Browse Source

Merge pull request #121 from lsytj0413/refactor-storage

refactor(*): split storage with sql/redis package
pull/125/head
yedf2 4 years ago
committed by GitHub
parent
commit
51eebc472c
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 48
      dtmsvr/storage/redis/redis.go
  2. 8
      dtmsvr/storage/registry/registry.go
  3. 56
      dtmsvr/storage/sql/sql.go
  4. 10
      dtmsvr/storage/store.go
  5. 16
      dtmsvr/storage/utils.go

48
dtmsvr/storage/redis.go → dtmsvr/storage/redis/redis.go

@ -1,4 +1,4 @@
package storage package redis
import ( import (
"context" "context"
@ -6,11 +6,15 @@ import (
"time" "time"
"github.com/go-redis/redis/v8" "github.com/go-redis/redis/v8"
"gorm.io/gorm"
"github.com/yedf/dtm/common" "github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmcli/dtmimp" "github.com/yedf/dtm/dtmcli/dtmimp"
"gorm.io/gorm" "github.com/yedf/dtm/dtmsvr/storage"
) )
var config = &common.Config
var ctx context.Context = context.Background() var ctx context.Context = context.Background()
type RedisStore struct { type RedisStore struct {
@ -26,30 +30,30 @@ func (s *RedisStore) PopulateData(skipDrop bool) {
dtmimp.PanicIf(err != nil, err) dtmimp.PanicIf(err != nil, err)
} }
func (s *RedisStore) FindTransGlobalStore(gid string) *TransGlobalStore { func (s *RedisStore) FindTransGlobalStore(gid string) *storage.TransGlobalStore {
r, err := redisGet().Get(ctx, config.Store.RedisPrefix+"_g_"+gid).Result() r, err := redisGet().Get(ctx, config.Store.RedisPrefix+"_g_"+gid).Result()
if err == redis.Nil { if err == redis.Nil {
return nil return nil
} }
dtmimp.E2P(err) dtmimp.E2P(err)
trans := &TransGlobalStore{} trans := &storage.TransGlobalStore{}
dtmimp.MustUnmarshalString(r, trans) dtmimp.MustUnmarshalString(r, trans)
return trans return trans
} }
func (s *RedisStore) ScanTransGlobalStores(position *string, limit int64) []TransGlobalStore { func (s *RedisStore) ScanTransGlobalStores(position *string, limit int64) []storage.TransGlobalStore {
lid := uint64(0) lid := uint64(0)
if *position != "" { if *position != "" {
lid = uint64(dtmimp.MustAtoi(*position)) lid = uint64(dtmimp.MustAtoi(*position))
} }
keys, cursor, err := redisGet().Scan(ctx, lid, config.Store.RedisPrefix+"_g_*", limit).Result() keys, cursor, err := redisGet().Scan(ctx, lid, config.Store.RedisPrefix+"_g_*", limit).Result()
dtmimp.E2P(err) dtmimp.E2P(err)
globals := []TransGlobalStore{} globals := []storage.TransGlobalStore{}
if len(keys) > 0 { if len(keys) > 0 {
values, err := redisGet().MGet(ctx, keys...).Result() values, err := redisGet().MGet(ctx, keys...).Result()
dtmimp.E2P(err) dtmimp.E2P(err)
for _, v := range values { for _, v := range values {
global := TransGlobalStore{} global := storage.TransGlobalStore{}
dtmimp.MustUnmarshalString(v.(string), &global) dtmimp.MustUnmarshalString(v.(string), &global)
globals = append(globals, global) globals = append(globals, global)
} }
@ -62,17 +66,17 @@ func (s *RedisStore) ScanTransGlobalStores(position *string, limit int64) []Tran
return globals return globals
} }
func (s *RedisStore) FindBranches(gid string) []TransBranchStore { func (s *RedisStore) FindBranches(gid string) []storage.TransBranchStore {
sa, err := redisGet().LRange(ctx, config.Store.RedisPrefix+"_b_"+gid, 0, -1).Result() sa, err := redisGet().LRange(ctx, config.Store.RedisPrefix+"_b_"+gid, 0, -1).Result()
dtmimp.E2P(err) dtmimp.E2P(err)
branches := make([]TransBranchStore, len(sa)) branches := make([]storage.TransBranchStore, len(sa))
for k, v := range sa { for k, v := range sa {
dtmimp.MustUnmarshalString(v, &branches[k]) dtmimp.MustUnmarshalString(v, &branches[k])
} }
return branches return branches
} }
func (s *RedisStore) UpdateBranchesSql(branches []TransBranchStore, updates []string) *gorm.DB { func (s *RedisStore) UpdateBranchesSql(branches []storage.TransBranchStore, updates []string) *gorm.DB {
return nil // not implemented return nil // not implemented
} }
@ -102,7 +106,7 @@ func (a *argList) AppendObject(v interface{}) *argList {
return a.AppendRaw(dtmimp.MustMarshalString(v)) return a.AppendRaw(dtmimp.MustMarshalString(v))
} }
func (a *argList) AppendBranches(branches []TransBranchStore) *argList { func (a *argList) AppendBranches(branches []storage.TransBranchStore) *argList {
for _, b := range branches { for _, b := range branches {
a.AppendRaw(dtmimp.MustMarshalString(b)) a.AppendRaw(dtmimp.MustMarshalString(b))
} }
@ -116,8 +120,8 @@ func handleRedisResult(ret interface{}, err error) (string, error) {
} }
s, _ := ret.(string) s, _ := ret.(string)
err = map[string]error{ err = map[string]error{
"NOT_FOUND": ErrNotFound, "NOT_FOUND": storage.ErrNotFound,
"UNIQUE_CONFLICT": ErrUniqueConflict, "UNIQUE_CONFLICT": storage.ErrUniqueConflict,
}[s] }[s]
return s, err return s, err
} }
@ -128,7 +132,7 @@ func callLua(a *argList, lua string) (string, error) {
return handleRedisResult(ret, err) return handleRedisResult(ret, err)
} }
func (s *RedisStore) MaySaveNewTrans(global *TransGlobalStore, branches []TransBranchStore) error { func (s *RedisStore) MaySaveNewTrans(global *storage.TransGlobalStore, branches []storage.TransBranchStore) error {
a := newArgList(). a := newArgList().
AppendGid(global.Gid). AppendGid(global.Gid).
AppendObject(global). AppendObject(global).
@ -153,10 +157,10 @@ redis.call('EXPIRE', KEYS[2], ARGV[2])
return err return err
} }
func (s *RedisStore) LockGlobalSaveBranches(gid string, status string, branches []TransBranchStore, branchStart int) { func (s *RedisStore) LockGlobalSaveBranches(gid string, status string, branches []storage.TransBranchStore, branchStart int) {
args := newArgList(). args := newArgList().
AppendGid(gid). AppendGid(gid).
AppendObject(&TransGlobalStore{Gid: gid, Status: status}). AppendObject(&storage.TransGlobalStore{Gid: gid, Status: status}).
AppendRaw(branchStart). AppendRaw(branchStart).
AppendBranches(branches) AppendBranches(branches)
_, err := callLua(args, ` _, err := callLua(args, `
@ -182,7 +186,7 @@ redis.call('EXPIRE', KEYS[2], ARGV[2])
dtmimp.E2P(err) dtmimp.E2P(err)
} }
func (s *RedisStore) ChangeGlobalStatus(global *TransGlobalStore, newStatus string, updates []string, finished bool) { func (s *RedisStore) ChangeGlobalStatus(global *storage.TransGlobalStore, newStatus string, updates []string, finished bool) {
old := global.Status old := global.Status
global.Status = newStatus global.Status = newStatus
args := newArgList().AppendGid(global.Gid).AppendObject(global).AppendRaw(old).AppendRaw(finished) args := newArgList().AppendGid(global.Gid).AppendObject(global).AppendRaw(old).AppendRaw(finished)
@ -205,7 +209,7 @@ end
dtmimp.E2P(err) dtmimp.E2P(err)
} }
func (s *RedisStore) LockOneGlobalTrans(expireIn time.Duration) *TransGlobalStore { func (s *RedisStore) LockOneGlobalTrans(expireIn time.Duration) *storage.TransGlobalStore {
expired := time.Now().Add(expireIn).Unix() expired := time.Now().Add(expireIn).Unix()
next := time.Now().Add(time.Duration(config.RetryInterval) * time.Second).Unix() next := time.Now().Add(time.Duration(config.RetryInterval) * time.Second).Unix()
args := newArgList().AppendGid("").AppendRaw(expired).AppendRaw(next) args := newArgList().AppendGid("").AppendRaw(expired).AppendRaw(next)
@ -224,7 +228,7 @@ return gid
` `
for { for {
r, err := callLua(args, lua) r, err := callLua(args, lua)
if err == ErrNotFound { if err == storage.ErrNotFound {
return nil return nil
} }
dtmimp.E2P(err) dtmimp.E2P(err)
@ -235,7 +239,7 @@ return gid
} }
} }
func (s *RedisStore) TouchCronTime(global *TransGlobalStore, nextCronInterval int64) { func (s *RedisStore) TouchCronTime(global *storage.TransGlobalStore, nextCronInterval int64) {
global.NextCronTime = common.GetNextTime(nextCronInterval) global.NextCronTime = common.GetNextTime(nextCronInterval)
global.UpdateTime = common.GetNextTime(0) global.UpdateTime = common.GetNextTime(0)
global.NextCronInterval = nextCronInterval global.NextCronInterval = nextCronInterval
@ -255,3 +259,7 @@ redis.call('SET', KEYS[1], ARGV[3], 'EX', ARGV[2])
`) `)
dtmimp.E2P(err) dtmimp.E2P(err)
} }
func redisGet() *redis.Client {
return common.RedisGet()
}

8
dtmsvr/storage/registry/registry.go

@ -6,14 +6,16 @@ import (
"github.com/yedf/dtm/common" "github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmsvr/storage" "github.com/yedf/dtm/dtmsvr/storage"
"github.com/yedf/dtm/dtmsvr/storage/boltdb" "github.com/yedf/dtm/dtmsvr/storage/boltdb"
"github.com/yedf/dtm/dtmsvr/storage/redis"
"github.com/yedf/dtm/dtmsvr/storage/sql"
) )
var config = &common.Config var config = &common.Config
var stores map[string]storage.Store = map[string]storage.Store{ var stores map[string]storage.Store = map[string]storage.Store{
"redis": &storage.RedisStore{}, "redis": &redis.RedisStore{},
"mysql": &storage.SqlStore{}, "mysql": &sql.SqlStore{},
"postgres": &storage.SqlStore{}, "postgres": &sql.SqlStore{},
"boltdb": &boltdb.BoltdbStore{}, "boltdb": &boltdb.BoltdbStore{},
} }

