Browse Source

refactor(*): split storage to different package

pull/119/head
lsytj0413 4 years ago
parent
commit
e27eb90fe8
  1. 8
      app/main.go
  2. 4
      bench/main.go
  3. 68
      dtmsvr/storage/boltdb/boltdb.go
  4. 9
      dtmsvr/storage/boltdb/boltdb_test.go
  5. 29
      dtmsvr/storage/registry/registry.go
  6. 18
      dtmsvr/storage/store.go
  7. 4
      dtmsvr/utils.go
  8. 8
      test/store_test.go

8
app/main.go

@ -11,14 +11,14 @@ import (
"os" "os"
"strings" "strings"
_ "go.uber.org/automaxprocs"
"github.com/yedf/dtm/common" "github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmcli" "github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/dtmcli/dtmimp" "github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmsvr" "github.com/yedf/dtm/dtmsvr"
"github.com/yedf/dtm/dtmsvr/storage" "github.com/yedf/dtm/dtmsvr/storage/registry"
"github.com/yedf/dtm/examples" "github.com/yedf/dtm/examples"
_ "go.uber.org/automaxprocs"
) )
var Version, Commit, Date string var Version, Commit, Date string
@ -56,7 +56,7 @@ func main() {
dtmcli.SetCurrentDBType(common.Config.ExamplesDB.Driver) dtmcli.SetCurrentDBType(common.Config.ExamplesDB.Driver)
} }
if os.Args[1] != "dtmsvr" { // 实际线上运行,只启动dtmsvr,不准备table相关的数据 if os.Args[1] != "dtmsvr" { // 实际线上运行,只启动dtmsvr,不准备table相关的数据
storage.WaitStoreUp() registry.WaitStoreUp()
dtmsvr.PopulateDB(true) dtmsvr.PopulateDB(true)
examples.PopulateDB(true) examples.PopulateDB(true)
} }

4
bench/main.go

