Browse Source

add postgres test

pull/304/head
yedf2 4 years ago
parent
commit
0aafcaa138
  1. 2
      dtmcli/barrier.go
  2. 3
      dtmcli/dtmimp/db_special.go
  3. 3
      dtmsvr/api_http.go
  4. 8
      dtmsvr/storage/boltdb/boltdb.go
  5. 8
      dtmsvr/storage/redis/redis.go
  6. 2
      dtmsvr/storage/registry/registry.go
  7. 87
      dtmsvr/storage/sql/sql.go
  8. 2
      dtmsvr/storage/store.go
  9. 2
      dtmutil/db.go
  10. 28
      helper/compose.cloud.yml
  11. 16
      helper/compose.mysql.yml
  12. 13
      helper/compose.postgres.yml
  13. 1
      helper/compose.store.yml
  14. 2
      helper/test-cover.sh
  15. 27
      sqls/dtmsvr.storage.postgres.sql
  16. 4
      test/base_test.go
  17. 11
      test/main_test.go
  18. 44
      test/store_test.go
  19. 3
      test/xa_cover_test.go

2
dtmcli/barrier.go

@ -109,6 +109,8 @@ func (bb *BranchBarrier) QueryPrepared(db *sql.DB) error {
var reason string
if err == nil {
sql := fmt.Sprintf("select reason from %s where gid=? and branch_id=? and op=? and barrier_id=?", dtmimp.BarrierTableName)
sql = dtmimp.GetDBSpecial(bb.DBType).GetPlaceHoldSQL(sql)
logger.Debugf("queryrow: %s", sql, bb.Gid, dtmimp.MsgDoBranch0, dtmimp.MsgDoOp, dtmimp.MsgDoBarrier1)
err = db.QueryRow(sql, bb.Gid, dtmimp.MsgDoBranch0, dtmimp.MsgDoOp, dtmimp.MsgDoBarrier1).Scan(&reason)
}
if reason == dtmimp.OpRollback {

3
dtmcli/dtmimp/db_special.go

@ -76,6 +76,9 @@ func init() {
// GetDBSpecial get DBSpecial for currentDBType
func GetDBSpecial(dbType string) DBSpecial {
if dbType == "" {
dbType = currentDBType
}
return dbSpecials[dbType]
}

3
dtmsvr/api_http.go

@ -92,8 +92,7 @@ func all(c *gin.Context) interface{} {
return map[string]interface{}{"transactions": globals, "next_position": position}
}
// resetCronTime rest nextCronTime
// Prevent multiple backoff from causing NextCronTime to be too long
// unfinished transactions need to be retried as soon as possible after business downtime is recovered
func resetCronTime(c *gin.Context) interface{} {
sTimeoutSecond := dtmimp.OrString(c.Query("timeout"), strconv.FormatInt(3*conf.TimeoutToFail, 10))
sLimit := dtmimp.OrString(c.Query("limit"), "100")

8
dtmsvr/storage/boltdb/boltdb.go

@ -413,12 +413,12 @@ func (s *Store) LockOneGlobalTrans(expireIn time.Duration) *storage.TransGlobalS
return trans
}
// ResetCronTime rest nextCronTime
// Prevent multiple backoff from causing NextCronTime to be too long
func (s *Store) ResetCronTime(timeout time.Duration, limit int64) (succeedCount int64, hasRemaining bool, err error) {
// ResetCronTime reset nextCronTime
// unfinished transactions need to be retried as soon as possible after business downtime is recovered
func (s *Store) ResetCronTime(after time.Duration, limit int64) (succeedCount int64, hasRemaining bool, err error) {
next := time.Now()
var trans *storage.TransGlobalStore
min := fmt.Sprintf("%d", time.Now().Add(timeout).Unix())
min := fmt.Sprintf("%d", time.Now().Add(after).Unix())
err = s.boltDb.Update(func(t *bolt.Tx) error {
cursor := t.Bucket(bucketIndex).Cursor()
succeedCount = 0

8
dtmsvr/storage/redis/redis.go

@ -270,11 +270,11 @@ return gid
}
}
// ResetCronTime rest nextCronTime
// Prevent multiple backoff from causing NextCronTime to be too long
func (s *Store) ResetCronTime(timeout time.Duration, limit int64) (succeedCount int64, hasRemaining bool, err error) {
// ResetCronTime reset nextCronTime
// unfinished transactions need to be retried as soon as possible after business downtime is recovered
func (s *Store) ResetCronTime(after time.Duration, limit int64) (succeedCount int64, hasRemaining bool, err error) {
next := time.Now().Unix()
timeoutTimestamp := time.Now().Add(timeout).Unix()
timeoutTimestamp := time.Now().Add(after).Unix()
args := newArgList().AppendGid("").AppendRaw(timeoutTimestamp).AppendRaw(next).AppendRaw(limit)
lua := `-- ResetCronTime
local r = redis.call('ZRANGEBYSCORE', KEYS[3], ARGV[3], '+inf', 'LIMIT', 0, ARGV[5]+1)

2
dtmsvr/storage/registry/registry.go

@ -49,7 +49,7 @@ func GetStore() storage.Store {
// WaitStoreUp wait for db to go up
func WaitStoreUp() {
for err := GetStore().Ping(); err != nil; err = GetStore().Ping() {
logger.Infof("wait store up")
logger.Infof("wait store up: %v", err)
time.Sleep(3 * time.Second)
}
}

87
dtmsvr/storage/sql/sql.go

@ -137,61 +137,42 @@ func (s *Store) TouchCronTime(global *storage.TransGlobalStore, nextCronInterval
// LockOneGlobalTrans finds GlobalTrans
func (s *Store) LockOneGlobalTrans(expireIn time.Duration) *storage.TransGlobalStore {
db := dbGet()
getTime := func(second int) string {
return map[string]string{
"mysql": fmt.Sprintf("date_add(now(), interval %d second)", second),
"postgres": fmt.Sprintf("current_timestamp + interval '%d second'", second),
}[conf.Store.Driver]
}
expire := int(expireIn / time.Second)
whereTime := fmt.Sprintf("next_cron_time < %s", getTime(expire))
owner := shortuuid.New()
global := &storage.TransGlobalStore{}
dbr := db.Must().Model(global).
Where(whereTime + "and status in ('prepared', 'aborting', 'submitted')").
Limit(1).
Select([]string{"owner", "next_cron_time"}).
Updates(&storage.TransGlobalStore{
Owner: owner,
NextCronTime: dtmutil.GetNextTime(conf.RetryInterval),
})
if dbr.RowsAffected == 0 {
where := map[string]string{
dtmimp.DBTypeMysql: fmt.Sprintf(`next_cron_time < date_add(now(), interval %d second) and status in ('prepared', 'aborting', 'submitted') limit 1`, int(expireIn/time.Second)),
dtmimp.DBTypePostgres: fmt.Sprintf(`id in (select id from trans_global where next_cron_time < current_timestamp + interval '%d second' and status in ('prepared', 'aborting', 'submitted') limit 1 )`, int(expireIn/time.Second)),
}[conf.Store.Driver]
sql := fmt.Sprintf(`UPDATE trans_global SET update_time='%s',next_cron_time='%s', owner='%s' WHERE %s`,
getTimeStr(0),
getTimeStr(conf.RetryInterval),
owner,
where)
affected, err := dtmimp.DBExec(conf.Store.Driver, db.ToSQLDB(), sql)
dtmimp.PanicIf(err != nil, err)
if affected == 0 {
return nil
}
global := &storage.TransGlobalStore{}
db.Must().Where("owner=?", owner).First(global)
return global
}
// ResetCronTime rest nextCronTime
// Prevent multiple backoff from causing NextCronTime to be too long
func (s *Store) ResetCronTime(timeout time.Duration, limit int64) (succeedCount int64, hasRemaining bool, err error) {
db := dbGet()
getTime := func(second int) string {
return map[string]string{
"mysql": fmt.Sprintf("date_add(now(), interval %d second)", second),
"postgres": fmt.Sprintf("current_timestamp + interval '%d second'", second),
}[conf.Store.Driver]
}
timeoutSecond := int(timeout / time.Second)
whereTime := fmt.Sprintf("next_cron_time > %s", getTime(timeoutSecond))
global := &storage.TransGlobalStore{}
dbr := db.Must().Model(global).
Where(whereTime + "and status in ('prepared', 'aborting', 'submitted')").
Limit(int(limit)).
Select([]string{"next_cron_time"}).
Updates(&storage.TransGlobalStore{
NextCronTime: dtmutil.GetNextTime(0),
})
succeedCount = dbr.RowsAffected
if succeedCount == limit {
var count int64
db.Must().Model(global).Where(whereTime + "and status in ('prepared', 'aborting', 'submitted')").Limit(1).Count(&count)
if count > 0 {
hasRemaining = true
}
}
// ResetCronTime reset nextCronTime
// unfinished transactions need to be retried as soon as possible after business downtime is recovered
func (s *Store) ResetCronTime(after time.Duration, limit int64) (succeedCount int64, hasRemaining bool, err error) {
where := map[string]string{
dtmimp.DBTypeMysql: fmt.Sprintf(`next_cron_time > date_add(now(), interval %d second) and status in ('prepared', 'aborting', 'submitted') limit %d`, int(after/time.Second), limit),
dtmimp.DBTypePostgres: fmt.Sprintf(`id in (select id from trans_global where next_cron_time > current_timestamp + interval '%d second' and status in ('prepared', 'aborting', 'submitted') limit %d )`, int(after/time.Second), limit),
}[conf.Store.Driver]
return succeedCount, hasRemaining, dbr.Error
sql := fmt.Sprintf(`UPDATE trans_global SET update_time='%s',next_cron_time='%s' WHERE %s`,
getTimeStr(0),
getTimeStr(0),
where)
affected, err := dtmimp.DBExec(conf.Store.Driver, dbGet().ToSQLDB(), sql)
return affected, affected == limit, err
}
// SetDBConn sets db conn pool
@ -213,3 +194,15 @@ func wrapError(err error) error {
dtmimp.E2P(err)
return err
}
func getTime(after time.Duration) string {
second := int64(after / time.Second)
return map[string]string{
"mysql": fmt.Sprintf("date_add(now(), interval %d second)", second),
"postgres": fmt.Sprintf("current_timestamp + interval '%d second'", second),
}[conf.Store.Driver]
}
func getTimeStr(afterSecond int64) string {
return dtmutil.GetNextTime(afterSecond).Format("2006-01-02 15:04:05")
}

2
dtmsvr/storage/store.go

@ -30,5 +30,5 @@ type Store interface {
ChangeGlobalStatus(global *TransGlobalStore, newStatus string, updates []string, finished bool)
TouchCronTime(global *TransGlobalStore, nextCronInterval int64, nextCronTime *time.Time)
LockOneGlobalTrans(expireIn time.Duration) *TransGlobalStore
ResetCronTime(timeout time.Duration, limit int64) (succeedCount int64, hasRemaining bool, err error)
ResetCronTime(after time.Duration, limit int64) (succeedCount int64, hasRemaining bool, err error)
}

2
dtmutil/db.go

@ -100,7 +100,7 @@ func DbGet(conf dtmcli.DBConf, ops ...func(*gorm.DB)) *DB {
dsn := dtmimp.GetDsn(conf)
db, ok := dbs.Load(dsn)
if !ok {
logger.Infof("connecting '%s' '%s' '%s' '%d'", conf.Driver, conf.Host, conf.User, conf.Port)
logger.Infof("connecting '%s' '%s' '%s' '%d' '%s'", conf.Driver, conf.Host, conf.User, conf.Port, conf.Db)
db1, err := gorm.Open(getGormDialetor(conf.Driver, dsn), &gorm.Config{
SkipDefaultTransaction: true,
})

28
helper/compose.cloud.yml

@ -1,28 +0,0 @@
version: '3.3'
services:
api:
build: ..
volumes:
- /etc/localtime:/etc/localtime:ro
- /etc/timezone:/etc/timezone:ro
- ..:/app/dtm
extra_hosts:
- 'host.docker.internal:host-gateway'
environment:
IS_DOCKER: 1
ports:
- '9080:8080'
mysql:
image: 'mysql:5.7'
volumes:
- /etc/localtime:/etc/localtime:ro
- /etc/timezone:/etc/timezone:ro
environment:
MYSQL_ALLOW_EMPTY_PASSWORD: 1
command:
[
'--character-set-server=utf8mb4',
'--collation-server=utf8mb4_unicode_ci',
]
ports:
- '3306:3306'

16
helper/compose.mysql.yml

@ -1,16 +0,0 @@
version: '3.3'
services:
mysql:
image: 'mysql:5.7'
volumes:
- /etc/localtime:/etc/localtime:ro
- /etc/timezone:/etc/timezone:ro
environment:
MYSQL_ALLOW_EMPTY_PASSWORD: 1
command:
[
'--character-set-server=utf8mb4',
'--collation-server=utf8mb4_unicode_ci',
]
ports:
- '3306:3306'

13
helper/compose.postgres.yml

@ -1,13 +0,0 @@
version: '3.3'
services:
postgres:
image: 'postgres:13'
command: postgres --max_prepared_transactions=1000
volumes:
- /etc/localtime:/etc/localtime:ro
- /etc/timezone:/etc/timezone:ro
environment:
POSTGRES_PASSWORD: mysecretpassword
ports:
- '5432:5432'

1
helper/compose.store.yml

@ -22,6 +22,7 @@ services:
- /etc/timezone:/etc/timezone:ro
environment:
POSTGRES_PASSWORD: mysecretpassword
POSTGRES_DB: dtm
ports:
- '5432:5432'

2
helper/test-cover.sh

@ -1,6 +1,6 @@
set -x
echo "" > coverage.txt
for store in redis mysql boltdb; do
for store in redis mysql boltdb postgres; do
for d in $(go list ./... | grep -v vendor); do
TEST_STORE=$store go test -covermode count -coverprofile=profile.out -coverpkg=github.com/dtm-labs/dtm/dtmcli,github.com/dtm-labs/dtm/dtmcli/dtmimp,github.com/dtm-labs/dtm/dtmcli/logger,github.com/dtm-labs/dtm/dtmgrpc,github.com/dtm-labs/dtm/dtmgrpc/dtmgimp,github.com/dtm-labs/dtm/dtmsvr,github.com/dtm-labs/dtm/dtmsvr/config,github.com/dtm-labs/dtm/dtmsvr/storage,github.com/dtm-labs/dtm/dtmsvr/storage/boltdb,github.com/dtm-labs/dtm/dtmsvr/storage/redis,github.com/dtm-labs/dtm/dtmsvr/storage/registry,github.com/dtm-labs/dtm/dtmsvr/storage/sql,github.com/dtm-labs/dtm/dtmutil -gcflags=-l $d || exit 1
if [ -f profile.out ]; then

27
sqls/dtmsvr.storage.postgres.sql

@ -1,11 +1,8 @@
CREATE SCHEMA if not EXISTS dtm
/* SQLINES DEMO *** RACTER SET utf8mb4 */
;
drop table IF EXISTS dtm.trans_global;
-- SQLINES LICENSE FOR EVALUATION USE ONLY
CREATE SEQUENCE if not EXISTS dtm.trans_global_seq;
CREATE TABLE if not EXISTS dtm.trans_global (
id bigint NOT NULL DEFAULT NEXTVAL ('dtm.trans_global_seq'),
drop table IF EXISTS trans_global;
CREATE SEQUENCE if not EXISTS trans_global_seq;
CREATE TABLE if not EXISTS trans_global (
id bigint NOT NULL DEFAULT NEXTVAL ('trans_global_seq'),
gid varchar(128) NOT NULL,
trans_type varchar(45) not null,
status varchar(45) NOT NULL,
@ -24,13 +21,13 @@ CREATE TABLE if not EXISTS dtm.trans_global (
PRIMARY KEY (id),
CONSTRAINT gid UNIQUE (gid)
);
create index if not EXISTS owner on dtm.trans_global(owner);
create index if not EXISTS status_next_cron_time on dtm.trans_global (status, next_cron_time);
drop table IF EXISTS dtm.trans_branch_op;
-- SQLINES LICENSE FOR EVALUATION USE ONLY
CREATE SEQUENCE if not EXISTS dtm.trans_branch_op_seq;
CREATE TABLE IF NOT EXISTS dtm.trans_branch_op (
id bigint NOT NULL DEFAULT NEXTVAL ('dtm.trans_branch_op_seq'),
create index if not EXISTS owner on trans_global(owner);
create index if not EXISTS status_next_cron_time on trans_global (status, next_cron_time);
drop table IF EXISTS trans_branch_op;
CREATE SEQUENCE if not EXISTS trans_branch_op_seq;
CREATE TABLE IF NOT EXISTS trans_branch_op (
id bigint NOT NULL DEFAULT NEXTVAL ('trans_branch_op_seq'),
gid varchar(128) NOT NULL,
url varchar(1024) NOT NULL,
data TEXT,

4
test/base_test.go

@ -26,7 +26,7 @@ type BarrierModel struct {
}
// TableName gorm table name
func (BarrierModel) TableName() string { return "dtm_barrier.barrier" }
func (BarrierModel) TableName() string { return dtmimp.BarrierTableName }
func TestBaseSqlDB(t *testing.T) {
asserts := assert.New(t)
@ -38,7 +38,7 @@ func TestBaseSqlDB(t *testing.T) {
Op: dtmimp.OpAction,
BarrierID: 1,
}
db.Must().Exec("insert into dtm_barrier.barrier(trans_type, gid, branch_id, op, barrier_id, reason) values('saga', 'gid1', 'branch_id1', 'action', '01', 'saga')")
db.Must().Exec(fmt.Sprintf("insert into %s(trans_type, gid, branch_id, op, barrier_id, reason) values('saga', 'gid1', 'branch_id1', 'action', '01', 'saga')", dtmimp.BarrierTableName))
tx, err := db.ToSQLDB().Begin()
asserts.Nil(err)
err = barrier.Call(tx, func(tx *sql.Tx) error {

11
test/main_test.go

@ -48,6 +48,12 @@ func TestMain(m *testing.M) {
conf.Store.Port = 3306
conf.Store.User = "root"
conf.Store.Password = ""
} else if tenv == "postgres" {
conf.Store.Driver = "postgres"
conf.Store.Host = "localhost"
conf.Store.Port = 5432
conf.Store.User = "postgres"
conf.Store.Password = "mysecretpassword"
} else {
conf.Store.Driver = "redis"
conf.Store.Host = "localhost"
@ -55,10 +61,15 @@ func TestMain(m *testing.M) {
conf.Store.Password = ""
conf.Store.Port = 6379
}
conf.Store.Db = ""
registry.WaitStoreUp()
dtmsvr.PopulateDB(false)
conf.Store.Db = "dtm" // after populateDB, set current db to dtm
if tenv == "postgres" {
busi.BusiConf = conf.Store.GetDBConf()
dtmcli.SetCurrentDBType(tenv)
}
go dtmsvr.StartSvr()
busi.PopulateDB(false)

44
test/store_test.go

@ -100,73 +100,73 @@ func TestStoreResetCronTime(t *testing.T) {
})
}
func testStoreResetCronTime(t *testing.T, funcName string, restCronHandler func(expire int64, limit int64) (int64, bool, error)) {
func testStoreResetCronTime(t *testing.T, funcName string, resetCronHandler func(expire int64, limit int64) (int64, bool, error)) {
s := registry.GetStore()
var restTimeTimeout, lockExpireIn, limit, i int64
restTimeTimeout = 100 //The time that will be ResetCronTime
lockExpireIn = 2 //The time that will be LockOneGlobalTrans
limit = 10 // rest limit
var afterSeconds, lockExpireIn, limit, i int64
afterSeconds = 100
lockExpireIn = 2
limit = 10
// Will be reset
for i = 0; i < limit; i++ {
gid := funcName + fmt.Sprintf("%d", i)
_, _ = initTransGlobalByNextCronTime(gid, time.Now().Add(time.Duration(restTimeTimeout+10)*time.Second))
_, _ = initTransGlobalByNextCronTime(gid, time.Now().Add(time.Duration(afterSeconds+10)*time.Second))
}
// Will not be reset
gid := funcName + fmt.Sprintf("%d", 10)
_, _ = initTransGlobalByNextCronTime(gid, time.Now().Add(time.Duration(restTimeTimeout-10)*time.Second))
_, _ = initTransGlobalByNextCronTime(gid, time.Now().Add(time.Duration(afterSeconds-10)*time.Second))
// Not Fount
// Not Found
g := s.LockOneGlobalTrans(time.Duration(lockExpireIn) * time.Second)
assert.Nil(t, g)
// Rest limit-1 count
succeedCount, hasRemaining, err := restCronHandler(restTimeTimeout, limit-1)
// Reset limit-1 count
succeedCount, hasRemaining, err := resetCronHandler(afterSeconds, limit-1)
assert.Equal(t, hasRemaining, true)
assert.Equal(t, succeedCount, limit-1)
assert.Nil(t, err)
// Fount limit-1 count
// Found limit-1 count
for i = 0; i < limit-1; i++ {
g = s.LockOneGlobalTrans(time.Duration(lockExpireIn) * time.Second)
assert.NotNil(t, g)
s.ChangeGlobalStatus(g, "succeed", []string{}, true)
}
// Not Fount
// Not Found
g = s.LockOneGlobalTrans(time.Duration(lockExpireIn) * time.Second)
assert.Nil(t, g)
// Rest 1 count
succeedCount, hasRemaining, err = restCronHandler(restTimeTimeout, limit)
// Reset 1 count
succeedCount, hasRemaining, err = resetCronHandler(afterSeconds, limit)
assert.Equal(t, hasRemaining, false)
assert.Equal(t, succeedCount, int64(1))
assert.Nil(t, err)
// Fount 1 count
// Found 1 count
g = s.LockOneGlobalTrans(time.Duration(lockExpireIn) * time.Second)
assert.NotNil(t, g)
s.ChangeGlobalStatus(g, "succeed", []string{}, true)
// Not Fount
// Not Found
g = s.LockOneGlobalTrans(time.Duration(lockExpireIn) * time.Second)
assert.Nil(t, g)
// reduce the restTimeTimeout, Rest 1 count
succeedCount, hasRemaining, err = restCronHandler(restTimeTimeout-12, limit)
// reduce the resetTimeTimeout, Reset 1 count
succeedCount, hasRemaining, err = resetCronHandler(afterSeconds-12, limit)
assert.Equal(t, hasRemaining, false)
assert.Equal(t, succeedCount, int64(1))
assert.Nil(t, err)
// Fount 1 count
// Found 1 count
g = s.LockOneGlobalTrans(time.Duration(lockExpireIn) * time.Second)
assert.NotNil(t, g)
s.ChangeGlobalStatus(g, "succeed", []string{}, true)
// Not Fount
// Not Found
g = s.LockOneGlobalTrans(time.Duration(lockExpireIn) * time.Second)
assert.Nil(t, g)
// Not Fount
succeedCount, hasRemaining, err = restCronHandler(restTimeTimeout-12, limit)
// Not Found
succeedCount, hasRemaining, err = resetCronHandler(afterSeconds-12, limit)
assert.Equal(t, hasRemaining, false)
assert.Equal(t, succeedCount, int64(0))
assert.Nil(t, err)

3
test/xa_cover_test.go

@ -39,6 +39,9 @@ func TestXaCoverDTMError(t *testing.T) {
}
func TestXaCoverGidError(t *testing.T) {
if dtmimp.GetCurrentDBType() != dtmimp.DBTypeMysql {
return
}
gid := dtmimp.GetFuncName() + "-' '"
err := dtmcli.XaGlobalTransaction(DtmServer, gid, func(xa *dtmcli.Xa) (*resty.Response, error) {
req := busi.GenTransReq(30, false, false)

Loading…
Cancel
Save