Browse Source

Merge pull request #165 from lsytj0413/refactor-bolt-to-factory

refactor(*): migrate boltdb to factory pattern
pull/175/head
yedf2 4 years ago
committed by GitHub
parent
commit
2d5f02ffdb
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 70
      dtmsvr/storage/boltdb/boltdb.go
  2. 25
      dtmsvr/storage/registry/factory.go
  3. 34
      dtmsvr/storage/registry/registry.go

70
dtmsvr/storage/boltdb/boltdb.go

@ -9,7 +9,6 @@ package boltdb
import (
"fmt"
"strings"
"sync"
"time"
bolt "go.etcd.io/bbolt"
@ -17,41 +16,44 @@ import (
"github.com/dtm-labs/dtm/dtmcli"
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmcli/logger"
"github.com/dtm-labs/dtm/dtmsvr/config"
"github.com/dtm-labs/dtm/dtmsvr/storage"
"github.com/dtm-labs/dtm/dtmutil"
)
var conf = &config.Config
// Store implements storage.Store, and storage with boltdb
type Store struct {
boltDb *bolt.DB
dataExpire int64
retryInterval int64
}
var boltDb *bolt.DB
var boltOnce sync.Once
// NewStore will return the boltdb implement
// TODO: change to options
func NewStore(dataExpire int64, retryInterval int64) *Store {
s := &Store{
dataExpire: dataExpire,
retryInterval: retryInterval,
}
func boltGet() *bolt.DB {
boltOnce.Do(func() {
db, err := bolt.Open("./dtm.bolt", 0666, &bolt.Options{Timeout: 1 * time.Second})
dtmimp.E2P(err)
db, err := bolt.Open("./dtm.bolt", 0666, &bolt.Options{Timeout: 1 * time.Second})
dtmimp.E2P(err)
// NOTE: we must ensure all buckets is exists before we use it
err = initializeBuckets(db)
dtmimp.E2P(err)
// NOTE: we must ensure all buckets is exists before we use it
err = initializeBuckets(db)
dtmimp.E2P(err)
// TODO:
// 1. refactor this code
// 2. make cleanup run period, to avoid the file growup when server long-running
err = cleanupExpiredData(
time.Duration(conf.Store.DataExpire)*time.Second,
db,
)
dtmimp.E2P(err)
// TODO:
// 1. refactor this code
// 2. make cleanup run period, to avoid the file growup when server long-running
err = cleanupExpiredData(
time.Duration(dataExpire)*time.Second,
db,
)
dtmimp.E2P(err)
boltDb = db
})
return boltDb
s.boltDb = db
return s
}
func initializeBuckets(db *bolt.DB) error {
@ -242,7 +244,7 @@ func (s *Store) Ping() error {
// PopulateData populates data to boltdb
func (s *Store) PopulateData(skipDrop bool) {
if !skipDrop {
err := boltGet().Update(func(t *bolt.Tx) error {
err := s.boltDb.Update(func(t *bolt.Tx) error {
dtmimp.E2P(t.DeleteBucket(bucketIndex))
dtmimp.E2P(t.DeleteBucket(bucketBranches))
dtmimp.E2P(t.DeleteBucket(bucketGlobal))
@ -262,7 +264,7 @@ func (s *Store) PopulateData(skipDrop bool) {
// FindTransGlobalStore finds GlobalTrans data by gid
func (s *Store) FindTransGlobalStore(gid string) (trans *storage.TransGlobalStore) {
err := boltGet().View(func(t *bolt.Tx) error {
err := s.boltDb.View(func(t *bolt.Tx) error {
trans = tGetGlobal(t, gid)
return nil
})
@ -273,7 +275,7 @@ func (s *Store) FindTransGlobalStore(gid string) (trans *storage.TransGlobalStor
// ScanTransGlobalStores lists GlobalTrans data
func (s *Store) ScanTransGlobalStores(position *string, limit int64) []storage.TransGlobalStore {
globals := []storage.TransGlobalStore{}
err := boltGet().View(func(t *bolt.Tx) error {
err := s.boltDb.View(func(t *bolt.Tx) error {
cursor := t.Bucket(bucketGlobal).Cursor()
for k, v := cursor.First(); k != nil; k, v = cursor.Next() {
if string(k) == *position {
@ -300,7 +302,7 @@ func (s *Store) ScanTransGlobalStores(position *string, limit int64) []storage.T
// FindBranches finds Branch data by gid
func (s *Store) FindBranches(gid string) []storage.TransBranchStore {
var branches []storage.TransBranchStore
err := boltGet().View(func(t *bolt.Tx) error {
err := s.boltDb.View(func(t *bolt.Tx) error {
branches = tGetBranches(t, gid)
return nil
})
@ -315,7 +317,7 @@ func (s *Store) UpdateBranches(branches []storage.TransBranchStore, updates []st
// LockGlobalSaveBranches creates branches
func (s *Store) LockGlobalSaveBranches(gid string, status string, branches []storage.TransBranchStore, branchStart int) {
err := boltGet().Update(func(t *bolt.Tx) error {
err := s.boltDb.Update(func(t *bolt.Tx) error {
g := tGetGlobal(t, gid)
if g == nil {
return storage.ErrNotFound
@ -331,7 +333,7 @@ func (s *Store) LockGlobalSaveBranches(gid string, status string, branches []sto
// MaySaveNewTrans creates a new trans
func (s *Store) MaySaveNewTrans(global *storage.TransGlobalStore, branches []storage.TransBranchStore) error {
return boltGet().Update(func(t *bolt.Tx) error {
return s.boltDb.Update(func(t *bolt.Tx) error {
g := tGetGlobal(t, global.Gid)
if g != nil {
return storage.ErrUniqueConflict
@ -347,7 +349,7 @@ func (s *Store) MaySaveNewTrans(global *storage.TransGlobalStore, branches []sto
func (s *Store) ChangeGlobalStatus(global *storage.TransGlobalStore, newStatus string, updates []string, finished bool) {
old := global.Status
global.Status = newStatus
err := boltGet().Update(func(t *bolt.Tx) error {
err := s.boltDb.Update(func(t *bolt.Tx) error {
g := tGetGlobal(t, global.Gid)
if g == nil || g.Status != old {
return storage.ErrNotFound
@ -367,7 +369,7 @@ func (s *Store) TouchCronTime(global *storage.TransGlobalStore, nextCronInterval
global.NextCronTime = dtmutil.GetNextTime(nextCronInterval)
global.UpdateTime = dtmutil.GetNextTime(0)
global.NextCronInterval = nextCronInterval
err := boltGet().Update(func(t *bolt.Tx) error {
err := s.boltDb.Update(func(t *bolt.Tx) error {
g := tGetGlobal(t, global.Gid)
if g == nil || g.Gid != global.Gid {
return storage.ErrNotFound
@ -384,8 +386,8 @@ func (s *Store) TouchCronTime(global *storage.TransGlobalStore, nextCronInterval
func (s *Store) LockOneGlobalTrans(expireIn time.Duration) *storage.TransGlobalStore {
var trans *storage.TransGlobalStore
min := fmt.Sprintf("%d", time.Now().Add(expireIn).Unix())
next := time.Now().Add(time.Duration(conf.RetryInterval) * time.Second)
err := boltGet().Update(func(t *bolt.Tx) error {
next := time.Now().Add(time.Duration(s.retryInterval) * time.Second)
err := s.boltDb.Update(func(t *bolt.Tx) error {
cursor := t.Bucket(bucketIndex).Cursor()
for trans == nil || trans.Status == dtmcli.StatusSucceed || trans.Status == dtmcli.StatusFailed {
k, v := cursor.First()

25
dtmsvr/storage/registry/factory.go

@ -0,0 +1,25 @@
package registry
import (
"sync"
"github.com/dtm-labs/dtm/dtmsvr/storage"
)
// SingletonFactory is the factory to build store in SINGLETON pattern.
type SingletonFactory struct {
once sync.Once
store storage.Store
creatorFunction func() storage.Store
}
// GetStorage implement the StorageFactory.GetStorage
func (f *SingletonFactory) GetStorage() storage.Store {
f.once.Do(func() {
f.store = f.creatorFunction()
})
return f.store
}

34
dtmsvr/storage/registry/registry.go

@ -12,16 +12,38 @@ import (
var conf = &config.Config
var stores map[string]storage.Store = map[string]storage.Store{
"redis": &redis.Store{},
"mysql": &sql.Store{},
"postgres": &sql.Store{},
"boltdb": &boltdb.Store{},
// StorageFactory is factory to get storage instance.
type StorageFactory interface {
// GetStorage will return the Storage instance.
GetStorage() storage.Store
}
var storeFactorys = map[string]StorageFactory{
"boltdb": &SingletonFactory{
creatorFunction: func() storage.Store {
return boltdb.NewStore(conf.Store.DataExpire, conf.RetryInterval)
},
},
"redis": &SingletonFactory{
creatorFunction: func() storage.Store {
return &redis.Store{}
},
},
"mysql": &SingletonFactory{
creatorFunction: func() storage.Store {
return &sql.Store{}
},
},
"postgres": &SingletonFactory{
creatorFunction: func() storage.Store {
return &sql.Store{}
},
},
}
// GetStore returns storage.Store
func GetStore() storage.Store {
return stores[conf.Store.Driver]
return storeFactorys[conf.Store.Driver].GetStorage()
}
// WaitStoreUp wait for db to go up

Loading…
Cancel
Save