From 0aafcaa138ddac16c2beaaf1b74b986154a8b891 Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Wed, 8 Jun 2022 23:20:49 +0800 Subject: [PATCH] add postgres test --- dtmcli/barrier.go | 2 + dtmcli/dtmimp/db_special.go | 3 + dtmsvr/api_http.go | 3 +- dtmsvr/storage/boltdb/boltdb.go | 8 +-- dtmsvr/storage/redis/redis.go | 8 +-- dtmsvr/storage/registry/registry.go | 2 +- dtmsvr/storage/sql/sql.go | 87 +++++++++++++---------------- dtmsvr/storage/store.go | 2 +- dtmutil/db.go | 2 +- helper/compose.cloud.yml | 28 ---------- helper/compose.mysql.yml | 16 ------ helper/compose.postgres.yml | 13 ----- helper/compose.store.yml | 1 + helper/test-cover.sh | 2 +- sqls/dtmsvr.storage.postgres.sql | 27 ++++----- test/base_test.go | 4 +- test/main_test.go | 11 ++++ test/store_test.go | 44 +++++++-------- test/xa_cover_test.go | 3 + 19 files changed, 109 insertions(+), 157 deletions(-) delete mode 100644 helper/compose.cloud.yml delete mode 100644 helper/compose.mysql.yml delete mode 100644 helper/compose.postgres.yml diff --git a/dtmcli/barrier.go b/dtmcli/barrier.go index 2235471..0b136cb 100644 --- a/dtmcli/barrier.go +++ b/dtmcli/barrier.go @@ -109,6 +109,8 @@ func (bb *BranchBarrier) QueryPrepared(db *sql.DB) error { var reason string if err == nil { sql := fmt.Sprintf("select reason from %s where gid=? and branch_id=? and op=? and barrier_id=?", dtmimp.BarrierTableName) + sql = dtmimp.GetDBSpecial(bb.DBType).GetPlaceHoldSQL(sql) + logger.Debugf("queryrow: %s", sql, bb.Gid, dtmimp.MsgDoBranch0, dtmimp.MsgDoOp, dtmimp.MsgDoBarrier1) err = db.QueryRow(sql, bb.Gid, dtmimp.MsgDoBranch0, dtmimp.MsgDoOp, dtmimp.MsgDoBarrier1).Scan(&reason) } if reason == dtmimp.OpRollback { diff --git a/dtmcli/dtmimp/db_special.go b/dtmcli/dtmimp/db_special.go index fed04f6..2f20f17 100644 --- a/dtmcli/dtmimp/db_special.go +++ b/dtmcli/dtmimp/db_special.go @@ -76,6 +76,9 @@ func init() { // GetDBSpecial get DBSpecial for currentDBType func GetDBSpecial(dbType string) DBSpecial { + if dbType == "" { + dbType = currentDBType + } return dbSpecials[dbType] } diff --git a/dtmsvr/api_http.go b/dtmsvr/api_http.go index 9cb6304..19aa7c6 100644 --- a/dtmsvr/api_http.go +++ b/dtmsvr/api_http.go @@ -92,8 +92,7 @@ func all(c *gin.Context) interface{} { return map[string]interface{}{"transactions": globals, "next_position": position} } -// resetCronTime rest nextCronTime -// Prevent multiple backoff from causing NextCronTime to be too long +// unfinished transactions need to be retried as soon as possible after business downtime is recovered func resetCronTime(c *gin.Context) interface{} { sTimeoutSecond := dtmimp.OrString(c.Query("timeout"), strconv.FormatInt(3*conf.TimeoutToFail, 10)) sLimit := dtmimp.OrString(c.Query("limit"), "100") diff --git a/dtmsvr/storage/boltdb/boltdb.go b/dtmsvr/storage/boltdb/boltdb.go index cee6c29..bccc401 100644 --- a/dtmsvr/storage/boltdb/boltdb.go +++ b/dtmsvr/storage/boltdb/boltdb.go @@ -413,12 +413,12 @@ func (s *Store) LockOneGlobalTrans(expireIn time.Duration) *storage.TransGlobalS return trans } -// ResetCronTime rest nextCronTime -// Prevent multiple backoff from causing NextCronTime to be too long -func (s *Store) ResetCronTime(timeout time.Duration, limit int64) (succeedCount int64, hasRemaining bool, err error) { +// ResetCronTime reset nextCronTime +// unfinished transactions need to be retried as soon as possible after business downtime is recovered +func (s *Store) ResetCronTime(after time.Duration, limit int64) (succeedCount int64, hasRemaining bool, err error) { next := time.Now() var trans *storage.TransGlobalStore - min := fmt.Sprintf("%d", time.Now().Add(timeout).Unix()) + min := fmt.Sprintf("%d", time.Now().Add(after).Unix()) err = s.boltDb.Update(func(t *bolt.Tx) error { cursor := t.Bucket(bucketIndex).Cursor() succeedCount = 0 diff --git a/dtmsvr/storage/redis/redis.go b/dtmsvr/storage/redis/redis.go index b529c25..d24d658 100644 --- a/dtmsvr/storage/redis/redis.go +++ b/dtmsvr/storage/redis/redis.go @@ -270,11 +270,11 @@ return gid } } -// ResetCronTime rest nextCronTime -// Prevent multiple backoff from causing NextCronTime to be too long -func (s *Store) ResetCronTime(timeout time.Duration, limit int64) (succeedCount int64, hasRemaining bool, err error) { +// ResetCronTime reset nextCronTime +// unfinished transactions need to be retried as soon as possible after business downtime is recovered +func (s *Store) ResetCronTime(after time.Duration, limit int64) (succeedCount int64, hasRemaining bool, err error) { next := time.Now().Unix() - timeoutTimestamp := time.Now().Add(timeout).Unix() + timeoutTimestamp := time.Now().Add(after).Unix() args := newArgList().AppendGid("").AppendRaw(timeoutTimestamp).AppendRaw(next).AppendRaw(limit) lua := `-- ResetCronTime local r = redis.call('ZRANGEBYSCORE', KEYS[3], ARGV[3], '+inf', 'LIMIT', 0, ARGV[5]+1) diff --git a/dtmsvr/storage/registry/registry.go b/dtmsvr/storage/registry/registry.go index 35df62a..f09692f 100644 --- a/dtmsvr/storage/registry/registry.go +++ b/dtmsvr/storage/registry/registry.go @@ -49,7 +49,7 @@ func GetStore() storage.Store { // WaitStoreUp wait for db to go up func WaitStoreUp() { for err := GetStore().Ping(); err != nil; err = GetStore().Ping() { - logger.Infof("wait store up") + logger.Infof("wait store up: %v", err) time.Sleep(3 * time.Second) } } diff --git a/dtmsvr/storage/sql/sql.go b/dtmsvr/storage/sql/sql.go index c15c7fc..1a8e657 100644 --- a/dtmsvr/storage/sql/sql.go +++ b/dtmsvr/storage/sql/sql.go @@ -137,61 +137,42 @@ func (s *Store) TouchCronTime(global *storage.TransGlobalStore, nextCronInterval // LockOneGlobalTrans finds GlobalTrans func (s *Store) LockOneGlobalTrans(expireIn time.Duration) *storage.TransGlobalStore { db := dbGet() - getTime := func(second int) string { - return map[string]string{ - "mysql": fmt.Sprintf("date_add(now(), interval %d second)", second), - "postgres": fmt.Sprintf("current_timestamp + interval '%d second'", second), - }[conf.Store.Driver] - } - expire := int(expireIn / time.Second) - whereTime := fmt.Sprintf("next_cron_time < %s", getTime(expire)) owner := shortuuid.New() - global := &storage.TransGlobalStore{} - dbr := db.Must().Model(global). - Where(whereTime + "and status in ('prepared', 'aborting', 'submitted')"). - Limit(1). - Select([]string{"owner", "next_cron_time"}). - Updates(&storage.TransGlobalStore{ - Owner: owner, - NextCronTime: dtmutil.GetNextTime(conf.RetryInterval), - }) - if dbr.RowsAffected == 0 { + where := map[string]string{ + dtmimp.DBTypeMysql: fmt.Sprintf(`next_cron_time < date_add(now(), interval %d second) and status in ('prepared', 'aborting', 'submitted') limit 1`, int(expireIn/time.Second)), + dtmimp.DBTypePostgres: fmt.Sprintf(`id in (select id from trans_global where next_cron_time < current_timestamp + interval '%d second' and status in ('prepared', 'aborting', 'submitted') limit 1 )`, int(expireIn/time.Second)), + }[conf.Store.Driver] + + sql := fmt.Sprintf(`UPDATE trans_global SET update_time='%s',next_cron_time='%s', owner='%s' WHERE %s`, + getTimeStr(0), + getTimeStr(conf.RetryInterval), + owner, + where) + affected, err := dtmimp.DBExec(conf.Store.Driver, db.ToSQLDB(), sql) + + dtmimp.PanicIf(err != nil, err) + if affected == 0 { return nil } + global := &storage.TransGlobalStore{} db.Must().Where("owner=?", owner).First(global) return global } -// ResetCronTime rest nextCronTime -// Prevent multiple backoff from causing NextCronTime to be too long -func (s *Store) ResetCronTime(timeout time.Duration, limit int64) (succeedCount int64, hasRemaining bool, err error) { - db := dbGet() - getTime := func(second int) string { - return map[string]string{ - "mysql": fmt.Sprintf("date_add(now(), interval %d second)", second), - "postgres": fmt.Sprintf("current_timestamp + interval '%d second'", second), - }[conf.Store.Driver] - } - timeoutSecond := int(timeout / time.Second) - whereTime := fmt.Sprintf("next_cron_time > %s", getTime(timeoutSecond)) - global := &storage.TransGlobalStore{} - dbr := db.Must().Model(global). - Where(whereTime + "and status in ('prepared', 'aborting', 'submitted')"). - Limit(int(limit)). - Select([]string{"next_cron_time"}). - Updates(&storage.TransGlobalStore{ - NextCronTime: dtmutil.GetNextTime(0), - }) - succeedCount = dbr.RowsAffected - if succeedCount == limit { - var count int64 - db.Must().Model(global).Where(whereTime + "and status in ('prepared', 'aborting', 'submitted')").Limit(1).Count(&count) - if count > 0 { - hasRemaining = true - } - } +// ResetCronTime reset nextCronTime +// unfinished transactions need to be retried as soon as possible after business downtime is recovered +func (s *Store) ResetCronTime(after time.Duration, limit int64) (succeedCount int64, hasRemaining bool, err error) { + where := map[string]string{ + dtmimp.DBTypeMysql: fmt.Sprintf(`next_cron_time > date_add(now(), interval %d second) and status in ('prepared', 'aborting', 'submitted') limit %d`, int(after/time.Second), limit), + dtmimp.DBTypePostgres: fmt.Sprintf(`id in (select id from trans_global where next_cron_time > current_timestamp + interval '%d second' and status in ('prepared', 'aborting', 'submitted') limit %d )`, int(after/time.Second), limit), + }[conf.Store.Driver] - return succeedCount, hasRemaining, dbr.Error + sql := fmt.Sprintf(`UPDATE trans_global SET update_time='%s',next_cron_time='%s' WHERE %s`, + getTimeStr(0), + getTimeStr(0), + where) + affected, err := dtmimp.DBExec(conf.Store.Driver, dbGet().ToSQLDB(), sql) + return affected, affected == limit, err } // SetDBConn sets db conn pool @@ -213,3 +194,15 @@ func wrapError(err error) error { dtmimp.E2P(err) return err } + +func getTime(after time.Duration) string { + second := int64(after / time.Second) + return map[string]string{ + "mysql": fmt.Sprintf("date_add(now(), interval %d second)", second), + "postgres": fmt.Sprintf("current_timestamp + interval '%d second'", second), + }[conf.Store.Driver] +} + +func getTimeStr(afterSecond int64) string { + return dtmutil.GetNextTime(afterSecond).Format("2006-01-02 15:04:05") +} diff --git a/dtmsvr/storage/store.go b/dtmsvr/storage/store.go index 391c4dd..bd2b400 100644 --- a/dtmsvr/storage/store.go +++ b/dtmsvr/storage/store.go @@ -30,5 +30,5 @@ type Store interface { ChangeGlobalStatus(global *TransGlobalStore, newStatus string, updates []string, finished bool) TouchCronTime(global *TransGlobalStore, nextCronInterval int64, nextCronTime *time.Time) LockOneGlobalTrans(expireIn time.Duration) *TransGlobalStore - ResetCronTime(timeout time.Duration, limit int64) (succeedCount int64, hasRemaining bool, err error) + ResetCronTime(after time.Duration, limit int64) (succeedCount int64, hasRemaining bool, err error) } diff --git a/dtmutil/db.go b/dtmutil/db.go index 1862870..043f7f9 100644 --- a/dtmutil/db.go +++ b/dtmutil/db.go @@ -100,7 +100,7 @@ func DbGet(conf dtmcli.DBConf, ops ...func(*gorm.DB)) *DB { dsn := dtmimp.GetDsn(conf) db, ok := dbs.Load(dsn) if !ok { - logger.Infof("connecting '%s' '%s' '%s' '%d'", conf.Driver, conf.Host, conf.User, conf.Port) + logger.Infof("connecting '%s' '%s' '%s' '%d' '%s'", conf.Driver, conf.Host, conf.User, conf.Port, conf.Db) db1, err := gorm.Open(getGormDialetor(conf.Driver, dsn), &gorm.Config{ SkipDefaultTransaction: true, }) diff --git a/helper/compose.cloud.yml b/helper/compose.cloud.yml deleted file mode 100644 index 641adf9..0000000 --- a/helper/compose.cloud.yml +++ /dev/null @@ -1,28 +0,0 @@ -version: '3.3' -services: - api: - build: .. - volumes: - - /etc/localtime:/etc/localtime:ro - - /etc/timezone:/etc/timezone:ro - - ..:/app/dtm - extra_hosts: - - 'host.docker.internal:host-gateway' - environment: - IS_DOCKER: 1 - ports: - - '9080:8080' - mysql: - image: 'mysql:5.7' - volumes: - - /etc/localtime:/etc/localtime:ro - - /etc/timezone:/etc/timezone:ro - environment: - MYSQL_ALLOW_EMPTY_PASSWORD: 1 - command: - [ - '--character-set-server=utf8mb4', - '--collation-server=utf8mb4_unicode_ci', - ] - ports: - - '3306:3306' diff --git a/helper/compose.mysql.yml b/helper/compose.mysql.yml deleted file mode 100644 index 0c5ec71..0000000 --- a/helper/compose.mysql.yml +++ /dev/null @@ -1,16 +0,0 @@ -version: '3.3' -services: - mysql: - image: 'mysql:5.7' - volumes: - - /etc/localtime:/etc/localtime:ro - - /etc/timezone:/etc/timezone:ro - environment: - MYSQL_ALLOW_EMPTY_PASSWORD: 1 - command: - [ - '--character-set-server=utf8mb4', - '--collation-server=utf8mb4_unicode_ci', - ] - ports: - - '3306:3306' diff --git a/helper/compose.postgres.yml b/helper/compose.postgres.yml deleted file mode 100644 index dc80e61..0000000 --- a/helper/compose.postgres.yml +++ /dev/null @@ -1,13 +0,0 @@ -version: '3.3' -services: - postgres: - image: 'postgres:13' - command: postgres --max_prepared_transactions=1000 - volumes: - - /etc/localtime:/etc/localtime:ro - - /etc/timezone:/etc/timezone:ro - environment: - POSTGRES_PASSWORD: mysecretpassword - - ports: - - '5432:5432' diff --git a/helper/compose.store.yml b/helper/compose.store.yml index 4053807..0fb4707 100644 --- a/helper/compose.store.yml +++ b/helper/compose.store.yml @@ -22,6 +22,7 @@ services: - /etc/timezone:/etc/timezone:ro environment: POSTGRES_PASSWORD: mysecretpassword + POSTGRES_DB: dtm ports: - '5432:5432' diff --git a/helper/test-cover.sh b/helper/test-cover.sh index 13bd654..e490032 100755 --- a/helper/test-cover.sh +++ b/helper/test-cover.sh @@ -1,6 +1,6 @@ set -x echo "" > coverage.txt -for store in redis mysql boltdb; do +for store in redis mysql boltdb postgres; do for d in $(go list ./... | grep -v vendor); do TEST_STORE=$store go test -covermode count -coverprofile=profile.out -coverpkg=github.com/dtm-labs/dtm/dtmcli,github.com/dtm-labs/dtm/dtmcli/dtmimp,github.com/dtm-labs/dtm/dtmcli/logger,github.com/dtm-labs/dtm/dtmgrpc,github.com/dtm-labs/dtm/dtmgrpc/dtmgimp,github.com/dtm-labs/dtm/dtmsvr,github.com/dtm-labs/dtm/dtmsvr/config,github.com/dtm-labs/dtm/dtmsvr/storage,github.com/dtm-labs/dtm/dtmsvr/storage/boltdb,github.com/dtm-labs/dtm/dtmsvr/storage/redis,github.com/dtm-labs/dtm/dtmsvr/storage/registry,github.com/dtm-labs/dtm/dtmsvr/storage/sql,github.com/dtm-labs/dtm/dtmutil -gcflags=-l $d || exit 1 if [ -f profile.out ]; then diff --git a/sqls/dtmsvr.storage.postgres.sql b/sqls/dtmsvr.storage.postgres.sql index 32af57f..2a1cf67 100644 --- a/sqls/dtmsvr.storage.postgres.sql +++ b/sqls/dtmsvr.storage.postgres.sql @@ -1,11 +1,8 @@ -CREATE SCHEMA if not EXISTS dtm -/* SQLINES DEMO *** RACTER SET utf8mb4 */ -; -drop table IF EXISTS dtm.trans_global; --- SQLINES LICENSE FOR EVALUATION USE ONLY -CREATE SEQUENCE if not EXISTS dtm.trans_global_seq; -CREATE TABLE if not EXISTS dtm.trans_global ( - id bigint NOT NULL DEFAULT NEXTVAL ('dtm.trans_global_seq'), +drop table IF EXISTS trans_global; + +CREATE SEQUENCE if not EXISTS trans_global_seq; +CREATE TABLE if not EXISTS trans_global ( + id bigint NOT NULL DEFAULT NEXTVAL ('trans_global_seq'), gid varchar(128) NOT NULL, trans_type varchar(45) not null, status varchar(45) NOT NULL, @@ -24,13 +21,13 @@ CREATE TABLE if not EXISTS dtm.trans_global ( PRIMARY KEY (id), CONSTRAINT gid UNIQUE (gid) ); -create index if not EXISTS owner on dtm.trans_global(owner); -create index if not EXISTS status_next_cron_time on dtm.trans_global (status, next_cron_time); -drop table IF EXISTS dtm.trans_branch_op; --- SQLINES LICENSE FOR EVALUATION USE ONLY -CREATE SEQUENCE if not EXISTS dtm.trans_branch_op_seq; -CREATE TABLE IF NOT EXISTS dtm.trans_branch_op ( - id bigint NOT NULL DEFAULT NEXTVAL ('dtm.trans_branch_op_seq'), +create index if not EXISTS owner on trans_global(owner); +create index if not EXISTS status_next_cron_time on trans_global (status, next_cron_time); +drop table IF EXISTS trans_branch_op; + +CREATE SEQUENCE if not EXISTS trans_branch_op_seq; +CREATE TABLE IF NOT EXISTS trans_branch_op ( + id bigint NOT NULL DEFAULT NEXTVAL ('trans_branch_op_seq'), gid varchar(128) NOT NULL, url varchar(1024) NOT NULL, data TEXT, diff --git a/test/base_test.go b/test/base_test.go index 68ea359..3fbd05c 100644 --- a/test/base_test.go +++ b/test/base_test.go @@ -26,7 +26,7 @@ type BarrierModel struct { } // TableName gorm table name -func (BarrierModel) TableName() string { return "dtm_barrier.barrier" } +func (BarrierModel) TableName() string { return dtmimp.BarrierTableName } func TestBaseSqlDB(t *testing.T) { asserts := assert.New(t) @@ -38,7 +38,7 @@ func TestBaseSqlDB(t *testing.T) { Op: dtmimp.OpAction, BarrierID: 1, } - db.Must().Exec("insert into dtm_barrier.barrier(trans_type, gid, branch_id, op, barrier_id, reason) values('saga', 'gid1', 'branch_id1', 'action', '01', 'saga')") + db.Must().Exec(fmt.Sprintf("insert into %s(trans_type, gid, branch_id, op, barrier_id, reason) values('saga', 'gid1', 'branch_id1', 'action', '01', 'saga')", dtmimp.BarrierTableName)) tx, err := db.ToSQLDB().Begin() asserts.Nil(err) err = barrier.Call(tx, func(tx *sql.Tx) error { diff --git a/test/main_test.go b/test/main_test.go index ec431bc..a8b7fb1 100644 --- a/test/main_test.go +++ b/test/main_test.go @@ -48,6 +48,12 @@ func TestMain(m *testing.M) { conf.Store.Port = 3306 conf.Store.User = "root" conf.Store.Password = "" + } else if tenv == "postgres" { + conf.Store.Driver = "postgres" + conf.Store.Host = "localhost" + conf.Store.Port = 5432 + conf.Store.User = "postgres" + conf.Store.Password = "mysecretpassword" } else { conf.Store.Driver = "redis" conf.Store.Host = "localhost" @@ -55,10 +61,15 @@ func TestMain(m *testing.M) { conf.Store.Password = "" conf.Store.Port = 6379 } + conf.Store.Db = "" registry.WaitStoreUp() dtmsvr.PopulateDB(false) conf.Store.Db = "dtm" // after populateDB, set current db to dtm + if tenv == "postgres" { + busi.BusiConf = conf.Store.GetDBConf() + dtmcli.SetCurrentDBType(tenv) + } go dtmsvr.StartSvr() busi.PopulateDB(false) diff --git a/test/store_test.go b/test/store_test.go index b9bb31e..4cc3c00 100644 --- a/test/store_test.go +++ b/test/store_test.go @@ -100,73 +100,73 @@ func TestStoreResetCronTime(t *testing.T) { }) } -func testStoreResetCronTime(t *testing.T, funcName string, restCronHandler func(expire int64, limit int64) (int64, bool, error)) { +func testStoreResetCronTime(t *testing.T, funcName string, resetCronHandler func(expire int64, limit int64) (int64, bool, error)) { s := registry.GetStore() - var restTimeTimeout, lockExpireIn, limit, i int64 - restTimeTimeout = 100 //The time that will be ResetCronTime - lockExpireIn = 2 //The time that will be LockOneGlobalTrans - limit = 10 // rest limit + var afterSeconds, lockExpireIn, limit, i int64 + afterSeconds = 100 + lockExpireIn = 2 + limit = 10 // Will be reset for i = 0; i < limit; i++ { gid := funcName + fmt.Sprintf("%d", i) - _, _ = initTransGlobalByNextCronTime(gid, time.Now().Add(time.Duration(restTimeTimeout+10)*time.Second)) + _, _ = initTransGlobalByNextCronTime(gid, time.Now().Add(time.Duration(afterSeconds+10)*time.Second)) } // Will not be reset gid := funcName + fmt.Sprintf("%d", 10) - _, _ = initTransGlobalByNextCronTime(gid, time.Now().Add(time.Duration(restTimeTimeout-10)*time.Second)) + _, _ = initTransGlobalByNextCronTime(gid, time.Now().Add(time.Duration(afterSeconds-10)*time.Second)) - // Not Fount + // Not Found g := s.LockOneGlobalTrans(time.Duration(lockExpireIn) * time.Second) assert.Nil(t, g) - // Rest limit-1 count - succeedCount, hasRemaining, err := restCronHandler(restTimeTimeout, limit-1) + // Reset limit-1 count + succeedCount, hasRemaining, err := resetCronHandler(afterSeconds, limit-1) assert.Equal(t, hasRemaining, true) assert.Equal(t, succeedCount, limit-1) assert.Nil(t, err) - // Fount limit-1 count + // Found limit-1 count for i = 0; i < limit-1; i++ { g = s.LockOneGlobalTrans(time.Duration(lockExpireIn) * time.Second) assert.NotNil(t, g) s.ChangeGlobalStatus(g, "succeed", []string{}, true) } - // Not Fount + // Not Found g = s.LockOneGlobalTrans(time.Duration(lockExpireIn) * time.Second) assert.Nil(t, g) - // Rest 1 count - succeedCount, hasRemaining, err = restCronHandler(restTimeTimeout, limit) + // Reset 1 count + succeedCount, hasRemaining, err = resetCronHandler(afterSeconds, limit) assert.Equal(t, hasRemaining, false) assert.Equal(t, succeedCount, int64(1)) assert.Nil(t, err) - // Fount 1 count + // Found 1 count g = s.LockOneGlobalTrans(time.Duration(lockExpireIn) * time.Second) assert.NotNil(t, g) s.ChangeGlobalStatus(g, "succeed", []string{}, true) - // Not Fount + // Not Found g = s.LockOneGlobalTrans(time.Duration(lockExpireIn) * time.Second) assert.Nil(t, g) - // reduce the restTimeTimeout, Rest 1 count - succeedCount, hasRemaining, err = restCronHandler(restTimeTimeout-12, limit) + // reduce the resetTimeTimeout, Reset 1 count + succeedCount, hasRemaining, err = resetCronHandler(afterSeconds-12, limit) assert.Equal(t, hasRemaining, false) assert.Equal(t, succeedCount, int64(1)) assert.Nil(t, err) - // Fount 1 count + // Found 1 count g = s.LockOneGlobalTrans(time.Duration(lockExpireIn) * time.Second) assert.NotNil(t, g) s.ChangeGlobalStatus(g, "succeed", []string{}, true) - // Not Fount + // Not Found g = s.LockOneGlobalTrans(time.Duration(lockExpireIn) * time.Second) assert.Nil(t, g) - // Not Fount - succeedCount, hasRemaining, err = restCronHandler(restTimeTimeout-12, limit) + // Not Found + succeedCount, hasRemaining, err = resetCronHandler(afterSeconds-12, limit) assert.Equal(t, hasRemaining, false) assert.Equal(t, succeedCount, int64(0)) assert.Nil(t, err) diff --git a/test/xa_cover_test.go b/test/xa_cover_test.go index 6836582..8c602ae 100644 --- a/test/xa_cover_test.go +++ b/test/xa_cover_test.go @@ -39,6 +39,9 @@ func TestXaCoverDTMError(t *testing.T) { } func TestXaCoverGidError(t *testing.T) { + if dtmimp.GetCurrentDBType() != dtmimp.DBTypeMysql { + return + } gid := dtmimp.GetFuncName() + "-' '" err := dtmcli.XaGlobalTransaction(DtmServer, gid, func(xa *dtmcli.Xa) (*resty.Response, error) { req := busi.GenTransReq(30, false, false)