@ -8,7 +8,7 @@ import (
"github.com/yedf/dtm/dtmcli" "github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/dtmcli/dtmimp" "github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmsvr" "github.com/yedf/dtm/dtmsvr"
"github.com/yedf/dtm/dtmsvr/storage" "github.com/yedf/dtm/dtmsvr/storage/registry"
"github.com/yedf/dtm/examples" "github.com/yedf/dtm/examples"
) )
@ -28,7 +28,7 @@ func main() {
fmt.Println("start bench server") fmt.Println("start bench server")
common.MustLoadConfig() common.MustLoadConfig()
dtmcli.SetCurrentDBType(common.Config.ExamplesDB.Driver) dtmcli.SetCurrentDBType(common.Config.ExamplesDB.Driver)
storage.WaitStoreUp() registry.WaitStoreUp()
dtmsvr.PopulateDB(true) dtmsvr.PopulateDB(true)
examples.PopulateDB(true) examples.PopulateDB(true)
dtmsvr.StartSvr() // 启动dtmsvr的api服务 dtmsvr.StartSvr() // 启动dtmsvr的api服务

68
dtmsvr/storage/boltdb.go → dtmsvr/storage/boltdb/boltdb.go

@ -1,4 +1,4 @@
package storage package boltdb
import ( import (
"fmt" "fmt"
@ -6,12 +6,16 @@ import (
"sync" "sync"
"time" "time"
"github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmcli/dtmimp"
bolt "go.etcd.io/bbolt" bolt "go.etcd.io/bbolt"
"gorm.io/gorm" "gorm.io/gorm"
"github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmsvr/storage"
) )
var config = &common.Config
type BoltdbStore struct { type BoltdbStore struct {
} }
@ -71,7 +75,7 @@ func cleanupExpiredData(expiredSeconds time.Duration, db *bolt.DB) error {
expiredGids := map[string]struct{}{} expiredGids := map[string]struct{}{}
cursor := globalBucket.Cursor() cursor := globalBucket.Cursor()
for k, v := cursor.First(); k != nil; k, v = cursor.Next() { for k, v := cursor.First(); k != nil; k, v = cursor.Next() {
trans := TransGlobalStore{} trans := storage.TransGlobalStore{}
dtmimp.MustUnmarshal(v, &trans) dtmimp.MustUnmarshal(v, &trans)
transDoneTime := trans.FinishTime transDoneTime := trans.FinishTime
@ -115,7 +119,7 @@ func cleanupBranchWithGids(t *bolt.Tx, gids map[string]struct{}) {
for gid := range gids { for gid := range gids {
cursor := bucket.Cursor() cursor := bucket.Cursor()
for k, v := cursor.Seek([]byte(gid)); k != nil; k, v = cursor.Next() { for k, v := cursor.Seek([]byte(gid)); k != nil; k, v = cursor.Next() {
b := TransBranchStore{} b := storage.TransBranchStore{}
dtmimp.MustUnmarshal(v, &b) dtmimp.MustUnmarshal(v, &b)
if b.Gid != gid { if b.Gid != gid {
break break
@ -167,8 +171,8 @@ var allBuckets = [][]byte{
bucketIndex, bucketIndex,
} }
func tGetGlobal(t *bolt.Tx, gid string) *TransGlobalStore { func tGetGlobal(t *bolt.Tx, gid string) *storage.TransGlobalStore {
trans := TransGlobalStore{} trans := storage.TransGlobalStore{}
bs := t.Bucket(bucketGlobal).Get([]byte(gid)) bs := t.Bucket(bucketGlobal).Get([]byte(gid))
if bs == nil { if bs == nil {
return nil return nil
@ -177,11 +181,11 @@ func tGetGlobal(t *bolt.Tx, gid string) *TransGlobalStore {
return &trans return &trans
} }
func tGetBranches(t *bolt.Tx, gid string) []TransBranchStore { func tGetBranches(t *bolt.Tx, gid string) []storage.TransBranchStore {
branches := []TransBranchStore{} branches := []storage.TransBranchStore{}
cursor := t.Bucket(bucketBranches).Cursor() cursor := t.Bucket(bucketBranches).Cursor()
for k, v := cursor.Seek([]byte(gid)); k != nil; k, v = cursor.Next() { for k, v := cursor.Seek([]byte(gid)); k != nil; k, v = cursor.Next() {
b := TransBranchStore{} b := storage.TransBranchStore{}
dtmimp.MustUnmarshal(v, &b) dtmimp.MustUnmarshal(v, &b)
if b.Gid != gid { if b.Gid != gid {
break break
@ -190,13 +194,13 @@ func tGetBranches(t *bolt.Tx, gid string) []TransBranchStore {
} }
return branches return branches
} }
func tPutGlobal(t *bolt.Tx, global *TransGlobalStore) { func tPutGlobal(t *bolt.Tx, global *storage.TransGlobalStore) {
bs := dtmimp.MustMarshal(global) bs := dtmimp.MustMarshal(global)
err := t.Bucket(bucketGlobal).Put([]byte(global.Gid), bs) err := t.Bucket(bucketGlobal).Put([]byte(global.Gid), bs)
dtmimp.E2P(err) dtmimp.E2P(err)
} }
func tPutBranches(t *bolt.Tx, branches []TransBranchStore, start int64) { func tPutBranches(t *bolt.Tx, branches []storage.TransBranchStore, start int64) {
if start == -1 { if start == -1 {
bs := tGetBranches(t, branches[0].Gid) bs := tGetBranches(t, branches[0].Gid)
start = int64(len(bs)) start = int64(len(bs))
@ -240,7 +244,7 @@ func (s *BoltdbStore) PopulateData(skipDrop bool) {
} }
} }
func (s *BoltdbStore) FindTransGlobalStore(gid string) (trans *TransGlobalStore) { func (s *BoltdbStore) FindTransGlobalStore(gid string) (trans *storage.TransGlobalStore) {
err := boltGet().View(func(t *bolt.Tx) error { err := boltGet().View(func(t *bolt.Tx) error {
trans = tGetGlobal(t, gid) trans = tGetGlobal(t, gid)
return nil return nil
@ -249,15 +253,15 @@ func (s *BoltdbStore) FindTransGlobalStore(gid string) (trans *TransGlobalStore)
return return
} }
func (s *BoltdbStore) ScanTransGlobalStores(position *string, limit int64) []TransGlobalStore { func (s *BoltdbStore) ScanTransGlobalStores(position *string, limit int64) []storage.TransGlobalStore {
globals := []TransGlobalStore{} globals := []storage.TransGlobalStore{}
err := boltGet().View(func(t *bolt.Tx) error { err := boltGet().View(func(t *bolt.Tx) error {
cursor := t.Bucket(bucketGlobal).Cursor() cursor := t.Bucket(bucketGlobal).Cursor()
for k, v := cursor.First(); k != nil; k, v = cursor.Next() { for k, v := cursor.First(); k != nil; k, v = cursor.Next() {
if string(k) == *position { if string(k) == *position {
continue continue
} }
g := TransGlobalStore{} g := storage.TransGlobalStore{}
dtmimp.MustUnmarshal(v, &g) dtmimp.MustUnmarshal(v, &g)
globals = append(globals, g) globals = append(globals, g)
if len(globals) == int(limit) { if len(globals) == int(limit) {
@ -275,8 +279,8 @@ func (s *BoltdbStore) ScanTransGlobalStores(position *string, limit int64) []Tra
return globals return globals
} }
func (s *BoltdbStore) FindBranches(gid string) []TransBranchStore { func (s *BoltdbStore) FindBranches(gid string) []storage.TransBranchStore {
var branches []TransBranchStore = nil var branches []storage.TransBranchStore = nil
err := boltGet().View(func(t *bolt.Tx) error { err := boltGet().View(func(t *bolt.Tx) error {
branches = tGetBranches(t, gid) branches = tGetBranches(t, gid)
return nil return nil
@ -285,18 +289,18 @@ func (s *BoltdbStore) FindBranches(gid string) []TransBranchStore {
return branches return branches
} }
func (s *BoltdbStore) UpdateBranchesSql(branches []TransBranchStore, updates []string) *gorm.DB { func (s *BoltdbStore) UpdateBranchesSql(branches []storage.TransBranchStore, updates []string) *gorm.DB {
return nil // not implemented return nil // not implemented
} }
func (s *BoltdbStore) LockGlobalSaveBranches(gid string, status string, branches []TransBranchStore, branchStart int) { func (s *BoltdbStore) LockGlobalSaveBranches(gid string, status string, branches []storage.TransBranchStore, branchStart int) {
err := boltGet().Update(func(t *bolt.Tx) error { err := boltGet().Update(func(t *bolt.Tx) error {
g := tGetGlobal(t, gid) g := tGetGlobal(t, gid)
if g == nil { if g == nil {
return ErrNotFound return storage.ErrNotFound
} }
if g.Status != status { if g.Status != status {
return ErrNotFound return storage.ErrNotFound
} }
tPutBranches(t, branches, int64(branchStart)) tPutBranches(t, branches, int64(branchStart))
return nil return nil
@ -304,11 +308,11 @@ func (s *BoltdbStore) LockGlobalSaveBranches(gid string, status string, branches
dtmimp.E2P(err) dtmimp.E2P(err)
} }
func (s *BoltdbStore) MaySaveNewTrans(global *TransGlobalStore, branches []TransBranchStore) error { func (s *BoltdbStore) MaySaveNewTrans(global *storage.TransGlobalStore, branches []storage.TransBranchStore) error {
return boltGet().Update(func(t *bolt.Tx) error { return boltGet().Update(func(t *bolt.Tx) error {
g := tGetGlobal(t, global.Gid) g := tGetGlobal(t, global.Gid)
if g != nil { if g != nil {
return ErrUniqueConflict return storage.ErrUniqueConflict
} }
tPutGlobal(t, global) tPutGlobal(t, global)
tPutIndex(t, global.NextCronTime.Unix(), global.Gid) tPutIndex(t, global.NextCronTime.Unix(), global.Gid)
@ -317,13 +321,13 @@ func (s *BoltdbStore) MaySaveNewTrans(global *TransGlobalStore, branches []Trans
}) })
} }
func (s *BoltdbStore) ChangeGlobalStatus(global *TransGlobalStore, newStatus string, updates []string, finished bool) { func (s *BoltdbStore) ChangeGlobalStatus(global *storage.TransGlobalStore, newStatus string, updates []string, finished bool) {
old := global.Status old := global.Status
global.Status = newStatus global.Status = newStatus
err := boltGet().Update(func(t *bolt.Tx) error { err := boltGet().Update(func(t *bolt.Tx) error {
g := tGetGlobal(t, global.Gid) g := tGetGlobal(t, global.Gid)
if g == nil || g.Status != old { if g == nil || g.Status != old {
return ErrNotFound return storage.ErrNotFound
} }
if finished { if finished {
tDelIndex(t, g.NextCronTime.Unix(), g.Gid) tDelIndex(t, g.NextCronTime.Unix(), g.Gid)
@ -334,7 +338,7 @@ func (s *BoltdbStore) ChangeGlobalStatus(global *TransGlobalStore, newStatus str
dtmimp.E2P(err) dtmimp.E2P(err)
} }
func (s *BoltdbStore) TouchCronTime(global *TransGlobalStore, nextCronInterval int64) { func (s *BoltdbStore) TouchCronTime(global *storage.TransGlobalStore, nextCronInterval int64) {
oldUnix := global.NextCronTime.Unix() oldUnix := global.NextCronTime.Unix()
global.NextCronTime = common.GetNextTime(nextCronInterval) global.NextCronTime = common.GetNextTime(nextCronInterval)
global.UpdateTime = common.GetNextTime(0) global.UpdateTime = common.GetNextTime(0)
@ -342,7 +346,7 @@ func (s *BoltdbStore) TouchCronTime(global *TransGlobalStore, nextCronInterval i
err := boltGet().Update(func(t *bolt.Tx) error { err := boltGet().Update(func(t *bolt.Tx) error {
g := tGetGlobal(t, global.Gid) g := tGetGlobal(t, global.Gid)
if g == nil || g.Gid != global.Gid { if g == nil || g.Gid != global.Gid {
return ErrNotFound return storage.ErrNotFound
} }
tDelIndex(t, oldUnix, global.Gid) tDelIndex(t, oldUnix, global.Gid)
tPutGlobal(t, global) tPutGlobal(t, global)
@ -352,8 +356,8 @@ func (s *BoltdbStore) TouchCronTime(global *TransGlobalStore, nextCronInterval i
dtmimp.E2P(err) dtmimp.E2P(err)
} }
func (s *BoltdbStore) LockOneGlobalTrans(expireIn time.Duration) *TransGlobalStore { func (s *BoltdbStore) LockOneGlobalTrans(expireIn time.Duration) *storage.TransGlobalStore {
var trans *TransGlobalStore = nil var trans *storage.TransGlobalStore = nil
min := fmt.Sprintf("%d", time.Now().Add(expireIn).Unix()) min := fmt.Sprintf("%d", time.Now().Add(expireIn).Unix())
next := time.Now().Add(time.Duration(config.RetryInterval) * time.Second) next := time.Now().Add(time.Duration(config.RetryInterval) * time.Second)
err := boltGet().Update(func(t *bolt.Tx) error { err := boltGet().Update(func(t *bolt.Tx) error {
@ -361,7 +365,7 @@ func (s *BoltdbStore) LockOneGlobalTrans(expireIn time.Duration) *TransGlobalSto
for trans == nil { for trans == nil {
k, v := cursor.First() k, v := cursor.First()
if k == nil || string(k) > min { if k == nil || string(k) > min {
return ErrNotFound return storage.ErrNotFound
} }
trans = tGetGlobal(t, string(v)) trans = tGetGlobal(t, string(v))
err := t.Bucket(bucketIndex).Delete(k) err := t.Bucket(bucketIndex).Delete(k)
@ -372,7 +376,7 @@ func (s *BoltdbStore) LockOneGlobalTrans(expireIn time.Duration) *TransGlobalSto
tPutIndex(t, next.Unix(), trans.Gid) tPutIndex(t, next.Unix(), trans.Gid)
return nil return nil
}) })
if err == ErrNotFound { if err == storage.ErrNotFound {
return nil return nil
} }
dtmimp.E2P(err) dtmimp.E2P(err)

9
dtmsvr/storage/boltdb_test.go → dtmsvr/storage/boltdb/boltdb_test.go

@ -1,4 +1,4 @@
package storage package boltdb
import ( import (
"path" "path"
@ -9,6 +9,7 @@ import (
bolt "go.etcd.io/bbolt" bolt "go.etcd.io/bbolt"
"github.com/yedf/dtm/dtmcli/dtmimp" "github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmsvr/storage"
) )
func TestInitializeBuckets(t *testing.T) { func TestInitializeBuckets(t *testing.T) {
@ -69,7 +70,7 @@ func TestCleanupExpiredData(t *testing.T) {
doneTime := time.Now().Add(-10 * time.Minute) doneTime := time.Now().Add(-10 * time.Minute)
gids := []string{"gid0", "gid1", "gid2"} gids := []string{"gid0", "gid1", "gid2"}
gidDatas := []TransGlobalStore{ gidDatas := []storage.TransGlobalStore{
{}, // not finished {}, // not finished
{ {
FinishTime: &doneTime, FinishTime: &doneTime,
@ -196,7 +197,7 @@ func TestCleanupBranchWithGids(t *testing.T) {
} }
keys := []string{"a", "gid001", "gid002", "gid101", "gid201", "z"} keys := []string{"a", "gid001", "gid002", "gid101", "gid201", "z"}
datas := []TransBranchStore{ datas := []storage.TransBranchStore{
{ {
Gid: "a", Gid: "a",
}, },
@ -277,7 +278,7 @@ func TestCleanupIndexWithGids(t *testing.T) {
} }
keys := []string{"a", "0-gid0", "1-gid0", "2-gid1", "3-gid2", "z"} keys := []string{"a", "0-gid0", "1-gid0", "2-gid1", "3-gid2", "z"}
datas := []TransBranchStore{ datas := []storage.TransBranchStore{
{ {
Gid: "a", Gid: "a",
}, },

29
dtmsvr/storage/registry/registry.go

@ -0,0 +1,29 @@
package registry
import (
"time"
"github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmsvr/storage"
"github.com/yedf/dtm/dtmsvr/storage/boltdb"
)
var config = &common.Config
var stores map[string]storage.Store = map[string]storage.Store{
"redis": &storage.RedisStore{},
"mysql": &storage.SqlStore{},
"postgres": &storage.SqlStore{},
"boltdb": &boltdb.BoltdbStore{},
}
func GetStore() storage.Store {
return stores[config.Store.Driver]
}
// WaitStoreUp wait for db to go up
func WaitStoreUp() {
for err := GetStore().Ping(); err != nil; err = GetStore().Ping() {
time.Sleep(3 * time.Second)
}
}

18
dtmsvr/storage/store.go

@ -26,24 +26,6 @@ type Store interface {
LockOneGlobalTrans(expireIn time.Duration) *TransGlobalStore LockOneGlobalTrans(expireIn time.Duration) *TransGlobalStore
} }
var stores map[string]Store = map[string]Store{
"redis": &RedisStore{},
"mysql": &SqlStore{},
"postgres": &SqlStore{},
"boltdb": &BoltdbStore{},
}
func GetStore() Store {
return stores[config.Store.Driver]
}
// WaitStoreUp wait for db to go up
func WaitStoreUp() {
for err := GetStore().Ping(); err != nil; err = GetStore().Ping() {
time.Sleep(3 * time.Second)
}
}
func wrapError(err error) error { func wrapError(err error) error {
if err == gorm.ErrRecordNotFound || err == redis.Nil { if err == gorm.ErrRecordNotFound || err == redis.Nil {
return ErrNotFound return ErrNotFound

4
dtmsvr/utils.go

@ -11,9 +11,11 @@ import (
"time" "time"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/yedf/dtm/common" "github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmcli/dtmimp" "github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmsvr/storage" "github.com/yedf/dtm/dtmsvr/storage"
"github.com/yedf/dtm/dtmsvr/storage/registry"
) )
type branchStatus struct { type branchStatus struct {
@ -28,7 +30,7 @@ var e2p = dtmimp.E2P
var config = &common.Config var config = &common.Config
func GetStore() storage.Store { func GetStore() storage.Store {
return storage.GetStore() return registry.GetStore()
} }
// TransProcessedTestChan only for test usage. when transaction processed once, write gid to this chan // TransProcessedTestChan only for test usage. when transaction processed once, write gid to this chan

8
test/store_test.go

@ -5,8 +5,10 @@ import (
"time" "time"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/yedf/dtm/dtmcli/dtmimp" "github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmsvr/storage" "github.com/yedf/dtm/dtmsvr/storage"
"github.com/yedf/dtm/dtmsvr/storage/registry"
) )
func initTransGlobal(gid string) (*storage.TransGlobalStore, storage.Store) { func initTransGlobal(gid string) (*storage.TransGlobalStore, storage.Store) {
@ -15,7 +17,7 @@ func initTransGlobal(gid string) (*storage.TransGlobalStore, storage.Store) {
bs := []storage.TransBranchStore{ bs := []storage.TransBranchStore{
{Gid: gid, BranchID: "01"}, {Gid: gid, BranchID: "01"},
} }
s := storage.GetStore() s := registry.GetStore()
err := s.MaySaveNewTrans(g, bs) err := s.MaySaveNewTrans(g, bs)
dtmimp.E2P(err) dtmimp.E2P(err)
return g, s return g, s
@ -87,12 +89,12 @@ func TestStoreLockTrans(t *testing.T) {
} }
func TestStoreWait(t *testing.T) { func TestStoreWait(t *testing.T) {
storage.WaitStoreUp() registry.WaitStoreUp()
} }
func TestUpdateBranchSql(t *testing.T) { func TestUpdateBranchSql(t *testing.T) {
if !config.Store.IsDB() { if !config.Store.IsDB() {
r := storage.GetStore().UpdateBranchesSql(nil, nil) r := registry.GetStore().UpdateBranchesSql(nil, nil)
assert.Nil(t, r) assert.Nil(t, r)
} }
} }

Loading…
Cancel
Save