Browse Source

refactor(*): split storage with sql/redis package

pull/121/head
lsytj0413 4 years ago
parent
commit
73fff1876e
  1. 48
      dtmsvr/storage/redis/redis.go
  2. 8
      dtmsvr/storage/registry/registry.go
  3. 56
      dtmsvr/storage/sql/sql.go
  4. 10
      dtmsvr/storage/store.go
  5. 16
      dtmsvr/storage/utils.go

48
dtmsvr/storage/redis.go → dtmsvr/storage/redis/redis.go

@ -1,4 +1,4 @@
package storage
package redis
import (
"context"
@ -6,11 +6,15 @@ import (
"time"
"github.com/go-redis/redis/v8"
"gorm.io/gorm"
"github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmcli/dtmimp"
"gorm.io/gorm"
"github.com/yedf/dtm/dtmsvr/storage"
)
var config = &common.Config
var ctx context.Context = context.Background()
type RedisStore struct {
@ -26,30 +30,30 @@ func (s *RedisStore) PopulateData(skipDrop bool) {
dtmimp.PanicIf(err != nil, err)
}
func (s *RedisStore) FindTransGlobalStore(gid string) *TransGlobalStore {
func (s *RedisStore) FindTransGlobalStore(gid string) *storage.TransGlobalStore {
r, err := redisGet().Get(ctx, config.Store.RedisPrefix+"_g_"+gid).Result()
if err == redis.Nil {
return nil
}
dtmimp.E2P(err)
trans := &TransGlobalStore{}
trans := &storage.TransGlobalStore{}
dtmimp.MustUnmarshalString(r, trans)
return trans
}
func (s *RedisStore) ScanTransGlobalStores(position *string, limit int64) []TransGlobalStore {
func (s *RedisStore) ScanTransGlobalStores(position *string, limit int64) []storage.TransGlobalStore {
lid := uint64(0)
if *position != "" {
lid = uint64(dtmimp.MustAtoi(*position))
}
keys, cursor, err := redisGet().Scan(ctx, lid, config.Store.RedisPrefix+"_g_*", limit).Result()
dtmimp.E2P(err)
globals := []TransGlobalStore{}
globals := []storage.TransGlobalStore{}
if len(keys) > 0 {
values, err := redisGet().MGet(ctx, keys...).Result()
dtmimp.E2P(err)
for _, v := range values {
global := TransGlobalStore{}
global := storage.TransGlobalStore{}
dtmimp.MustUnmarshalString(v.(string), &global)
globals = append(globals, global)
}
@ -62,17 +66,17 @@ func (s *RedisStore) ScanTransGlobalStores(position *string, limit int64) []Tran
return globals
}
func (s *RedisStore) FindBranches(gid string) []TransBranchStore {
func (s *RedisStore) FindBranches(gid string) []storage.TransBranchStore {
sa, err := redisGet().LRange(ctx, config.Store.RedisPrefix+"_b_"+gid, 0, -1).Result()
dtmimp.E2P(err)
branches := make([]TransBranchStore, len(sa))
branches := make([]storage.TransBranchStore, len(sa))
for k, v := range sa {
dtmimp.MustUnmarshalString(v, &branches[k])
}
return branches
}
func (s *RedisStore) UpdateBranchesSql(branches []TransBranchStore, updates []string) *gorm.DB {
func (s *RedisStore) UpdateBranchesSql(branches []storage.TransBranchStore, updates []string) *gorm.DB {
return nil // not implemented
}
@ -102,7 +106,7 @@ func (a *argList) AppendObject(v interface{}) *argList {
return a.AppendRaw(dtmimp.MustMarshalString(v))
}
func (a *argList) AppendBranches(branches []TransBranchStore) *argList {
func (a *argList) AppendBranches(branches []storage.TransBranchStore) *argList {
for _, b := range branches {
a.AppendRaw(dtmimp.MustMarshalString(b))
}
@ -116,8 +120,8 @@ func handleRedisResult(ret interface{}, err error) (string, error) {
}
s, _ := ret.(string)
err = map[string]error{
"NOT_FOUND": ErrNotFound,
"UNIQUE_CONFLICT": ErrUniqueConflict,
"NOT_FOUND": storage.ErrNotFound,
"UNIQUE_CONFLICT": storage.ErrUniqueConflict,
}[s]
return s, err
}
@ -128,7 +132,7 @@ func callLua(a *argList, lua string) (string, error) {
return handleRedisResult(ret, err)
}
func (s *RedisStore) MaySaveNewTrans(global *TransGlobalStore, branches []TransBranchStore) error {
func (s *RedisStore) MaySaveNewTrans(global *storage.TransGlobalStore, branches []storage.TransBranchStore) error {
a := newArgList().
AppendGid(global.Gid).
AppendObject(global).
@ -153,10 +157,10 @@ redis.call('EXPIRE', KEYS[2], ARGV[2])
return err
}
func (s *RedisStore) LockGlobalSaveBranches(gid string, status string, branches []TransBranchStore, branchStart int) {
func (s *RedisStore) LockGlobalSaveBranches(gid string, status string, branches []storage.TransBranchStore, branchStart int) {
args := newArgList().
AppendGid(gid).
AppendObject(&TransGlobalStore{Gid: gid, Status: status}).
AppendObject(&storage.TransGlobalStore{Gid: gid, Status: status}).
AppendRaw(branchStart).
AppendBranches(branches)
_, err := callLua(args, `
@ -182,7 +186,7 @@ redis.call('EXPIRE', KEYS[2], ARGV[2])
dtmimp.E2P(err)
}
func (s *RedisStore) ChangeGlobalStatus(global *TransGlobalStore, newStatus string, updates []string, finished bool) {
func (s *RedisStore) ChangeGlobalStatus(global *storage.TransGlobalStore, newStatus string, updates []string, finished bool) {
old := global.Status
global.Status = newStatus
args := newArgList().AppendGid(global.Gid).AppendObject(global).AppendRaw(old).AppendRaw(finished)
@ -205,7 +209,7 @@ end
dtmimp.E2P(err)
}
func (s *RedisStore) LockOneGlobalTrans(expireIn time.Duration) *TransGlobalStore {
func (s *RedisStore) LockOneGlobalTrans(expireIn time.Duration) *storage.TransGlobalStore {
expired := time.Now().Add(expireIn).Unix()
next := time.Now().Add(time.Duration(config.RetryInterval) * time.Second).Unix()
args := newArgList().AppendGid("").AppendRaw(expired).AppendRaw(next)
@ -224,7 +228,7 @@ return gid
`
for {
r, err := callLua(args, lua)
if err == ErrNotFound {
if err == storage.ErrNotFound {
return nil
}
dtmimp.E2P(err)
@ -235,7 +239,7 @@ return gid
}
}
func (s *RedisStore) TouchCronTime(global *TransGlobalStore, nextCronInterval int64) {
func (s *RedisStore) TouchCronTime(global *storage.TransGlobalStore, nextCronInterval int64) {
global.NextCronTime = common.GetNextTime(nextCronInterval)
global.UpdateTime = common.GetNextTime(0)
global.NextCronInterval = nextCronInterval
@ -255,3 +259,7 @@ redis.call('SET', KEYS[1], ARGV[3], 'EX', ARGV[2])
`)
dtmimp.E2P(err)
}
func redisGet() *redis.Client {
return common.RedisGet()
}

8
dtmsvr/storage/registry/registry.go

@ -6,14 +6,16 @@ import (
"github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmsvr/storage"
"github.com/yedf/dtm/dtmsvr/storage/boltdb"
"github.com/yedf/dtm/dtmsvr/storage/redis"
"github.com/yedf/dtm/dtmsvr/storage/sql"
)
var config = &common.Config
var stores map[string]storage.Store = map[string]storage.Store{
"redis": &storage.RedisStore{},
"mysql": &storage.SqlStore{},
"postgres": &storage.SqlStore{},
"redis": &redis.RedisStore{},
"mysql": &sql.SqlStore{},
"postgres": &sql.SqlStore{},
"boltdb": &boltdb.BoltdbStore{},
}

56
dtmsvr/storage/sql.go → dtmsvr/storage/sql/sql.go

@ -1,4 +1,4 @@
package storage
package sql
import (
"fmt"
@ -6,12 +6,16 @@ import (
"time"
"github.com/google/uuid"
"github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmcli/dtmimp"
"gorm.io/gorm"
"gorm.io/gorm/clause"
"github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmsvr/storage"
)
var config = &common.Config
type SqlStore struct {
}
@ -27,8 +31,8 @@ func (s *SqlStore) PopulateData(skipDrop bool) {
common.RunSQLScript(config.Store.GetDBConf(), file, skipDrop)
}
func (s *SqlStore) FindTransGlobalStore(gid string) *TransGlobalStore {
trans := &TransGlobalStore{}
func (s *SqlStore) FindTransGlobalStore(gid string) *storage.TransGlobalStore {
trans := &storage.TransGlobalStore{}
dbr := dbGet().Model(trans).Where("gid=?", gid).First(trans)
if dbr.Error == gorm.ErrRecordNotFound {
return nil
@ -37,8 +41,8 @@ func (s *SqlStore) FindTransGlobalStore(gid string) *TransGlobalStore {
return trans
}
func (s *SqlStore) ScanTransGlobalStores(position *string, limit int64) []TransGlobalStore {
globals := []TransGlobalStore{}
func (s *SqlStore) ScanTransGlobalStores(position *string, limit int64) []storage.TransGlobalStore {
globals := []storage.TransGlobalStore{}
lid := math.MaxInt64
if *position != "" {
lid = dtmimp.MustAtoi(*position)
@ -52,22 +56,22 @@ func (s *SqlStore) ScanTransGlobalStores(position *string, limit int64) []TransG
return globals
}
func (s *SqlStore) FindBranches(gid string) []TransBranchStore {
branches := []TransBranchStore{}
func (s *SqlStore) FindBranches(gid string) []storage.TransBranchStore {
branches := []storage.TransBranchStore{}
dbGet().Must().Where("gid=?", gid).Order("id asc").Find(&branches)
return branches
}
func (s *SqlStore) UpdateBranchesSql(branches []TransBranchStore, updates []string) *gorm.DB {
func (s *SqlStore) UpdateBranchesSql(branches []storage.TransBranchStore, updates []string) *gorm.DB {
return dbGet().Clauses(clause.OnConflict{
OnConstraint: "trans_branch_op_pkey",
DoUpdates: clause.AssignmentColumns(updates),
}).Create(branches)
}
func (s *SqlStore) LockGlobalSaveBranches(gid string, status string, branches []TransBranchStore, branchStart int) {
func (s *SqlStore) LockGlobalSaveBranches(gid string, status string, branches []storage.TransBranchStore, branchStart int) {
err := dbGet().Transaction(func(tx *gorm.DB) error {
g := &TransGlobalStore{}
g := &storage.TransGlobalStore{}
dbr := tx.Clauses(clause.Locking{Strength: "UPDATE"}).Model(g).Where("gid=? and status=?", gid, status).First(g)
if dbr.Error == nil {
dbr = tx.Save(branches)
@ -77,14 +81,14 @@ func (s *SqlStore) LockGlobalSaveBranches(gid string, status string, branches []
dtmimp.E2P(err)
}
func (s *SqlStore) MaySaveNewTrans(global *TransGlobalStore, branches []TransBranchStore) error {
func (s *SqlStore) MaySaveNewTrans(global *storage.TransGlobalStore, branches []storage.TransBranchStore) error {
return dbGet().Transaction(func(db1 *gorm.DB) error {
db := &common.DB{DB: db1}
dbr := db.Must().Clauses(clause.OnConflict{
DoNothing: true,
}).Create(global)
if dbr.RowsAffected <= 0 { // 如果这个不是新事务,返回错误
return ErrUniqueConflict
return storage.ErrUniqueConflict
}
if len(branches) > 0 {
db.Must().Clauses(clause.OnConflict{
@ -95,16 +99,16 @@ func (s *SqlStore) MaySaveNewTrans(global *TransGlobalStore, branches []TransBra
})
}
func (s *SqlStore) ChangeGlobalStatus(global *TransGlobalStore, newStatus string, updates []string, finished bool) {
func (s *SqlStore) ChangeGlobalStatus(global *storage.TransGlobalStore, newStatus string, updates []string, finished bool) {
old := global.Status
global.Status = newStatus
dbr := dbGet().Must().Model(global).Where("status=? and gid=?", old, global.Gid).Select(updates).Updates(global)
if dbr.RowsAffected == 0 {
dtmimp.E2P(ErrNotFound)
dtmimp.E2P(storage.ErrNotFound)
}
}
func (s *SqlStore) TouchCronTime(global *TransGlobalStore, nextCronInterval int64) {
func (s *SqlStore) TouchCronTime(global *storage.TransGlobalStore, nextCronInterval int64) {
global.NextCronTime = common.GetNextTime(nextCronInterval)
global.UpdateTime = common.GetNextTime(0)
global.NextCronInterval = nextCronInterval
@ -112,7 +116,7 @@ func (s *SqlStore) TouchCronTime(global *TransGlobalStore, nextCronInterval int6
Select([]string{"next_cron_time", "update_time", "next_cron_interval"}).Updates(global)
}
func (s *SqlStore) LockOneGlobalTrans(expireIn time.Duration) *TransGlobalStore {
func (s *SqlStore) LockOneGlobalTrans(expireIn time.Duration) *storage.TransGlobalStore {
db := dbGet()
getTime := func(second int) string {
return map[string]string{
@ -123,12 +127,12 @@ func (s *SqlStore) LockOneGlobalTrans(expireIn time.Duration) *TransGlobalStore
expire := int(expireIn / time.Second)
whereTime := fmt.Sprintf("next_cron_time < %s", getTime(expire))
owner := uuid.NewString()
global := &TransGlobalStore{}
global := &storage.TransGlobalStore{}
dbr := db.Must().Model(global).
Where(whereTime + "and status in ('prepared', 'aborting', 'submitted')").
Limit(1).
Select([]string{"owner", "next_cron_time"}).
Updates(&TransGlobalStore{
Updates(&storage.TransGlobalStore{
Owner: owner,
NextCronTime: common.GetNextTime(common.Config.RetryInterval),
})
@ -138,3 +142,15 @@ func (s *SqlStore) LockOneGlobalTrans(expireIn time.Duration) *TransGlobalStore
dbr = db.Must().Where("owner=?", owner).First(global)
return global
}
func dbGet() *common.DB {
return common.DbGet(config.Store.GetDBConf())
}
func wrapError(err error) error {
if err == gorm.ErrRecordNotFound {
return storage.ErrNotFound
}
dtmimp.E2P(err)
return err
}

10
dtmsvr/storage/store.go

@ -4,8 +4,6 @@ import (
"errors"
"time"
"github.com/go-redis/redis/v8"
"github.com/yedf/dtm/dtmcli/dtmimp"
"gorm.io/gorm"
)
@ -25,11 +23,3 @@ type Store interface {
TouchCronTime(global *TransGlobalStore, nextCronInterval int64)
LockOneGlobalTrans(expireIn time.Duration) *TransGlobalStore
}
func wrapError(err error) error {
if err == gorm.ErrRecordNotFound || err == redis.Nil {
return ErrNotFound
}
dtmimp.E2P(err)
return err
}

16
dtmsvr/storage/utils.go

@ -1,16 +0,0 @@
package storage
import (
"github.com/go-redis/redis/v8"
"github.com/yedf/dtm/common"
)
var config = &common.Config
func dbGet() *common.DB {
return common.DbGet(config.Store.GetDBConf())
}
func redisGet() *redis.Client {
return common.RedisGet()
}
Loading…
Cancel
Save