Browse Source
Merge pull request #136 from lsytj0413/optimize-store
refactor(*): rename UpdateBranchesSql -> UpdateBranches
pull/138/head
yedf2
4 years ago
committed by
GitHub
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with
17 additions and
17 deletions
-
dtmsvr/storage/boltdb/boltdb.go
-
dtmsvr/storage/redis/redis.go
-
dtmsvr/storage/sql/sql.go
-
dtmsvr/storage/store.go
-
dtmsvr/svr.go
-
test/store_test.go
|
|
|
@ -13,7 +13,6 @@ import ( |
|
|
|
"time" |
|
|
|
|
|
|
|
bolt "go.etcd.io/bbolt" |
|
|
|
"gorm.io/gorm" |
|
|
|
|
|
|
|
"github.com/dtm-labs/dtm/common" |
|
|
|
"github.com/dtm-labs/dtm/dtmcli/dtmimp" |
|
|
|
@ -297,8 +296,8 @@ func (s *BoltdbStore) FindBranches(gid string) []storage.TransBranchStore { |
|
|
|
return branches |
|
|
|
} |
|
|
|
|
|
|
|
func (s *BoltdbStore) UpdateBranchesSql(branches []storage.TransBranchStore, updates []string) *gorm.DB { |
|
|
|
return nil // not implemented
|
|
|
|
func (s *BoltdbStore) UpdateBranches(branches []storage.TransBranchStore, updates []string) (int, error) { |
|
|
|
return 0, nil // not implemented
|
|
|
|
} |
|
|
|
|
|
|
|
func (s *BoltdbStore) LockGlobalSaveBranches(gid string, status string, branches []storage.TransBranchStore, branchStart int) { |
|
|
|
|
|
|
|
@ -7,7 +7,6 @@ import ( |
|
|
|
"time" |
|
|
|
|
|
|
|
"github.com/go-redis/redis/v8" |
|
|
|
"gorm.io/gorm" |
|
|
|
|
|
|
|
"github.com/dtm-labs/dtm/common" |
|
|
|
"github.com/dtm-labs/dtm/dtmcli/dtmimp" |
|
|
|
@ -87,8 +86,8 @@ func (s *RedisStore) FindBranches(gid string) []storage.TransBranchStore { |
|
|
|
return branches |
|
|
|
} |
|
|
|
|
|
|
|
func (s *RedisStore) UpdateBranchesSql(branches []storage.TransBranchStore, updates []string) *gorm.DB { |
|
|
|
return nil // not implemented
|
|
|
|
func (s *RedisStore) UpdateBranches(branches []storage.TransBranchStore, updates []string) (int, error) { |
|
|
|
return 0, nil // not implemented
|
|
|
|
} |
|
|
|
|
|
|
|
type argList struct { |
|
|
|
|
|
|
|
@ -62,11 +62,12 @@ func (s *SqlStore) FindBranches(gid string) []storage.TransBranchStore { |
|
|
|
return branches |
|
|
|
} |
|
|
|
|
|
|
|
func (s *SqlStore) UpdateBranchesSql(branches []storage.TransBranchStore, updates []string) *gorm.DB { |
|
|
|
return dbGet().Clauses(clause.OnConflict{ |
|
|
|
func (s *SqlStore) UpdateBranches(branches []storage.TransBranchStore, updates []string) (int, error) { |
|
|
|
db := dbGet().Clauses(clause.OnConflict{ |
|
|
|
OnConstraint: "trans_branch_op_pkey", |
|
|
|
DoUpdates: clause.AssignmentColumns(updates), |
|
|
|
}).Create(branches) |
|
|
|
return int(db.RowsAffected), db.Error |
|
|
|
} |
|
|
|
|
|
|
|
func (s *SqlStore) LockGlobalSaveBranches(gid string, status string, branches []storage.TransBranchStore, branchStart int) { |
|
|
|
|
|
|
|
@ -3,11 +3,12 @@ package storage |
|
|
|
import ( |
|
|
|
"errors" |
|
|
|
"time" |
|
|
|
|
|
|
|
"gorm.io/gorm" |
|
|
|
) |
|
|
|
|
|
|
|
// ErrNotFound defines the query item is not found in storage implement.
|
|
|
|
var ErrNotFound = errors.New("storage: NotFound") |
|
|
|
|
|
|
|
// ErrUniqueConflict defines the item is conflict with unique key in storage implement.
|
|
|
|
var ErrUniqueConflict = errors.New("storage: UniqueKeyConflict") |
|
|
|
|
|
|
|
type Store interface { |
|
|
|
@ -16,7 +17,7 @@ type Store interface { |
|
|
|
FindTransGlobalStore(gid string) *TransGlobalStore |
|
|
|
ScanTransGlobalStores(position *string, limit int64) []TransGlobalStore |
|
|
|
FindBranches(gid string) []TransBranchStore |
|
|
|
UpdateBranchesSql(branches []TransBranchStore, updates []string) *gorm.DB |
|
|
|
UpdateBranches(branches []TransBranchStore, updates []string) (int, error) |
|
|
|
LockGlobalSaveBranches(gid string, status string, branches []TransBranchStore, branchStart int) |
|
|
|
MaySaveNewTrans(global *TransGlobalStore, branches []TransBranchStore) error |
|
|
|
ChangeGlobalStatus(global *TransGlobalStore, newStatus string, updates []string, finished bool) |
|
|
|
|
|
|
|
@ -77,13 +77,13 @@ func updateBranchAsync() { |
|
|
|
} |
|
|
|
} |
|
|
|
for len(updates) > 0 { |
|
|
|
dbr := GetStore().UpdateBranchesSql(updates, []string{"status", "finish_time", "update_time"}) |
|
|
|
rowAffected, err := GetStore().UpdateBranches(updates, []string{"status", "finish_time", "update_time"}) |
|
|
|
|
|
|
|
if dbr.Error != nil { |
|
|
|
logger.Errorf("async update branch status error: %v", dbr.Error) |
|
|
|
if err != nil { |
|
|
|
logger.Errorf("async update branch status error: %v", err) |
|
|
|
time.Sleep(1 * time.Second) |
|
|
|
} else { |
|
|
|
logger.Infof("flushed %d branch status to db. affected: %d", len(updates), dbr.RowsAffected) |
|
|
|
logger.Infof("flushed %d branch status to db. affected: %d", len(updates), rowAffected) |
|
|
|
updates = []TransBranch{} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
@ -94,7 +94,7 @@ func TestStoreWait(t *testing.T) { |
|
|
|
|
|
|
|
func TestUpdateBranchSql(t *testing.T) { |
|
|
|
if !config.Store.IsDB() { |
|
|
|
r := registry.GetStore().UpdateBranchesSql(nil, nil) |
|
|
|
assert.Nil(t, r) |
|
|
|
_, err := registry.GetStore().UpdateBranches(nil, nil) |
|
|
|
assert.Nil(t, err) |
|
|
|
} |
|
|
|
} |
|
|
|
|