From 2221bbf74e9f95bec909f1ee148aa2de44b36793 Mon Sep 17 00:00:00 2001 From: lsytj0413 <511121939@qq.com> Date: Sun, 9 Jan 2022 16:02:35 +0800 Subject: [PATCH] refactor(*): migrate boltdb to factory pattern --- dtmsvr/storage/boltdb/boltdb.go | 70 +++++++++++++++-------------- dtmsvr/storage/registry/factory.go | 25 +++++++++++ dtmsvr/storage/registry/registry.go | 34 +++++++++++--- 3 files changed, 89 insertions(+), 40 deletions(-) create mode 100644 dtmsvr/storage/registry/factory.go diff --git a/dtmsvr/storage/boltdb/boltdb.go b/dtmsvr/storage/boltdb/boltdb.go index 19f0607..95d78fe 100644 --- a/dtmsvr/storage/boltdb/boltdb.go +++ b/dtmsvr/storage/boltdb/boltdb.go @@ -9,48 +9,50 @@ package boltdb import ( "fmt" "strings" - "sync" "time" bolt "go.etcd.io/bbolt" "github.com/dtm-labs/dtm/dtmcli/dtmimp" "github.com/dtm-labs/dtm/dtmcli/logger" - "github.com/dtm-labs/dtm/dtmsvr/config" "github.com/dtm-labs/dtm/dtmsvr/storage" "github.com/dtm-labs/dtm/dtmutil" ) -var conf = &config.Config - // Store implements storage.Store, and storage with boltdb type Store struct { + boltDb *bolt.DB + + dataExpire int64 + retryInterval int64 } -var boltDb *bolt.DB -var boltOnce sync.Once +// NewStore will return the boltdb implement +// TODO: change to options +func NewStore(dataExpire int64, retryInterval int64) *Store { + s := &Store{ + dataExpire: dataExpire, + retryInterval: retryInterval, + } -func boltGet() *bolt.DB { - boltOnce.Do(func() { - db, err := bolt.Open("./dtm.bolt", 0666, &bolt.Options{Timeout: 1 * time.Second}) - dtmimp.E2P(err) + db, err := bolt.Open("./dtm.bolt", 0666, &bolt.Options{Timeout: 1 * time.Second}) + dtmimp.E2P(err) - // NOTE: we must ensure all buckets is exists before we use it - err = initializeBuckets(db) - dtmimp.E2P(err) + // NOTE: we must ensure all buckets is exists before we use it + err = initializeBuckets(db) + dtmimp.E2P(err) - // TODO: - // 1. refactor this code - // 2. make cleanup run period, to avoid the file growup when server long-running - err = cleanupExpiredData( - time.Duration(conf.Store.DataExpire)*time.Second, - db, - ) - dtmimp.E2P(err) + // TODO: + // 1. refactor this code + // 2. make cleanup run period, to avoid the file growup when server long-running + err = cleanupExpiredData( + time.Duration(dataExpire)*time.Second, + db, + ) + dtmimp.E2P(err) - boltDb = db - }) - return boltDb + s.boltDb = db + return s } func initializeBuckets(db *bolt.DB) error { @@ -241,7 +243,7 @@ func (s *Store) Ping() error { // PopulateData populates data to boltdb func (s *Store) PopulateData(skipDrop bool) { if !skipDrop { - err := boltGet().Update(func(t *bolt.Tx) error { + err := s.boltDb.Update(func(t *bolt.Tx) error { dtmimp.E2P(t.DeleteBucket(bucketIndex)) dtmimp.E2P(t.DeleteBucket(bucketBranches)) dtmimp.E2P(t.DeleteBucket(bucketGlobal)) @@ -261,7 +263,7 @@ func (s *Store) PopulateData(skipDrop bool) { // FindTransGlobalStore finds GlobalTrans data by gid func (s *Store) FindTransGlobalStore(gid string) (trans *storage.TransGlobalStore) { - err := boltGet().View(func(t *bolt.Tx) error { + err := s.boltDb.View(func(t *bolt.Tx) error { trans = tGetGlobal(t, gid) return nil }) @@ -272,7 +274,7 @@ func (s *Store) FindTransGlobalStore(gid string) (trans *storage.TransGlobalStor // ScanTransGlobalStores lists GlobalTrans data func (s *Store) ScanTransGlobalStores(position *string, limit int64) []storage.TransGlobalStore { globals := []storage.TransGlobalStore{} - err := boltGet().View(func(t *bolt.Tx) error { + err := s.boltDb.View(func(t *bolt.Tx) error { cursor := t.Bucket(bucketGlobal).Cursor() for k, v := cursor.First(); k != nil; k, v = cursor.Next() { if string(k) == *position { @@ -299,7 +301,7 @@ func (s *Store) ScanTransGlobalStores(position *string, limit int64) []storage.T // FindBranches finds Branch data by gid func (s *Store) FindBranches(gid string) []storage.TransBranchStore { var branches []storage.TransBranchStore - err := boltGet().View(func(t *bolt.Tx) error { + err := s.boltDb.View(func(t *bolt.Tx) error { branches = tGetBranches(t, gid) return nil }) @@ -314,7 +316,7 @@ func (s *Store) UpdateBranches(branches []storage.TransBranchStore, updates []st // LockGlobalSaveBranches creates branches func (s *Store) LockGlobalSaveBranches(gid string, status string, branches []storage.TransBranchStore, branchStart int) { - err := boltGet().Update(func(t *bolt.Tx) error { + err := s.boltDb.Update(func(t *bolt.Tx) error { g := tGetGlobal(t, gid) if g == nil { return storage.ErrNotFound @@ -330,7 +332,7 @@ func (s *Store) LockGlobalSaveBranches(gid string, status string, branches []sto // MaySaveNewTrans creates a new trans func (s *Store) MaySaveNewTrans(global *storage.TransGlobalStore, branches []storage.TransBranchStore) error { - return boltGet().Update(func(t *bolt.Tx) error { + return s.boltDb.Update(func(t *bolt.Tx) error { g := tGetGlobal(t, global.Gid) if g != nil { return storage.ErrUniqueConflict @@ -346,7 +348,7 @@ func (s *Store) MaySaveNewTrans(global *storage.TransGlobalStore, branches []sto func (s *Store) ChangeGlobalStatus(global *storage.TransGlobalStore, newStatus string, updates []string, finished bool) { old := global.Status global.Status = newStatus - err := boltGet().Update(func(t *bolt.Tx) error { + err := s.boltDb.Update(func(t *bolt.Tx) error { g := tGetGlobal(t, global.Gid) if g == nil || g.Status != old { return storage.ErrNotFound @@ -366,7 +368,7 @@ func (s *Store) TouchCronTime(global *storage.TransGlobalStore, nextCronInterval global.NextCronTime = dtmutil.GetNextTime(nextCronInterval) global.UpdateTime = dtmutil.GetNextTime(0) global.NextCronInterval = nextCronInterval - err := boltGet().Update(func(t *bolt.Tx) error { + err := s.boltDb.Update(func(t *bolt.Tx) error { g := tGetGlobal(t, global.Gid) if g == nil || g.Gid != global.Gid { return storage.ErrNotFound @@ -383,8 +385,8 @@ func (s *Store) TouchCronTime(global *storage.TransGlobalStore, nextCronInterval func (s *Store) LockOneGlobalTrans(expireIn time.Duration) *storage.TransGlobalStore { var trans *storage.TransGlobalStore min := fmt.Sprintf("%d", time.Now().Add(expireIn).Unix()) - next := time.Now().Add(time.Duration(conf.RetryInterval) * time.Second) - err := boltGet().Update(func(t *bolt.Tx) error { + next := time.Now().Add(time.Duration(s.retryInterval) * time.Second) + err := s.boltDb.Update(func(t *bolt.Tx) error { cursor := t.Bucket(bucketIndex).Cursor() for trans == nil { k, v := cursor.First() diff --git a/dtmsvr/storage/registry/factory.go b/dtmsvr/storage/registry/factory.go new file mode 100644 index 0000000..811bbf0 --- /dev/null +++ b/dtmsvr/storage/registry/factory.go @@ -0,0 +1,25 @@ +package registry + +import ( + "sync" + + "github.com/dtm-labs/dtm/dtmsvr/storage" +) + +// SingletonFactory is the factory to build store in SINGLETON pattern. +type SingletonFactory struct { + once sync.Once + + store storage.Store + + creatorFunction func() storage.Store +} + +// GetStorage implement the StorageFactory.GetStorage +func (f *SingletonFactory) GetStorage() storage.Store { + f.once.Do(func() { + f.store = f.creatorFunction() + }) + + return f.store +} diff --git a/dtmsvr/storage/registry/registry.go b/dtmsvr/storage/registry/registry.go index 1788456..003fd69 100644 --- a/dtmsvr/storage/registry/registry.go +++ b/dtmsvr/storage/registry/registry.go @@ -12,16 +12,38 @@ import ( var conf = &config.Config -var stores map[string]storage.Store = map[string]storage.Store{ - "redis": &redis.Store{}, - "mysql": &sql.Store{}, - "postgres": &sql.Store{}, - "boltdb": &boltdb.Store{}, +// StorageFactory is factory to get storage instance. +type StorageFactory interface { + // GetStorage will return the Storage instance. + GetStorage() storage.Store +} + +var storeFactorys = map[string]StorageFactory{ + "boltdb": &SingletonFactory{ + creatorFunction: func() storage.Store { + return boltdb.NewStore(conf.Store.DataExpire, conf.RetryInterval) + }, + }, + "redis": &SingletonFactory{ + creatorFunction: func() storage.Store { + return &redis.Store{} + }, + }, + "mysql": &SingletonFactory{ + creatorFunction: func() storage.Store { + return &sql.Store{} + }, + }, + "postgres": &SingletonFactory{ + creatorFunction: func() storage.Store { + return &sql.Store{} + }, + }, } // GetStore returns storage.Store func GetStore() storage.Store { - return stores[conf.Store.Driver] + return storeFactorys[conf.Store.Driver].GetStorage() } // WaitStoreUp wait for db to go up