diff --git a/dtmsvr/storage/boltdb/boltdb.go b/dtmsvr/storage/boltdb/boltdb.go index eb3454d..19f0607 100644 --- a/dtmsvr/storage/boltdb/boltdb.go +++ b/dtmsvr/storage/boltdb/boltdb.go @@ -23,10 +23,11 @@ import ( var conf = &config.Config -type BoltdbStore struct { +// Store implements storage.Store, and storage with boltdb +type Store struct { } -var boltDb *bolt.DB = nil +var boltDb *bolt.DB var boltOnce sync.Once func boltGet() *bolt.DB { @@ -110,7 +111,7 @@ func cleanupGlobalWithGids(t *bolt.Tx, gids map[string]struct{}) { logger.Debugf("Start to cleanup %d gids", len(gids)) for gid := range gids { logger.Debugf("Start to delete gid: %s", gid) - bucket.Delete([]byte(gid)) + dtmimp.E2P(bucket.Delete([]byte(gid))) } } @@ -139,7 +140,7 @@ func cleanupBranchWithGids(t *bolt.Tx, gids map[string]struct{}) { logger.Debugf("Start to cleanup %d branches", len(branchKeys)) for _, key := range branchKeys { logger.Debugf("Start to delete branch: %s", key) - bucket.Delete([]byte(key)) + dtmimp.E2P(bucket.Delete([]byte(key))) } } @@ -165,7 +166,7 @@ func cleanupIndexWithGids(t *bolt.Tx, gids map[string]struct{}) { logger.Debugf("Start to cleanup %d indexes", len(indexKeys)) for _, key := range indexKeys { logger.Debugf("Start to delete index: %s", key) - bucket.Delete([]byte(key)) + dtmimp.E2P(bucket.Delete([]byte(key))) } } @@ -232,19 +233,25 @@ func tPutIndex(t *bolt.Tx, unix int64, gid string) { dtmimp.E2P(err) } -func (s *BoltdbStore) Ping() error { +// Ping execs ping cmd to boltdb +func (s *Store) Ping() error { return nil } -func (s *BoltdbStore) PopulateData(skipDrop bool) { +// PopulateData populates data to boltdb +func (s *Store) PopulateData(skipDrop bool) { if !skipDrop { err := boltGet().Update(func(t *bolt.Tx) error { - t.DeleteBucket(bucketIndex) - t.DeleteBucket(bucketBranches) - t.DeleteBucket(bucketGlobal) - t.CreateBucket(bucketIndex) - t.CreateBucket(bucketBranches) - t.CreateBucket(bucketGlobal) + dtmimp.E2P(t.DeleteBucket(bucketIndex)) + dtmimp.E2P(t.DeleteBucket(bucketBranches)) + dtmimp.E2P(t.DeleteBucket(bucketGlobal)) + _, err := t.CreateBucket(bucketIndex) + dtmimp.E2P(err) + _, err = t.CreateBucket(bucketBranches) + dtmimp.E2P(err) + _, err = t.CreateBucket(bucketGlobal) + dtmimp.E2P(err) + return nil }) dtmimp.E2P(err) @@ -252,7 +259,8 @@ func (s *BoltdbStore) PopulateData(skipDrop bool) { } } -func (s *BoltdbStore) FindTransGlobalStore(gid string) (trans *storage.TransGlobalStore) { +// FindTransGlobalStore finds GlobalTrans data by gid +func (s *Store) FindTransGlobalStore(gid string) (trans *storage.TransGlobalStore) { err := boltGet().View(func(t *bolt.Tx) error { trans = tGetGlobal(t, gid) return nil @@ -261,7 +269,8 @@ func (s *BoltdbStore) FindTransGlobalStore(gid string) (trans *storage.TransGlob return } -func (s *BoltdbStore) ScanTransGlobalStores(position *string, limit int64) []storage.TransGlobalStore { +// ScanTransGlobalStores lists GlobalTrans data +func (s *Store) ScanTransGlobalStores(position *string, limit int64) []storage.TransGlobalStore { globals := []storage.TransGlobalStore{} err := boltGet().View(func(t *bolt.Tx) error { cursor := t.Bucket(bucketGlobal).Cursor() @@ -287,8 +296,9 @@ func (s *BoltdbStore) ScanTransGlobalStores(position *string, limit int64) []sto return globals } -func (s *BoltdbStore) FindBranches(gid string) []storage.TransBranchStore { - var branches []storage.TransBranchStore = nil +// FindBranches finds Branch data by gid +func (s *Store) FindBranches(gid string) []storage.TransBranchStore { + var branches []storage.TransBranchStore err := boltGet().View(func(t *bolt.Tx) error { branches = tGetBranches(t, gid) return nil @@ -297,11 +307,13 @@ func (s *BoltdbStore) FindBranches(gid string) []storage.TransBranchStore { return branches } -func (s *BoltdbStore) UpdateBranches(branches []storage.TransBranchStore, updates []string) (int, error) { +// UpdateBranches update branches info +func (s *Store) UpdateBranches(branches []storage.TransBranchStore, updates []string) (int, error) { return 0, nil // not implemented } -func (s *BoltdbStore) LockGlobalSaveBranches(gid string, status string, branches []storage.TransBranchStore, branchStart int) { +// LockGlobalSaveBranches creates branches +func (s *Store) LockGlobalSaveBranches(gid string, status string, branches []storage.TransBranchStore, branchStart int) { err := boltGet().Update(func(t *bolt.Tx) error { g := tGetGlobal(t, gid) if g == nil { @@ -316,7 +328,8 @@ func (s *BoltdbStore) LockGlobalSaveBranches(gid string, status string, branches dtmimp.E2P(err) } -func (s *BoltdbStore) MaySaveNewTrans(global *storage.TransGlobalStore, branches []storage.TransBranchStore) error { +// MaySaveNewTrans creates a new trans +func (s *Store) MaySaveNewTrans(global *storage.TransGlobalStore, branches []storage.TransBranchStore) error { return boltGet().Update(func(t *bolt.Tx) error { g := tGetGlobal(t, global.Gid) if g != nil { @@ -329,7 +342,8 @@ func (s *BoltdbStore) MaySaveNewTrans(global *storage.TransGlobalStore, branches }) } -func (s *BoltdbStore) ChangeGlobalStatus(global *storage.TransGlobalStore, newStatus string, updates []string, finished bool) { +// ChangeGlobalStatus changes global trans status +func (s *Store) ChangeGlobalStatus(global *storage.TransGlobalStore, newStatus string, updates []string, finished bool) { old := global.Status global.Status = newStatus err := boltGet().Update(func(t *bolt.Tx) error { @@ -346,7 +360,8 @@ func (s *BoltdbStore) ChangeGlobalStatus(global *storage.TransGlobalStore, newSt dtmimp.E2P(err) } -func (s *BoltdbStore) TouchCronTime(global *storage.TransGlobalStore, nextCronInterval int64) { +// TouchCronTime updates cronTime +func (s *Store) TouchCronTime(global *storage.TransGlobalStore, nextCronInterval int64) { oldUnix := global.NextCronTime.Unix() global.NextCronTime = dtmutil.GetNextTime(nextCronInterval) global.UpdateTime = dtmutil.GetNextTime(0) @@ -364,8 +379,9 @@ func (s *BoltdbStore) TouchCronTime(global *storage.TransGlobalStore, nextCronIn dtmimp.E2P(err) } -func (s *BoltdbStore) LockOneGlobalTrans(expireIn time.Duration) *storage.TransGlobalStore { - var trans *storage.TransGlobalStore = nil +// LockOneGlobalTrans finds GlobalTrans +func (s *Store) LockOneGlobalTrans(expireIn time.Duration) *storage.TransGlobalStore { + var trans *storage.TransGlobalStore min := fmt.Sprintf("%d", time.Now().Add(expireIn).Unix()) next := time.Now().Add(time.Duration(conf.RetryInterval) * time.Second) err := boltGet().Update(func(t *bolt.Tx) error { diff --git a/dtmsvr/storage/redis/redis.go b/dtmsvr/storage/redis/redis.go index 12ff277..e0903a5 100644 --- a/dtmsvr/storage/redis/redis.go +++ b/dtmsvr/storage/redis/redis.go @@ -2,6 +2,7 @@ package redis import ( "context" + "errors" "fmt" "sync" "time" @@ -21,16 +22,18 @@ var conf = &config.Config // TODO: optimize this, all function should have context as first parameter var ctx = context.Background() -// RedisStore is the storage with redis, all transaction information will bachend with redis -type RedisStore struct { +// Store is the storage with redis, all transaction information will bachend with redis +type Store struct { } -func (s *RedisStore) Ping() error { +// Ping execs ping cmd to redis +func (s *Store) Ping() error { _, err := redisGet().Ping(ctx).Result() return err } -func (s *RedisStore) PopulateData(skipDrop bool) { +// PopulateData populates data to redis +func (s *Store) PopulateData(skipDrop bool) { if !skipDrop { _, err := redisGet().FlushAll(ctx).Result() logger.Infof("call redis flushall. result: %v", err) @@ -38,7 +41,8 @@ func (s *RedisStore) PopulateData(skipDrop bool) { } } -func (s *RedisStore) FindTransGlobalStore(gid string) *storage.TransGlobalStore { +// FindTransGlobalStore finds GlobalTrans data by gid +func (s *Store) FindTransGlobalStore(gid string) *storage.TransGlobalStore { logger.Debugf("calling FindTransGlobalStore: %s", gid) r, err := redisGet().Get(ctx, conf.Store.RedisPrefix+"_g_"+gid).Result() if err == redis.Nil { @@ -50,7 +54,8 @@ func (s *RedisStore) FindTransGlobalStore(gid string) *storage.TransGlobalStore return trans } -func (s *RedisStore) ScanTransGlobalStores(position *string, limit int64) []storage.TransGlobalStore { +// ScanTransGlobalStores lists GlobalTrans data +func (s *Store) ScanTransGlobalStores(position *string, limit int64) []storage.TransGlobalStore { logger.Debugf("calling ScanTransGlobalStores: %s %d", *position, limit) lid := uint64(0) if *position != "" { @@ -76,7 +81,8 @@ func (s *RedisStore) ScanTransGlobalStores(position *string, limit int64) []stor return globals } -func (s *RedisStore) FindBranches(gid string) []storage.TransBranchStore { +// FindBranches finds Branch data by gid +func (s *Store) FindBranches(gid string) []storage.TransBranchStore { logger.Debugf("calling FindBranches: %s", gid) sa, err := redisGet().LRange(ctx, conf.Store.RedisPrefix+"_b_"+gid, 0, -1).Result() dtmimp.E2P(err) @@ -87,7 +93,8 @@ func (s *RedisStore) FindBranches(gid string) []storage.TransBranchStore { return branches } -func (s *RedisStore) UpdateBranches(branches []storage.TransBranchStore, updates []string) (int, error) { +// UpdateBranches updates branches info +func (s *Store) UpdateBranches(branches []storage.TransBranchStore, updates []string) (int, error) { return 0, nil // not implemented } @@ -144,7 +151,8 @@ func callLua(a *argList, lua string) (string, error) { return handleRedisResult(ret, err) } -func (s *RedisStore) MaySaveNewTrans(global *storage.TransGlobalStore, branches []storage.TransBranchStore) error { +// MaySaveNewTrans creates a new trans +func (s *Store) MaySaveNewTrans(global *storage.TransGlobalStore, branches []storage.TransBranchStore) error { a := newArgList(). AppendGid(global.Gid). AppendObject(global). @@ -171,7 +179,8 @@ redis.call('EXPIRE', KEYS[2], ARGV[2]) return err } -func (s *RedisStore) LockGlobalSaveBranches(gid string, status string, branches []storage.TransBranchStore, branchStart int) { +// LockGlobalSaveBranches creates branches +func (s *Store) LockGlobalSaveBranches(gid string, status string, branches []storage.TransBranchStore, branchStart int) { args := newArgList(). AppendGid(gid). AppendRaw(status). @@ -195,7 +204,8 @@ redis.call('EXPIRE', KEYS[2], ARGV[2]) dtmimp.E2P(err) } -func (s *RedisStore) ChangeGlobalStatus(global *storage.TransGlobalStore, newStatus string, updates []string, finished bool) { +// ChangeGlobalStatus changes global trans status +func (s *Store) ChangeGlobalStatus(global *storage.TransGlobalStore, newStatus string, updates []string, finished bool) { old := global.Status global.Status = newStatus args := newArgList(). @@ -219,7 +229,8 @@ end dtmimp.E2P(err) } -func (s *RedisStore) LockOneGlobalTrans(expireIn time.Duration) *storage.TransGlobalStore { +// LockOneGlobalTrans finds GlobalTrans +func (s *Store) LockOneGlobalTrans(expireIn time.Duration) *storage.TransGlobalStore { expired := time.Now().Add(expireIn).Unix() next := time.Now().Add(time.Duration(conf.RetryInterval) * time.Second).Unix() args := newArgList().AppendGid("").AppendRaw(expired).AppendRaw(next) @@ -238,7 +249,7 @@ return gid ` for { r, err := callLua(args, lua) - if err == storage.ErrNotFound { + if errors.Is(err, storage.ErrNotFound) { return nil } dtmimp.E2P(err) @@ -249,7 +260,8 @@ return gid } } -func (s *RedisStore) TouchCronTime(global *storage.TransGlobalStore, nextCronInterval int64) { +// TouchCronTime updates cronTime +func (s *Store) TouchCronTime(global *storage.TransGlobalStore, nextCronInterval int64) { global.NextCronTime = dtmutil.GetNextTime(nextCronInterval) global.UpdateTime = dtmutil.GetNextTime(0) global.NextCronInterval = nextCronInterval diff --git a/dtmsvr/storage/registry/registry.go b/dtmsvr/storage/registry/registry.go index d65f2e1..1788456 100644 --- a/dtmsvr/storage/registry/registry.go +++ b/dtmsvr/storage/registry/registry.go @@ -13,12 +13,13 @@ import ( var conf = &config.Config var stores map[string]storage.Store = map[string]storage.Store{ - "redis": &redis.RedisStore{}, - "mysql": &sql.SqlStore{}, - "postgres": &sql.SqlStore{}, - "boltdb": &boltdb.BoltdbStore{}, + "redis": &redis.Store{}, + "mysql": &sql.Store{}, + "postgres": &sql.Store{}, + "boltdb": &boltdb.Store{}, } +// GetStore returns storage.Store func GetStore() storage.Store { return stores[conf.Store.Driver] } diff --git a/dtmsvr/storage/sql/sql.go b/dtmsvr/storage/sql/sql.go index 02f85c0..cda8d44 100644 --- a/dtmsvr/storage/sql/sql.go +++ b/dtmsvr/storage/sql/sql.go @@ -17,22 +17,26 @@ import ( var conf = &config.Config -type SqlStore struct { +// Store implements storage.Store, and storage with db +type Store struct { } -func (s *SqlStore) Ping() error { +// Ping execs ping cmd to db +func (s *Store) Ping() error { db, err := dtmimp.StandaloneDB(conf.Store.GetDBConf()) dtmimp.E2P(err) _, err = db.Exec("select 1") return err } -func (s *SqlStore) PopulateData(skipDrop bool) { +// PopulateData populates data to db +func (s *Store) PopulateData(skipDrop bool) { file := fmt.Sprintf("%s/dtmsvr.storage.%s.sql", dtmutil.GetSqlDir(), conf.Store.Driver) dtmutil.RunSQLScript(conf.Store.GetDBConf(), file, skipDrop) } -func (s *SqlStore) FindTransGlobalStore(gid string) *storage.TransGlobalStore { +// FindTransGlobalStore finds GlobalTrans data by gid +func (s *Store) FindTransGlobalStore(gid string) *storage.TransGlobalStore { trans := &storage.TransGlobalStore{} dbr := dbGet().Model(trans).Where("gid=?", gid).First(trans) if dbr.Error == gorm.ErrRecordNotFound { @@ -42,7 +46,8 @@ func (s *SqlStore) FindTransGlobalStore(gid string) *storage.TransGlobalStore { return trans } -func (s *SqlStore) ScanTransGlobalStores(position *string, limit int64) []storage.TransGlobalStore { +// ScanTransGlobalStores lists GlobalTrans data +func (s *Store) ScanTransGlobalStores(position *string, limit int64) []storage.TransGlobalStore { globals := []storage.TransGlobalStore{} lid := math.MaxInt64 if *position != "" { @@ -57,13 +62,15 @@ func (s *SqlStore) ScanTransGlobalStores(position *string, limit int64) []storag return globals } -func (s *SqlStore) FindBranches(gid string) []storage.TransBranchStore { +// FindBranches finds Branch data by gid +func (s *Store) FindBranches(gid string) []storage.TransBranchStore { branches := []storage.TransBranchStore{} dbGet().Must().Where("gid=?", gid).Order("id asc").Find(&branches) return branches } -func (s *SqlStore) UpdateBranches(branches []storage.TransBranchStore, updates []string) (int, error) { +// UpdateBranches update branches info +func (s *Store) UpdateBranches(branches []storage.TransBranchStore, updates []string) (int, error) { db := dbGet().Clauses(clause.OnConflict{ OnConstraint: "trans_branch_op_pkey", DoUpdates: clause.AssignmentColumns(updates), @@ -71,7 +78,8 @@ func (s *SqlStore) UpdateBranches(branches []storage.TransBranchStore, updates [ return int(db.RowsAffected), db.Error } -func (s *SqlStore) LockGlobalSaveBranches(gid string, status string, branches []storage.TransBranchStore, branchStart int) { +// LockGlobalSaveBranches creates branches +func (s *Store) LockGlobalSaveBranches(gid string, status string, branches []storage.TransBranchStore, branchStart int) { err := dbGet().Transaction(func(tx *gorm.DB) error { g := &storage.TransGlobalStore{} dbr := tx.Clauses(clause.Locking{Strength: "UPDATE"}).Model(g).Where("gid=? and status=?", gid, status).First(g) @@ -83,7 +91,8 @@ func (s *SqlStore) LockGlobalSaveBranches(gid string, status string, branches [] dtmimp.E2P(err) } -func (s *SqlStore) MaySaveNewTrans(global *storage.TransGlobalStore, branches []storage.TransBranchStore) error { +// MaySaveNewTrans creates a new trans +func (s *Store) MaySaveNewTrans(global *storage.TransGlobalStore, branches []storage.TransBranchStore) error { return dbGet().Transaction(func(db1 *gorm.DB) error { db := &dtmutil.DB{DB: db1} dbr := db.Must().Clauses(clause.OnConflict{ @@ -101,7 +110,8 @@ func (s *SqlStore) MaySaveNewTrans(global *storage.TransGlobalStore, branches [] }) } -func (s *SqlStore) ChangeGlobalStatus(global *storage.TransGlobalStore, newStatus string, updates []string, finished bool) { +// ChangeGlobalStatus changes global trans status +func (s *Store) ChangeGlobalStatus(global *storage.TransGlobalStore, newStatus string, updates []string, finished bool) { old := global.Status global.Status = newStatus dbr := dbGet().Must().Model(global).Where("status=? and gid=?", old, global.Gid).Select(updates).Updates(global) @@ -110,7 +120,8 @@ func (s *SqlStore) ChangeGlobalStatus(global *storage.TransGlobalStore, newStatu } } -func (s *SqlStore) TouchCronTime(global *storage.TransGlobalStore, nextCronInterval int64) { +// TouchCronTime updates cronTime +func (s *Store) TouchCronTime(global *storage.TransGlobalStore, nextCronInterval int64) { global.NextCronTime = dtmutil.GetNextTime(nextCronInterval) global.UpdateTime = dtmutil.GetNextTime(0) global.NextCronInterval = nextCronInterval @@ -118,7 +129,8 @@ func (s *SqlStore) TouchCronTime(global *storage.TransGlobalStore, nextCronInter Select([]string{"next_cron_time", "update_time", "next_cron_interval"}).Updates(global) } -func (s *SqlStore) LockOneGlobalTrans(expireIn time.Duration) *storage.TransGlobalStore { +// LockOneGlobalTrans finds GlobalTrans +func (s *Store) LockOneGlobalTrans(expireIn time.Duration) *storage.TransGlobalStore { db := dbGet() getTime := func(second int) string { return map[string]string{ @@ -141,10 +153,11 @@ func (s *SqlStore) LockOneGlobalTrans(expireIn time.Duration) *storage.TransGlob if dbr.RowsAffected == 0 { return nil } - dbr = db.Must().Where("owner=?", owner).First(global) + db.Must().Where("owner=?", owner).First(global) return global } +// SetDBConn sets db conn pool func SetDBConn(db *gorm.DB) { sqldb, _ := db.DB() sqldb.SetMaxOpenConns(int(conf.Store.MaxOpenConns)) diff --git a/dtmsvr/storage/store.go b/dtmsvr/storage/store.go index 79a9b0e..eca4d38 100644 --- a/dtmsvr/storage/store.go +++ b/dtmsvr/storage/store.go @@ -3,7 +3,7 @@ * Use of this source code is governed by a BSD-style * license that can be found in the LICENSE file. */ - + package storage import ( @@ -17,6 +17,7 @@ var ErrNotFound = errors.New("storage: NotFound") // ErrUniqueConflict defines the item is conflict with unique key in storage implement. var ErrUniqueConflict = errors.New("storage: UniqueKeyConflict") +// Store defines storage relevant interface type Store interface { Ping() error PopulateData(skipDrop bool) diff --git a/dtmsvr/storage/trans.go b/dtmsvr/storage/trans.go index 80663c9..077bd0d 100644 --- a/dtmsvr/storage/trans.go +++ b/dtmsvr/storage/trans.go @@ -9,10 +9,12 @@ import ( "github.com/dtm-labs/dtm/dtmutil" ) +// TransGlobalExt defines Header info type TransGlobalExt struct { Headers map[string]string `json:"headers,omitempty" gorm:"-"` } +// TransGlobalStore defines GlobalStore storage info type TransGlobalStore struct { dtmutil.ModelBase Gid string `json:"gid,omitempty"`