Browse Source

Merge pull request #161 from Leizhengzi/main

fix dtmsvr/storage golangci lint
pull/165/head
yedf2 4 years ago
committed by GitHub
parent
commit
77082c47c7
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 64
      dtmsvr/storage/boltdb/boltdb.go
  2. 40
      dtmsvr/storage/redis/redis.go
  3. 9
      dtmsvr/storage/registry/registry.go
  4. 39
      dtmsvr/storage/sql/sql.go
  5. 3
      dtmsvr/storage/store.go
  6. 2
      dtmsvr/storage/trans.go

64
dtmsvr/storage/boltdb/boltdb.go

@ -23,10 +23,11 @@ import (
var conf = &config.Config
type BoltdbStore struct {
// Store implements storage.Store, and storage with boltdb
type Store struct {
}
var boltDb *bolt.DB = nil
var boltDb *bolt.DB
var boltOnce sync.Once
func boltGet() *bolt.DB {
@ -110,7 +111,7 @@ func cleanupGlobalWithGids(t *bolt.Tx, gids map[string]struct{}) {
logger.Debugf("Start to cleanup %d gids", len(gids))
for gid := range gids {
logger.Debugf("Start to delete gid: %s", gid)
bucket.Delete([]byte(gid))
dtmimp.E2P(bucket.Delete([]byte(gid)))
}
}
@ -139,7 +140,7 @@ func cleanupBranchWithGids(t *bolt.Tx, gids map[string]struct{}) {
logger.Debugf("Start to cleanup %d branches", len(branchKeys))
for _, key := range branchKeys {
logger.Debugf("Start to delete branch: %s", key)
bucket.Delete([]byte(key))
dtmimp.E2P(bucket.Delete([]byte(key)))
}
}
@ -165,7 +166,7 @@ func cleanupIndexWithGids(t *bolt.Tx, gids map[string]struct{}) {
logger.Debugf("Start to cleanup %d indexes", len(indexKeys))
for _, key := range indexKeys {
logger.Debugf("Start to delete index: %s", key)
bucket.Delete([]byte(key))
dtmimp.E2P(bucket.Delete([]byte(key)))
}
}
@ -232,19 +233,25 @@ func tPutIndex(t *bolt.Tx, unix int64, gid string) {
dtmimp.E2P(err)
}
func (s *BoltdbStore) Ping() error {
// Ping execs ping cmd to boltdb
func (s *Store) Ping() error {
return nil
}
func (s *BoltdbStore) PopulateData(skipDrop bool) {
// PopulateData populates data to boltdb
func (s *Store) PopulateData(skipDrop bool) {
if !skipDrop {
err := boltGet().Update(func(t *bolt.Tx) error {
t.DeleteBucket(bucketIndex)
t.DeleteBucket(bucketBranches)
t.DeleteBucket(bucketGlobal)
t.CreateBucket(bucketIndex)
t.CreateBucket(bucketBranches)
t.CreateBucket(bucketGlobal)
dtmimp.E2P(t.DeleteBucket(bucketIndex))
dtmimp.E2P(t.DeleteBucket(bucketBranches))
dtmimp.E2P(t.DeleteBucket(bucketGlobal))
_, err := t.CreateBucket(bucketIndex)
dtmimp.E2P(err)
_, err = t.CreateBucket(bucketBranches)
dtmimp.E2P(err)
_, err = t.CreateBucket(bucketGlobal)
dtmimp.E2P(err)
return nil
})
dtmimp.E2P(err)
@ -252,7 +259,8 @@ func (s *BoltdbStore) PopulateData(skipDrop bool) {
}
}
func (s *BoltdbStore) FindTransGlobalStore(gid string) (trans *storage.TransGlobalStore) {
// FindTransGlobalStore finds GlobalTrans data by gid
func (s *Store) FindTransGlobalStore(gid string) (trans *storage.TransGlobalStore) {
err := boltGet().View(func(t *bolt.Tx) error {
trans = tGetGlobal(t, gid)
return nil
@ -261,7 +269,8 @@ func (s *BoltdbStore) FindTransGlobalStore(gid string) (trans *storage.TransGlob
return
}
func (s *BoltdbStore) ScanTransGlobalStores(position *string, limit int64) []storage.TransGlobalStore {
// ScanTransGlobalStores lists GlobalTrans data
func (s *Store) ScanTransGlobalStores(position *string, limit int64) []storage.TransGlobalStore {
globals := []storage.TransGlobalStore{}
err := boltGet().View(func(t *bolt.Tx) error {
cursor := t.Bucket(bucketGlobal).Cursor()
@ -287,8 +296,9 @@ func (s *BoltdbStore) ScanTransGlobalStores(position *string, limit int64) []sto
return globals
}
func (s *BoltdbStore) FindBranches(gid string) []storage.TransBranchStore {
var branches []storage.TransBranchStore = nil
// FindBranches finds Branch data by gid
func (s *Store) FindBranches(gid string) []storage.TransBranchStore {
var branches []storage.TransBranchStore
err := boltGet().View(func(t *bolt.Tx) error {
branches = tGetBranches(t, gid)
return nil
@ -297,11 +307,13 @@ func (s *BoltdbStore) FindBranches(gid string) []storage.TransBranchStore {
return branches
}
func (s *BoltdbStore) UpdateBranches(branches []storage.TransBranchStore, updates []string) (int, error) {
// UpdateBranches update branches info
func (s *Store) UpdateBranches(branches []storage.TransBranchStore, updates []string) (int, error) {
return 0, nil // not implemented
}
func (s *BoltdbStore) LockGlobalSaveBranches(gid string, status string, branches []storage.TransBranchStore, branchStart int) {
// LockGlobalSaveBranches creates branches
func (s *Store) LockGlobalSaveBranches(gid string, status string, branches []storage.TransBranchStore, branchStart int) {
err := boltGet().Update(func(t *bolt.Tx) error {
g := tGetGlobal(t, gid)
if g == nil {
@ -316,7 +328,8 @@ func (s *BoltdbStore) LockGlobalSaveBranches(gid string, status string, branches
dtmimp.E2P(err)
}
func (s *BoltdbStore) MaySaveNewTrans(global *storage.TransGlobalStore, branches []storage.TransBranchStore) error {
// MaySaveNewTrans creates a new trans
func (s *Store) MaySaveNewTrans(global *storage.TransGlobalStore, branches []storage.TransBranchStore) error {
return boltGet().Update(func(t *bolt.Tx) error {
g := tGetGlobal(t, global.Gid)
if g != nil {
@ -329,7 +342,8 @@ func (s *BoltdbStore) MaySaveNewTrans(global *storage.TransGlobalStore, branches
})
}
func (s *BoltdbStore) ChangeGlobalStatus(global *storage.TransGlobalStore, newStatus string, updates []string, finished bool) {
// ChangeGlobalStatus changes global trans status
func (s *Store) ChangeGlobalStatus(global *storage.TransGlobalStore, newStatus string, updates []string, finished bool) {
old := global.Status
global.Status = newStatus
err := boltGet().Update(func(t *bolt.Tx) error {
@ -346,7 +360,8 @@ func (s *BoltdbStore) ChangeGlobalStatus(global *storage.TransGlobalStore, newSt
dtmimp.E2P(err)
}
func (s *BoltdbStore) TouchCronTime(global *storage.TransGlobalStore, nextCronInterval int64) {
// TouchCronTime updates cronTime
func (s *Store) TouchCronTime(global *storage.TransGlobalStore, nextCronInterval int64) {
oldUnix := global.NextCronTime.Unix()
global.NextCronTime = dtmutil.GetNextTime(nextCronInterval)
global.UpdateTime = dtmutil.GetNextTime(0)
@ -364,8 +379,9 @@ func (s *BoltdbStore) TouchCronTime(global *storage.TransGlobalStore, nextCronIn
dtmimp.E2P(err)
}
func (s *BoltdbStore) LockOneGlobalTrans(expireIn time.Duration) *storage.TransGlobalStore {
var trans *storage.TransGlobalStore = nil
// LockOneGlobalTrans finds GlobalTrans
func (s *Store) LockOneGlobalTrans(expireIn time.Duration) *storage.TransGlobalStore {
var trans *storage.TransGlobalStore
min := fmt.Sprintf("%d", time.Now().Add(expireIn).Unix())
next := time.Now().Add(time.Duration(conf.RetryInterval) * time.Second)
err := boltGet().Update(func(t *bolt.Tx) error {

40
dtmsvr/storage/redis/redis.go

@ -2,6 +2,7 @@ package redis
import (
"context"
"errors"
"fmt"
"sync"
"time"
@ -21,16 +22,18 @@ var conf = &config.Config
// TODO: optimize this, all function should have context as first parameter
var ctx = context.Background()
// RedisStore is the storage with redis, all transaction information will bachend with redis
type RedisStore struct {
// Store is the storage with redis, all transaction information will bachend with redis
type Store struct {
}
func (s *RedisStore) Ping() error {
// Ping execs ping cmd to redis
func (s *Store) Ping() error {
_, err := redisGet().Ping(ctx).Result()
return err
}
func (s *RedisStore) PopulateData(skipDrop bool) {
// PopulateData populates data to redis
func (s *Store) PopulateData(skipDrop bool) {
if !skipDrop {
_, err := redisGet().FlushAll(ctx).Result()
logger.Infof("call redis flushall. result: %v", err)
@ -38,7 +41,8 @@ func (s *RedisStore) PopulateData(skipDrop bool) {
}
}
func (s *RedisStore) FindTransGlobalStore(gid string) *storage.TransGlobalStore {
// FindTransGlobalStore finds GlobalTrans data by gid
func (s *Store) FindTransGlobalStore(gid string) *storage.TransGlobalStore {
logger.Debugf("calling FindTransGlobalStore: %s", gid)
r, err := redisGet().Get(ctx, conf.Store.RedisPrefix+"_g_"+gid).Result()
if err == redis.Nil {
@ -50,7 +54,8 @@ func (s *RedisStore) FindTransGlobalStore(gid string) *storage.TransGlobalStore
return trans
}
func (s *RedisStore) ScanTransGlobalStores(position *string, limit int64) []storage.TransGlobalStore {
// ScanTransGlobalStores lists GlobalTrans data
func (s *Store) ScanTransGlobalStores(position *string, limit int64) []storage.TransGlobalStore {
logger.Debugf("calling ScanTransGlobalStores: %s %d", *position, limit)
lid := uint64(0)
if *position != "" {
@ -76,7 +81,8 @@ func (s *RedisStore) ScanTransGlobalStores(position *string, limit int64) []stor
return globals
}
func (s *RedisStore) FindBranches(gid string) []storage.TransBranchStore {
// FindBranches finds Branch data by gid
func (s *Store) FindBranches(gid string) []storage.TransBranchStore {
logger.Debugf("calling FindBranches: %s", gid)
sa, err := redisGet().LRange(ctx, conf.Store.RedisPrefix+"_b_"+gid, 0, -1).Result()
dtmimp.E2P(err)
@ -87,7 +93,8 @@ func (s *RedisStore) FindBranches(gid string) []storage.TransBranchStore {
return branches
}
func (s *RedisStore) UpdateBranches(branches []storage.TransBranchStore, updates []string) (int, error) {
// UpdateBranches updates branches info
func (s *Store) UpdateBranches(branches []storage.TransBranchStore, updates []string) (int, error) {
return 0, nil // not implemented
}
@ -144,7 +151,8 @@ func callLua(a *argList, lua string) (string, error) {
return handleRedisResult(ret, err)
}
func (s *RedisStore) MaySaveNewTrans(global *storage.TransGlobalStore, branches []storage.TransBranchStore) error {
// MaySaveNewTrans creates a new trans
func (s *Store) MaySaveNewTrans(global *storage.TransGlobalStore, branches []storage.TransBranchStore) error {
a := newArgList().
AppendGid(global.Gid).
AppendObject(global).
@ -171,7 +179,8 @@ redis.call('EXPIRE', KEYS[2], ARGV[2])
return err
}
func (s *RedisStore) LockGlobalSaveBranches(gid string, status string, branches []storage.TransBranchStore, branchStart int) {
// LockGlobalSaveBranches creates branches
func (s *Store) LockGlobalSaveBranches(gid string, status string, branches []storage.TransBranchStore, branchStart int) {
args := newArgList().
AppendGid(gid).
AppendRaw(status).
@ -195,7 +204,8 @@ redis.call('EXPIRE', KEYS[2], ARGV[2])
dtmimp.E2P(err)
}
func (s *RedisStore) ChangeGlobalStatus(global *storage.TransGlobalStore, newStatus string, updates []string, finished bool) {
// ChangeGlobalStatus changes global trans status
func (s *Store) ChangeGlobalStatus(global *storage.TransGlobalStore, newStatus string, updates []string, finished bool) {
old := global.Status
global.Status = newStatus
args := newArgList().
@ -219,7 +229,8 @@ end
dtmimp.E2P(err)
}
func (s *RedisStore) LockOneGlobalTrans(expireIn time.Duration) *storage.TransGlobalStore {
// LockOneGlobalTrans finds GlobalTrans
func (s *Store) LockOneGlobalTrans(expireIn time.Duration) *storage.TransGlobalStore {
expired := time.Now().Add(expireIn).Unix()
next := time.Now().Add(time.Duration(conf.RetryInterval) * time.Second).Unix()
args := newArgList().AppendGid("").AppendRaw(expired).AppendRaw(next)
@ -238,7 +249,7 @@ return gid
`
for {
r, err := callLua(args, lua)
if err == storage.ErrNotFound {
if errors.Is(err, storage.ErrNotFound) {
return nil
}
dtmimp.E2P(err)
@ -249,7 +260,8 @@ return gid
}
}
func (s *RedisStore) TouchCronTime(global *storage.TransGlobalStore, nextCronInterval int64) {
// TouchCronTime updates cronTime
func (s *Store) TouchCronTime(global *storage.TransGlobalStore, nextCronInterval int64) {
global.NextCronTime = dtmutil.GetNextTime(nextCronInterval)
global.UpdateTime = dtmutil.GetNextTime(0)
global.NextCronInterval = nextCronInterval

9
dtmsvr/storage/registry/registry.go

@ -13,12 +13,13 @@ import (
var conf = &config.Config
var stores map[string]storage.Store = map[string]storage.Store{
"redis": &redis.RedisStore{},
"mysql": &sql.SqlStore{},
"postgres": &sql.SqlStore{},
"boltdb": &boltdb.BoltdbStore{},
"redis": &redis.Store{},
"mysql": &sql.Store{},
"postgres": &sql.Store{},
"boltdb": &boltdb.Store{},
}
// GetStore returns storage.Store
func GetStore() storage.Store {
return stores[conf.Store.Driver]
}

39
dtmsvr/storage/sql/sql.go

@ -17,22 +17,26 @@ import (
var conf = &config.Config
type SqlStore struct {
// Store implements storage.Store, and storage with db
type Store struct {
}
func (s *SqlStore) Ping() error {
// Ping execs ping cmd to db
func (s *Store) Ping() error {
db, err := dtmimp.StandaloneDB(conf.Store.GetDBConf())
dtmimp.E2P(err)
_, err = db.Exec("select 1")
return err
}
func (s *SqlStore) PopulateData(skipDrop bool) {
// PopulateData populates data to db
func (s *Store) PopulateData(skipDrop bool) {
file := fmt.Sprintf("%s/dtmsvr.storage.%s.sql", dtmutil.GetSqlDir(), conf.Store.Driver)
dtmutil.RunSQLScript(conf.Store.GetDBConf(), file, skipDrop)
}
func (s *SqlStore) FindTransGlobalStore(gid string) *storage.TransGlobalStore {
// FindTransGlobalStore finds GlobalTrans data by gid
func (s *Store) FindTransGlobalStore(gid string) *storage.TransGlobalStore {
trans := &storage.TransGlobalStore{}
dbr := dbGet().Model(trans).Where("gid=?", gid).First(trans)
if dbr.Error == gorm.ErrRecordNotFound {
@ -42,7 +46,8 @@ func (s *SqlStore) FindTransGlobalStore(gid string) *storage.TransGlobalStore {
return trans
}
func (s *SqlStore) ScanTransGlobalStores(position *string, limit int64) []storage.TransGlobalStore {
// ScanTransGlobalStores lists GlobalTrans data
func (s *Store) ScanTransGlobalStores(position *string, limit int64) []storage.TransGlobalStore {
globals := []storage.TransGlobalStore{}
lid := math.MaxInt64
if *position != "" {
@ -57,13 +62,15 @@ func (s *SqlStore) ScanTransGlobalStores(position *string, limit int64) []storag
return globals
}
func (s *SqlStore) FindBranches(gid string) []storage.TransBranchStore {
// FindBranches finds Branch data by gid
func (s *Store) FindBranches(gid string) []storage.TransBranchStore {
branches := []storage.TransBranchStore{}
dbGet().Must().Where("gid=?", gid).Order("id asc").Find(&branches)
return branches
}
func (s *SqlStore) UpdateBranches(branches []storage.TransBranchStore, updates []string) (int, error) {
// UpdateBranches update branches info
func (s *Store) UpdateBranches(branches []storage.TransBranchStore, updates []string) (int, error) {
db := dbGet().Clauses(clause.OnConflict{
OnConstraint: "trans_branch_op_pkey",
DoUpdates: clause.AssignmentColumns(updates),
@ -71,7 +78,8 @@ func (s *SqlStore) UpdateBranches(branches []storage.TransBranchStore, updates [
return int(db.RowsAffected), db.Error
}
func (s *SqlStore) LockGlobalSaveBranches(gid string, status string, branches []storage.TransBranchStore, branchStart int) {
// LockGlobalSaveBranches creates branches
func (s *Store) LockGlobalSaveBranches(gid string, status string, branches []storage.TransBranchStore, branchStart int) {
err := dbGet().Transaction(func(tx *gorm.DB) error {
g := &storage.TransGlobalStore{}
dbr := tx.Clauses(clause.Locking{Strength: "UPDATE"}).Model(g).Where("gid=? and status=?", gid, status).First(g)
@ -83,7 +91,8 @@ func (s *SqlStore) LockGlobalSaveBranches(gid string, status string, branches []
dtmimp.E2P(err)
}
func (s *SqlStore) MaySaveNewTrans(global *storage.TransGlobalStore, branches []storage.TransBranchStore) error {
// MaySaveNewTrans creates a new trans
func (s *Store) MaySaveNewTrans(global *storage.TransGlobalStore, branches []storage.TransBranchStore) error {
return dbGet().Transaction(func(db1 *gorm.DB) error {
db := &dtmutil.DB{DB: db1}
dbr := db.Must().Clauses(clause.OnConflict{
@ -101,7 +110,8 @@ func (s *SqlStore) MaySaveNewTrans(global *storage.TransGlobalStore, branches []
})
}
func (s *SqlStore) ChangeGlobalStatus(global *storage.TransGlobalStore, newStatus string, updates []string, finished bool) {
// ChangeGlobalStatus changes global trans status
func (s *Store) ChangeGlobalStatus(global *storage.TransGlobalStore, newStatus string, updates []string, finished bool) {
old := global.Status
global.Status = newStatus
dbr := dbGet().Must().Model(global).Where("status=? and gid=?", old, global.Gid).Select(updates).Updates(global)
@ -110,7 +120,8 @@ func (s *SqlStore) ChangeGlobalStatus(global *storage.TransGlobalStore, newStatu
}
}
func (s *SqlStore) TouchCronTime(global *storage.TransGlobalStore, nextCronInterval int64) {
// TouchCronTime updates cronTime
func (s *Store) TouchCronTime(global *storage.TransGlobalStore, nextCronInterval int64) {
global.NextCronTime = dtmutil.GetNextTime(nextCronInterval)
global.UpdateTime = dtmutil.GetNextTime(0)
global.NextCronInterval = nextCronInterval
@ -118,7 +129,8 @@ func (s *SqlStore) TouchCronTime(global *storage.TransGlobalStore, nextCronInter
Select([]string{"next_cron_time", "update_time", "next_cron_interval"}).Updates(global)
}
func (s *SqlStore) LockOneGlobalTrans(expireIn time.Duration) *storage.TransGlobalStore {
// LockOneGlobalTrans finds GlobalTrans
func (s *Store) LockOneGlobalTrans(expireIn time.Duration) *storage.TransGlobalStore {
db := dbGet()
getTime := func(second int) string {
return map[string]string{
@ -141,10 +153,11 @@ func (s *SqlStore) LockOneGlobalTrans(expireIn time.Duration) *storage.TransGlob
if dbr.RowsAffected == 0 {
return nil
}
dbr = db.Must().Where("owner=?", owner).First(global)
db.Must().Where("owner=?", owner).First(global)
return global
}
// SetDBConn sets db conn pool
func SetDBConn(db *gorm.DB) {
sqldb, _ := db.DB()
sqldb.SetMaxOpenConns(int(conf.Store.MaxOpenConns))

3
dtmsvr/storage/store.go

@ -3,7 +3,7 @@
* Use of this source code is governed by a BSD-style
* license that can be found in the LICENSE file.
*/
package storage
import (
@ -17,6 +17,7 @@ var ErrNotFound = errors.New("storage: NotFound")
// ErrUniqueConflict defines the item is conflict with unique key in storage implement.
var ErrUniqueConflict = errors.New("storage: UniqueKeyConflict")
// Store defines storage relevant interface
type Store interface {
Ping() error
PopulateData(skipDrop bool)

2
dtmsvr/storage/trans.go

@ -9,10 +9,12 @@ import (
"github.com/dtm-labs/dtm/dtmutil"
)
// TransGlobalExt defines Header info
type TransGlobalExt struct {
Headers map[string]string `json:"headers,omitempty" gorm:"-"`
}
// TransGlobalStore defines GlobalStore storage info
type TransGlobalStore struct {
dtmutil.ModelBase
Gid string `json:"gid,omitempty"`

Loading…
Cancel
Save