56
dtmsvr/storage/sql.go → dtmsvr/storage/sql/sql.go

@ -1,4 +1,4 @@
package storage package sql
import ( import (
"fmt" "fmt"
@ -6,12 +6,16 @@ import (
"time" "time"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmcli/dtmimp"
"gorm.io/gorm" "gorm.io/gorm"
"gorm.io/gorm/clause" "gorm.io/gorm/clause"
"github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmsvr/storage"
) )
var config = &common.Config
type SqlStore struct { type SqlStore struct {
} }
@ -27,8 +31,8 @@ func (s *SqlStore) PopulateData(skipDrop bool) {
common.RunSQLScript(config.Store.GetDBConf(), file, skipDrop) common.RunSQLScript(config.Store.GetDBConf(), file, skipDrop)
} }
func (s *SqlStore) FindTransGlobalStore(gid string) *TransGlobalStore { func (s *SqlStore) FindTransGlobalStore(gid string) *storage.TransGlobalStore {
trans := &TransGlobalStore{} trans := &storage.TransGlobalStore{}
dbr := dbGet().Model(trans).Where("gid=?", gid).First(trans) dbr := dbGet().Model(trans).Where("gid=?", gid).First(trans)
if dbr.Error == gorm.ErrRecordNotFound { if dbr.Error == gorm.ErrRecordNotFound {
return nil return nil
@ -37,8 +41,8 @@ func (s *SqlStore) FindTransGlobalStore(gid string) *TransGlobalStore {
return trans return trans
} }
func (s *SqlStore) ScanTransGlobalStores(position *string, limit int64) []TransGlobalStore { func (s *SqlStore) ScanTransGlobalStores(position *string, limit int64) []storage.TransGlobalStore {
globals := []TransGlobalStore{} globals := []storage.TransGlobalStore{}
lid := math.MaxInt64 lid := math.MaxInt64
if *position != "" { if *position != "" {
lid = dtmimp.MustAtoi(*position) lid = dtmimp.MustAtoi(*position)
@ -52,22 +56,22 @@ func (s *SqlStore) ScanTransGlobalStores(position *string, limit int64) []TransG
return globals return globals
} }
func (s *SqlStore) FindBranches(gid string) []TransBranchStore { func (s *SqlStore) FindBranches(gid string) []storage.TransBranchStore {
branches := []TransBranchStore{} branches := []storage.TransBranchStore{}
dbGet().Must().Where("gid=?", gid).Order("id asc").Find(&branches) dbGet().Must().Where("gid=?", gid).Order("id asc").Find(&branches)
return branches return branches
} }
func (s *SqlStore) UpdateBranchesSql(branches []TransBranchStore, updates []string) *gorm.DB { func (s *SqlStore) UpdateBranchesSql(branches []storage.TransBranchStore, updates []string) *gorm.DB {
return dbGet().Clauses(clause.OnConflict{ return dbGet().Clauses(clause.OnConflict{
OnConstraint: "trans_branch_op_pkey", OnConstraint: "trans_branch_op_pkey",
DoUpdates: clause.AssignmentColumns(updates), DoUpdates: clause.AssignmentColumns(updates),
}).Create(branches) }).Create(branches)
} }
func (s *SqlStore) LockGlobalSaveBranches(gid string, status string, branches []TransBranchStore, branchStart int) { func (s *SqlStore) LockGlobalSaveBranches(gid string, status string, branches []storage.TransBranchStore, branchStart int) {
err := dbGet().Transaction(func(tx *gorm.DB) error { err := dbGet().Transaction(func(tx *gorm.DB) error {
g := &TransGlobalStore{} g := &storage.TransGlobalStore{}
dbr := tx.Clauses(clause.Locking{Strength: "UPDATE"}).Model(g).Where("gid=? and status=?", gid, status).First(g) dbr := tx.Clauses(clause.Locking{Strength: "UPDATE"}).Model(g).Where("gid=? and status=?", gid, status).First(g)
if dbr.Error == nil { if dbr.Error == nil {
dbr = tx.Save(branches) dbr = tx.Save(branches)
@ -77,14 +81,14 @@ func (s *SqlStore) LockGlobalSaveBranches(gid string, status string, branches []
dtmimp.E2P(err) dtmimp.E2P(err)
} }
func (s *SqlStore) MaySaveNewTrans(global *TransGlobalStore, branches []TransBranchStore) error { func (s *SqlStore) MaySaveNewTrans(global *storage.TransGlobalStore, branches []storage.TransBranchStore) error {
return dbGet().Transaction(func(db1 *gorm.DB) error { return dbGet().Transaction(func(db1 *gorm.DB) error {
db := &common.DB{DB: db1} db := &common.DB{DB: db1}
dbr := db.Must().Clauses(clause.OnConflict{ dbr := db.Must().Clauses(clause.OnConflict{
DoNothing: true, DoNothing: true,
}).Create(global) }).Create(global)
if dbr.RowsAffected <= 0 { // 如果这个不是新事务,返回错误 if dbr.RowsAffected <= 0 { // 如果这个不是新事务,返回错误
return ErrUniqueConflict return storage.ErrUniqueConflict
} }
if len(branches) > 0 { if len(branches) > 0 {
db.Must().Clauses(clause.OnConflict{ db.Must().Clauses(clause.OnConflict{
@ -95,16 +99,16 @@ func (s *SqlStore) MaySaveNewTrans(global *TransGlobalStore, branches []TransBra
}) })
} }
func (s *SqlStore) ChangeGlobalStatus(global *TransGlobalStore, newStatus string, updates []string, finished bool) { func (s *SqlStore) ChangeGlobalStatus(global *storage.TransGlobalStore, newStatus string, updates []string, finished bool) {
old := global.Status old := global.Status
global.Status = newStatus global.Status = newStatus
dbr := dbGet().Must().Model(global).Where("status=? and gid=?", old, global.Gid).Select(updates).Updates(global) dbr := dbGet().Must().Model(global).Where("status=? and gid=?", old, global.Gid).Select(updates).Updates(global)
if dbr.RowsAffected == 0 { if dbr.RowsAffected == 0 {
dtmimp.E2P(ErrNotFound) dtmimp.E2P(storage.ErrNotFound)
} }
} }
func (s *SqlStore) TouchCronTime(global *TransGlobalStore, nextCronInterval int64) { func (s *SqlStore) TouchCronTime(global *storage.TransGlobalStore, nextCronInterval int64) {
global.NextCronTime = common.GetNextTime(nextCronInterval) global.NextCronTime = common.GetNextTime(nextCronInterval)
global.UpdateTime = common.GetNextTime(0) global.UpdateTime = common.GetNextTime(0)
global.NextCronInterval = nextCronInterval global.NextCronInterval = nextCronInterval
@ -112,7 +116,7 @@ func (s *SqlStore) TouchCronTime(global *TransGlobalStore, nextCronInterval int6
Select([]string{"next_cron_time", "update_time", "next_cron_interval"}).Updates(global) Select([]string{"next_cron_time", "update_time", "next_cron_interval"}).Updates(global)
} }
func (s *SqlStore) LockOneGlobalTrans(expireIn time.Duration) *TransGlobalStore { func (s *SqlStore) LockOneGlobalTrans(expireIn time.Duration) *storage.TransGlobalStore {
db := dbGet() db := dbGet()
getTime := func(second int) string { getTime := func(second int) string {
return map[string]string{ return map[string]string{
@ -123,12 +127,12 @@ func (s *SqlStore) LockOneGlobalTrans(expireIn time.Duration) *TransGlobalStore
expire := int(expireIn / time.Second) expire := int(expireIn / time.Second)
whereTime := fmt.Sprintf("next_cron_time < %s", getTime(expire)) whereTime := fmt.Sprintf("next_cron_time < %s", getTime(expire))
owner := uuid.NewString() owner := uuid.NewString()
global := &TransGlobalStore{} global := &storage.TransGlobalStore{}
dbr := db.Must().Model(global). dbr := db.Must().Model(global).
Where(whereTime + "and status in ('prepared', 'aborting', 'submitted')"). Where(whereTime + "and status in ('prepared', 'aborting', 'submitted')").
Limit(1). Limit(1).
Select([]string{"owner", "next_cron_time"}). Select([]string{"owner", "next_cron_time"}).
Updates(&TransGlobalStore{ Updates(&storage.TransGlobalStore{
Owner: owner, Owner: owner,
NextCronTime: common.GetNextTime(common.Config.RetryInterval), NextCronTime: common.GetNextTime(common.Config.RetryInterval),
}) })
@ -138,3 +142,15 @@ func (s *SqlStore) LockOneGlobalTrans(expireIn time.Duration) *TransGlobalStore
dbr = db.Must().Where("owner=?", owner).First(global) dbr = db.Must().Where("owner=?", owner).First(global)
return global return global
} }
func dbGet() *common.DB {
return common.DbGet(config.Store.GetDBConf())
}
func wrapError(err error) error {
if err == gorm.ErrRecordNotFound {
return storage.ErrNotFound
}
dtmimp.E2P(err)
return err
}

10
dtmsvr/storage/store.go

@ -4,8 +4,6 @@ import (
"errors" "errors"
"time" "time"
"github.com/go-redis/redis/v8"
"github.com/yedf/dtm/dtmcli/dtmimp"
"gorm.io/gorm" "gorm.io/gorm"
) )
@ -25,11 +23,3 @@ type Store interface {
TouchCronTime(global *TransGlobalStore, nextCronInterval int64) TouchCronTime(global *TransGlobalStore, nextCronInterval int64)
LockOneGlobalTrans(expireIn time.Duration) *TransGlobalStore LockOneGlobalTrans(expireIn time.Duration) *TransGlobalStore
} }
func wrapError(err error) error {
if err == gorm.ErrRecordNotFound || err == redis.Nil {
return ErrNotFound
}
dtmimp.E2P(err)
return err
}

16
dtmsvr/storage/utils.go

@ -1,16 +0,0 @@
package storage
import (
"github.com/go-redis/redis/v8"
"github.com/yedf/dtm/common"
)
var config = &common.Config
func dbGet() *common.DB {
return common.DbGet(config.Store.GetDBConf())
}
func redisGet() *redis.Client {
return common.RedisGet()
}
Loading…
Cancel
Save