|
|
|
@ -9,48 +9,50 @@ package boltdb |
|
|
|
import ( |
|
|
|
"fmt" |
|
|
|
"strings" |
|
|
|
"sync" |
|
|
|
"time" |
|
|
|
|
|
|
|
bolt "go.etcd.io/bbolt" |
|
|
|
|
|
|
|
"github.com/dtm-labs/dtm/dtmcli/dtmimp" |
|
|
|
"github.com/dtm-labs/dtm/dtmcli/logger" |
|
|
|
"github.com/dtm-labs/dtm/dtmsvr/config" |
|
|
|
"github.com/dtm-labs/dtm/dtmsvr/storage" |
|
|
|
"github.com/dtm-labs/dtm/dtmutil" |
|
|
|
) |
|
|
|
|
|
|
|
var conf = &config.Config |
|
|
|
|
|
|
|
// Store implements storage.Store, and storage with boltdb
|
|
|
|
type Store struct { |
|
|
|
boltDb *bolt.DB |
|
|
|
|
|
|
|
dataExpire int64 |
|
|
|
retryInterval int64 |
|
|
|
} |
|
|
|
|
|
|
|
var boltDb *bolt.DB |
|
|
|
var boltOnce sync.Once |
|
|
|
// NewStore will return the boltdb implement
|
|
|
|
// TODO: change to options
|
|
|
|
func NewStore(dataExpire int64, retryInterval int64) *Store { |
|
|
|
s := &Store{ |
|
|
|
dataExpire: dataExpire, |
|
|
|
retryInterval: retryInterval, |
|
|
|
} |
|
|
|
|
|
|
|
func boltGet() *bolt.DB { |
|
|
|
boltOnce.Do(func() { |
|
|
|
db, err := bolt.Open("./dtm.bolt", 0666, &bolt.Options{Timeout: 1 * time.Second}) |
|
|
|
dtmimp.E2P(err) |
|
|
|
db, err := bolt.Open("./dtm.bolt", 0666, &bolt.Options{Timeout: 1 * time.Second}) |
|
|
|
dtmimp.E2P(err) |
|
|
|
|
|
|
|
// NOTE: we must ensure all buckets is exists before we use it
|
|
|
|
err = initializeBuckets(db) |
|
|
|
dtmimp.E2P(err) |
|
|
|
// NOTE: we must ensure all buckets is exists before we use it
|
|
|
|
err = initializeBuckets(db) |
|
|
|
dtmimp.E2P(err) |
|
|
|
|
|
|
|
// TODO:
|
|
|
|
// 1. refactor this code
|
|
|
|
// 2. make cleanup run period, to avoid the file growup when server long-running
|
|
|
|
err = cleanupExpiredData( |
|
|
|
time.Duration(conf.Store.DataExpire)*time.Second, |
|
|
|
db, |
|
|
|
) |
|
|
|
dtmimp.E2P(err) |
|
|
|
// TODO:
|
|
|
|
// 1. refactor this code
|
|
|
|
// 2. make cleanup run period, to avoid the file growup when server long-running
|
|
|
|
err = cleanupExpiredData( |
|
|
|
time.Duration(dataExpire)*time.Second, |
|
|
|
db, |
|
|
|
) |
|
|
|
dtmimp.E2P(err) |
|
|
|
|
|
|
|
boltDb = db |
|
|
|
}) |
|
|
|
return boltDb |
|
|
|
s.boltDb = db |
|
|
|
return s |
|
|
|
} |
|
|
|
|
|
|
|
func initializeBuckets(db *bolt.DB) error { |
|
|
|
@ -241,7 +243,7 @@ func (s *Store) Ping() error { |
|
|
|
// PopulateData populates data to boltdb
|
|
|
|
func (s *Store) PopulateData(skipDrop bool) { |
|
|
|
if !skipDrop { |
|
|
|
err := boltGet().Update(func(t *bolt.Tx) error { |
|
|
|
err := s.boltDb.Update(func(t *bolt.Tx) error { |
|
|
|
dtmimp.E2P(t.DeleteBucket(bucketIndex)) |
|
|
|
dtmimp.E2P(t.DeleteBucket(bucketBranches)) |
|
|
|
dtmimp.E2P(t.DeleteBucket(bucketGlobal)) |
|
|
|
@ -261,7 +263,7 @@ func (s *Store) PopulateData(skipDrop bool) { |
|
|
|
|
|
|
|
// FindTransGlobalStore finds GlobalTrans data by gid
|
|
|
|
func (s *Store) FindTransGlobalStore(gid string) (trans *storage.TransGlobalStore) { |
|
|
|
err := boltGet().View(func(t *bolt.Tx) error { |
|
|
|
err := s.boltDb.View(func(t *bolt.Tx) error { |
|
|
|
trans = tGetGlobal(t, gid) |
|
|
|
return nil |
|
|
|
}) |
|
|
|
@ -272,7 +274,7 @@ func (s *Store) FindTransGlobalStore(gid string) (trans *storage.TransGlobalStor |
|
|
|
// ScanTransGlobalStores lists GlobalTrans data
|
|
|
|
func (s *Store) ScanTransGlobalStores(position *string, limit int64) []storage.TransGlobalStore { |
|
|
|
globals := []storage.TransGlobalStore{} |
|
|
|
err := boltGet().View(func(t *bolt.Tx) error { |
|
|
|
err := s.boltDb.View(func(t *bolt.Tx) error { |
|
|
|
cursor := t.Bucket(bucketGlobal).Cursor() |
|
|
|
for k, v := cursor.First(); k != nil; k, v = cursor.Next() { |
|
|
|
if string(k) == *position { |
|
|
|
@ -299,7 +301,7 @@ func (s *Store) ScanTransGlobalStores(position *string, limit int64) []storage.T |
|
|
|
// FindBranches finds Branch data by gid
|
|
|
|
func (s *Store) FindBranches(gid string) []storage.TransBranchStore { |
|
|
|
var branches []storage.TransBranchStore |
|
|
|
err := boltGet().View(func(t *bolt.Tx) error { |
|
|
|
err := s.boltDb.View(func(t *bolt.Tx) error { |
|
|
|
branches = tGetBranches(t, gid) |
|
|
|
return nil |
|
|
|
}) |
|
|
|
@ -314,7 +316,7 @@ func (s *Store) UpdateBranches(branches []storage.TransBranchStore, updates []st |
|
|
|
|
|
|
|
// LockGlobalSaveBranches creates branches
|
|
|
|
func (s *Store) LockGlobalSaveBranches(gid string, status string, branches []storage.TransBranchStore, branchStart int) { |
|
|
|
err := boltGet().Update(func(t *bolt.Tx) error { |
|
|
|
err := s.boltDb.Update(func(t *bolt.Tx) error { |
|
|
|
g := tGetGlobal(t, gid) |
|
|
|
if g == nil { |
|
|
|
return storage.ErrNotFound |
|
|
|
@ -330,7 +332,7 @@ func (s *Store) LockGlobalSaveBranches(gid string, status string, branches []sto |
|
|
|
|
|
|
|
// MaySaveNewTrans creates a new trans
|
|
|
|
func (s *Store) MaySaveNewTrans(global *storage.TransGlobalStore, branches []storage.TransBranchStore) error { |
|
|
|
return boltGet().Update(func(t *bolt.Tx) error { |
|
|
|
return s.boltDb.Update(func(t *bolt.Tx) error { |
|
|
|
g := tGetGlobal(t, global.Gid) |
|
|
|
if g != nil { |
|
|
|
return storage.ErrUniqueConflict |
|
|
|
@ -346,7 +348,7 @@ func (s *Store) MaySaveNewTrans(global *storage.TransGlobalStore, branches []sto |
|
|
|
func (s *Store) ChangeGlobalStatus(global *storage.TransGlobalStore, newStatus string, updates []string, finished bool) { |
|
|
|
old := global.Status |
|
|
|
global.Status = newStatus |
|
|
|
err := boltGet().Update(func(t *bolt.Tx) error { |
|
|
|
err := s.boltDb.Update(func(t *bolt.Tx) error { |
|
|
|
g := tGetGlobal(t, global.Gid) |
|
|
|
if g == nil || g.Status != old { |
|
|
|
return storage.ErrNotFound |
|
|
|
@ -366,7 +368,7 @@ func (s *Store) TouchCronTime(global *storage.TransGlobalStore, nextCronInterval |
|
|
|
global.NextCronTime = dtmutil.GetNextTime(nextCronInterval) |
|
|
|
global.UpdateTime = dtmutil.GetNextTime(0) |
|
|
|
global.NextCronInterval = nextCronInterval |
|
|
|
err := boltGet().Update(func(t *bolt.Tx) error { |
|
|
|
err := s.boltDb.Update(func(t *bolt.Tx) error { |
|
|
|
g := tGetGlobal(t, global.Gid) |
|
|
|
if g == nil || g.Gid != global.Gid { |
|
|
|
return storage.ErrNotFound |
|
|
|
@ -383,8 +385,8 @@ func (s *Store) TouchCronTime(global *storage.TransGlobalStore, nextCronInterval |
|
|
|
func (s *Store) LockOneGlobalTrans(expireIn time.Duration) *storage.TransGlobalStore { |
|
|
|
var trans *storage.TransGlobalStore |
|
|
|
min := fmt.Sprintf("%d", time.Now().Add(expireIn).Unix()) |
|
|
|
next := time.Now().Add(time.Duration(conf.RetryInterval) * time.Second) |
|
|
|
err := boltGet().Update(func(t *bolt.Tx) error { |
|
|
|
next := time.Now().Add(time.Duration(s.retryInterval) * time.Second) |
|
|
|
err := s.boltDb.Update(func(t *bolt.Tx) error { |
|
|
|
cursor := t.Bucket(bucketIndex).Cursor() |
|
|
|
for trans == nil { |
|
|
|
k, v := cursor.First() |
|
|
|
|