From e27eb90fe875a07418c12da2cf650e0e6b80e88a Mon Sep 17 00:00:00 2001 From: lsytj0413 <511121939@qq.com> Date: Thu, 23 Dec 2021 18:53:36 +0800 Subject: [PATCH] refactor(*): split storage to different package --- app/main.go | 8 +-- bench/main.go | 4 +- dtmsvr/storage/{ => boltdb}/boltdb.go | 68 ++++++++++++---------- dtmsvr/storage/{ => boltdb}/boltdb_test.go | 9 +-- dtmsvr/storage/registry/registry.go | 29 +++++++++ dtmsvr/storage/store.go | 18 ------ dtmsvr/utils.go | 4 +- test/store_test.go | 8 ++- 8 files changed, 84 insertions(+), 64 deletions(-) rename dtmsvr/storage/{ => boltdb}/boltdb.go (82%) rename dtmsvr/storage/{ => boltdb}/boltdb_test.go (97%) create mode 100644 dtmsvr/storage/registry/registry.go diff --git a/app/main.go b/app/main.go index 8a63fdf..aa87722 100644 --- a/app/main.go +++ b/app/main.go @@ -11,14 +11,14 @@ import ( "os" "strings" + _ "go.uber.org/automaxprocs" + "github.com/yedf/dtm/common" "github.com/yedf/dtm/dtmcli" "github.com/yedf/dtm/dtmcli/dtmimp" "github.com/yedf/dtm/dtmsvr" - "github.com/yedf/dtm/dtmsvr/storage" + "github.com/yedf/dtm/dtmsvr/storage/registry" "github.com/yedf/dtm/examples" - - _ "go.uber.org/automaxprocs" ) var Version, Commit, Date string @@ -56,7 +56,7 @@ func main() { dtmcli.SetCurrentDBType(common.Config.ExamplesDB.Driver) } if os.Args[1] != "dtmsvr" { // 实际线上运行,只启动dtmsvr,不准备table相关的数据 - storage.WaitStoreUp() + registry.WaitStoreUp() dtmsvr.PopulateDB(true) examples.PopulateDB(true) } diff --git a/bench/main.go b/bench/main.go index 02b2931..f757c8c 100644 --- a/bench/main.go +++ b/bench/main.go @@ -8,7 +8,7 @@ import ( "github.com/yedf/dtm/dtmcli" "github.com/yedf/dtm/dtmcli/dtmimp" "github.com/yedf/dtm/dtmsvr" - "github.com/yedf/dtm/dtmsvr/storage" + "github.com/yedf/dtm/dtmsvr/storage/registry" "github.com/yedf/dtm/examples" ) @@ -28,7 +28,7 @@ func main() { fmt.Println("start bench server") common.MustLoadConfig() dtmcli.SetCurrentDBType(common.Config.ExamplesDB.Driver) - storage.WaitStoreUp() + registry.WaitStoreUp() dtmsvr.PopulateDB(true) examples.PopulateDB(true) dtmsvr.StartSvr() // 启动dtmsvr的api服务 diff --git a/dtmsvr/storage/boltdb.go b/dtmsvr/storage/boltdb/boltdb.go similarity index 82% rename from dtmsvr/storage/boltdb.go rename to dtmsvr/storage/boltdb/boltdb.go index 58f30e5..f5b9670 100644 --- a/dtmsvr/storage/boltdb.go +++ b/dtmsvr/storage/boltdb/boltdb.go @@ -1,4 +1,4 @@ -package storage +package boltdb import ( "fmt" @@ -6,12 +6,16 @@ import ( "sync" "time" - "github.com/yedf/dtm/common" - "github.com/yedf/dtm/dtmcli/dtmimp" bolt "go.etcd.io/bbolt" "gorm.io/gorm" + + "github.com/yedf/dtm/common" + "github.com/yedf/dtm/dtmcli/dtmimp" + "github.com/yedf/dtm/dtmsvr/storage" ) +var config = &common.Config + type BoltdbStore struct { } @@ -71,7 +75,7 @@ func cleanupExpiredData(expiredSeconds time.Duration, db *bolt.DB) error { expiredGids := map[string]struct{}{} cursor := globalBucket.Cursor() for k, v := cursor.First(); k != nil; k, v = cursor.Next() { - trans := TransGlobalStore{} + trans := storage.TransGlobalStore{} dtmimp.MustUnmarshal(v, &trans) transDoneTime := trans.FinishTime @@ -115,7 +119,7 @@ func cleanupBranchWithGids(t *bolt.Tx, gids map[string]struct{}) { for gid := range gids { cursor := bucket.Cursor() for k, v := cursor.Seek([]byte(gid)); k != nil; k, v = cursor.Next() { - b := TransBranchStore{} + b := storage.TransBranchStore{} dtmimp.MustUnmarshal(v, &b) if b.Gid != gid { break @@ -167,8 +171,8 @@ var allBuckets = [][]byte{ bucketIndex, } -func tGetGlobal(t *bolt.Tx, gid string) *TransGlobalStore { - trans := TransGlobalStore{} +func tGetGlobal(t *bolt.Tx, gid string) *storage.TransGlobalStore { + trans := storage.TransGlobalStore{} bs := t.Bucket(bucketGlobal).Get([]byte(gid)) if bs == nil { return nil @@ -177,11 +181,11 @@ func tGetGlobal(t *bolt.Tx, gid string) *TransGlobalStore { return &trans } -func tGetBranches(t *bolt.Tx, gid string) []TransBranchStore { - branches := []TransBranchStore{} +func tGetBranches(t *bolt.Tx, gid string) []storage.TransBranchStore { + branches := []storage.TransBranchStore{} cursor := t.Bucket(bucketBranches).Cursor() for k, v := cursor.Seek([]byte(gid)); k != nil; k, v = cursor.Next() { - b := TransBranchStore{} + b := storage.TransBranchStore{} dtmimp.MustUnmarshal(v, &b) if b.Gid != gid { break @@ -190,13 +194,13 @@ func tGetBranches(t *bolt.Tx, gid string) []TransBranchStore { } return branches } -func tPutGlobal(t *bolt.Tx, global *TransGlobalStore) { +func tPutGlobal(t *bolt.Tx, global *storage.TransGlobalStore) { bs := dtmimp.MustMarshal(global) err := t.Bucket(bucketGlobal).Put([]byte(global.Gid), bs) dtmimp.E2P(err) } -func tPutBranches(t *bolt.Tx, branches []TransBranchStore, start int64) { +func tPutBranches(t *bolt.Tx, branches []storage.TransBranchStore, start int64) { if start == -1 { bs := tGetBranches(t, branches[0].Gid) start = int64(len(bs)) @@ -240,7 +244,7 @@ func (s *BoltdbStore) PopulateData(skipDrop bool) { } } -func (s *BoltdbStore) FindTransGlobalStore(gid string) (trans *TransGlobalStore) { +func (s *BoltdbStore) FindTransGlobalStore(gid string) (trans *storage.TransGlobalStore) { err := boltGet().View(func(t *bolt.Tx) error { trans = tGetGlobal(t, gid) return nil @@ -249,15 +253,15 @@ func (s *BoltdbStore) FindTransGlobalStore(gid string) (trans *TransGlobalStore) return } -func (s *BoltdbStore) ScanTransGlobalStores(position *string, limit int64) []TransGlobalStore { - globals := []TransGlobalStore{} +func (s *BoltdbStore) ScanTransGlobalStores(position *string, limit int64) []storage.TransGlobalStore { + globals := []storage.TransGlobalStore{} err := boltGet().View(func(t *bolt.Tx) error { cursor := t.Bucket(bucketGlobal).Cursor() for k, v := cursor.First(); k != nil; k, v = cursor.Next() { if string(k) == *position { continue } - g := TransGlobalStore{} + g := storage.TransGlobalStore{} dtmimp.MustUnmarshal(v, &g) globals = append(globals, g) if len(globals) == int(limit) { @@ -275,8 +279,8 @@ func (s *BoltdbStore) ScanTransGlobalStores(position *string, limit int64) []Tra return globals } -func (s *BoltdbStore) FindBranches(gid string) []TransBranchStore { - var branches []TransBranchStore = nil +func (s *BoltdbStore) FindBranches(gid string) []storage.TransBranchStore { + var branches []storage.TransBranchStore = nil err := boltGet().View(func(t *bolt.Tx) error { branches = tGetBranches(t, gid) return nil @@ -285,18 +289,18 @@ func (s *BoltdbStore) FindBranches(gid string) []TransBranchStore { return branches } -func (s *BoltdbStore) UpdateBranchesSql(branches []TransBranchStore, updates []string) *gorm.DB { +func (s *BoltdbStore) UpdateBranchesSql(branches []storage.TransBranchStore, updates []string) *gorm.DB { return nil // not implemented } -func (s *BoltdbStore) LockGlobalSaveBranches(gid string, status string, branches []TransBranchStore, branchStart int) { +func (s *BoltdbStore) LockGlobalSaveBranches(gid string, status string, branches []storage.TransBranchStore, branchStart int) { err := boltGet().Update(func(t *bolt.Tx) error { g := tGetGlobal(t, gid) if g == nil { - return ErrNotFound + return storage.ErrNotFound } if g.Status != status { - return ErrNotFound + return storage.ErrNotFound } tPutBranches(t, branches, int64(branchStart)) return nil @@ -304,11 +308,11 @@ func (s *BoltdbStore) LockGlobalSaveBranches(gid string, status string, branches dtmimp.E2P(err) } -func (s *BoltdbStore) MaySaveNewTrans(global *TransGlobalStore, branches []TransBranchStore) error { +func (s *BoltdbStore) MaySaveNewTrans(global *storage.TransGlobalStore, branches []storage.TransBranchStore) error { return boltGet().Update(func(t *bolt.Tx) error { g := tGetGlobal(t, global.Gid) if g != nil { - return ErrUniqueConflict + return storage.ErrUniqueConflict } tPutGlobal(t, global) tPutIndex(t, global.NextCronTime.Unix(), global.Gid) @@ -317,13 +321,13 @@ func (s *BoltdbStore) MaySaveNewTrans(global *TransGlobalStore, branches []Trans }) } -func (s *BoltdbStore) ChangeGlobalStatus(global *TransGlobalStore, newStatus string, updates []string, finished bool) { +func (s *BoltdbStore) ChangeGlobalStatus(global *storage.TransGlobalStore, newStatus string, updates []string, finished bool) { old := global.Status global.Status = newStatus err := boltGet().Update(func(t *bolt.Tx) error { g := tGetGlobal(t, global.Gid) if g == nil || g.Status != old { - return ErrNotFound + return storage.ErrNotFound } if finished { tDelIndex(t, g.NextCronTime.Unix(), g.Gid) @@ -334,7 +338,7 @@ func (s *BoltdbStore) ChangeGlobalStatus(global *TransGlobalStore, newStatus str dtmimp.E2P(err) } -func (s *BoltdbStore) TouchCronTime(global *TransGlobalStore, nextCronInterval int64) { +func (s *BoltdbStore) TouchCronTime(global *storage.TransGlobalStore, nextCronInterval int64) { oldUnix := global.NextCronTime.Unix() global.NextCronTime = common.GetNextTime(nextCronInterval) global.UpdateTime = common.GetNextTime(0) @@ -342,7 +346,7 @@ func (s *BoltdbStore) TouchCronTime(global *TransGlobalStore, nextCronInterval i err := boltGet().Update(func(t *bolt.Tx) error { g := tGetGlobal(t, global.Gid) if g == nil || g.Gid != global.Gid { - return ErrNotFound + return storage.ErrNotFound } tDelIndex(t, oldUnix, global.Gid) tPutGlobal(t, global) @@ -352,8 +356,8 @@ func (s *BoltdbStore) TouchCronTime(global *TransGlobalStore, nextCronInterval i dtmimp.E2P(err) } -func (s *BoltdbStore) LockOneGlobalTrans(expireIn time.Duration) *TransGlobalStore { - var trans *TransGlobalStore = nil +func (s *BoltdbStore) LockOneGlobalTrans(expireIn time.Duration) *storage.TransGlobalStore { + var trans *storage.TransGlobalStore = nil min := fmt.Sprintf("%d", time.Now().Add(expireIn).Unix()) next := time.Now().Add(time.Duration(config.RetryInterval) * time.Second) err := boltGet().Update(func(t *bolt.Tx) error { @@ -361,7 +365,7 @@ func (s *BoltdbStore) LockOneGlobalTrans(expireIn time.Duration) *TransGlobalSto for trans == nil { k, v := cursor.First() if k == nil || string(k) > min { - return ErrNotFound + return storage.ErrNotFound } trans = tGetGlobal(t, string(v)) err := t.Bucket(bucketIndex).Delete(k) @@ -372,7 +376,7 @@ func (s *BoltdbStore) LockOneGlobalTrans(expireIn time.Duration) *TransGlobalSto tPutIndex(t, next.Unix(), trans.Gid) return nil }) - if err == ErrNotFound { + if err == storage.ErrNotFound { return nil } dtmimp.E2P(err) diff --git a/dtmsvr/storage/boltdb_test.go b/dtmsvr/storage/boltdb/boltdb_test.go similarity index 97% rename from dtmsvr/storage/boltdb_test.go rename to dtmsvr/storage/boltdb/boltdb_test.go index c1656f4..7388222 100644 --- a/dtmsvr/storage/boltdb_test.go +++ b/dtmsvr/storage/boltdb/boltdb_test.go @@ -1,4 +1,4 @@ -package storage +package boltdb import ( "path" @@ -9,6 +9,7 @@ import ( bolt "go.etcd.io/bbolt" "github.com/yedf/dtm/dtmcli/dtmimp" + "github.com/yedf/dtm/dtmsvr/storage" ) func TestInitializeBuckets(t *testing.T) { @@ -69,7 +70,7 @@ func TestCleanupExpiredData(t *testing.T) { doneTime := time.Now().Add(-10 * time.Minute) gids := []string{"gid0", "gid1", "gid2"} - gidDatas := []TransGlobalStore{ + gidDatas := []storage.TransGlobalStore{ {}, // not finished { FinishTime: &doneTime, @@ -196,7 +197,7 @@ func TestCleanupBranchWithGids(t *testing.T) { } keys := []string{"a", "gid001", "gid002", "gid101", "gid201", "z"} - datas := []TransBranchStore{ + datas := []storage.TransBranchStore{ { Gid: "a", }, @@ -277,7 +278,7 @@ func TestCleanupIndexWithGids(t *testing.T) { } keys := []string{"a", "0-gid0", "1-gid0", "2-gid1", "3-gid2", "z"} - datas := []TransBranchStore{ + datas := []storage.TransBranchStore{ { Gid: "a", }, diff --git a/dtmsvr/storage/registry/registry.go b/dtmsvr/storage/registry/registry.go new file mode 100644 index 0000000..250ffe2 --- /dev/null +++ b/dtmsvr/storage/registry/registry.go @@ -0,0 +1,29 @@ +package registry + +import ( + "time" + + "github.com/yedf/dtm/common" + "github.com/yedf/dtm/dtmsvr/storage" + "github.com/yedf/dtm/dtmsvr/storage/boltdb" +) + +var config = &common.Config + +var stores map[string]storage.Store = map[string]storage.Store{ + "redis": &storage.RedisStore{}, + "mysql": &storage.SqlStore{}, + "postgres": &storage.SqlStore{}, + "boltdb": &boltdb.BoltdbStore{}, +} + +func GetStore() storage.Store { + return stores[config.Store.Driver] +} + +// WaitStoreUp wait for db to go up +func WaitStoreUp() { + for err := GetStore().Ping(); err != nil; err = GetStore().Ping() { + time.Sleep(3 * time.Second) + } +} diff --git a/dtmsvr/storage/store.go b/dtmsvr/storage/store.go index 43c79c3..d68b1a4 100644 --- a/dtmsvr/storage/store.go +++ b/dtmsvr/storage/store.go @@ -26,24 +26,6 @@ type Store interface { LockOneGlobalTrans(expireIn time.Duration) *TransGlobalStore } -var stores map[string]Store = map[string]Store{ - "redis": &RedisStore{}, - "mysql": &SqlStore{}, - "postgres": &SqlStore{}, - "boltdb": &BoltdbStore{}, -} - -func GetStore() Store { - return stores[config.Store.Driver] -} - -// WaitStoreUp wait for db to go up -func WaitStoreUp() { - for err := GetStore().Ping(); err != nil; err = GetStore().Ping() { - time.Sleep(3 * time.Second) - } -} - func wrapError(err error) error { if err == gorm.ErrRecordNotFound || err == redis.Nil { return ErrNotFound diff --git a/dtmsvr/utils.go b/dtmsvr/utils.go index fe5c7cb..07e017a 100644 --- a/dtmsvr/utils.go +++ b/dtmsvr/utils.go @@ -11,9 +11,11 @@ import ( "time" "github.com/google/uuid" + "github.com/yedf/dtm/common" "github.com/yedf/dtm/dtmcli/dtmimp" "github.com/yedf/dtm/dtmsvr/storage" + "github.com/yedf/dtm/dtmsvr/storage/registry" ) type branchStatus struct { @@ -28,7 +30,7 @@ var e2p = dtmimp.E2P var config = &common.Config func GetStore() storage.Store { - return storage.GetStore() + return registry.GetStore() } // TransProcessedTestChan only for test usage. when transaction processed once, write gid to this chan diff --git a/test/store_test.go b/test/store_test.go index 24a2f22..49549c0 100644 --- a/test/store_test.go +++ b/test/store_test.go @@ -5,8 +5,10 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/yedf/dtm/dtmcli/dtmimp" "github.com/yedf/dtm/dtmsvr/storage" + "github.com/yedf/dtm/dtmsvr/storage/registry" ) func initTransGlobal(gid string) (*storage.TransGlobalStore, storage.Store) { @@ -15,7 +17,7 @@ func initTransGlobal(gid string) (*storage.TransGlobalStore, storage.Store) { bs := []storage.TransBranchStore{ {Gid: gid, BranchID: "01"}, } - s := storage.GetStore() + s := registry.GetStore() err := s.MaySaveNewTrans(g, bs) dtmimp.E2P(err) return g, s @@ -87,12 +89,12 @@ func TestStoreLockTrans(t *testing.T) { } func TestStoreWait(t *testing.T) { - storage.WaitStoreUp() + registry.WaitStoreUp() } func TestUpdateBranchSql(t *testing.T) { if !config.Store.IsDB() { - r := storage.GetStore().UpdateBranchesSql(nil, nil) + r := registry.GetStore().UpdateBranchesSql(nil, nil) assert.Nil(t, r) } }