diff --git a/dtmcli/tcc.go b/dtmcli/tcc.go index 9df1bd4..73d1172 100644 --- a/dtmcli/tcc.go +++ b/dtmcli/tcc.go @@ -24,7 +24,7 @@ type TccGlobalFunc func(tcc *Tcc) (*resty.Response, error) // TccGlobalTransaction begin a tcc global transaction // dtm dtm server address -// gid 全局事务id +// gid global transaction ID // tccFunc tcc事务函数,里面会定义全局事务的分支 func TccGlobalTransaction(dtm string, gid string, tccFunc TccGlobalFunc) (rerr error) { tcc := &Tcc{TransBase: *dtmimp.NewTransBase(gid, "tcc", dtm, "")} diff --git a/dtmsvr/storage/boltdb/boltdb.go b/dtmsvr/storage/boltdb/boltdb.go index d0cd912..eb3454d 100644 --- a/dtmsvr/storage/boltdb/boltdb.go +++ b/dtmsvr/storage/boltdb/boltdb.go @@ -13,7 +13,6 @@ import ( "time" bolt "go.etcd.io/bbolt" - "gorm.io/gorm" "github.com/dtm-labs/dtm/dtmcli/dtmimp" "github.com/dtm-labs/dtm/dtmcli/logger" @@ -298,8 +297,8 @@ func (s *BoltdbStore) FindBranches(gid string) []storage.TransBranchStore { return branches } -func (s *BoltdbStore) UpdateBranchesSql(branches []storage.TransBranchStore, updates []string) *gorm.DB { - return nil // not implemented +func (s *BoltdbStore) UpdateBranches(branches []storage.TransBranchStore, updates []string) (int, error) { + return 0, nil // not implemented } func (s *BoltdbStore) LockGlobalSaveBranches(gid string, status string, branches []storage.TransBranchStore, branchStart int) { diff --git a/dtmsvr/storage/redis/redis.go b/dtmsvr/storage/redis/redis.go index f8cef4b..12ff277 100644 --- a/dtmsvr/storage/redis/redis.go +++ b/dtmsvr/storage/redis/redis.go @@ -7,7 +7,6 @@ import ( "time" "github.com/go-redis/redis/v8" - "gorm.io/gorm" "github.com/dtm-labs/dtm/dtmcli/dtmimp" "github.com/dtm-labs/dtm/dtmcli/logger" @@ -88,8 +87,8 @@ func (s *RedisStore) FindBranches(gid string) []storage.TransBranchStore { return branches } -func (s *RedisStore) UpdateBranchesSql(branches []storage.TransBranchStore, updates []string) *gorm.DB { - return nil // not implemented +func (s *RedisStore) UpdateBranches(branches []storage.TransBranchStore, updates []string) (int, error) { + return 0, nil // not implemented } type argList struct { diff --git a/dtmsvr/storage/sql/sql.go b/dtmsvr/storage/sql/sql.go index 7e4e74d..02f85c0 100644 --- a/dtmsvr/storage/sql/sql.go +++ b/dtmsvr/storage/sql/sql.go @@ -63,11 +63,12 @@ func (s *SqlStore) FindBranches(gid string) []storage.TransBranchStore { return branches } -func (s *SqlStore) UpdateBranchesSql(branches []storage.TransBranchStore, updates []string) *gorm.DB { - return dbGet().Clauses(clause.OnConflict{ +func (s *SqlStore) UpdateBranches(branches []storage.TransBranchStore, updates []string) (int, error) { + db := dbGet().Clauses(clause.OnConflict{ OnConstraint: "trans_branch_op_pkey", DoUpdates: clause.AssignmentColumns(updates), }).Create(branches) + return int(db.RowsAffected), db.Error } func (s *SqlStore) LockGlobalSaveBranches(gid string, status string, branches []storage.TransBranchStore, branchStart int) { diff --git a/dtmsvr/storage/store.go b/dtmsvr/storage/store.go index 54a40f9..1a9da9c 100644 --- a/dtmsvr/storage/store.go +++ b/dtmsvr/storage/store.go @@ -3,11 +3,12 @@ package storage import ( "errors" "time" - - "gorm.io/gorm" ) +// ErrNotFound defines the query item is not found in storage implement. var ErrNotFound = errors.New("storage: NotFound") + +// ErrUniqueConflict defines the item is conflict with unique key in storage implement. var ErrUniqueConflict = errors.New("storage: UniqueKeyConflict") type Store interface { @@ -16,7 +17,7 @@ type Store interface { FindTransGlobalStore(gid string) *TransGlobalStore ScanTransGlobalStores(position *string, limit int64) []TransGlobalStore FindBranches(gid string) []TransBranchStore - UpdateBranchesSql(branches []TransBranchStore, updates []string) *gorm.DB + UpdateBranches(branches []TransBranchStore, updates []string) (int, error) LockGlobalSaveBranches(gid string, status string, branches []TransBranchStore, branchStart int) MaySaveNewTrans(global *TransGlobalStore, branches []TransBranchStore) error ChangeGlobalStatus(global *TransGlobalStore, newStatus string, updates []string, finished bool) diff --git a/dtmsvr/svr.go b/dtmsvr/svr.go index f241e18..be40e1c 100644 --- a/dtmsvr/svr.go +++ b/dtmsvr/svr.go @@ -77,13 +77,13 @@ func updateBranchAsync() { } } for len(updates) > 0 { - dbr := GetStore().UpdateBranchesSql(updates, []string{"status", "finish_time", "update_time"}) + rowAffected, err := GetStore().UpdateBranches(updates, []string{"status", "finish_time", "update_time"}) - if dbr.Error != nil { - logger.Errorf("async update branch status error: %v", dbr.Error) + if err != nil { + logger.Errorf("async update branch status error: %v", err) time.Sleep(1 * time.Second) } else { - logger.Infof("flushed %d branch status to db. affected: %d", len(updates), dbr.RowsAffected) + logger.Infof("flushed %d branch status to db. affected: %d", len(updates), rowAffected) updates = []TransBranch{} } } diff --git a/test/store_test.go b/test/store_test.go index 47cd425..486b807 100644 --- a/test/store_test.go +++ b/test/store_test.go @@ -94,7 +94,7 @@ func TestStoreWait(t *testing.T) { func TestUpdateBranchSql(t *testing.T) { if !conf.Store.IsDB() { - r := registry.GetStore().UpdateBranchesSql(nil, nil) - assert.Nil(t, r) + _, err := registry.GetStore().UpdateBranches(nil, nil) + assert.Nil(t, err) } }