Browse Source

merge main

pull/139/head
yedf2 4 years ago
parent
commit
cbda9a4f7a
  1. 2
      dtmcli/tcc.go
  2. 5
      dtmsvr/storage/boltdb/boltdb.go
  3. 5
      dtmsvr/storage/redis/redis.go
  4. 5
      dtmsvr/storage/sql/sql.go
  5. 7
      dtmsvr/storage/store.go
  6. 8
      dtmsvr/svr.go
  7. 4
      test/store_test.go

2
dtmcli/tcc.go

@ -24,7 +24,7 @@ type TccGlobalFunc func(tcc *Tcc) (*resty.Response, error)
// TccGlobalTransaction begin a tcc global transaction // TccGlobalTransaction begin a tcc global transaction
// dtm dtm server address // dtm dtm server address
// gid 全局事务id // gid global transaction ID
// tccFunc tcc事务函数,里面会定义全局事务的分支 // tccFunc tcc事务函数,里面会定义全局事务的分支
func TccGlobalTransaction(dtm string, gid string, tccFunc TccGlobalFunc) (rerr error) { func TccGlobalTransaction(dtm string, gid string, tccFunc TccGlobalFunc) (rerr error) {
tcc := &Tcc{TransBase: *dtmimp.NewTransBase(gid, "tcc", dtm, "")} tcc := &Tcc{TransBase: *dtmimp.NewTransBase(gid, "tcc", dtm, "")}

5
dtmsvr/storage/boltdb/boltdb.go

@ -13,7 +13,6 @@ import (
"time" "time"
bolt "go.etcd.io/bbolt" bolt "go.etcd.io/bbolt"
"gorm.io/gorm"
"github.com/dtm-labs/dtm/dtmcli/dtmimp" "github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmcli/logger" "github.com/dtm-labs/dtm/dtmcli/logger"
@ -298,8 +297,8 @@ func (s *BoltdbStore) FindBranches(gid string) []storage.TransBranchStore {
return branches return branches
} }
func (s *BoltdbStore) UpdateBranchesSql(branches []storage.TransBranchStore, updates []string) *gorm.DB { func (s *BoltdbStore) UpdateBranches(branches []storage.TransBranchStore, updates []string) (int, error) {
return nil // not implemented return 0, nil // not implemented
} }
func (s *BoltdbStore) LockGlobalSaveBranches(gid string, status string, branches []storage.TransBranchStore, branchStart int) { func (s *BoltdbStore) LockGlobalSaveBranches(gid string, status string, branches []storage.TransBranchStore, branchStart int) {

5
dtmsvr/storage/redis/redis.go

@ -7,7 +7,6 @@ import (
"time" "time"
"github.com/go-redis/redis/v8" "github.com/go-redis/redis/v8"
"gorm.io/gorm"
"github.com/dtm-labs/dtm/dtmcli/dtmimp" "github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmcli/logger" "github.com/dtm-labs/dtm/dtmcli/logger"
@ -88,8 +87,8 @@ func (s *RedisStore) FindBranches(gid string) []storage.TransBranchStore {
return branches return branches
} }
func (s *RedisStore) UpdateBranchesSql(branches []storage.TransBranchStore, updates []string) *gorm.DB { func (s *RedisStore) UpdateBranches(branches []storage.TransBranchStore, updates []string) (int, error) {
return nil // not implemented return 0, nil // not implemented
} }
type argList struct { type argList struct {

5
dtmsvr/storage/sql/sql.go

@ -63,11 +63,12 @@ func (s *SqlStore) FindBranches(gid string) []storage.TransBranchStore {
return branches return branches
} }
func (s *SqlStore) UpdateBranchesSql(branches []storage.TransBranchStore, updates []string) *gorm.DB { func (s *SqlStore) UpdateBranches(branches []storage.TransBranchStore, updates []string) (int, error) {
return dbGet().Clauses(clause.OnConflict{ db := dbGet().Clauses(clause.OnConflict{
OnConstraint: "trans_branch_op_pkey", OnConstraint: "trans_branch_op_pkey",
DoUpdates: clause.AssignmentColumns(updates), DoUpdates: clause.AssignmentColumns(updates),
}).Create(branches) }).Create(branches)
return int(db.RowsAffected), db.Error
} }
func (s *SqlStore) LockGlobalSaveBranches(gid string, status string, branches []storage.TransBranchStore, branchStart int) { func (s *SqlStore) LockGlobalSaveBranches(gid string, status string, branches []storage.TransBranchStore, branchStart int) {

7
dtmsvr/storage/store.go

@ -3,11 +3,12 @@ package storage
import ( import (
"errors" "errors"
"time" "time"
"gorm.io/gorm"
) )
// ErrNotFound defines the query item is not found in storage implement.
var ErrNotFound = errors.New("storage: NotFound") var ErrNotFound = errors.New("storage: NotFound")
// ErrUniqueConflict defines the item is conflict with unique key in storage implement.
var ErrUniqueConflict = errors.New("storage: UniqueKeyConflict") var ErrUniqueConflict = errors.New("storage: UniqueKeyConflict")
type Store interface { type Store interface {
@ -16,7 +17,7 @@ type Store interface {
FindTransGlobalStore(gid string) *TransGlobalStore FindTransGlobalStore(gid string) *TransGlobalStore
ScanTransGlobalStores(position *string, limit int64) []TransGlobalStore ScanTransGlobalStores(position *string, limit int64) []TransGlobalStore
FindBranches(gid string) []TransBranchStore FindBranches(gid string) []TransBranchStore
UpdateBranchesSql(branches []TransBranchStore, updates []string) *gorm.DB UpdateBranches(branches []TransBranchStore, updates []string) (int, error)
LockGlobalSaveBranches(gid string, status string, branches []TransBranchStore, branchStart int) LockGlobalSaveBranches(gid string, status string, branches []TransBranchStore, branchStart int)
MaySaveNewTrans(global *TransGlobalStore, branches []TransBranchStore) error MaySaveNewTrans(global *TransGlobalStore, branches []TransBranchStore) error
ChangeGlobalStatus(global *TransGlobalStore, newStatus string, updates []string, finished bool) ChangeGlobalStatus(global *TransGlobalStore, newStatus string, updates []string, finished bool)

8
dtmsvr/svr.go

@ -77,13 +77,13 @@ func updateBranchAsync() {
} }
} }
for len(updates) > 0 { for len(updates) > 0 {
dbr := GetStore().UpdateBranchesSql(updates, []string{"status", "finish_time", "update_time"}) rowAffected, err := GetStore().UpdateBranches(updates, []string{"status", "finish_time", "update_time"})
if dbr.Error != nil { if err != nil {
logger.Errorf("async update branch status error: %v", dbr.Error) logger.Errorf("async update branch status error: %v", err)
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
} else { } else {
logger.Infof("flushed %d branch status to db. affected: %d", len(updates), dbr.RowsAffected) logger.Infof("flushed %d branch status to db. affected: %d", len(updates), rowAffected)
updates = []TransBranch{} updates = []TransBranch{}
} }
} }

4
test/store_test.go

@ -94,7 +94,7 @@ func TestStoreWait(t *testing.T) {
func TestUpdateBranchSql(t *testing.T) { func TestUpdateBranchSql(t *testing.T) {
if !conf.Store.IsDB() { if !conf.Store.IsDB() {
r := registry.GetStore().UpdateBranchesSql(nil, nil) _, err := registry.GetStore().UpdateBranches(nil, nil)
assert.Nil(t, r) assert.Nil(t, err)
} }
} }

Loading…
Cancel
Save