From 73fff1876ecae1ceab64342e6d051770775c06ef Mon Sep 17 00:00:00 2001 From: lsytj0413 <511121939@qq.com> Date: Fri, 24 Dec 2021 10:27:57 +0800 Subject: [PATCH] refactor(*): split storage with sql/redis package --- dtmsvr/storage/{ => redis}/redis.go | 48 ++++++++++++++----------- dtmsvr/storage/registry/registry.go | 8 +++-- dtmsvr/storage/{ => sql}/sql.go | 56 ++++++++++++++++++----------- dtmsvr/storage/store.go | 10 ------ dtmsvr/storage/utils.go | 16 --------- 5 files changed, 69 insertions(+), 69 deletions(-) rename dtmsvr/storage/{ => redis}/redis.go (81%) rename dtmsvr/storage/{ => sql}/sql.go (71%) delete mode 100644 dtmsvr/storage/utils.go diff --git a/dtmsvr/storage/redis.go b/dtmsvr/storage/redis/redis.go similarity index 81% rename from dtmsvr/storage/redis.go rename to dtmsvr/storage/redis/redis.go index dcaed8a..348e572 100644 --- a/dtmsvr/storage/redis.go +++ b/dtmsvr/storage/redis/redis.go @@ -1,4 +1,4 @@ -package storage +package redis import ( "context" @@ -6,11 +6,15 @@ import ( "time" "github.com/go-redis/redis/v8" + "gorm.io/gorm" + "github.com/yedf/dtm/common" "github.com/yedf/dtm/dtmcli/dtmimp" - "gorm.io/gorm" + "github.com/yedf/dtm/dtmsvr/storage" ) +var config = &common.Config + var ctx context.Context = context.Background() type RedisStore struct { @@ -26,30 +30,30 @@ func (s *RedisStore) PopulateData(skipDrop bool) { dtmimp.PanicIf(err != nil, err) } -func (s *RedisStore) FindTransGlobalStore(gid string) *TransGlobalStore { +func (s *RedisStore) FindTransGlobalStore(gid string) *storage.TransGlobalStore { r, err := redisGet().Get(ctx, config.Store.RedisPrefix+"_g_"+gid).Result() if err == redis.Nil { return nil } dtmimp.E2P(err) - trans := &TransGlobalStore{} + trans := &storage.TransGlobalStore{} dtmimp.MustUnmarshalString(r, trans) return trans } -func (s *RedisStore) ScanTransGlobalStores(position *string, limit int64) []TransGlobalStore { +func (s *RedisStore) ScanTransGlobalStores(position *string, limit int64) []storage.TransGlobalStore { lid := uint64(0) if *position != "" { lid = uint64(dtmimp.MustAtoi(*position)) } keys, cursor, err := redisGet().Scan(ctx, lid, config.Store.RedisPrefix+"_g_*", limit).Result() dtmimp.E2P(err) - globals := []TransGlobalStore{} + globals := []storage.TransGlobalStore{} if len(keys) > 0 { values, err := redisGet().MGet(ctx, keys...).Result() dtmimp.E2P(err) for _, v := range values { - global := TransGlobalStore{} + global := storage.TransGlobalStore{} dtmimp.MustUnmarshalString(v.(string), &global) globals = append(globals, global) } @@ -62,17 +66,17 @@ func (s *RedisStore) ScanTransGlobalStores(position *string, limit int64) []Tran return globals } -func (s *RedisStore) FindBranches(gid string) []TransBranchStore { +func (s *RedisStore) FindBranches(gid string) []storage.TransBranchStore { sa, err := redisGet().LRange(ctx, config.Store.RedisPrefix+"_b_"+gid, 0, -1).Result() dtmimp.E2P(err) - branches := make([]TransBranchStore, len(sa)) + branches := make([]storage.TransBranchStore, len(sa)) for k, v := range sa { dtmimp.MustUnmarshalString(v, &branches[k]) } return branches } -func (s *RedisStore) UpdateBranchesSql(branches []TransBranchStore, updates []string) *gorm.DB { +func (s *RedisStore) UpdateBranchesSql(branches []storage.TransBranchStore, updates []string) *gorm.DB { return nil // not implemented } @@ -102,7 +106,7 @@ func (a *argList) AppendObject(v interface{}) *argList { return a.AppendRaw(dtmimp.MustMarshalString(v)) } -func (a *argList) AppendBranches(branches []TransBranchStore) *argList { +func (a *argList) AppendBranches(branches []storage.TransBranchStore) *argList { for _, b := range branches { a.AppendRaw(dtmimp.MustMarshalString(b)) } @@ -116,8 +120,8 @@ func handleRedisResult(ret interface{}, err error) (string, error) { } s, _ := ret.(string) err = map[string]error{ - "NOT_FOUND": ErrNotFound, - "UNIQUE_CONFLICT": ErrUniqueConflict, + "NOT_FOUND": storage.ErrNotFound, + "UNIQUE_CONFLICT": storage.ErrUniqueConflict, }[s] return s, err } @@ -128,7 +132,7 @@ func callLua(a *argList, lua string) (string, error) { return handleRedisResult(ret, err) } -func (s *RedisStore) MaySaveNewTrans(global *TransGlobalStore, branches []TransBranchStore) error { +func (s *RedisStore) MaySaveNewTrans(global *storage.TransGlobalStore, branches []storage.TransBranchStore) error { a := newArgList(). AppendGid(global.Gid). AppendObject(global). @@ -153,10 +157,10 @@ redis.call('EXPIRE', KEYS[2], ARGV[2]) return err } -func (s *RedisStore) LockGlobalSaveBranches(gid string, status string, branches []TransBranchStore, branchStart int) { +func (s *RedisStore) LockGlobalSaveBranches(gid string, status string, branches []storage.TransBranchStore, branchStart int) { args := newArgList(). AppendGid(gid). - AppendObject(&TransGlobalStore{Gid: gid, Status: status}). + AppendObject(&storage.TransGlobalStore{Gid: gid, Status: status}). AppendRaw(branchStart). AppendBranches(branches) _, err := callLua(args, ` @@ -182,7 +186,7 @@ redis.call('EXPIRE', KEYS[2], ARGV[2]) dtmimp.E2P(err) } -func (s *RedisStore) ChangeGlobalStatus(global *TransGlobalStore, newStatus string, updates []string, finished bool) { +func (s *RedisStore) ChangeGlobalStatus(global *storage.TransGlobalStore, newStatus string, updates []string, finished bool) { old := global.Status global.Status = newStatus args := newArgList().AppendGid(global.Gid).AppendObject(global).AppendRaw(old).AppendRaw(finished) @@ -205,7 +209,7 @@ end dtmimp.E2P(err) } -func (s *RedisStore) LockOneGlobalTrans(expireIn time.Duration) *TransGlobalStore { +func (s *RedisStore) LockOneGlobalTrans(expireIn time.Duration) *storage.TransGlobalStore { expired := time.Now().Add(expireIn).Unix() next := time.Now().Add(time.Duration(config.RetryInterval) * time.Second).Unix() args := newArgList().AppendGid("").AppendRaw(expired).AppendRaw(next) @@ -224,7 +228,7 @@ return gid ` for { r, err := callLua(args, lua) - if err == ErrNotFound { + if err == storage.ErrNotFound { return nil } dtmimp.E2P(err) @@ -235,7 +239,7 @@ return gid } } -func (s *RedisStore) TouchCronTime(global *TransGlobalStore, nextCronInterval int64) { +func (s *RedisStore) TouchCronTime(global *storage.TransGlobalStore, nextCronInterval int64) { global.NextCronTime = common.GetNextTime(nextCronInterval) global.UpdateTime = common.GetNextTime(0) global.NextCronInterval = nextCronInterval @@ -255,3 +259,7 @@ redis.call('SET', KEYS[1], ARGV[3], 'EX', ARGV[2]) `) dtmimp.E2P(err) } + +func redisGet() *redis.Client { + return common.RedisGet() +} diff --git a/dtmsvr/storage/registry/registry.go b/dtmsvr/storage/registry/registry.go index 250ffe2..e0fbe93 100644 --- a/dtmsvr/storage/registry/registry.go +++ b/dtmsvr/storage/registry/registry.go @@ -6,14 +6,16 @@ import ( "github.com/yedf/dtm/common" "github.com/yedf/dtm/dtmsvr/storage" "github.com/yedf/dtm/dtmsvr/storage/boltdb" + "github.com/yedf/dtm/dtmsvr/storage/redis" + "github.com/yedf/dtm/dtmsvr/storage/sql" ) var config = &common.Config var stores map[string]storage.Store = map[string]storage.Store{ - "redis": &storage.RedisStore{}, - "mysql": &storage.SqlStore{}, - "postgres": &storage.SqlStore{}, + "redis": &redis.RedisStore{}, + "mysql": &sql.SqlStore{}, + "postgres": &sql.SqlStore{}, "boltdb": &boltdb.BoltdbStore{}, } diff --git a/dtmsvr/storage/sql.go b/dtmsvr/storage/sql/sql.go similarity index 71% rename from dtmsvr/storage/sql.go rename to dtmsvr/storage/sql/sql.go index 40a399f..db4ad26 100644 --- a/dtmsvr/storage/sql.go +++ b/dtmsvr/storage/sql/sql.go @@ -1,4 +1,4 @@ -package storage +package sql import ( "fmt" @@ -6,12 +6,16 @@ import ( "time" "github.com/google/uuid" - "github.com/yedf/dtm/common" - "github.com/yedf/dtm/dtmcli/dtmimp" "gorm.io/gorm" "gorm.io/gorm/clause" + + "github.com/yedf/dtm/common" + "github.com/yedf/dtm/dtmcli/dtmimp" + "github.com/yedf/dtm/dtmsvr/storage" ) +var config = &common.Config + type SqlStore struct { } @@ -27,8 +31,8 @@ func (s *SqlStore) PopulateData(skipDrop bool) { common.RunSQLScript(config.Store.GetDBConf(), file, skipDrop) } -func (s *SqlStore) FindTransGlobalStore(gid string) *TransGlobalStore { - trans := &TransGlobalStore{} +func (s *SqlStore) FindTransGlobalStore(gid string) *storage.TransGlobalStore { + trans := &storage.TransGlobalStore{} dbr := dbGet().Model(trans).Where("gid=?", gid).First(trans) if dbr.Error == gorm.ErrRecordNotFound { return nil @@ -37,8 +41,8 @@ func (s *SqlStore) FindTransGlobalStore(gid string) *TransGlobalStore { return trans } -func (s *SqlStore) ScanTransGlobalStores(position *string, limit int64) []TransGlobalStore { - globals := []TransGlobalStore{} +func (s *SqlStore) ScanTransGlobalStores(position *string, limit int64) []storage.TransGlobalStore { + globals := []storage.TransGlobalStore{} lid := math.MaxInt64 if *position != "" { lid = dtmimp.MustAtoi(*position) @@ -52,22 +56,22 @@ func (s *SqlStore) ScanTransGlobalStores(position *string, limit int64) []TransG return globals } -func (s *SqlStore) FindBranches(gid string) []TransBranchStore { - branches := []TransBranchStore{} +func (s *SqlStore) FindBranches(gid string) []storage.TransBranchStore { + branches := []storage.TransBranchStore{} dbGet().Must().Where("gid=?", gid).Order("id asc").Find(&branches) return branches } -func (s *SqlStore) UpdateBranchesSql(branches []TransBranchStore, updates []string) *gorm.DB { +func (s *SqlStore) UpdateBranchesSql(branches []storage.TransBranchStore, updates []string) *gorm.DB { return dbGet().Clauses(clause.OnConflict{ OnConstraint: "trans_branch_op_pkey", DoUpdates: clause.AssignmentColumns(updates), }).Create(branches) } -func (s *SqlStore) LockGlobalSaveBranches(gid string, status string, branches []TransBranchStore, branchStart int) { +func (s *SqlStore) LockGlobalSaveBranches(gid string, status string, branches []storage.TransBranchStore, branchStart int) { err := dbGet().Transaction(func(tx *gorm.DB) error { - g := &TransGlobalStore{} + g := &storage.TransGlobalStore{} dbr := tx.Clauses(clause.Locking{Strength: "UPDATE"}).Model(g).Where("gid=? and status=?", gid, status).First(g) if dbr.Error == nil { dbr = tx.Save(branches) @@ -77,14 +81,14 @@ func (s *SqlStore) LockGlobalSaveBranches(gid string, status string, branches [] dtmimp.E2P(err) } -func (s *SqlStore) MaySaveNewTrans(global *TransGlobalStore, branches []TransBranchStore) error { +func (s *SqlStore) MaySaveNewTrans(global *storage.TransGlobalStore, branches []storage.TransBranchStore) error { return dbGet().Transaction(func(db1 *gorm.DB) error { db := &common.DB{DB: db1} dbr := db.Must().Clauses(clause.OnConflict{ DoNothing: true, }).Create(global) if dbr.RowsAffected <= 0 { // 如果这个不是新事务,返回错误 - return ErrUniqueConflict + return storage.ErrUniqueConflict } if len(branches) > 0 { db.Must().Clauses(clause.OnConflict{ @@ -95,16 +99,16 @@ func (s *SqlStore) MaySaveNewTrans(global *TransGlobalStore, branches []TransBra }) } -func (s *SqlStore) ChangeGlobalStatus(global *TransGlobalStore, newStatus string, updates []string, finished bool) { +func (s *SqlStore) ChangeGlobalStatus(global *storage.TransGlobalStore, newStatus string, updates []string, finished bool) { old := global.Status global.Status = newStatus dbr := dbGet().Must().Model(global).Where("status=? and gid=?", old, global.Gid).Select(updates).Updates(global) if dbr.RowsAffected == 0 { - dtmimp.E2P(ErrNotFound) + dtmimp.E2P(storage.ErrNotFound) } } -func (s *SqlStore) TouchCronTime(global *TransGlobalStore, nextCronInterval int64) { +func (s *SqlStore) TouchCronTime(global *storage.TransGlobalStore, nextCronInterval int64) { global.NextCronTime = common.GetNextTime(nextCronInterval) global.UpdateTime = common.GetNextTime(0) global.NextCronInterval = nextCronInterval @@ -112,7 +116,7 @@ func (s *SqlStore) TouchCronTime(global *TransGlobalStore, nextCronInterval int6 Select([]string{"next_cron_time", "update_time", "next_cron_interval"}).Updates(global) } -func (s *SqlStore) LockOneGlobalTrans(expireIn time.Duration) *TransGlobalStore { +func (s *SqlStore) LockOneGlobalTrans(expireIn time.Duration) *storage.TransGlobalStore { db := dbGet() getTime := func(second int) string { return map[string]string{ @@ -123,12 +127,12 @@ func (s *SqlStore) LockOneGlobalTrans(expireIn time.Duration) *TransGlobalStore expire := int(expireIn / time.Second) whereTime := fmt.Sprintf("next_cron_time < %s", getTime(expire)) owner := uuid.NewString() - global := &TransGlobalStore{} + global := &storage.TransGlobalStore{} dbr := db.Must().Model(global). Where(whereTime + "and status in ('prepared', 'aborting', 'submitted')"). Limit(1). Select([]string{"owner", "next_cron_time"}). - Updates(&TransGlobalStore{ + Updates(&storage.TransGlobalStore{ Owner: owner, NextCronTime: common.GetNextTime(common.Config.RetryInterval), }) @@ -138,3 +142,15 @@ func (s *SqlStore) LockOneGlobalTrans(expireIn time.Duration) *TransGlobalStore dbr = db.Must().Where("owner=?", owner).First(global) return global } + +func dbGet() *common.DB { + return common.DbGet(config.Store.GetDBConf()) +} + +func wrapError(err error) error { + if err == gorm.ErrRecordNotFound { + return storage.ErrNotFound + } + dtmimp.E2P(err) + return err +} diff --git a/dtmsvr/storage/store.go b/dtmsvr/storage/store.go index d68b1a4..54a40f9 100644 --- a/dtmsvr/storage/store.go +++ b/dtmsvr/storage/store.go @@ -4,8 +4,6 @@ import ( "errors" "time" - "github.com/go-redis/redis/v8" - "github.com/yedf/dtm/dtmcli/dtmimp" "gorm.io/gorm" ) @@ -25,11 +23,3 @@ type Store interface { TouchCronTime(global *TransGlobalStore, nextCronInterval int64) LockOneGlobalTrans(expireIn time.Duration) *TransGlobalStore } - -func wrapError(err error) error { - if err == gorm.ErrRecordNotFound || err == redis.Nil { - return ErrNotFound - } - dtmimp.E2P(err) - return err -} diff --git a/dtmsvr/storage/utils.go b/dtmsvr/storage/utils.go deleted file mode 100644 index d2d73d1..0000000 --- a/dtmsvr/storage/utils.go +++ /dev/null @@ -1,16 +0,0 @@ -package storage - -import ( - "github.com/go-redis/redis/v8" - "github.com/yedf/dtm/common" -) - -var config = &common.Config - -func dbGet() *common.DB { - return common.DbGet(config.Store.GetDBConf()) -} - -func redisGet() *redis.Client { - return common.RedisGet() -}