From cd243881ef6bfb548b3c91a362aa63b5f94b2856 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=90=B4=E6=98=8A=E8=BE=BE?= <2082881702@qq.com> Date: Mon, 6 Jun 2022 14:36:13 +0800 Subject: [PATCH 01/12] Fix a syntax error in a code comment --- dtmutil/utils.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dtmutil/utils.go b/dtmutil/utils.go index 06b39a7..ffb533f 100644 --- a/dtmutil/utils.go +++ b/dtmutil/utils.go @@ -64,7 +64,7 @@ func WrapHandler(fn func(*gin.Context) interface{}) gin.HandlerFunc { } } -// WrapHandler2 wrap a function te bo the handler of gin request +// WrapHandler2 wrap a function to be the handler of gin request // used by dtmsvr func WrapHandler2(fn func(*gin.Context) interface{}) gin.HandlerFunc { return func(c *gin.Context) { From 17cc5d468e2332200a2a38444e28551f94966171 Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Wed, 8 Jun 2022 10:25:06 +0800 Subject: [PATCH 02/12] for support multiple dbtype --- dtmcli/barrier.go | 18 ++++++++++-------- dtmcli/dtmimp/db_special.go | 4 ++-- dtmcli/dtmimp/db_special_test.go | 4 ++-- dtmcli/dtmimp/trans_xa_base.go | 12 ++++++------ dtmcli/dtmimp/utils.go | 16 +++++++++++----- dtmutil/utils.go | 2 +- test/busi/busi.go | 9 ++++----- test/common_test.go | 6 +++--- test/xa_test.go | 4 ++-- 9 files changed, 41 insertions(+), 34 deletions(-) diff --git a/dtmcli/barrier.go b/dtmcli/barrier.go index 1fcf61f..2235471 100644 --- a/dtmcli/barrier.go +++ b/dtmcli/barrier.go @@ -20,11 +20,13 @@ type BarrierBusiFunc func(tx *sql.Tx) error // BranchBarrier every branch info type BranchBarrier struct { - TransType string - Gid string - BranchID string - Op string - BarrierID int + TransType string + Gid string + BranchID string + Op string + BarrierID int + DBType string // DBTypeMysql | DBTypePostgres + BarrierTableName string } func (bb *BranchBarrier) String() string { @@ -70,8 +72,8 @@ func (bb *BranchBarrier) Call(tx *sql.Tx, busiCall BarrierBusiFunc) (rerr error) dtmimp.OpCompensate: dtmimp.OpAction, }[bb.Op] - originAffected, oerr := dtmimp.InsertBarrier(tx, bb.TransType, bb.Gid, bb.BranchID, originOp, bid, bb.Op) - currentAffected, rerr := dtmimp.InsertBarrier(tx, bb.TransType, bb.Gid, bb.BranchID, bb.Op, bid, bb.Op) + originAffected, oerr := dtmimp.InsertBarrier(tx, bb.TransType, bb.Gid, bb.BranchID, originOp, bid, bb.Op, bb.DBType, bb.BarrierTableName) + currentAffected, rerr := dtmimp.InsertBarrier(tx, bb.TransType, bb.Gid, bb.BranchID, bb.Op, bid, bb.Op, bb.DBType, bb.BarrierTableName) logger.Debugf("originAffected: %d currentAffected: %d", originAffected, currentAffected) if rerr == nil && bb.Op == dtmimp.MsgDoOp && currentAffected == 0 { // for msg's DoAndSubmit, repeated insert should be rejected. @@ -103,7 +105,7 @@ func (bb *BranchBarrier) CallWithDB(db *sql.DB, busiCall BarrierBusiFunc) error // QueryPrepared queries prepared data func (bb *BranchBarrier) QueryPrepared(db *sql.DB) error { - _, err := dtmimp.InsertBarrier(db, bb.TransType, bb.Gid, dtmimp.MsgDoBranch0, dtmimp.MsgDoOp, dtmimp.MsgDoBarrier1, dtmimp.OpRollback) + _, err := dtmimp.InsertBarrier(db, bb.TransType, bb.Gid, dtmimp.MsgDoBranch0, dtmimp.MsgDoOp, dtmimp.MsgDoBarrier1, dtmimp.OpRollback, bb.DBType, bb.BarrierTableName) var reason string if err == nil { sql := fmt.Sprintf("select reason from %s where gid=? and branch_id=? and op=? and barrier_id=?", dtmimp.BarrierTableName) diff --git a/dtmcli/dtmimp/db_special.go b/dtmcli/dtmimp/db_special.go index b4db7fc..fed04f6 100644 --- a/dtmcli/dtmimp/db_special.go +++ b/dtmcli/dtmimp/db_special.go @@ -75,8 +75,8 @@ func init() { } // GetDBSpecial get DBSpecial for currentDBType -func GetDBSpecial() DBSpecial { - return dbSpecials[currentDBType] +func GetDBSpecial(dbType string) DBSpecial { + return dbSpecials[dbType] } // SetCurrentDBType set currentDBType diff --git a/dtmcli/dtmimp/db_special_test.go b/dtmcli/dtmimp/db_special_test.go index 3966cd2..7003109 100644 --- a/dtmcli/dtmimp/db_special_test.go +++ b/dtmcli/dtmimp/db_special_test.go @@ -18,13 +18,13 @@ func TestDBSpecial(t *testing.T) { SetCurrentDBType("no-driver") })) SetCurrentDBType(DBTypeMysql) - sp := GetDBSpecial() + sp := GetDBSpecial(DBTypeMysql) assert.Equal(t, "? ?", sp.GetPlaceHoldSQL("? ?")) assert.Equal(t, "xa start 'xa1'", sp.GetXaSQL("start", "xa1")) assert.Equal(t, "insert ignore into a(f) values(?)", sp.GetInsertIgnoreTemplate("a(f) values(?)", "c")) SetCurrentDBType(DBTypePostgres) - sp = GetDBSpecial() + sp = GetDBSpecial(DBTypeMysql) assert.Equal(t, "$1 $2", sp.GetPlaceHoldSQL("? ?")) assert.Equal(t, "begin", sp.GetXaSQL("start", "xa1")) assert.Equal(t, "insert into a(f) values(?) on conflict ON CONSTRAINT c do nothing", sp.GetInsertIgnoreTemplate("a(f) values(?)", "c")) diff --git a/dtmcli/dtmimp/trans_xa_base.go b/dtmcli/dtmimp/trans_xa_base.go index 50bb9c8..bd15c87 100644 --- a/dtmcli/dtmimp/trans_xa_base.go +++ b/dtmcli/dtmimp/trans_xa_base.go @@ -18,14 +18,14 @@ func XaHandlePhase2(gid string, dbConf DBConf, branchID string, op string) error return err } xaID := gid + "-" + branchID - _, err = DBExec(db, GetDBSpecial().GetXaSQL(op, xaID)) + _, err = DBExec(dbConf.Driver, db, GetDBSpecial(dbConf.Driver).GetXaSQL(op, xaID)) if err != nil && (strings.Contains(err.Error(), "XAER_NOTA") || strings.Contains(err.Error(), "does not exist")) { // Repeat commit/rollback with the same id, report this error, ignore err = nil } if op == OpRollback && err == nil { // rollback insert a row after prepare. no-error means prepare has finished. - _, err = InsertBarrier(db, "xa", gid, branchID, OpAction, XaBarrier1, op) + _, err = InsertBarrier(db, "xa", gid, branchID, OpAction, XaBarrier1, op, dbConf.Driver, "TODO") } return err } @@ -39,20 +39,20 @@ func XaHandleLocalTrans(xa *TransBase, dbConf DBConf, cb func(*sql.DB) error) (r } defer func() { _ = db.Close() }() defer DeferDo(&rerr, func() error { - _, err := DBExec(db, GetDBSpecial().GetXaSQL("prepare", xaBranch)) + _, err := DBExec(dbConf.Driver, db, GetDBSpecial(dbConf.Driver).GetXaSQL("prepare", xaBranch)) return err }, func() error { return nil }) - _, rerr = DBExec(db, GetDBSpecial().GetXaSQL("start", xaBranch)) + _, rerr = DBExec(dbConf.Driver, db, GetDBSpecial(dbConf.Driver).GetXaSQL("start", xaBranch)) if rerr != nil { return } defer func() { - _, _ = DBExec(db, GetDBSpecial().GetXaSQL("end", xaBranch)) + _, _ = DBExec(dbConf.Driver, db, GetDBSpecial(dbConf.Driver).GetXaSQL("end", xaBranch)) }() // prepare and rollback both insert a row - _, rerr = InsertBarrier(db, xa.TransType, xa.Gid, xa.BranchID, OpAction, XaBarrier1, OpAction) + _, rerr = InsertBarrier(db, xa.TransType, xa.Gid, xa.BranchID, OpAction, XaBarrier1, OpAction, dbConf.Driver, "TODO") if rerr == nil { rerr = cb(db) } diff --git a/dtmcli/dtmimp/utils.go b/dtmcli/dtmimp/utils.go index 7220b30..1e5607a 100644 --- a/dtmcli/dtmimp/utils.go +++ b/dtmcli/dtmimp/utils.go @@ -187,12 +187,12 @@ func XaDB(conf DBConf) (*sql.DB, error) { } // DBExec use raw db to exec -func DBExec(db DB, sql string, values ...interface{}) (affected int64, rerr error) { +func DBExec(dbType string, db DB, sql string, values ...interface{}) (affected int64, rerr error) { if sql == "" { return 0, nil } began := time.Now() - sql = GetDBSpecial().GetPlaceHoldSQL(sql) + sql = GetDBSpecial(dbType).GetPlaceHoldSQL(sql) r, rerr := db.Exec(sql, values...) used := time.Since(began) / time.Millisecond if rerr == nil { @@ -262,10 +262,16 @@ func EscapeGet(qs url.Values, key string) string { } // InsertBarrier insert a record to barrier -func InsertBarrier(tx DB, transType string, gid string, branchID string, op string, barrierID string, reason string) (int64, error) { +func InsertBarrier(tx DB, transType string, gid string, branchID string, op string, barrierID string, reason string, dbType string, barrierTableName string) (int64, error) { if op == "" { return 0, nil } - sql := GetDBSpecial().GetInsertIgnoreTemplate(BarrierTableName+"(trans_type, gid, branch_id, op, barrier_id, reason) values(?,?,?,?,?,?)", "uniq_barrier") - return DBExec(tx, sql, transType, gid, branchID, op, barrierID, reason) + if dbType == "" { + dbType = currentDBType + } + if barrierTableName == "" { + barrierTableName = BarrierTableName + } + sql := GetDBSpecial(dbType).GetInsertIgnoreTemplate(barrierTableName+"(trans_type, gid, branch_id, op, barrier_id, reason) values(?,?,?,?,?,?)", "uniq_barrier") + return DBExec(dbType, tx, sql, transType, gid, branchID, op, barrierID, reason) } diff --git a/dtmutil/utils.go b/dtmutil/utils.go index 06b39a7..599419c 100644 --- a/dtmutil/utils.go +++ b/dtmutil/utils.go @@ -168,7 +168,7 @@ func RunSQLScript(conf dtmcli.DBConf, script string, skipDrop bool) { if s == "" || (skipDrop && strings.Contains(s, "drop")) { continue } - _, err = dtmimp.DBExec(con, s) + _, err = dtmimp.DBExec(conf.Driver, con, s) logger.FatalIfError(err) logger.Infof("sql scripts finished: %s", s) } diff --git a/test/busi/busi.go b/test/busi/busi.go index e31360c..7d24985 100644 --- a/test/busi/busi.go +++ b/test/busi/busi.go @@ -66,7 +66,7 @@ func sagaGrpcAdjustBalance(db dtmcli.DB, uid int, amount int64, result string) e if result == dtmcli.ResultFailure { return status.New(codes.Aborted, dtmcli.ResultFailure).Err() } - _, err := dtmimp.DBExec(db, "update dtm_busi.user_account set balance = balance + ? where user_id = ?", amount, uid) + _, err := dtmimp.DBExec(BusiConf.Driver, db, "update dtm_busi.user_account set balance = balance + ? where user_id = ?", amount, uid) return err } @@ -75,7 +75,7 @@ func SagaAdjustBalance(db dtmcli.DB, uid int, amount int, result string) error { if strings.Contains(result, dtmcli.ResultFailure) { return dtmcli.ErrFailure } - _, err := dtmimp.DBExec(db, "update dtm_busi.user_account set balance = balance + ? where user_id = ?", amount, uid) + _, err := dtmimp.DBExec(BusiConf.Driver, db, "update dtm_busi.user_account set balance = balance + ? where user_id = ?", amount, uid) return err } @@ -102,11 +102,10 @@ func SagaMongoAdjustBalance(ctx context.Context, mc *mongo.Client, uid int, amou return fmt.Errorf("balance not enough %w", dtmcli.ErrFailure) } return nil - } func tccAdjustTrading(db dtmcli.DB, uid int, amount int) error { - affected, err := dtmimp.DBExec(db, `update dtm_busi.user_account + affected, err := dtmimp.DBExec(BusiConf.Driver, db, `update dtm_busi.user_account set trading_balance=trading_balance+? where user_id=? and trading_balance + ? + balance >= 0`, amount, uid, amount) if err == nil && affected == 0 { @@ -116,7 +115,7 @@ func tccAdjustTrading(db dtmcli.DB, uid int, amount int) error { } func tccAdjustBalance(db dtmcli.DB, uid int, amount int) error { - affected, err := dtmimp.DBExec(db, `update dtm_busi.user_account + affected, err := dtmimp.DBExec(BusiConf.Driver, db, `update dtm_busi.user_account set trading_balance=trading_balance-?, balance=balance+? where user_id=?`, amount, amount, uid) if err == nil && affected == 0 { diff --git a/test/common_test.go b/test/common_test.go index 2a9b917..a28a144 100644 --- a/test/common_test.go +++ b/test/common_test.go @@ -33,12 +33,12 @@ func testSql(t *testing.T) { func testDbAlone(t *testing.T) { db, err := dtmimp.StandaloneDB(conf.Store.GetDBConf()) assert.Nil(t, err) - _, err = dtmimp.DBExec(db, "select 1") + _, err = dtmimp.DBExec(conf.Store.Driver, db, "select 1") assert.Equal(t, nil, err) - _, err = dtmimp.DBExec(db, "") + _, err = dtmimp.DBExec(conf.Store.Driver, db, "") assert.Equal(t, nil, err) db.Close() - _, err = dtmimp.DBExec(db, "select 1") + _, err = dtmimp.DBExec(conf.Store.Driver, db, "select 1") assert.NotEqual(t, nil, err) } diff --git a/test/xa_test.go b/test/xa_test.go index 4980a0a..ff17286 100644 --- a/test/xa_test.go +++ b/test/xa_test.go @@ -43,10 +43,10 @@ func TestXaDuplicate(t *testing.T) { sdb, err := dtmimp.StandaloneDB(busi.BusiConf) assert.Nil(t, err) if dtmcli.GetCurrentDBType() == dtmcli.DBTypeMysql { - _, err = dtmimp.DBExec(sdb, "xa recover") + _, err = dtmimp.DBExec(conf.Store.Driver, sdb, "xa recover") assert.Nil(t, err) } - _, err = dtmimp.DBExec(sdb, dtmimp.GetDBSpecial().GetXaSQL("commit", gid+"-01")) // simulate repeated request + _, err = dtmimp.DBExec(conf.Store.Driver, sdb, dtmimp.GetDBSpecial(conf.Store.Driver).GetXaSQL("commit", gid+"-01")) // simulate repeated request assert.Nil(t, err) return xa.CallBranch(req, busi.Busi+"/TransInXa") }) From 573c880857fc10319b3ae5b1bf1380c33b358290 Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Wed, 8 Jun 2022 10:49:21 +0800 Subject: [PATCH 03/12] dtmcli barrier ok --- dtmcli/dtmimp/db_special_test.go | 2 +- dtmcli/dtmimp/trans_xa_base.go | 4 ++-- helper/bench/svr/http.go | 14 +++++++------- test/busi/base_http.go | 3 ++- test/xa_test.go | 4 ++-- 5 files changed, 14 insertions(+), 13 deletions(-) diff --git a/dtmcli/dtmimp/db_special_test.go b/dtmcli/dtmimp/db_special_test.go index 7003109..74632a1 100644 --- a/dtmcli/dtmimp/db_special_test.go +++ b/dtmcli/dtmimp/db_special_test.go @@ -24,7 +24,7 @@ func TestDBSpecial(t *testing.T) { assert.Equal(t, "xa start 'xa1'", sp.GetXaSQL("start", "xa1")) assert.Equal(t, "insert ignore into a(f) values(?)", sp.GetInsertIgnoreTemplate("a(f) values(?)", "c")) SetCurrentDBType(DBTypePostgres) - sp = GetDBSpecial(DBTypeMysql) + sp = GetDBSpecial(DBTypePostgres) assert.Equal(t, "$1 $2", sp.GetPlaceHoldSQL("? ?")) assert.Equal(t, "begin", sp.GetXaSQL("start", "xa1")) assert.Equal(t, "insert into a(f) values(?) on conflict ON CONSTRAINT c do nothing", sp.GetInsertIgnoreTemplate("a(f) values(?)", "c")) diff --git a/dtmcli/dtmimp/trans_xa_base.go b/dtmcli/dtmimp/trans_xa_base.go index bd15c87..6eedb31 100644 --- a/dtmcli/dtmimp/trans_xa_base.go +++ b/dtmcli/dtmimp/trans_xa_base.go @@ -25,7 +25,7 @@ func XaHandlePhase2(gid string, dbConf DBConf, branchID string, op string) error } if op == OpRollback && err == nil { // rollback insert a row after prepare. no-error means prepare has finished. - _, err = InsertBarrier(db, "xa", gid, branchID, OpAction, XaBarrier1, op, dbConf.Driver, "TODO") + _, err = InsertBarrier(db, "xa", gid, branchID, OpAction, XaBarrier1, op, dbConf.Driver, "") } return err } @@ -52,7 +52,7 @@ func XaHandleLocalTrans(xa *TransBase, dbConf DBConf, cb func(*sql.DB) error) (r _, _ = DBExec(dbConf.Driver, db, GetDBSpecial(dbConf.Driver).GetXaSQL("end", xaBranch)) }() // prepare and rollback both insert a row - _, rerr = InsertBarrier(db, xa.TransType, xa.Gid, xa.BranchID, OpAction, XaBarrier1, OpAction, dbConf.Driver, "TODO") + _, rerr = InsertBarrier(db, xa.TransType, xa.Gid, xa.BranchID, OpAction, XaBarrier1, OpAction, dbConf.Driver, "") if rerr == nil { rerr = cb(db) } diff --git a/helper/bench/svr/http.go b/helper/bench/svr/http.go index 6cac380..5a8b883 100644 --- a/helper/bench/svr/http.go +++ b/helper/bench/svr/http.go @@ -53,7 +53,7 @@ func reloadData() { db := pdbGet() tables := []string{"dtm_busi.user_account", "dtm_busi.user_account_log", "dtm.trans_global", "dtm.trans_branch_op", "dtm_barrier.barrier"} for _, t := range tables { - _, err := dtmimp.DBExec(db, fmt.Sprintf("truncate %s", t)) + _, err := dtmimp.DBExec(busi.BusiConf.Driver, db, fmt.Sprintf("truncate %s", t)) logger.FatalIfError(err) } s := "insert ignore into dtm_busi.user_account(user_id, balance) values " @@ -61,7 +61,7 @@ func reloadData() { for i := 1; i <= total; i++ { ss = append(ss, fmt.Sprintf("(%d, 1000000)", i)) } - _, err := dtmimp.DBExec(db, s+strings.Join(ss, ",")) + _, err := dtmimp.DBExec(busi.BusiConf.Driver, db, s+strings.Join(ss, ",")) logger.FatalIfError(err) logger.Debugf("%d users inserted. used: %dms", total, time.Since(began).Milliseconds()) } @@ -73,11 +73,11 @@ var sqls = 1 // PrepareBenchDB prepares db data for bench func PrepareBenchDB() { db := pdbGet() - _, err := dtmimp.DBExec(db, "CREATE DATABASE if not exists dtm_busi") + _, err := dtmimp.DBExec(busi.BusiConf.Driver, db, "CREATE DATABASE if not exists dtm_busi") logger.FatalIfError(err) - _, err = dtmimp.DBExec(db, "drop table if exists dtm_busi.user_account_log") + _, err = dtmimp.DBExec(busi.BusiConf.Driver, db, "drop table if exists dtm_busi.user_account_log") logger.FatalIfError(err) - _, err = dtmimp.DBExec(db, `create table if not exists dtm_busi.user_account_log ( + _, err = dtmimp.DBExec(busi.BusiConf.Driver, db, `create table if not exists dtm_busi.user_account_log ( id INT(11) AUTO_INCREMENT PRIMARY KEY, user_id INT(11) NOT NULL, delta DECIMAL(11, 2) not null, @@ -111,10 +111,10 @@ func qsAdjustBalance(uid int, amount int, c *gin.Context) error { // nolint: unp tb := dtmimp.TransBaseFromQuery(c.Request.URL.Query()) f := func(tx *sql.Tx) error { for i := 0; i < sqls; i++ { - _, err := dtmimp.DBExec(tx, "insert into dtm_busi.user_account_log(user_id, delta, gid, branch_id, op, reason) values(?,?,?,?,?,?)", + _, err := dtmimp.DBExec(busi.BusiConf.Driver, tx, "insert into dtm_busi.user_account_log(user_id, delta, gid, branch_id, op, reason) values(?,?,?,?,?,?)", uid, amount, tb.Gid, c.Query("branch_id"), tb.TransType, fmt.Sprintf("inserted by dtm transaction %s %s", tb.Gid, c.Query("branch_id"))) logger.FatalIfError(err) - _, err = dtmimp.DBExec(tx, "update dtm_busi.user_account set balance = balance + ?, update_time = now() where user_id = ?", amount, uid) + _, err = dtmimp.DBExec(busi.BusiConf.Driver, tx, "update dtm_busi.user_account set balance = balance + ?, update_time = now() where user_id = ?", amount, uid) logger.FatalIfError(err) } return nil diff --git a/test/busi/base_http.go b/test/busi/base_http.go index e2609bf..11bc525 100644 --- a/test/busi/base_http.go +++ b/test/busi/base_http.go @@ -69,7 +69,8 @@ func BaseAppStartup() *gin.Engine { } logger.Debugf("Starting busi at: %d", BusiPort) go func() { - _ = app.Run(fmt.Sprintf(":%d", BusiPort)) + err := app.Run(fmt.Sprintf(":%d", BusiPort)) + dtmimp.FatalIfError(err) }() return app } diff --git a/test/xa_test.go b/test/xa_test.go index ff17286..82d10c7 100644 --- a/test/xa_test.go +++ b/test/xa_test.go @@ -43,10 +43,10 @@ func TestXaDuplicate(t *testing.T) { sdb, err := dtmimp.StandaloneDB(busi.BusiConf) assert.Nil(t, err) if dtmcli.GetCurrentDBType() == dtmcli.DBTypeMysql { - _, err = dtmimp.DBExec(conf.Store.Driver, sdb, "xa recover") + _, err = dtmimp.DBExec(busi.BusiConf.Driver, sdb, "xa recover") assert.Nil(t, err) } - _, err = dtmimp.DBExec(conf.Store.Driver, sdb, dtmimp.GetDBSpecial(conf.Store.Driver).GetXaSQL("commit", gid+"-01")) // simulate repeated request + _, err = dtmimp.DBExec(busi.BusiConf.Driver, sdb, dtmimp.GetDBSpecial(busi.BusiConf.Driver).GetXaSQL("commit", gid+"-01")) // simulate repeated request assert.Nil(t, err) return xa.CallBranch(req, busi.Busi+"/TransInXa") }) From fc7a00d2c8448f32bbcd7b301dee033ee8beb994 Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Wed, 8 Jun 2022 11:19:09 +0800 Subject: [PATCH 04/12] dtmsvr use config db --- conf.sample.yml | 3 +-- dtmsvr/config/config.go | 4 ++-- dtmsvr/storage/trans.go | 5 ++--- test/main_test.go | 2 +- 4 files changed, 6 insertions(+), 8 deletions(-) diff --git a/conf.sample.yml b/conf.sample.yml index 1427da2..a256a2a 100644 --- a/conf.sample.yml +++ b/conf.sample.yml @@ -11,6 +11,7 @@ # User: 'root' # Password: '' # Port: 3306 +# Db: 'dtm' # Driver: 'boltdb' # default store engine @@ -30,8 +31,6 @@ # MaxOpenConns: 500 # MaxIdleConns: 500 # ConnMaxLifeTime: 5 # default value is 5 (minutes) -# TransGlobalTable: 'dtm.trans_global' -# TransBranchOpTable: 'dtm.trans_branch_op' ### flollowing config is only for some Driver # DataExpire: 604800 # Trans data will expire in 7 days. only for redis/boltdb. diff --git a/dtmsvr/config/config.go b/dtmsvr/config/config.go index c95f720..0ae4b41 100644 --- a/dtmsvr/config/config.go +++ b/dtmsvr/config/config.go @@ -53,14 +53,13 @@ type Store struct { Port int64 `yaml:"Port"` User string `yaml:"User"` Password string `yaml:"Password"` + Db string `yaml:"Db" default:"dtm"` MaxOpenConns int64 `yaml:"MaxOpenConns" default:"500"` MaxIdleConns int64 `yaml:"MaxIdleConns" default:"500"` ConnMaxLifeTime int64 `yaml:"ConnMaxLifeTime" default:"5"` DataExpire int64 `yaml:"DataExpire" default:"604800"` // Trans data will expire in 7 days. only for redis/boltdb. FinishedDataExpire int64 `yaml:"FinishedDataExpire" default:"86400"` // finished Trans data will expire in 1 days. only for redis. RedisPrefix string `yaml:"RedisPrefix" default:"{a}"` // Redis storage prefix. store data to only one slot in cluster - TransGlobalTable string `yaml:"TransGlobalTable" default:"dtm.trans_global"` - TransBranchOpTable string `yaml:"TransBranchOpTable" default:"dtm.trans_branch_op"` } // IsDB checks config driver is mysql or postgres @@ -76,6 +75,7 @@ func (s *Store) GetDBConf() dtmcli.DBConf { Port: s.Port, User: s.User, Password: s.Password, + Db: s.Db, } } diff --git a/dtmsvr/storage/trans.go b/dtmsvr/storage/trans.go index 4f37122..04b13c4 100644 --- a/dtmsvr/storage/trans.go +++ b/dtmsvr/storage/trans.go @@ -11,7 +11,6 @@ import ( "github.com/dtm-labs/dtm/dtmcli" "github.com/dtm-labs/dtm/dtmcli/dtmimp" - "github.com/dtm-labs/dtm/dtmsvr/config" "github.com/dtm-labs/dtm/dtmutil" ) @@ -45,7 +44,7 @@ type TransGlobalStore struct { // TableName TableName func (g *TransGlobalStore) TableName() string { - return config.Config.Store.TransGlobalTable + return "trans_global" } func (g *TransGlobalStore) String() string { @@ -67,7 +66,7 @@ type TransBranchStore struct { // TableName TableName func (b *TransBranchStore) TableName() string { - return config.Config.Store.TransBranchOpTable + return "trans_branch_op" } func (b *TransBranchStore) String() string { diff --git a/test/main_test.go b/test/main_test.go index 4f17d73..ec431bc 100644 --- a/test/main_test.go +++ b/test/main_test.go @@ -30,7 +30,6 @@ func exitIf(code int) { func TestMain(m *testing.M) { config.MustLoadConfig("") logger.InitLog("debug") - dtmcli.SetCurrentDBType(busi.BusiConf.Driver) dtmsvr.TransProcessedTestChan = make(chan string, 1) dtmsvr.NowForwardDuration = 0 * time.Second dtmsvr.CronForwardDuration = 180 * time.Second @@ -59,6 +58,7 @@ func TestMain(m *testing.M) { registry.WaitStoreUp() dtmsvr.PopulateDB(false) + conf.Store.Db = "dtm" // after populateDB, set current db to dtm go dtmsvr.StartSvr() busi.PopulateDB(false) From 0aafcaa138ddac16c2beaaf1b74b986154a8b891 Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Wed, 8 Jun 2022 23:20:49 +0800 Subject: [PATCH 05/12] add postgres test --- dtmcli/barrier.go | 2 + dtmcli/dtmimp/db_special.go | 3 + dtmsvr/api_http.go | 3 +- dtmsvr/storage/boltdb/boltdb.go | 8 +-- dtmsvr/storage/redis/redis.go | 8 +-- dtmsvr/storage/registry/registry.go | 2 +- dtmsvr/storage/sql/sql.go | 87 +++++++++++++---------------- dtmsvr/storage/store.go | 2 +- dtmutil/db.go | 2 +- helper/compose.cloud.yml | 28 ---------- helper/compose.mysql.yml | 16 ------ helper/compose.postgres.yml | 13 ----- helper/compose.store.yml | 1 + helper/test-cover.sh | 2 +- sqls/dtmsvr.storage.postgres.sql | 27 ++++----- test/base_test.go | 4 +- test/main_test.go | 11 ++++ test/store_test.go | 44 +++++++-------- test/xa_cover_test.go | 3 + 19 files changed, 109 insertions(+), 157 deletions(-) delete mode 100644 helper/compose.cloud.yml delete mode 100644 helper/compose.mysql.yml delete mode 100644 helper/compose.postgres.yml diff --git a/dtmcli/barrier.go b/dtmcli/barrier.go index 2235471..0b136cb 100644 --- a/dtmcli/barrier.go +++ b/dtmcli/barrier.go @@ -109,6 +109,8 @@ func (bb *BranchBarrier) QueryPrepared(db *sql.DB) error { var reason string if err == nil { sql := fmt.Sprintf("select reason from %s where gid=? and branch_id=? and op=? and barrier_id=?", dtmimp.BarrierTableName) + sql = dtmimp.GetDBSpecial(bb.DBType).GetPlaceHoldSQL(sql) + logger.Debugf("queryrow: %s", sql, bb.Gid, dtmimp.MsgDoBranch0, dtmimp.MsgDoOp, dtmimp.MsgDoBarrier1) err = db.QueryRow(sql, bb.Gid, dtmimp.MsgDoBranch0, dtmimp.MsgDoOp, dtmimp.MsgDoBarrier1).Scan(&reason) } if reason == dtmimp.OpRollback { diff --git a/dtmcli/dtmimp/db_special.go b/dtmcli/dtmimp/db_special.go index fed04f6..2f20f17 100644 --- a/dtmcli/dtmimp/db_special.go +++ b/dtmcli/dtmimp/db_special.go @@ -76,6 +76,9 @@ func init() { // GetDBSpecial get DBSpecial for currentDBType func GetDBSpecial(dbType string) DBSpecial { + if dbType == "" { + dbType = currentDBType + } return dbSpecials[dbType] } diff --git a/dtmsvr/api_http.go b/dtmsvr/api_http.go index 9cb6304..19aa7c6 100644 --- a/dtmsvr/api_http.go +++ b/dtmsvr/api_http.go @@ -92,8 +92,7 @@ func all(c *gin.Context) interface{} { return map[string]interface{}{"transactions": globals, "next_position": position} } -// resetCronTime rest nextCronTime -// Prevent multiple backoff from causing NextCronTime to be too long +// unfinished transactions need to be retried as soon as possible after business downtime is recovered func resetCronTime(c *gin.Context) interface{} { sTimeoutSecond := dtmimp.OrString(c.Query("timeout"), strconv.FormatInt(3*conf.TimeoutToFail, 10)) sLimit := dtmimp.OrString(c.Query("limit"), "100") diff --git a/dtmsvr/storage/boltdb/boltdb.go b/dtmsvr/storage/boltdb/boltdb.go index cee6c29..bccc401 100644 --- a/dtmsvr/storage/boltdb/boltdb.go +++ b/dtmsvr/storage/boltdb/boltdb.go @@ -413,12 +413,12 @@ func (s *Store) LockOneGlobalTrans(expireIn time.Duration) *storage.TransGlobalS return trans } -// ResetCronTime rest nextCronTime -// Prevent multiple backoff from causing NextCronTime to be too long -func (s *Store) ResetCronTime(timeout time.Duration, limit int64) (succeedCount int64, hasRemaining bool, err error) { +// ResetCronTime reset nextCronTime +// unfinished transactions need to be retried as soon as possible after business downtime is recovered +func (s *Store) ResetCronTime(after time.Duration, limit int64) (succeedCount int64, hasRemaining bool, err error) { next := time.Now() var trans *storage.TransGlobalStore - min := fmt.Sprintf("%d", time.Now().Add(timeout).Unix()) + min := fmt.Sprintf("%d", time.Now().Add(after).Unix()) err = s.boltDb.Update(func(t *bolt.Tx) error { cursor := t.Bucket(bucketIndex).Cursor() succeedCount = 0 diff --git a/dtmsvr/storage/redis/redis.go b/dtmsvr/storage/redis/redis.go index b529c25..d24d658 100644 --- a/dtmsvr/storage/redis/redis.go +++ b/dtmsvr/storage/redis/redis.go @@ -270,11 +270,11 @@ return gid } } -// ResetCronTime rest nextCronTime -// Prevent multiple backoff from causing NextCronTime to be too long -func (s *Store) ResetCronTime(timeout time.Duration, limit int64) (succeedCount int64, hasRemaining bool, err error) { +// ResetCronTime reset nextCronTime +// unfinished transactions need to be retried as soon as possible after business downtime is recovered +func (s *Store) ResetCronTime(after time.Duration, limit int64) (succeedCount int64, hasRemaining bool, err error) { next := time.Now().Unix() - timeoutTimestamp := time.Now().Add(timeout).Unix() + timeoutTimestamp := time.Now().Add(after).Unix() args := newArgList().AppendGid("").AppendRaw(timeoutTimestamp).AppendRaw(next).AppendRaw(limit) lua := `-- ResetCronTime local r = redis.call('ZRANGEBYSCORE', KEYS[3], ARGV[3], '+inf', 'LIMIT', 0, ARGV[5]+1) diff --git a/dtmsvr/storage/registry/registry.go b/dtmsvr/storage/registry/registry.go index 35df62a..f09692f 100644 --- a/dtmsvr/storage/registry/registry.go +++ b/dtmsvr/storage/registry/registry.go @@ -49,7 +49,7 @@ func GetStore() storage.Store { // WaitStoreUp wait for db to go up func WaitStoreUp() { for err := GetStore().Ping(); err != nil; err = GetStore().Ping() { - logger.Infof("wait store up") + logger.Infof("wait store up: %v", err) time.Sleep(3 * time.Second) } } diff --git a/dtmsvr/storage/sql/sql.go b/dtmsvr/storage/sql/sql.go index c15c7fc..1a8e657 100644 --- a/dtmsvr/storage/sql/sql.go +++ b/dtmsvr/storage/sql/sql.go @@ -137,61 +137,42 @@ func (s *Store) TouchCronTime(global *storage.TransGlobalStore, nextCronInterval // LockOneGlobalTrans finds GlobalTrans func (s *Store) LockOneGlobalTrans(expireIn time.Duration) *storage.TransGlobalStore { db := dbGet() - getTime := func(second int) string { - return map[string]string{ - "mysql": fmt.Sprintf("date_add(now(), interval %d second)", second), - "postgres": fmt.Sprintf("current_timestamp + interval '%d second'", second), - }[conf.Store.Driver] - } - expire := int(expireIn / time.Second) - whereTime := fmt.Sprintf("next_cron_time < %s", getTime(expire)) owner := shortuuid.New() - global := &storage.TransGlobalStore{} - dbr := db.Must().Model(global). - Where(whereTime + "and status in ('prepared', 'aborting', 'submitted')"). - Limit(1). - Select([]string{"owner", "next_cron_time"}). - Updates(&storage.TransGlobalStore{ - Owner: owner, - NextCronTime: dtmutil.GetNextTime(conf.RetryInterval), - }) - if dbr.RowsAffected == 0 { + where := map[string]string{ + dtmimp.DBTypeMysql: fmt.Sprintf(`next_cron_time < date_add(now(), interval %d second) and status in ('prepared', 'aborting', 'submitted') limit 1`, int(expireIn/time.Second)), + dtmimp.DBTypePostgres: fmt.Sprintf(`id in (select id from trans_global where next_cron_time < current_timestamp + interval '%d second' and status in ('prepared', 'aborting', 'submitted') limit 1 )`, int(expireIn/time.Second)), + }[conf.Store.Driver] + + sql := fmt.Sprintf(`UPDATE trans_global SET update_time='%s',next_cron_time='%s', owner='%s' WHERE %s`, + getTimeStr(0), + getTimeStr(conf.RetryInterval), + owner, + where) + affected, err := dtmimp.DBExec(conf.Store.Driver, db.ToSQLDB(), sql) + + dtmimp.PanicIf(err != nil, err) + if affected == 0 { return nil } + global := &storage.TransGlobalStore{} db.Must().Where("owner=?", owner).First(global) return global } -// ResetCronTime rest nextCronTime -// Prevent multiple backoff from causing NextCronTime to be too long -func (s *Store) ResetCronTime(timeout time.Duration, limit int64) (succeedCount int64, hasRemaining bool, err error) { - db := dbGet() - getTime := func(second int) string { - return map[string]string{ - "mysql": fmt.Sprintf("date_add(now(), interval %d second)", second), - "postgres": fmt.Sprintf("current_timestamp + interval '%d second'", second), - }[conf.Store.Driver] - } - timeoutSecond := int(timeout / time.Second) - whereTime := fmt.Sprintf("next_cron_time > %s", getTime(timeoutSecond)) - global := &storage.TransGlobalStore{} - dbr := db.Must().Model(global). - Where(whereTime + "and status in ('prepared', 'aborting', 'submitted')"). - Limit(int(limit)). - Select([]string{"next_cron_time"}). - Updates(&storage.TransGlobalStore{ - NextCronTime: dtmutil.GetNextTime(0), - }) - succeedCount = dbr.RowsAffected - if succeedCount == limit { - var count int64 - db.Must().Model(global).Where(whereTime + "and status in ('prepared', 'aborting', 'submitted')").Limit(1).Count(&count) - if count > 0 { - hasRemaining = true - } - } +// ResetCronTime reset nextCronTime +// unfinished transactions need to be retried as soon as possible after business downtime is recovered +func (s *Store) ResetCronTime(after time.Duration, limit int64) (succeedCount int64, hasRemaining bool, err error) { + where := map[string]string{ + dtmimp.DBTypeMysql: fmt.Sprintf(`next_cron_time > date_add(now(), interval %d second) and status in ('prepared', 'aborting', 'submitted') limit %d`, int(after/time.Second), limit), + dtmimp.DBTypePostgres: fmt.Sprintf(`id in (select id from trans_global where next_cron_time > current_timestamp + interval '%d second' and status in ('prepared', 'aborting', 'submitted') limit %d )`, int(after/time.Second), limit), + }[conf.Store.Driver] - return succeedCount, hasRemaining, dbr.Error + sql := fmt.Sprintf(`UPDATE trans_global SET update_time='%s',next_cron_time='%s' WHERE %s`, + getTimeStr(0), + getTimeStr(0), + where) + affected, err := dtmimp.DBExec(conf.Store.Driver, dbGet().ToSQLDB(), sql) + return affected, affected == limit, err } // SetDBConn sets db conn pool @@ -213,3 +194,15 @@ func wrapError(err error) error { dtmimp.E2P(err) return err } + +func getTime(after time.Duration) string { + second := int64(after / time.Second) + return map[string]string{ + "mysql": fmt.Sprintf("date_add(now(), interval %d second)", second), + "postgres": fmt.Sprintf("current_timestamp + interval '%d second'", second), + }[conf.Store.Driver] +} + +func getTimeStr(afterSecond int64) string { + return dtmutil.GetNextTime(afterSecond).Format("2006-01-02 15:04:05") +} diff --git a/dtmsvr/storage/store.go b/dtmsvr/storage/store.go index 391c4dd..bd2b400 100644 --- a/dtmsvr/storage/store.go +++ b/dtmsvr/storage/store.go @@ -30,5 +30,5 @@ type Store interface { ChangeGlobalStatus(global *TransGlobalStore, newStatus string, updates []string, finished bool) TouchCronTime(global *TransGlobalStore, nextCronInterval int64, nextCronTime *time.Time) LockOneGlobalTrans(expireIn time.Duration) *TransGlobalStore - ResetCronTime(timeout time.Duration, limit int64) (succeedCount int64, hasRemaining bool, err error) + ResetCronTime(after time.Duration, limit int64) (succeedCount int64, hasRemaining bool, err error) } diff --git a/dtmutil/db.go b/dtmutil/db.go index 1862870..043f7f9 100644 --- a/dtmutil/db.go +++ b/dtmutil/db.go @@ -100,7 +100,7 @@ func DbGet(conf dtmcli.DBConf, ops ...func(*gorm.DB)) *DB { dsn := dtmimp.GetDsn(conf) db, ok := dbs.Load(dsn) if !ok { - logger.Infof("connecting '%s' '%s' '%s' '%d'", conf.Driver, conf.Host, conf.User, conf.Port) + logger.Infof("connecting '%s' '%s' '%s' '%d' '%s'", conf.Driver, conf.Host, conf.User, conf.Port, conf.Db) db1, err := gorm.Open(getGormDialetor(conf.Driver, dsn), &gorm.Config{ SkipDefaultTransaction: true, }) diff --git a/helper/compose.cloud.yml b/helper/compose.cloud.yml deleted file mode 100644 index 641adf9..0000000 --- a/helper/compose.cloud.yml +++ /dev/null @@ -1,28 +0,0 @@ -version: '3.3' -services: - api: - build: .. - volumes: - - /etc/localtime:/etc/localtime:ro - - /etc/timezone:/etc/timezone:ro - - ..:/app/dtm - extra_hosts: - - 'host.docker.internal:host-gateway' - environment: - IS_DOCKER: 1 - ports: - - '9080:8080' - mysql: - image: 'mysql:5.7' - volumes: - - /etc/localtime:/etc/localtime:ro - - /etc/timezone:/etc/timezone:ro - environment: - MYSQL_ALLOW_EMPTY_PASSWORD: 1 - command: - [ - '--character-set-server=utf8mb4', - '--collation-server=utf8mb4_unicode_ci', - ] - ports: - - '3306:3306' diff --git a/helper/compose.mysql.yml b/helper/compose.mysql.yml deleted file mode 100644 index 0c5ec71..0000000 --- a/helper/compose.mysql.yml +++ /dev/null @@ -1,16 +0,0 @@ -version: '3.3' -services: - mysql: - image: 'mysql:5.7' - volumes: - - /etc/localtime:/etc/localtime:ro - - /etc/timezone:/etc/timezone:ro - environment: - MYSQL_ALLOW_EMPTY_PASSWORD: 1 - command: - [ - '--character-set-server=utf8mb4', - '--collation-server=utf8mb4_unicode_ci', - ] - ports: - - '3306:3306' diff --git a/helper/compose.postgres.yml b/helper/compose.postgres.yml deleted file mode 100644 index dc80e61..0000000 --- a/helper/compose.postgres.yml +++ /dev/null @@ -1,13 +0,0 @@ -version: '3.3' -services: - postgres: - image: 'postgres:13' - command: postgres --max_prepared_transactions=1000 - volumes: - - /etc/localtime:/etc/localtime:ro - - /etc/timezone:/etc/timezone:ro - environment: - POSTGRES_PASSWORD: mysecretpassword - - ports: - - '5432:5432' diff --git a/helper/compose.store.yml b/helper/compose.store.yml index 4053807..0fb4707 100644 --- a/helper/compose.store.yml +++ b/helper/compose.store.yml @@ -22,6 +22,7 @@ services: - /etc/timezone:/etc/timezone:ro environment: POSTGRES_PASSWORD: mysecretpassword + POSTGRES_DB: dtm ports: - '5432:5432' diff --git a/helper/test-cover.sh b/helper/test-cover.sh index 13bd654..e490032 100755 --- a/helper/test-cover.sh +++ b/helper/test-cover.sh @@ -1,6 +1,6 @@ set -x echo "" > coverage.txt -for store in redis mysql boltdb; do +for store in redis mysql boltdb postgres; do for d in $(go list ./... | grep -v vendor); do TEST_STORE=$store go test -covermode count -coverprofile=profile.out -coverpkg=github.com/dtm-labs/dtm/dtmcli,github.com/dtm-labs/dtm/dtmcli/dtmimp,github.com/dtm-labs/dtm/dtmcli/logger,github.com/dtm-labs/dtm/dtmgrpc,github.com/dtm-labs/dtm/dtmgrpc/dtmgimp,github.com/dtm-labs/dtm/dtmsvr,github.com/dtm-labs/dtm/dtmsvr/config,github.com/dtm-labs/dtm/dtmsvr/storage,github.com/dtm-labs/dtm/dtmsvr/storage/boltdb,github.com/dtm-labs/dtm/dtmsvr/storage/redis,github.com/dtm-labs/dtm/dtmsvr/storage/registry,github.com/dtm-labs/dtm/dtmsvr/storage/sql,github.com/dtm-labs/dtm/dtmutil -gcflags=-l $d || exit 1 if [ -f profile.out ]; then diff --git a/sqls/dtmsvr.storage.postgres.sql b/sqls/dtmsvr.storage.postgres.sql index 32af57f..2a1cf67 100644 --- a/sqls/dtmsvr.storage.postgres.sql +++ b/sqls/dtmsvr.storage.postgres.sql @@ -1,11 +1,8 @@ -CREATE SCHEMA if not EXISTS dtm -/* SQLINES DEMO *** RACTER SET utf8mb4 */ -; -drop table IF EXISTS dtm.trans_global; --- SQLINES LICENSE FOR EVALUATION USE ONLY -CREATE SEQUENCE if not EXISTS dtm.trans_global_seq; -CREATE TABLE if not EXISTS dtm.trans_global ( - id bigint NOT NULL DEFAULT NEXTVAL ('dtm.trans_global_seq'), +drop table IF EXISTS trans_global; + +CREATE SEQUENCE if not EXISTS trans_global_seq; +CREATE TABLE if not EXISTS trans_global ( + id bigint NOT NULL DEFAULT NEXTVAL ('trans_global_seq'), gid varchar(128) NOT NULL, trans_type varchar(45) not null, status varchar(45) NOT NULL, @@ -24,13 +21,13 @@ CREATE TABLE if not EXISTS dtm.trans_global ( PRIMARY KEY (id), CONSTRAINT gid UNIQUE (gid) ); -create index if not EXISTS owner on dtm.trans_global(owner); -create index if not EXISTS status_next_cron_time on dtm.trans_global (status, next_cron_time); -drop table IF EXISTS dtm.trans_branch_op; --- SQLINES LICENSE FOR EVALUATION USE ONLY -CREATE SEQUENCE if not EXISTS dtm.trans_branch_op_seq; -CREATE TABLE IF NOT EXISTS dtm.trans_branch_op ( - id bigint NOT NULL DEFAULT NEXTVAL ('dtm.trans_branch_op_seq'), +create index if not EXISTS owner on trans_global(owner); +create index if not EXISTS status_next_cron_time on trans_global (status, next_cron_time); +drop table IF EXISTS trans_branch_op; + +CREATE SEQUENCE if not EXISTS trans_branch_op_seq; +CREATE TABLE IF NOT EXISTS trans_branch_op ( + id bigint NOT NULL DEFAULT NEXTVAL ('trans_branch_op_seq'), gid varchar(128) NOT NULL, url varchar(1024) NOT NULL, data TEXT, diff --git a/test/base_test.go b/test/base_test.go index 68ea359..3fbd05c 100644 --- a/test/base_test.go +++ b/test/base_test.go @@ -26,7 +26,7 @@ type BarrierModel struct { } // TableName gorm table name -func (BarrierModel) TableName() string { return "dtm_barrier.barrier" } +func (BarrierModel) TableName() string { return dtmimp.BarrierTableName } func TestBaseSqlDB(t *testing.T) { asserts := assert.New(t) @@ -38,7 +38,7 @@ func TestBaseSqlDB(t *testing.T) { Op: dtmimp.OpAction, BarrierID: 1, } - db.Must().Exec("insert into dtm_barrier.barrier(trans_type, gid, branch_id, op, barrier_id, reason) values('saga', 'gid1', 'branch_id1', 'action', '01', 'saga')") + db.Must().Exec(fmt.Sprintf("insert into %s(trans_type, gid, branch_id, op, barrier_id, reason) values('saga', 'gid1', 'branch_id1', 'action', '01', 'saga')", dtmimp.BarrierTableName)) tx, err := db.ToSQLDB().Begin() asserts.Nil(err) err = barrier.Call(tx, func(tx *sql.Tx) error { diff --git a/test/main_test.go b/test/main_test.go index ec431bc..a8b7fb1 100644 --- a/test/main_test.go +++ b/test/main_test.go @@ -48,6 +48,12 @@ func TestMain(m *testing.M) { conf.Store.Port = 3306 conf.Store.User = "root" conf.Store.Password = "" + } else if tenv == "postgres" { + conf.Store.Driver = "postgres" + conf.Store.Host = "localhost" + conf.Store.Port = 5432 + conf.Store.User = "postgres" + conf.Store.Password = "mysecretpassword" } else { conf.Store.Driver = "redis" conf.Store.Host = "localhost" @@ -55,10 +61,15 @@ func TestMain(m *testing.M) { conf.Store.Password = "" conf.Store.Port = 6379 } + conf.Store.Db = "" registry.WaitStoreUp() dtmsvr.PopulateDB(false) conf.Store.Db = "dtm" // after populateDB, set current db to dtm + if tenv == "postgres" { + busi.BusiConf = conf.Store.GetDBConf() + dtmcli.SetCurrentDBType(tenv) + } go dtmsvr.StartSvr() busi.PopulateDB(false) diff --git a/test/store_test.go b/test/store_test.go index b9bb31e..4cc3c00 100644 --- a/test/store_test.go +++ b/test/store_test.go @@ -100,73 +100,73 @@ func TestStoreResetCronTime(t *testing.T) { }) } -func testStoreResetCronTime(t *testing.T, funcName string, restCronHandler func(expire int64, limit int64) (int64, bool, error)) { +func testStoreResetCronTime(t *testing.T, funcName string, resetCronHandler func(expire int64, limit int64) (int64, bool, error)) { s := registry.GetStore() - var restTimeTimeout, lockExpireIn, limit, i int64 - restTimeTimeout = 100 //The time that will be ResetCronTime - lockExpireIn = 2 //The time that will be LockOneGlobalTrans - limit = 10 // rest limit + var afterSeconds, lockExpireIn, limit, i int64 + afterSeconds = 100 + lockExpireIn = 2 + limit = 10 // Will be reset for i = 0; i < limit; i++ { gid := funcName + fmt.Sprintf("%d", i) - _, _ = initTransGlobalByNextCronTime(gid, time.Now().Add(time.Duration(restTimeTimeout+10)*time.Second)) + _, _ = initTransGlobalByNextCronTime(gid, time.Now().Add(time.Duration(afterSeconds+10)*time.Second)) } // Will not be reset gid := funcName + fmt.Sprintf("%d", 10) - _, _ = initTransGlobalByNextCronTime(gid, time.Now().Add(time.Duration(restTimeTimeout-10)*time.Second)) + _, _ = initTransGlobalByNextCronTime(gid, time.Now().Add(time.Duration(afterSeconds-10)*time.Second)) - // Not Fount + // Not Found g := s.LockOneGlobalTrans(time.Duration(lockExpireIn) * time.Second) assert.Nil(t, g) - // Rest limit-1 count - succeedCount, hasRemaining, err := restCronHandler(restTimeTimeout, limit-1) + // Reset limit-1 count + succeedCount, hasRemaining, err := resetCronHandler(afterSeconds, limit-1) assert.Equal(t, hasRemaining, true) assert.Equal(t, succeedCount, limit-1) assert.Nil(t, err) - // Fount limit-1 count + // Found limit-1 count for i = 0; i < limit-1; i++ { g = s.LockOneGlobalTrans(time.Duration(lockExpireIn) * time.Second) assert.NotNil(t, g) s.ChangeGlobalStatus(g, "succeed", []string{}, true) } - // Not Fount + // Not Found g = s.LockOneGlobalTrans(time.Duration(lockExpireIn) * time.Second) assert.Nil(t, g) - // Rest 1 count - succeedCount, hasRemaining, err = restCronHandler(restTimeTimeout, limit) + // Reset 1 count + succeedCount, hasRemaining, err = resetCronHandler(afterSeconds, limit) assert.Equal(t, hasRemaining, false) assert.Equal(t, succeedCount, int64(1)) assert.Nil(t, err) - // Fount 1 count + // Found 1 count g = s.LockOneGlobalTrans(time.Duration(lockExpireIn) * time.Second) assert.NotNil(t, g) s.ChangeGlobalStatus(g, "succeed", []string{}, true) - // Not Fount + // Not Found g = s.LockOneGlobalTrans(time.Duration(lockExpireIn) * time.Second) assert.Nil(t, g) - // reduce the restTimeTimeout, Rest 1 count - succeedCount, hasRemaining, err = restCronHandler(restTimeTimeout-12, limit) + // reduce the resetTimeTimeout, Reset 1 count + succeedCount, hasRemaining, err = resetCronHandler(afterSeconds-12, limit) assert.Equal(t, hasRemaining, false) assert.Equal(t, succeedCount, int64(1)) assert.Nil(t, err) - // Fount 1 count + // Found 1 count g = s.LockOneGlobalTrans(time.Duration(lockExpireIn) * time.Second) assert.NotNil(t, g) s.ChangeGlobalStatus(g, "succeed", []string{}, true) - // Not Fount + // Not Found g = s.LockOneGlobalTrans(time.Duration(lockExpireIn) * time.Second) assert.Nil(t, g) - // Not Fount - succeedCount, hasRemaining, err = restCronHandler(restTimeTimeout-12, limit) + // Not Found + succeedCount, hasRemaining, err = resetCronHandler(afterSeconds-12, limit) assert.Equal(t, hasRemaining, false) assert.Equal(t, succeedCount, int64(0)) assert.Nil(t, err) diff --git a/test/xa_cover_test.go b/test/xa_cover_test.go index 6836582..8c602ae 100644 --- a/test/xa_cover_test.go +++ b/test/xa_cover_test.go @@ -39,6 +39,9 @@ func TestXaCoverDTMError(t *testing.T) { } func TestXaCoverGidError(t *testing.T) { + if dtmimp.GetCurrentDBType() != dtmimp.DBTypeMysql { + return + } gid := dtmimp.GetFuncName() + "-' '" err := dtmcli.XaGlobalTransaction(DtmServer, gid, func(xa *dtmcli.Xa) (*resty.Response, error) { req := busi.GenTransReq(30, false, false) From 2d52f5369ba3c817a91c4b2a040c7caa72a97574 Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Fri, 10 Jun 2022 16:45:24 +0800 Subject: [PATCH 06/12] fix golint --- dtmsvr/storage/sql/sql.go | 8 -------- test/main_test.go | 15 +++++---------- 2 files changed, 5 insertions(+), 18 deletions(-) diff --git a/dtmsvr/storage/sql/sql.go b/dtmsvr/storage/sql/sql.go index 1a8e657..7ad4cbc 100644 --- a/dtmsvr/storage/sql/sql.go +++ b/dtmsvr/storage/sql/sql.go @@ -195,14 +195,6 @@ func wrapError(err error) error { return err } -func getTime(after time.Duration) string { - second := int64(after / time.Second) - return map[string]string{ - "mysql": fmt.Sprintf("date_add(now(), interval %d second)", second), - "postgres": fmt.Sprintf("current_timestamp + interval '%d second'", second), - }[conf.Store.Driver] -} - func getTimeStr(afterSecond int64) string { return dtmutil.GetNextTime(afterSecond).Format("2006-01-02 15:04:05") } diff --git a/test/main_test.go b/test/main_test.go index a8b7fb1..a9f48d8 100644 --- a/test/main_test.go +++ b/test/main_test.go @@ -40,23 +40,18 @@ func TestMain(m *testing.M) { dtmcli.GetRestyClient().OnAfterResponse(func(c *resty.Client, resp *resty.Response) error { return nil }) tenv := os.Getenv("TEST_STORE") + conf.Store.Host = "localhost" + conf.Store.Driver = tenv if tenv == "boltdb" { - conf.Store.Driver = "boltdb" - } else if tenv == "mysql" { - conf.Store.Driver = "mysql" - conf.Store.Host = "localhost" + } else if tenv == config.Mysql { conf.Store.Port = 3306 conf.Store.User = "root" conf.Store.Password = "" - } else if tenv == "postgres" { - conf.Store.Driver = "postgres" - conf.Store.Host = "localhost" + } else if tenv == config.Postgres { conf.Store.Port = 5432 conf.Store.User = "postgres" conf.Store.Password = "mysecretpassword" - } else { - conf.Store.Driver = "redis" - conf.Store.Host = "localhost" + } else if tenv == config.Redis { conf.Store.User = "" conf.Store.Password = "" conf.Store.Port = 6379 From 02dbc320c6efc5323c9082f1d36dd674836bdd14 Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Fri, 10 Jun 2022 17:10:31 +0800 Subject: [PATCH 07/12] update tests.yml --- .github/workflows/tests.yml | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index e71699d..3abf81d 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -28,6 +28,17 @@ jobs: - /etc/timezone:/etc/timezone:ro ports: - 6379:6379 + postgres: + image: 'postgres:13' + command: postgres --max_prepared_transactions=1000 + volumes: + - /etc/localtime:/etc/localtime:ro + - /etc/timezone:/etc/timezone:ro + environment: + POSTGRES_PASSWORD: mysecretpassword + POSTGRES_DB: dtm + ports: + - '5432:5432' mongo: image: 'yedf/mongo-rs' volumes: From 906749c179ed3986308d6fc3f3a59cf0e4509fc9 Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Fri, 10 Jun 2022 17:14:33 +0800 Subject: [PATCH 08/12] fix tests.yml --- .github/workflows/tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 3abf81d..aa1d146 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -34,7 +34,7 @@ jobs: volumes: - /etc/localtime:/etc/localtime:ro - /etc/timezone:/etc/timezone:ro - environment: + env: POSTGRES_PASSWORD: mysecretpassword POSTGRES_DB: dtm ports: From c229b0abb98fe84742e4765a140f68573b37afbe Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Fri, 10 Jun 2022 17:21:12 +0800 Subject: [PATCH 09/12] update postgres actions service --- .github/workflows/tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index aa1d146..c0f3d10 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -30,7 +30,7 @@ jobs: - 6379:6379 postgres: image: 'postgres:13' - command: postgres --max_prepared_transactions=1000 + options: --max_prepared_transactions=1000 volumes: - /etc/localtime:/etc/localtime:ro - /etc/timezone:/etc/timezone:ro From 57695fcfe992830cc77aa37a5ca5eda9408e1ac9 Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Fri, 10 Jun 2022 20:05:34 +0800 Subject: [PATCH 10/12] update to postgres-xa --- helper/compose.store.yml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/helper/compose.store.yml b/helper/compose.store.yml index 0fb4707..f82ecda 100644 --- a/helper/compose.store.yml +++ b/helper/compose.store.yml @@ -15,8 +15,7 @@ services: ports: - '3306:3306' postgres: - image: 'postgres:13' - command: postgres --max_prepared_transactions=1000 + image: 'yedf/postgres-xa' volumes: - /etc/localtime:/etc/localtime:ro - /etc/timezone:/etc/timezone:ro From 17592b8b3957044eb46d0be0f90678fec51758f9 Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Fri, 10 Jun 2022 20:08:25 +0800 Subject: [PATCH 11/12] update pg image --- .github/workflows/tests.yml | 3 +-- helper/compose.store.yml | 3 ++- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index c0f3d10..d12fe2a 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -29,8 +29,7 @@ jobs: ports: - 6379:6379 postgres: - image: 'postgres:13' - options: --max_prepared_transactions=1000 + image: 'yedf/postgres-xa' volumes: - /etc/localtime:/etc/localtime:ro - /etc/timezone:/etc/timezone:ro diff --git a/helper/compose.store.yml b/helper/compose.store.yml index f82ecda..0fb4707 100644 --- a/helper/compose.store.yml +++ b/helper/compose.store.yml @@ -15,7 +15,8 @@ services: ports: - '3306:3306' postgres: - image: 'yedf/postgres-xa' + image: 'postgres:13' + command: postgres --max_prepared_transactions=1000 volumes: - /etc/localtime:/etc/localtime:ro - /etc/timezone:/etc/timezone:ro From 779e17fab9b893ecd3a7a999565f91e4157f9c39 Mon Sep 17 00:00:00 2001 From: phprao Date: Mon, 13 Jun 2022 10:57:10 +0800 Subject: [PATCH 12/12] fix api logic of '/TransOutXa' --- test/busi/base_http.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/busi/base_http.go b/test/busi/base_http.go index 11bc525..c2bf076 100644 --- a/test/busi/base_http.go +++ b/test/busi/base_http.go @@ -141,7 +141,7 @@ func BaseAddRoute(app *gin.Engine) { })) app.POST(BusiAPI+"/TransOutXa", dtmutil.WrapHandler(func(c *gin.Context) interface{} { return dtmcli.XaLocalTransaction(c.Request.URL.Query(), BusiConf, func(db *sql.DB, xa *dtmcli.Xa) error { - return SagaAdjustBalance(db, TransOutUID, reqFrom(c).Amount, reqFrom(c).TransOutResult) + return SagaAdjustBalance(db, TransOutUID, -reqFrom(c).Amount, reqFrom(c).TransOutResult) }) })) app.POST(BusiAPI+"/TransOutTimeout", dtmutil.WrapHandler(func(c *gin.Context) interface{} {