From 8eda41849dae7fbf31e6d407b3430b8c3e4619e3 Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Wed, 6 Oct 2021 14:07:50 +0800 Subject: [PATCH] ready to run bench --- .travis.yml | 2 +- app/main.go | 3 +- bench/http.go | 70 ++++++++++++++++++++++++++++++----------- bench/run.sh | 17 +++++----- common/types.go | 4 +-- dtmsvr/dtmsvr.go | 38 +++++++++++++++++++++- dtmsvr/dtmsvr.mysql.sql | 2 +- dtmsvr/trans.go | 18 ++++++----- dtmsvr/utils.go | 6 ++++ test/dtmsvr_test.go | 44 +++++++++----------------- test/main_test.go | 41 ++++++++++++++++++++++++ 11 files changed, 178 insertions(+), 67 deletions(-) mode change 100644 => 100755 bench/run.sh create mode 100644 test/main_test.go diff --git a/.travis.yml b/.travis.yml index a0a9192..94d45aa 100644 --- a/.travis.yml +++ b/.travis.yml @@ -14,4 +14,4 @@ before_install: - go get -t -v ./... - go get github.com/mattn/goveralls script: - - $GOPATH/bin/goveralls -service=travis-ci -ignore="examples/*,dtmgrpc/*.pb.go" + - $GOPATH/bin/goveralls -service=travis-ci -ignore="examples/*,dtmgrpc/*.pb.go,bench/*" diff --git a/app/main.go b/app/main.go index f3ab1ae..a4e04bd 100644 --- a/app/main.go +++ b/app/main.go @@ -18,7 +18,8 @@ usage: Available commands: dtmsvr run dtm as a server - dev create all needed table and run dtm as a server + dev create all needed table and run dtm as a server + bench start bench server quick_start run quick start example (dtm will create needed table) qs same as quick_start diff --git a/bench/http.go b/bench/http.go index 4b780df..172e89a 100644 --- a/bench/http.go +++ b/bench/http.go @@ -18,7 +18,7 @@ import ( // 事务参与者的服务地址 const benchAPI = "/api/busi_bench" const benchPort = 8083 -const total = 1000000 +const total = 200000 var benchBusi = fmt.Sprintf("http://localhost:%d%s", benchPort, benchAPI) @@ -38,41 +38,71 @@ func txGet() *sql.Tx { func reloadData() { began := time.Now() db := sdbGet() - tables := []string{"dtm_busi.user_account", "dtm.trans_global", "dtm.trans_branch"} + _, err := dtmcli.DBExec(db, `create table if not exists dtm_busi.user_account_log ( + id INT(11) AUTO_INCREMENT PRIMARY KEY, + user_id INT(11) NOT NULL, + delta DECIMAL(11, 2) not null, + gid varchar(45) not null, + branch_id varchar(45) not null, + reason varchar(45), + create_time datetime not null default now(), + update_time datetime not null default now(), + key(user_id), + key(create_time) +) +`) + dtmcli.FatalIfError(err) + tables := []string{"dtm_busi.user_account", "dtm_busi.user_account_log", "dtm.trans_global", "dtm.trans_branch", "dtm_barrier.barrier"} for _, t := range tables { - dtmcli.DBExec(db, fmt.Sprintf("truncate %s", t)) + _, err = dtmcli.DBExec(db, fmt.Sprintf("truncate %s", t)) + dtmcli.FatalIfError(err) } s := "insert ignore into dtm_busi.user_account(user_id, balance) values " ss := []string{} for i := 1; i <= total; i++ { ss = append(ss, fmt.Sprintf("(%d, 1000000)", i)) } - db.Exec(s + strings.Join(ss, ",")) + _, err = db.Exec(s + strings.Join(ss, ",")) + dtmcli.FatalIfError(err) dtmcli.Logf("%d users inserted. used: %dms", total, time.Since(began).Milliseconds()) } var uidCounter int32 = 0 var mode string = "" +var sqls int = 1 // StartSvr 1 func StartSvr() { app := common.GetGinApp() benchAddRoute(app) dtmcli.Logf("bench listening at %d", benchPort) - reloadData() go app.Run(fmt.Sprintf(":%d", benchPort)) - time.Sleep(100 * time.Millisecond) + reloadData() + time.Sleep(1100 * time.Millisecond) // sleep 1 second for async branch status update to finish } -func qsAdjustBalance(uid int, amount int) (interface{}, error) { +func qsAdjustBalance(uid int, amount int, c *gin.Context) (interface{}, error) { if strings.Contains(mode, "empty") { return dtmcli.MapSuccess, nil - } else { - tx := txGet() - for i := 0; i < 5; i++ { - _, err := dtmcli.DBExec(tx, "update dtm_busi.user_account set balance = balance + ? where user_id = ?", amount, uid) + } + tb := dtmcli.TransBaseFromQuery(c.Request.URL.Query()) + f := func(tx dtmcli.DB) error { + for i := 0; i < sqls; i++ { + _, err := dtmcli.DBExec(tx, "insert into dtm_busi.user_account_log(user_id, delta, gid, branch_id, branch_type, reason) values(?,?,?,?,?,?)", + uid, amount, tb.Gid, c.Query("branch_id"), tb.TransType, fmt.Sprintf("inserted by dtm transaction %s %s", tb.Gid, c.Query("branch_id"))) + dtmcli.FatalIfError(err) + _, err = dtmcli.DBExec(tx, "update dtm_busi.user_account set balance = balance + ?, update_time = now() where user_id = ?", amount, uid) dtmcli.FatalIfError(err) } + return nil + } + if strings.Contains(mode, "barrier") { + barrier, err := dtmcli.BarrierFromQuery(c.Request.URL.Query()) + dtmcli.FatalIfError(err) + barrier.Call(txGet(), f) + } else { + tx := txGet() + f(tx) err := tx.Commit() dtmcli.FatalIfError(err) } @@ -82,20 +112,24 @@ func qsAdjustBalance(uid int, amount int) (interface{}, error) { func benchAddRoute(app *gin.Engine) { app.POST(benchAPI+"/TransIn", common.WrapHandler(func(c *gin.Context) (interface{}, error) { - return qsAdjustBalance(dtmcli.MustAtoi(c.Query("uid")), 1) + return qsAdjustBalance(dtmcli.MustAtoi(c.Query("uid")), 1, c) })) app.POST(benchAPI+"/TransInCompensate", common.WrapHandler(func(c *gin.Context) (interface{}, error) { - return qsAdjustBalance(dtmcli.MustAtoi(c.Query("uid")), -1) + return qsAdjustBalance(dtmcli.MustAtoi(c.Query("uid")), -1, c) })) app.POST(benchAPI+"/TransOut", common.WrapHandler(func(c *gin.Context) (interface{}, error) { - return qsAdjustBalance(dtmcli.MustAtoi(c.Query("uid")), -1) + return qsAdjustBalance(dtmcli.MustAtoi(c.Query("uid")), -1, c) })) app.POST(benchAPI+"/TransOutCompensate", common.WrapHandler(func(c *gin.Context) (interface{}, error) { - return qsAdjustBalance(dtmcli.MustAtoi(c.Query("uid")), 30) + return qsAdjustBalance(dtmcli.MustAtoi(c.Query("uid")), 30, c) })) app.Any(benchAPI+"/reloadData", common.WrapHandler(func(c *gin.Context) (interface{}, error) { reloadData() mode = c.Query("m") + s := c.Query("sqls") + if s != "" { + sqls = dtmcli.MustAtoi(s) + } return nil, nil })) app.Any(benchAPI+"/bench", common.WrapHandler(func(c *gin.Context) (interface{}, error) { @@ -112,12 +146,12 @@ func benchAddRoute(app *gin.Engine) { Add(benchBusi+"/TransIn"+params2, benchBusi+"/TransInCompensate"+params2, req) saga.WaitResult = true err := saga.Submit() - dtmcli.FatalIfError(err) + dtmcli.E2P(err) } else { _, err := dtmcli.RestyClient.R().SetBody(gin.H{}).SetQueryParam("uid", suid2).Post(benchBusi + "/TransOut") - dtmcli.FatalIfError(err) + dtmcli.E2P(err) _, err = dtmcli.RestyClient.R().SetBody(gin.H{}).SetQueryParam("uid", suid).Post(benchBusi + "/TransIn") - dtmcli.FatalIfError(err) + dtmcli.E2P(err) } return nil, nil })) diff --git a/bench/run.sh b/bench/run.sh old mode 100644 new mode 100755 index cdd1129..695cc0d --- a/bench/run.sh +++ b/bench/run.sh @@ -2,14 +2,17 @@ # go run ../app/main.go set -x -TIME=1 -CURRENT=10 -curl "http://127.0.0.1:8083/api/busi_bench/reloadData?m=raw_empty" && ab -t $TIME -c $CURRENT "http://127.0.0.1:8083/api/busi_bench/bench" -curl "http://127.0.0.1:8083/api/busi_bench/reloadData?m=raw_tx" && ab -t $TIME -c $CURRENT "http://127.0.0.1:8083/api/busi_bench/bench" -curl "http://127.0.0.1:8083/api/busi_bench/reloadData?m=dtm_empty" && ab -t $TIME -c $CURRENT "http://127.0.0.1:8083/api/busi_bench/bench" -curl "http://127.0.0.1:8083/api/busi_bench/reloadData?m=dtm_tx" && ab -t $TIME -c $CURRENT "http://127.0.0.1:8083/api/busi_bench/bench" +export TIME=10 +export CONCURRENT=20 +curl "http://127.0.0.1:8083/api/busi_bench/reloadData?m=dtm_tx&sqls=5" && ab -t $TIME -c $CONCURRENT "http://127.0.0.1:8083/api/busi_bench/bench" +curl "http://127.0.0.1:8083/api/busi_bench/reloadData?m=dtm_barrier&sqls=5" && ab -t $TIME -c $CONCURRENT "http://127.0.0.1:8083/api/busi_bench/bench" +curl "http://127.0.0.1:8083/api/busi_bench/reloadData?m=raw_tx&sqls=5" && ab -t $TIME -c $CONCURRENT "http://127.0.0.1:8083/api/busi_bench/bench" +curl "http://127.0.0.1:8083/api/busi_bench/reloadData?m=dtm_tx&sqls=1" && ab -t $TIME -c $CONCURRENT "http://127.0.0.1:8083/api/busi_bench/bench" +curl "http://127.0.0.1:8083/api/busi_bench/reloadData?m=dtm_barrier&sqls=1" && ab -t $TIME -c $CONCURRENT "http://127.0.0.1:8083/api/busi_bench/bench" +curl "http://127.0.0.1:8083/api/busi_bench/reloadData?m=raw_tx&sqls=1" && ab -t $TIME -c $CONCURRENT "http://127.0.0.1:8083/api/busi_bench/bench" +curl "http://127.0.0.1:8083/api/busi_bench/reloadData?m=raw_empty" && ab -t $TIME -c $CONCURRENT "http://127.0.0.1:8083/api/busi_bench/bench" # curl "http://127.0.0.1:8083/api/busi_bench/reloadData?m=raw_empty" && curl "http://127.0.0.1:8083/api/busi_bench/bench" # curl "http://127.0.0.1:8083/api/busi_bench/reloadData?m=raw_tx" && curl "http://127.0.0.1:8083/api/busi_bench/bench" -# curl "http://127.0.0.1:8083/api/busi_bench/reloadData?m=dtm_empty" && curl "http://127.0.0.1:8083/api/busi_bench/bench" # curl "http://127.0.0.1:8083/api/busi_bench/reloadData?m=dtm_tx" && curl "http://127.0.0.1:8083/api/busi_bench/bench" +# curl "http://127.0.0.1:8083/api/busi_bench/reloadData?m=dtm_barrier" && curl "http://127.0.0.1:8083/api/busi_bench/bench" diff --git a/common/types.go b/common/types.go index 60e404c..de04394 100644 --- a/common/types.go +++ b/common/types.go @@ -123,7 +123,7 @@ type dtmConfigType struct { TransCronInterval int64 `yaml:"TransCronInterval"` // 单位秒 当事务等待这个时间之后,还没有变化,则进行一轮处理,包括prepared中的任务和committed的任务 DB map[string]string `yaml:"DB"` DisableLocalhost int64 `yaml:"DisableLocalhost"` - RetryLimit int64 `yaml:"RetryLimit"` + UpdateBranchSync int64 `yaml:"UpdateBranchSync"` } // DtmConfig 配置 @@ -146,7 +146,7 @@ func init() { "password": os.Getenv("DB_PASSWORD"), } DtmConfig.DisableLocalhost = getIntEnv("DISABLE_LOCALHOST", "0") - DtmConfig.RetryLimit = getIntEnv("RETRY_LIMIT", "2000000000") + DtmConfig.UpdateBranchSync = getIntEnv("UPDATE_BRANCH_SYNC", "0") cont := []byte{} for d := MustGetwd(); d != "" && d != "/"; d = filepath.Dir(d) { cont1, err := ioutil.ReadFile(d + "/conf.yml") diff --git a/dtmsvr/dtmsvr.go b/dtmsvr/dtmsvr.go index 91073aa..ff15414 100644 --- a/dtmsvr/dtmsvr.go +++ b/dtmsvr/dtmsvr.go @@ -8,8 +8,9 @@ import ( "github.com/yedf/dtm/common" "github.com/yedf/dtm/dtmcli" "github.com/yedf/dtm/dtmgrpc" + "gorm.io/gorm/clause" - "github.com/grpc-ecosystem/go-grpc-middleware" + grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" "google.golang.org/grpc" "github.com/yedf/dtm/examples" @@ -40,6 +41,7 @@ func StartSvr() { err := s.Serve(lis) dtmcli.FatalIfError(err) }() + go updateBranchAsync() // prometheus exporter dtmcli.Logf("prometheus exporter listen at: %d", metricsPort) @@ -52,3 +54,37 @@ func PopulateDB(skipDrop bool) { file := fmt.Sprintf("%s/dtmsvr.%s.sql", common.GetCallerCodeDir(), config.DB["driver"]) examples.RunSQLScript(config.DB, file, skipDrop) } + +// UpdateBranchAsyncInterval unit millisecond +var UpdateBranchAsyncInterval time.Duration = 1000 +var updateBranchAsyncChan chan branchStatus = make(chan branchStatus, 1000) + +func updateBranchAsync() { + for { // flush branches every second + updates := []TransBranch{} + started := time.Now() + for time.Since(started) < UpdateBranchAsyncInterval*time.Millisecond { + select { + case updateBranch := <-updateBranchAsyncChan: + updates = append(updates, TransBranch{ + ModelBase: common.ModelBase{ID: updateBranch.id}, + Status: updateBranch.status, + FinishTime: updateBranch.finish_time, + }) + case <-time.After(50 * time.Millisecond): + } + } + for len(updates) > 0 { + dbr := dbGet().Clauses(clause.OnConflict{ + DoUpdates: clause.AssignmentColumns([]string{"status", "finish_time"}), + }).Create(updates) + dtmcli.Logf("flushed %d branch status to db. affected: %d", len(updates), dbr.RowsAffected) + if dbr.Error != nil { + dtmcli.LogRedf("async update branch status error: %v", dbr.Error) + time.Sleep(1 * time.Second) + } else { + updates = []TransBranch{} + } + } + } +} diff --git a/dtmsvr/dtmsvr.mysql.sql b/dtmsvr/dtmsvr.mysql.sql index 1fe1015..93b6614 100644 --- a/dtmsvr/dtmsvr.mysql.sql +++ b/dtmsvr/dtmsvr.mysql.sql @@ -5,7 +5,7 @@ CREATE TABLE if not EXISTS dtm.trans_global ( `id` int(11) NOT NULL AUTO_INCREMENT, `gid` varchar(128) NOT NULL COMMENT '事务全局id', `trans_type` varchar(45) not null COMMENT '事务类型: saga | xa | tcc | msg', - `data` TEXT COMMENT '事务携带的数据', + -- `data` TEXT COMMENT '事务携带的数据', -- 影响性能,不必要存储 `status` varchar(45) NOT NULL COMMENT '全局事务的状态 prepared | submitted | finished | rollbacked', `query_prepared` varchar(128) NOT NULL COMMENT 'prepared状态事务的查询api', `protocol` varchar(45) not null comment '通信协议 http | grpc', diff --git a/dtmsvr/trans.go b/dtmsvr/trans.go index 7187603..1948beb 100644 --- a/dtmsvr/trans.go +++ b/dtmsvr/trans.go @@ -25,7 +25,7 @@ type TransGlobal struct { common.ModelBase Gid string `json:"gid"` TransType string `json:"trans_type"` - Data string `json:"data"` + Data string `json:"data" gorm:"-"` Status string `json:"status"` QueryPrepared string `json:"query_prepared"` Protocol string `json:"protocol"` @@ -92,13 +92,17 @@ func (*TransBranch) TableName() string { func (t *TransBranch) changeStatus(db *common.DB, status string) *gorm.DB { writeTransLog(t.Gid, "branch change", status, t.BranchID, "") - dbr := db.Must().Model(t).Where("status=?", t.Status).Updates(M{ - "status": status, - "finish_time": time.Now(), - }) - checkAffected(dbr) + if common.DtmConfig.UpdateBranchSync > 0 { + dbr := db.Must().Model(t).Updates(M{ + "status": status, + "finish_time": time.Now(), + }) + checkAffected(dbr) + } else { // 为了性能优化,把branch的status更新异步化 + updateBranchAsyncChan <- branchStatus{id: t.ID, status: status} + } t.Status = status - return dbr + return db.DB } func checkAffected(db1 *gorm.DB) { diff --git a/dtmsvr/utils.go b/dtmsvr/utils.go index e59c59d..ee07452 100644 --- a/dtmsvr/utils.go +++ b/dtmsvr/utils.go @@ -15,6 +15,12 @@ import ( // M a short name type M = map[string]interface{} +type branchStatus struct { + id uint + status string + finish_time *time.Time +} + var p2e = dtmcli.P2E var e2p = dtmcli.E2P diff --git a/test/dtmsvr_test.go b/test/dtmsvr_test.go index 040ad7e..4eeb154 100644 --- a/test/dtmsvr_test.go +++ b/test/dtmsvr_test.go @@ -26,35 +26,6 @@ type BarrierModel struct { // TableName gorm table name func (BarrierModel) TableName() string { return "dtm_barrier.barrier" } -func resetXaData() { - if config.DB["driver"] != "mysql" { - return - } - db := dbGet() - type XaRow struct { - Data string - } - xas := []XaRow{} - db.Must().Raw("xa recover").Scan(&xas) - for _, xa := range xas { - db.Must().Exec(fmt.Sprintf("xa rollback '%s'", xa.Data)) - } -} - -func TestMain(m *testing.M) { - dtmsvr.TransProcessedTestChan = make(chan string, 1) - dtmsvr.CronForwardDuration = 60 * time.Second - dtmsvr.PopulateDB(false) - examples.PopulateDB(false) - // 启动组件 - go dtmsvr.StartSvr() - examples.GrpcStartup() - app = examples.BaseAppStartup() - - resetXaData() - m.Run() -} - func getTransStatus(gid string) string { sm := TransGlobal{} dbr := dbGet().Model(&sm).Where("gid=?", gid).First(&sm) @@ -144,3 +115,18 @@ func TestSqlDB(t *testing.T) { dbr = db.Model(&BarrierModel{}).Where("gid=?", "gid2").Find(&[]BarrierModel{}) asserts.Equal(dbr.RowsAffected, int64(1)) } + +func TestUpdateBranchAsync(t *testing.T) { + common.DtmConfig.UpdateBranchSync = 0 + dtmsvr.UpdateBranchAsyncInterval = 50 + saga := genSaga("gid-update-branch-async", false, false) + saga.WaitResult = true + err := saga.Submit() + assert.Nil(t, err) + WaitTransProcessed(saga.Gid) + time.Sleep(100 * time.Millisecond) + assert.Equal(t, []string{dtmcli.StatusPrepared, dtmcli.StatusSucceed, dtmcli.StatusPrepared, dtmcli.StatusSucceed}, getBranchesStatus(saga.Gid)) + assert.Equal(t, dtmcli.StatusSucceed, getTransStatus(saga.Gid)) + common.DtmConfig.UpdateBranchSync = 1 + dtmsvr.UpdateBranchAsyncInterval = 1000 +} diff --git a/test/main_test.go b/test/main_test.go new file mode 100644 index 0000000..cf3bf77 --- /dev/null +++ b/test/main_test.go @@ -0,0 +1,41 @@ +package test + +import ( + "fmt" + "testing" + "time" + + "github.com/yedf/dtm/common" + "github.com/yedf/dtm/dtmsvr" + "github.com/yedf/dtm/examples" +) + +func TestMain(m *testing.M) { + dtmsvr.TransProcessedTestChan = make(chan string, 1) + dtmsvr.CronForwardDuration = 60 * time.Second + common.DtmConfig.UpdateBranchSync = 1 + dtmsvr.PopulateDB(false) + examples.PopulateDB(false) + // 启动组件 + go dtmsvr.StartSvr() + examples.GrpcStartup() + app = examples.BaseAppStartup() + + resetXaData() + m.Run() +} + +func resetXaData() { + if config.DB["driver"] != "mysql" { + return + } + db := dbGet() + type XaRow struct { + Data string + } + xas := []XaRow{} + db.Must().Raw("xa recover").Scan(&xas) + for _, xa := range xas { + db.Must().Exec(fmt.Sprintf("xa rollback '%s'", xa.Data)) + } +}