diff --git a/bench/Makefile b/bench/Makefile new file mode 100644 index 0000000..e819a4b --- /dev/null +++ b/bench/Makefile @@ -0,0 +1,28 @@ +# All targets. +default: bench + +bench: /usr/local/bin/go /etc/redis/redis.conf /usr/local/bin/docker-compose main.go + rm -f ../conf.sample.yml + go build -o bench + +go: /usr/local/bin/go + +redis: /etc/redis/redis.conf + +mysql: /usr/local/bin/docker-compose + +/usr/local/bin/go: + wget https://golang.org/dl/go1.17.1.linux-amd64.tar.gz + rm -rf /usr/local/go && tar -C /usr/local -xzf go1.17.1.linux-amd64.tar.gz && cp -f /usr/local/go/bin/go /usr/local/bin/go && rm go1.* + +/etc/redis/redis.conf: + apt update + apt install -y redis redis-tools + +/usr/local/bin/docker-compose: + apt update + apt install -y sysbench apache2-utils mysql-client-core-8.0 + curl -fsSL https://get.docker.com | sh + curl -L "https://github.com/docker/compose/releases/download/1.29.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose + chmod +x /usr/local/bin/docker-compose + cd .. && docker-compose -f helper/compose.mysql.yml up -d && cd bench diff --git a/bench/main.go b/bench/main.go index 56542c8..ec2a78e 100644 --- a/bench/main.go +++ b/bench/main.go @@ -13,31 +13,42 @@ import ( "github.com/dtm-labs/dtm/examples" ) -var hint = `To start the bench server, you need to specify the parameters: - -Available commands: - http start bench server +var usage = `bench is a bench test server for dtmf +usage: + redis prepare for redis bench test + db prepare for mysql|postgres bench test + boltdb prepare for boltdb bench test ` +func hintAndExit() { + fmt.Printf(usage) + os.Exit(0) +} + +var conf = &common.Config + func main() { if len(os.Args) <= 1 { - fmt.Printf(hint) - return + hintAndExit() + } + logger.Infof("starting bench server") + common.MustLoadConfig() + logger.InitLog(conf.LogLevel) + if conf.ExamplesDB.Driver != "" { + dtmcli.SetCurrentDBType(conf.ExamplesDB.Driver) + svr.PrepareBenchDB() } - logger.Infof("starting dtm....") - if os.Args[1] == "http" { - fmt.Println("start bench server") - common.MustLoadConfig() - logger.InitLog(common.Config.LogLevel) - dtmcli.SetCurrentDBType(common.Config.ExamplesDB.Driver) - registry.WaitStoreUp() - dtmsvr.PopulateDB(false) + registry.WaitStoreUp() + dtmsvr.PopulateDB(false) + if os.Args[1] == "db" { examples.PopulateDB(false) - dtmsvr.StartSvr() // 启动dtmsvr的api服务 - go dtmsvr.CronExpiredTrans(-1) // 启动dtmsvr的定时过期查询 - svr.StartSvr() - select {} + } else if os.Args[1] == "redis" || os.Args[1] == "boltdb" { + } else { - fmt.Printf(hint) + hintAndExit() } + dtmsvr.StartSvr() // 启动dtmsvr的api服务 + go dtmsvr.CronExpiredTrans(-1) // 启动dtmsvr的定时过期查询 + svr.StartSvr() // 启动bench服务 + select {} } diff --git a/bench/prepare.sh b/bench/prepare.sh new file mode 100755 index 0000000..2ec3ad1 --- /dev/null +++ b/bench/prepare.sh @@ -0,0 +1,10 @@ +# !/bin/bash +apt update +apt install -y git +git clone https://github.com/dtm-labs/dtm.git && cd dtm && git checkout alpha && cd bench && make + + +echo 'all prepared. you shoud run following commands to test in different terminal' +echo +echo 'cd dtm && go run bench/main.go redis|boltdb|db' +echo 'cd dtm && bench/run-redis|boltdb|mysql.sh' diff --git a/bench/run-mysql.sh b/bench/run-mysql.sh deleted file mode 100755 index 8db27b4..0000000 --- a/bench/run-mysql.sh +++ /dev/null @@ -1,8 +0,0 @@ -# !/bin/bash - -cd /usr/share/sysbench/ -echo 'create database sbtest;' > mysql -h 127.0.0.1 -uroot - -sysbench oltp_write_only.lua --time=60 --mysql-host=127.0.0.1 --mysql-port=3306 --mysql-user=root --mysql-password= --mysql-db=sbtest --table-size=1000000 --tables=10 --threads=10 --events=999999999 --report-interval=10 prepare - -sysbench oltp_write_only.lua --time=60 --mysql-host=127.0.0.1 --mysql-port=3306 --mysql-user=root --mysql-password= --mysql-db=sbtest --table-size=1000000 --tables=10 --threads=10 --events=999999999 --report-interval=10 run diff --git a/bench/run-services.sh b/bench/run-services.sh deleted file mode 100644 index a95f9a3..0000000 --- a/bench/run-services.sh +++ /dev/null @@ -1,5 +0,0 @@ -# !/bin/bash - -# start all services -docker-compose -f helper/compose.mysql.yml up -d -go run app/main.go bench > /dev/nul \ No newline at end of file diff --git a/bench/setup-redis6.sh b/bench/setup-redis6.sh new file mode 100755 index 0000000..ed180f4 --- /dev/null +++ b/bench/setup-redis6.sh @@ -0,0 +1,6 @@ +# !/bin/bash +apt update +apt install -y software-properties-common +add-apt-repository -y ppa:redislabs/redis +apt install -y redis redis-tools + diff --git a/bench/setup.sh b/bench/setup.sh index bd90f1a..985aaa3 100755 --- a/bench/setup.sh +++ b/bench/setup.sh @@ -1,8 +1,9 @@ # !/bin/bash # install all commands needed + apt update -apt install -y git sysbench apache2-utils mysql-client-core-8.0 +apt install -y sysbench apache2-utils mysql-client-core-8.0 redis redis-tools # install docker and docker-compose curl -fsSL https://get.docker.com -o get-docker.sh @@ -13,5 +14,3 @@ chmod +x /usr/local/bin/docker-compose # install go wget https://golang.org/dl/go1.17.1.linux-amd64.tar.gz rm -rf /usr/local/go && tar -C /usr/local -xzf go1.17.1.linux-amd64.tar.gz && cp -f /usr/local/go/bin/go /usr/local/bin/go - -apt install -y redis \ No newline at end of file diff --git a/bench/svr/http.go b/bench/svr/http.go index de03f07..0ff80a8 100644 --- a/bench/svr/http.go +++ b/bench/svr/http.go @@ -9,6 +9,7 @@ package svr import ( "database/sql" "fmt" + "os" "strings" "sync/atomic" "time" @@ -18,7 +19,6 @@ import ( "github.com/dtm-labs/dtm/dtmcli/dtmimp" "github.com/dtm-labs/dtm/dtmcli/logger" "github.com/dtm-labs/dtm/dtmsvr" - "github.com/dtm-labs/dtm/examples" "github.com/gin-gonic/gin" "github.com/lithammer/shortuuid" ) @@ -27,10 +27,10 @@ import ( // service address of the transcation const benchAPI = "/api/busi_bench" -const benchPort = 8083 const total = 200000 -var benchBusi = fmt.Sprintf("http://localhost:%d%s", benchPort, benchAPI) +var benchPort = dtmimp.If(os.Getenv("BENCH_PORT") == "", "8083", os.Getenv("BENCH_PORT")).(string) +var benchBusi = fmt.Sprintf("http://localhost:%s%s", benchPort, benchAPI) func sdbGet() *sql.DB { db, err := dtmimp.PooledDB(common.Config.ExamplesDB) @@ -68,12 +68,7 @@ var uidCounter int32 = 0 var mode string = "" var sqls int = 1 -// StartSvr 1 -func StartSvr() { - app := common.GetGinApp() - benchAddRoute(app) - logger.Debugf("bench listening at %d", benchPort) - go app.Run(fmt.Sprintf(":%d", benchPort)) +func PrepareBenchDB() { db := sdbGet() _, err := dtmimp.DBExec(db, "drop table if exists dtm_busi.user_account_log") logger.FatalIfError(err) @@ -94,6 +89,14 @@ func StartSvr() { logger.FatalIfError(err) } +// StartSvr 1 +func StartSvr() { + app := common.GetGinApp() + benchAddRoute(app) + logger.Debugf("bench listening at %d", benchPort) + go app.Run(fmt.Sprintf(":%s", benchPort)) +} + func qsAdjustBalance(uid int, amount int, c *gin.Context) (interface{}, error) { if strings.Contains(mode, "empty") || sqls == 0 { return dtmcli.MapSuccess, nil @@ -124,6 +127,7 @@ func qsAdjustBalance(uid int, amount int, c *gin.Context) (interface{}, error) { } func benchAddRoute(app *gin.Engine) { + dtmHttpServer := fmt.Sprintf("http://localhost:%d/api/dtmsvr", common.Config.HttpPort) app.POST(benchAPI+"/TransIn", common.WrapHandler(func(c *gin.Context) (interface{}, error) { return qsAdjustBalance(dtmimp.MustAtoi(c.Query("uid")), 1, c) })) @@ -154,7 +158,7 @@ func benchAddRoute(app *gin.Engine) { params2 := fmt.Sprintf("?uid=%s", suid2) logger.Debugf("mode: %s contains dtm: %t", mode, strings.Contains(mode, "dtm")) if strings.Contains(mode, "dtm") { - saga := dtmcli.NewSaga(examples.DtmHttpServer, fmt.Sprintf("bench-%d", uid)). + saga := dtmcli.NewSaga(dtmHttpServer, fmt.Sprintf("bench-%d", uid)). Add(benchBusi+"/TransOut"+params, benchBusi+"/TransOutCompensate"+params, req). Add(benchBusi+"/TransIn"+params2, benchBusi+"/TransInCompensate"+params2, req) saga.WaitResult = true @@ -171,9 +175,10 @@ func benchAddRoute(app *gin.Engine) { app.Any(benchAPI+"/benchEmptyUrl", common.WrapHandler(func(c *gin.Context) (interface{}, error) { gid := shortuuid.New() req := gin.H{} - saga := dtmcli.NewSaga(examples.DtmHttpServer, gid). + saga := dtmcli.NewSaga(dtmHttpServer, gid). Add("", "", req). Add("", "", req) + saga.WaitResult = true err := saga.Submit() return nil, err })) diff --git a/bench/test-boltdb.sh b/bench/test-boltdb.sh new file mode 100755 index 0000000..ddd2807 --- /dev/null +++ b/bench/test-boltdb.sh @@ -0,0 +1,5 @@ +# !/bin/bash + +set -x + +ab -n 50000 -c 10 "http://127.0.0.1:8083/api/busi_bench/benchEmptyUrl" diff --git a/bench/run-dtm.sh b/bench/test-mysql.sh similarity index 67% rename from bench/run-dtm.sh rename to bench/test-mysql.sh index 0f8cd2f..9a7f259 100755 --- a/bench/run-dtm.sh +++ b/bench/test-mysql.sh @@ -1,7 +1,14 @@ # !/bin/bash -# go run ../app/main.go set -x + +cd /usr/share/sysbench/ +echo 'create database sbtest;' > mysql -h 127.0.0.1 -uroot + +sysbench oltp_write_only.lua --time=60 --mysql-host=127.0.0.1 --mysql-port=3306 --mysql-user=root --mysql-password= --mysql-db=sbtest --table-size=1000000 --tables=10 --threads=10 --events=999999999 --report-interval=10 prepare + +sysbench oltp_write_only.lua --time=60 --mysql-host=127.0.0.1 --mysql-port=3306 --mysql-user=root --mysql-password= --mysql-db=sbtest --table-size=1000000 --tables=10 --threads=10 --events=999999999 --report-interval=10 run + export TIME=10 export CONCURRENT=20 curl "http://127.0.0.1:8083/api/busi_bench/reloadData?m=dtm_tx&sqls=0" && ab -t $TIME -c $CONCURRENT "http://127.0.0.1:8083/api/busi_bench/bench" @@ -13,9 +20,3 @@ curl "http://127.0.0.1:8083/api/busi_bench/reloadData?m=dtm_barrier&sqls=1" && a curl "http://127.0.0.1:8083/api/busi_bench/reloadData?m=raw_tx&sqls=1" && ab -t $TIME -c $CONCURRENT "http://127.0.0.1:8083/api/busi_bench/bench" curl "http://127.0.0.1:8083/api/busi_bench/reloadData?m=raw_empty" && ab -t $TIME -c $CONCURRENT "http://127.0.0.1:8083/api/busi_bench/bench" -# curl "http://127.0.0.1:8083/api/busi_bench/reloadData?m=raw_empty" && curl "http://127.0.0.1:8083/api/busi_bench/bench" -# curl "http://127.0.0.1:8083/api/busi_bench/reloadData?m=raw_tx" && curl "http://127.0.0.1:8083/api/busi_bench/bench" -# curl "http://127.0.0.1:8083/api/busi_bench/reloadData?m=dtm_tx" && curl "http://127.0.0.1:8083/api/busi_bench/bench" -# curl "http://127.0.0.1:8083/api/busi_bench/reloadData?m=dtm_barrier" && curl "http://127.0.0.1:8083/api/busi_bench/bench" - -ab -n 1000000 -c 10 "http://127.0.0.1:8083/api/busi_bench/benchEmptyUrl" diff --git a/bench/test-redis.sh b/bench/test-redis.sh new file mode 100755 index 0000000..403c1c5 --- /dev/null +++ b/bench/test-redis.sh @@ -0,0 +1,27 @@ +# !/bin/bash + +set -x + +export LOG_LEVEL=warn +export STORE_DRIVER=redis +export STORE_HOST=localhost +export STORE_PORT=6379 +cd .. && bench/bench redis & +echo 'sleeping 3s for dtm bench to run up.' && sleep 3 +ab -n 1000000 -c 10 "http://127.0.0.1:8083/api/busi_bench/benchEmptyUrl" +pkill bench + +redis-benchmark -n 300000 SET 'abcdefg' 'ddddddd' + +redis-benchmark -n 300000 EVAL "redis.call('SET', 'abcdedf', 'ddddddd')" 0 + +redis-benchmark -n 300000 EVAL "redis.call('SET', KEYS[1], ARGV[1])" 1 'aaaaaaaaa' 'bbbbbbbbbb' + +redis-benchmark -n 3000000 -P 50 SET 'abcdefg' 'ddddddd' + +redis-benchmark -n 300000 EVAL "for k=1, 10 do; redis.call('SET', KEYS[1], ARGV[1]);end" 1 'aaaaaaaaa' 'bbbbbbbbbb' + +redis-benchmark -n 300000 -P 50 EVAL "redis.call('SET', KEYS[1], ARGV[1])" 1 'aaaaaaaaa' 'bbbbbbbbbb' + +redis-benchmark -n 300000 EVAL "for k=1,10 do;local c = cjson.decode(ARGV[1]);end" 1 'aaaaaaaaa' '{"aaaaa":"bbbbb","b":1,"t":"2012-01-01 14:00:00"}' + diff --git a/dtmsvr/api.go b/dtmsvr/api.go index 5c61a8d..7fb5673 100644 --- a/dtmsvr/api.go +++ b/dtmsvr/api.go @@ -17,22 +17,23 @@ import ( func svcSubmit(t *TransGlobal) (interface{}, error) { t.Status = dtmcli.StatusSubmitted - err := t.saveNew() + branches, err := t.saveNew() if err == storage.ErrUniqueConflict { dbt := GetTransGlobal(t.Gid) if dbt.Status == dtmcli.StatusPrepared { dbt.changeStatus(t.Status) + branches = GetStore().FindBranches(t.Gid) } else if dbt.Status != dtmcli.StatusSubmitted { return map[string]interface{}{"dtm_result": dtmcli.ResultFailure, "message": fmt.Sprintf("current status '%s', cannot sumbmit", dbt.Status)}, nil } } - return t.Process(), nil + return t.Process(branches), nil } func svcPrepare(t *TransGlobal) (interface{}, error) { t.Status = dtmcli.StatusPrepared - err := t.saveNew() + _, err := t.saveNew() if err == storage.ErrUniqueConflict { dbt := GetTransGlobal(t.Gid) if dbt.Status != dtmcli.StatusPrepared { @@ -48,7 +49,8 @@ func svcAbort(t *TransGlobal) (interface{}, error) { return map[string]interface{}{"dtm_result": dtmcli.ResultFailure, "message": fmt.Sprintf("trans type: '%s' current status '%s', cannot abort", dbt.TransType, dbt.Status)}, nil } dbt.changeStatus(dtmcli.StatusAborting) - return dbt.Process(), nil + branches := GetStore().FindBranches(t.Gid) + return dbt.Process(branches), nil } func svcRegisterBranch(transType string, branch *TransBranch, data map[string]string) (ret interface{}, rerr error) { diff --git a/dtmsvr/cron.go b/dtmsvr/cron.go index c718840..5eec83c 100644 --- a/dtmsvr/cron.go +++ b/dtmsvr/cron.go @@ -31,7 +31,8 @@ func CronTransOnce() (gid string) { } gid = trans.Gid trans.WaitResult = true - trans.Process() + branches := GetStore().FindBranches(gid) + trans.Process(branches) return } diff --git a/dtmsvr/storage/redis/redis.go b/dtmsvr/storage/redis/redis.go index b525f7f..60ec8fd 100644 --- a/dtmsvr/storage/redis/redis.go +++ b/dtmsvr/storage/redis/redis.go @@ -35,6 +35,7 @@ func (s *RedisStore) PopulateData(skipDrop bool) { } func (s *RedisStore) FindTransGlobalStore(gid string) *storage.TransGlobalStore { + logger.Debugf("calling FindTransGlobalStore: %s", gid) r, err := redisGet().Get(ctx, config.Store.RedisPrefix+"_g_"+gid).Result() if err == redis.Nil { return nil @@ -46,6 +47,7 @@ func (s *RedisStore) FindTransGlobalStore(gid string) *storage.TransGlobalStore } func (s *RedisStore) ScanTransGlobalStores(position *string, limit int64) []storage.TransGlobalStore { + logger.Debugf("calling ScanTransGlobalStores: %s %d", *position, limit) lid := uint64(0) if *position != "" { lid = uint64(dtmimp.MustAtoi(*position)) @@ -71,6 +73,7 @@ func (s *RedisStore) ScanTransGlobalStores(position *string, limit int64) []stor } func (s *RedisStore) FindBranches(gid string) []storage.TransBranchStore { + logger.Debugf("calling FindBranches: %s", gid) sa, err := redisGet().LRange(ctx, config.Store.RedisPrefix+"_b_"+gid, 0, -1).Result() dtmimp.E2P(err) branches := make([]storage.TransBranchStore, len(sa)) diff --git a/dtmsvr/trans_process.go b/dtmsvr/trans_process.go index 8165e50..cb5a288 100644 --- a/dtmsvr/trans_process.go +++ b/dtmsvr/trans_process.go @@ -16,23 +16,23 @@ import ( ) // Process process global transaction once -func (t *TransGlobal) Process() map[string]interface{} { - r := t.process() +func (t *TransGlobal) Process(branches []TransBranch) map[string]interface{} { + r := t.process(branches) transactionMetrics(t, r["dtm_result"] == dtmcli.ResultSuccess) return r } -func (t *TransGlobal) process() map[string]interface{} { +func (t *TransGlobal) process(branches []TransBranch) map[string]interface{} { if t.Options != "" { dtmimp.MustUnmarshalString(t.Options, &t.TransOptions) } if !t.WaitResult { - go t.processInner() + go t.processInner(branches) return dtmcli.MapSuccess } submitting := t.Status == dtmcli.StatusSubmitted - err := t.processInner() + err := t.processInner(branches) if err != nil { return map[string]interface{}{"dtm_result": dtmcli.ResultFailure, "message": err.Error()} } @@ -42,7 +42,7 @@ func (t *TransGlobal) process() map[string]interface{} { return dtmcli.MapSuccess } -func (t *TransGlobal) processInner() (rerr error) { +func (t *TransGlobal) processInner(branches []TransBranch) (rerr error) { defer handlePanic(&rerr) defer func() { if rerr != nil && rerr != dtmcli.ErrOngoing { @@ -55,14 +55,12 @@ func (t *TransGlobal) processInner() (rerr error) { } }() logger.Debugf("processing: %s status: %s", t.Gid, t.Status) - branches := GetStore().FindBranches(t.Gid) t.lastTouched = time.Now() rerr = t.getProcessor().ProcessOnce(branches) return } -func (t *TransGlobal) saveNew() error { - branches := t.getProcessor().GenBranches() +func (t *TransGlobal) saveNew() ([]TransBranch, error) { t.NextCronInterval = t.getNextCronInterval(cronReset) t.NextCronTime = common.GetNextTime(t.NextCronInterval) t.Options = dtmimp.MustMarshalString(t.TransOptions) @@ -72,8 +70,13 @@ func (t *TransGlobal) saveNew() error { now := time.Now() t.CreateTime = &now t.UpdateTime = &now + branches := t.getProcessor().GenBranches() + for i, _ := range branches { + branches[i].CreateTime = &now + branches[i].UpdateTime = &now + } err := GetStore().MaySaveNewTrans(&t.TransGlobalStore, branches) logger.Infof("MaySaveNewTrans result: %v, global: %v branches: %v", err, t.TransGlobalStore.String(), dtmimp.MustMarshalString(branches)) - return err + return branches, err }