Browse Source

Merge pull request #127 from dtm-labs/alpha

Redis optimized
pull/128/head v1.7.4
yedf2 4 years ago
committed by GitHub
parent
commit
e252d39823
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 10
      bench/main.go
  2. 2
      bench/run-dtm.sh
  3. 4
      bench/setup.sh
  4. 14
      bench/svr/http.go
  5. 60
      dtmsvr/storage/redis/redis.go
  6. 4
      dtmsvr/storage/sql/sql.go
  7. 5
      dtmsvr/utils.go
  8. 2
      go.mod
  9. 7
      go.sum
  10. 2
      test/main_test.go

10
bench/main.go

@ -4,6 +4,7 @@ import (
"fmt"
"os"
"github.com/dtm-labs/dtm/bench/svr"
"github.com/dtm-labs/dtm/common"
"github.com/dtm-labs/dtm/dtmcli"
"github.com/dtm-labs/dtm/dtmcli/logger"
@ -23,17 +24,18 @@ func main() {
fmt.Printf(hint)
return
}
logger.Debugf("starting dtm....")
logger.Infof("starting dtm....")
if os.Args[1] == "http" {
fmt.Println("start bench server")
common.MustLoadConfig()
logger.InitLog(common.Config.LogLevel)
dtmcli.SetCurrentDBType(common.Config.ExamplesDB.Driver)
registry.WaitStoreUp()
dtmsvr.PopulateDB(true)
examples.PopulateDB(true)
dtmsvr.PopulateDB(false)
examples.PopulateDB(false)
dtmsvr.StartSvr() // 启动dtmsvr的api服务
go dtmsvr.CronExpiredTrans(-1) // 启动dtmsvr的定时过期查询
StartSvr()
svr.StartSvr()
select {}
} else {
fmt.Printf(hint)

2
bench/run-dtm.sh

@ -17,3 +17,5 @@ curl "http://127.0.0.1:8083/api/busi_bench/reloadData?m=raw_empty" && ab -t $TIM
# curl "http://127.0.0.1:8083/api/busi_bench/reloadData?m=raw_tx" && curl "http://127.0.0.1:8083/api/busi_bench/bench"
# curl "http://127.0.0.1:8083/api/busi_bench/reloadData?m=dtm_tx" && curl "http://127.0.0.1:8083/api/busi_bench/bench"
# curl "http://127.0.0.1:8083/api/busi_bench/reloadData?m=dtm_barrier" && curl "http://127.0.0.1:8083/api/busi_bench/bench"
ab -n 1000000 -c 10 "http://127.0.0.1:8083/api/busi_bench/benchEmptyUrl"

4
bench/setup.sh

@ -12,4 +12,6 @@ chmod +x /usr/local/bin/docker-compose
# install go
wget https://golang.org/dl/go1.17.1.linux-amd64.tar.gz
rm -rf /usr/local/go && tar -C /usr/local -xzf go1.17.1.linux-amd64.tar.gz && cp -f /usr/local/bin/go /usr/local/go/bin/go
rm -rf /usr/local/go && tar -C /usr/local -xzf go1.17.1.linux-amd64.tar.gz && cp -f /usr/local/go/bin/go /usr/local/bin/go
apt install -y redis

14
bench/http.go → bench/svr/http.go

@ -4,7 +4,7 @@
* license that can be found in the LICENSE file.
*/
package main
package svr
import (
"database/sql"
@ -20,6 +20,7 @@ import (
"github.com/dtm-labs/dtm/dtmsvr"
"github.com/dtm-labs/dtm/examples"
"github.com/gin-gonic/gin"
"github.com/lithammer/shortuuid"
)
// launch command:go run app/main.go qs
@ -32,7 +33,7 @@ const total = 200000
var benchBusi = fmt.Sprintf("http://localhost:%d%s", benchPort, benchAPI)
func sdbGet() *sql.DB {
db, err := dtmimp.PooledDB(common.Config.Store.GetDBConf())
db, err := dtmimp.PooledDB(common.Config.ExamplesDB)
logger.FatalIfError(err)
return db
}
@ -167,4 +168,13 @@ func benchAddRoute(app *gin.Engine) {
}
return nil, nil
}))
app.Any(benchAPI+"/benchEmptyUrl", common.WrapHandler(func(c *gin.Context) (interface{}, error) {
gid := shortuuid.New()
req := gin.H{}
saga := dtmcli.NewSaga(examples.DtmHttpServer, gid).
Add("", "", req).
Add("", "", req)
err := saga.Submit()
return nil, err
}))
}

60
dtmsvr/storage/redis/redis.go

@ -98,6 +98,7 @@ func (a *argList) AppendGid(gid string) *argList {
a.Keys = append(a.Keys, config.Store.RedisPrefix+"_g_"+gid)
a.Keys = append(a.Keys, config.Store.RedisPrefix+"_b_"+gid)
a.Keys = append(a.Keys, config.Store.RedisPrefix+"_u")
a.Keys = append(a.Keys, config.Store.RedisPrefix+"_s_"+gid)
return a
}
@ -141,19 +142,21 @@ func (s *RedisStore) MaySaveNewTrans(global *storage.TransGlobalStore, branches
AppendGid(global.Gid).
AppendObject(global).
AppendRaw(global.NextCronTime.Unix()).
AppendRaw(global.Gid).
AppendRaw(global.Status).
AppendBranches(branches)
global.Steps = nil
global.Payloads = nil
_, err := callLua(a, `-- MaySaveNewTrans
local gs = cjson.decode(ARGV[3])
local g = redis.call('GET', KEYS[1])
if g ~= false then
return 'UNIQUE_CONFLICT'
end
redis.call('SET', KEYS[1], ARGV[3], 'EX', ARGV[2])
redis.call('ZADD', KEYS[3], ARGV[4], gs.gid)
for k = 5, table.getn(ARGV) do
redis.call('SET', KEYS[4], ARGV[6], 'EX', ARGV[2])
redis.call('ZADD', KEYS[3], ARGV[4], ARGV[5])
for k = 7, table.getn(ARGV) do
redis.call('RPUSH', KEYS[2], ARGV[k])
end
redis.call('EXPIRE', KEYS[2], ARGV[2])
@ -164,17 +167,12 @@ redis.call('EXPIRE', KEYS[2], ARGV[2])
func (s *RedisStore) LockGlobalSaveBranches(gid string, status string, branches []storage.TransBranchStore, branchStart int) {
args := newArgList().
AppendGid(gid).
AppendObject(&storage.TransGlobalStore{Gid: gid, Status: status}).
AppendRaw(status).
AppendRaw(branchStart).
AppendBranches(branches)
_, err := callLua(args, `
local gs = cjson.decode(ARGV[3])
local g = redis.call('GET', KEYS[1])
if (g == false) then
return 'NOT_FOUND'
end
local js = cjson.decode(g)
if js.status ~= gs.status then
_, err := callLua(args, `-- LockGlobalSaveBranches
local old = redis.call('GET', KEYS[4])
if old ~= ARGV[3] then
return 'NOT_FOUND'
end
local start = ARGV[4]
@ -193,20 +191,22 @@ redis.call('EXPIRE', KEYS[2], ARGV[2])
func (s *RedisStore) ChangeGlobalStatus(global *storage.TransGlobalStore, newStatus string, updates []string, finished bool) {
old := global.Status
global.Status = newStatus
args := newArgList().AppendGid(global.Gid).AppendObject(global).AppendRaw(old).AppendRaw(finished)
args := newArgList().
AppendGid(global.Gid).
AppendObject(global).
AppendRaw(old).
AppendRaw(finished).
AppendRaw(global.Gid).
AppendRaw(newStatus)
_, err := callLua(args, `-- ChangeGlobalStatus
local gs = cjson.decode(ARGV[3])
local old = redis.call('GET', KEYS[1])
if old == false then
return 'NOT_FOUND'
end
local os = cjson.decode(old)
if os.status ~= ARGV[4] then
local old = redis.call('GET', KEYS[4])
if old ~= ARGV[4] then
return 'NOT_FOUND'
end
redis.call('SET', KEYS[1], ARGV[3], 'EX', ARGV[2])
redis.call('SET', KEYS[4], ARGV[7], 'EX', ARGV[2])
if ARGV[5] == '1' then
redis.call('ZREM', KEYS[3], gs.gid)
redis.call('ZREM', KEYS[3], ARGV[6])
end
`)
dtmimp.E2P(err)
@ -246,18 +246,18 @@ func (s *RedisStore) TouchCronTime(global *storage.TransGlobalStore, nextCronInt
global.NextCronTime = common.GetNextTime(nextCronInterval)
global.UpdateTime = common.GetNextTime(0)
global.NextCronInterval = nextCronInterval
args := newArgList().AppendGid(global.Gid).AppendObject(global).AppendRaw(global.NextCronTime.Unix())
args := newArgList().
AppendGid(global.Gid).
AppendObject(global).
AppendRaw(global.NextCronTime.Unix()).
AppendRaw(global.Status).
AppendRaw(global.Gid)
_, err := callLua(args, `-- TouchCronTime
local g = cjson.decode(ARGV[3])
local old = redis.call('GET', KEYS[1])
if old == false then
local old = redis.call('GET', KEYS[4])
if old ~= ARGV[5] then
return 'NOT_FOUND'
end
local os = cjson.decode(old)
if os.status ~= g.status then
return 'NOT_FOUND'
end
redis.call('ZADD', KEYS[3], ARGV[4], g.gid)
redis.call('ZADD', KEYS[3], ARGV[4], ARGV[6])
redis.call('SET', KEYS[1], ARGV[3], 'EX', ARGV[2])
`)
dtmimp.E2P(err)

4
dtmsvr/storage/sql/sql.go

@ -5,7 +5,7 @@ import (
"math"
"time"
"github.com/google/uuid"
"github.com/lithammer/shortuuid/v3"
"gorm.io/gorm"
"gorm.io/gorm/clause"
@ -126,7 +126,7 @@ func (s *SqlStore) LockOneGlobalTrans(expireIn time.Duration) *storage.TransGlob
}
expire := int(expireIn / time.Second)
whereTime := fmt.Sprintf("next_cron_time < %s", getTime(expire))
owner := uuid.NewString()
owner := shortuuid.New()
global := &storage.TransGlobalStore{}
dbr := db.Must().Model(global).
Where(whereTime + "and status in ('prepared', 'aborting', 'submitted')").

5
dtmsvr/utils.go

@ -10,12 +10,11 @@ import (
"fmt"
"time"
"github.com/google/uuid"
"github.com/dtm-labs/dtm/common"
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmsvr/storage"
"github.com/dtm-labs/dtm/dtmsvr/storage/registry"
"github.com/lithammer/shortuuid/v3"
)
type branchStatus struct {
@ -38,7 +37,7 @@ var TransProcessedTestChan chan string = nil
// GenGid generate gid, use uuid
func GenGid() string {
return uuid.NewString()
return shortuuid.New()
}
// GetTransGlobal construct trans from db

2
go.mod

@ -15,6 +15,8 @@ require (
github.com/google/uuid v1.3.0
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/lib/pq v1.10.3
github.com/lithammer/shortuuid v2.0.3+incompatible
github.com/lithammer/shortuuid/v3 v3.0.7
github.com/onsi/gomega v1.16.0
github.com/polarismesh/grpc-go-polaris v0.0.0-20211128162137-1a59cd7b5733 // indirect
github.com/prometheus/client_golang v1.11.0

7
go.sum

@ -241,6 +241,7 @@ github.com/google/pprof v0.0.0-20200229191704-1ebb73c60ed3/go.mod h1:ZgVRPoUq/hf
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
@ -369,6 +370,11 @@ github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.10.2/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/lib/pq v1.10.3 h1:v9QZf2Sn6AmjXtQeFpdoq/eaNtYP6IN+7lcrygsIAtg=
github.com/lib/pq v1.10.3/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/lithammer/shortuuid v1.0.0 h1:kdcbvjGVEgqeVeDIRtnANOi/F6ftbKrtbxY+cjQmK1Q=
github.com/lithammer/shortuuid v2.0.3+incompatible h1:ao1r3cQ9AUX+c6dZXwbCM/ELGf10EoO4SyqqxBXTyHc=
github.com/lithammer/shortuuid v2.0.3+incompatible/go.mod h1:FR74pbAuElzOUuenUHTK2Tciko1/vKuIKS9dSkDrA4w=
github.com/lithammer/shortuuid/v3 v3.0.7 h1:trX0KTHy4Pbwo/6ia8fscyHoGA+mf1jWbPJVuvyJQQ8=
github.com/lithammer/shortuuid/v3 v3.0.7/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts=
github.com/logrusorgru/aurora v2.0.3+incompatible/go.mod h1:7rIyQOR62GCctdiQpZ/zOJlFyk6y+94wXzv6RNZgaR4=
github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
@ -467,6 +473,7 @@ github.com/rs/zerolog v1.13.0/go.mod h1:YbFCdg8HfsridGWAh22vktObvhZbQsZXe4/zB0OK
github.com/rs/zerolog v1.15.0/go.mod h1:xYTKnLHcpfU2225ny5qZjxnj9NvkumZYjJHlAThCjNc=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww=
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4=
github.com/shopspring/decimal v1.2.0 h1:abSATXmQEYyShuxI4/vyW3tV1MrKAJzCZ/0zLUXYbsQ=

2
test/main_test.go

@ -13,6 +13,7 @@ import (
"github.com/dtm-labs/dtm/common"
"github.com/dtm-labs/dtm/dtmcli"
"github.com/dtm-labs/dtm/dtmcli/logger"
"github.com/dtm-labs/dtm/dtmsvr"
"github.com/dtm-labs/dtm/examples"
"github.com/gin-gonic/gin"
@ -26,6 +27,7 @@ func exitIf(code int) {
func TestMain(m *testing.M) {
common.MustLoadConfig()
logger.InitLog(config.LogLevel)
dtmcli.SetCurrentDBType(common.Config.ExamplesDB.Driver)
dtmsvr.TransProcessedTestChan = make(chan string, 1)
dtmsvr.NowForwardDuration = 0 * time.Second

Loading…
Cancel
Save