Browse Source

Merge branch 'lsytj0413-refactor-storage' into main

pull/146/head
yedf2 4 years ago
parent
commit
388366da8b
  1. 8
      app/main.go
  2. 4
      bench/main.go
  3. 68
      dtmsvr/storage/boltdb/boltdb.go
  4. 9
      dtmsvr/storage/boltdb/boltdb_test.go
  5. 29
      dtmsvr/storage/registry/registry.go
  6. 18
      dtmsvr/storage/store.go
  7. 4
      dtmsvr/utils.go
  8. 8
      test/store_test.go

8
app/main.go

@ -11,14 +11,14 @@ import (
"os"
"strings"
_ "go.uber.org/automaxprocs"
"github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmsvr"
"github.com/yedf/dtm/dtmsvr/storage"
"github.com/yedf/dtm/dtmsvr/storage/registry"
"github.com/yedf/dtm/examples"
_ "go.uber.org/automaxprocs"
)
var Version, Commit, Date string
@ -56,7 +56,7 @@ func main() {
dtmcli.SetCurrentDBType(common.Config.ExamplesDB.Driver)
}
if os.Args[1] != "dtmsvr" { // 实际线上运行,只启动dtmsvr,不准备table相关的数据
storage.WaitStoreUp()
registry.WaitStoreUp()
dtmsvr.PopulateDB(true)
examples.PopulateDB(true)
}

4
bench/main.go

@ -8,7 +8,7 @@ import (
"github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmsvr"
"github.com/yedf/dtm/dtmsvr/storage"
"github.com/yedf/dtm/dtmsvr/storage/registry"
"github.com/yedf/dtm/examples"
)
@ -28,7 +28,7 @@ func main() {
fmt.Println("start bench server")
common.MustLoadConfig()
dtmcli.SetCurrentDBType(common.Config.ExamplesDB.Driver)
storage.WaitStoreUp()
registry.WaitStoreUp()
dtmsvr.PopulateDB(true)
examples.PopulateDB(true)
dtmsvr.StartSvr() // 启动dtmsvr的api服务

68
dtmsvr/storage/boltdb.go → dtmsvr/storage/boltdb/boltdb.go

@ -1,4 +1,4 @@
package storage
package boltdb
import (
"fmt"
@ -6,12 +6,16 @@ import (
"sync"
"time"
"github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmcli/dtmimp"
bolt "go.etcd.io/bbolt"
"gorm.io/gorm"
"github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmsvr/storage"
)
var config = &common.Config
type BoltdbStore struct {
}
@ -71,7 +75,7 @@ func cleanupExpiredData(expiredSeconds time.Duration, db *bolt.DB) error {
expiredGids := map[string]struct{}{}
cursor := globalBucket.Cursor()
for k, v := cursor.First(); k != nil; k, v = cursor.Next() {
trans := TransGlobalStore{}
trans := storage.TransGlobalStore{}
dtmimp.MustUnmarshal(v, &trans)
transDoneTime := trans.FinishTime
@ -115,7 +119,7 @@ func cleanupBranchWithGids(t *bolt.Tx, gids map[string]struct{}) {
for gid := range gids {
cursor := bucket.Cursor()
for k, v := cursor.Seek([]byte(gid)); k != nil; k, v = cursor.Next() {
b := TransBranchStore{}
b := storage.TransBranchStore{}
dtmimp.MustUnmarshal(v, &b)
if b.Gid != gid {
break
@ -167,8 +171,8 @@ var allBuckets = [][]byte{
bucketIndex,
}
func tGetGlobal(t *bolt.Tx, gid string) *TransGlobalStore {
trans := TransGlobalStore{}
func tGetGlobal(t *bolt.Tx, gid string) *storage.TransGlobalStore {
trans := storage.TransGlobalStore{}
bs := t.Bucket(bucketGlobal).Get([]byte(gid))
if bs == nil {
return nil
@ -177,11 +181,11 @@ func tGetGlobal(t *bolt.Tx, gid string) *TransGlobalStore {
return &trans
}
func tGetBranches(t *bolt.Tx, gid string) []TransBranchStore {
branches := []TransBranchStore{}
func tGetBranches(t *bolt.Tx, gid string) []storage.TransBranchStore {
branches := []storage.TransBranchStore{}
cursor := t.Bucket(bucketBranches).Cursor()
for k, v := cursor.Seek([]byte(gid)); k != nil; k, v = cursor.Next() {
b := TransBranchStore{}
b := storage.TransBranchStore{}
dtmimp.MustUnmarshal(v, &b)
if b.Gid != gid {
break
@ -190,13 +194,13 @@ func tGetBranches(t *bolt.Tx, gid string) []TransBranchStore {
}
return branches
}
func tPutGlobal(t *bolt.Tx, global *TransGlobalStore) {
func tPutGlobal(t *bolt.Tx, global *storage.TransGlobalStore) {
bs := dtmimp.MustMarshal(global)
err := t.Bucket(bucketGlobal).Put([]byte(global.Gid), bs)
dtmimp.E2P(err)
}
func tPutBranches(t *bolt.Tx, branches []TransBranchStore, start int64) {
func tPutBranches(t *bolt.Tx, branches []storage.TransBranchStore, start int64) {
if start == -1 {
bs := tGetBranches(t, branches[0].Gid)
start = int64(len(bs))
@ -240,7 +244,7 @@ func (s *BoltdbStore) PopulateData(skipDrop bool) {
}
}
func (s *BoltdbStore) FindTransGlobalStore(gid string) (trans *TransGlobalStore) {
func (s *BoltdbStore) FindTransGlobalStore(gid string) (trans *storage.TransGlobalStore) {
err := boltGet().View(func(t *bolt.Tx) error {
trans = tGetGlobal(t, gid)
return nil
@ -249,15 +253,15 @@ func (s *BoltdbStore) FindTransGlobalStore(gid string) (trans *TransGlobalStore)
return
}
func (s *BoltdbStore) ScanTransGlobalStores(position *string, limit int64) []TransGlobalStore {
globals := []TransGlobalStore{}
func (s *BoltdbStore) ScanTransGlobalStores(position *string, limit int64) []storage.TransGlobalStore {
globals := []storage.TransGlobalStore{}
err := boltGet().View(func(t *bolt.Tx) error {
cursor := t.Bucket(bucketGlobal).Cursor()
for k, v := cursor.First(); k != nil; k, v = cursor.Next() {
if string(k) == *position {
continue
}
g := TransGlobalStore{}
g := storage.TransGlobalStore{}
dtmimp.MustUnmarshal(v, &g)
globals = append(globals, g)
if len(globals) == int(limit) {
@ -275,8 +279,8 @@ func (s *BoltdbStore) ScanTransGlobalStores(position *string, limit int64) []Tra
return globals
}
func (s *BoltdbStore) FindBranches(gid string) []TransBranchStore {
var branches []TransBranchStore = nil
func (s *BoltdbStore) FindBranches(gid string) []storage.TransBranchStore {
var branches []storage.TransBranchStore = nil
err := boltGet().View(func(t *bolt.Tx) error {
branches = tGetBranches(t, gid)
return nil
@ -285,18 +289,18 @@ func (s *BoltdbStore) FindBranches(gid string) []TransBranchStore {
return branches
}
func (s *BoltdbStore) UpdateBranchesSql(branches []TransBranchStore, updates []string) *gorm.DB {
func (s *BoltdbStore) UpdateBranchesSql(branches []storage.TransBranchStore, updates []string) *gorm.DB {
return nil // not implemented
}
func (s *BoltdbStore) LockGlobalSaveBranches(gid string, status string, branches []TransBranchStore, branchStart int) {
func (s *BoltdbStore) LockGlobalSaveBranches(gid string, status string, branches []storage.TransBranchStore, branchStart int) {
err := boltGet().Update(func(t *bolt.Tx) error {
g := tGetGlobal(t, gid)
if g == nil {
return ErrNotFound
return storage.ErrNotFound
}
if g.Status != status {
return ErrNotFound
return storage.ErrNotFound
}
tPutBranches(t, branches, int64(branchStart))
return nil
@ -304,11 +308,11 @@ func (s *BoltdbStore) LockGlobalSaveBranches(gid string, status string, branches
dtmimp.E2P(err)
}
func (s *BoltdbStore) MaySaveNewTrans(global *TransGlobalStore, branches []TransBranchStore) error {
func (s *BoltdbStore) MaySaveNewTrans(global *storage.TransGlobalStore, branches []storage.TransBranchStore) error {
return boltGet().Update(func(t *bolt.Tx) error {
g := tGetGlobal(t, global.Gid)
if g != nil {
return ErrUniqueConflict
return storage.ErrUniqueConflict
}
tPutGlobal(t, global)
tPutIndex(t, global.NextCronTime.Unix(), global.Gid)
@ -317,13 +321,13 @@ func (s *BoltdbStore) MaySaveNewTrans(global *TransGlobalStore, branches []Trans
})
}
func (s *BoltdbStore) ChangeGlobalStatus(global *TransGlobalStore, newStatus string, updates []string, finished bool) {
func (s *BoltdbStore) ChangeGlobalStatus(global *storage.TransGlobalStore, newStatus string, updates []string, finished bool) {
old := global.Status
global.Status = newStatus
err := boltGet().Update(func(t *bolt.Tx) error {
g := tGetGlobal(t, global.Gid)
if g == nil || g.Status != old {
return ErrNotFound
return storage.ErrNotFound
}
if finished {
tDelIndex(t, g.NextCronTime.Unix(), g.Gid)
@ -334,7 +338,7 @@ func (s *BoltdbStore) ChangeGlobalStatus(global *TransGlobalStore, newStatus str
dtmimp.E2P(err)
}
func (s *BoltdbStore) TouchCronTime(global *TransGlobalStore, nextCronInterval int64) {
func (s *BoltdbStore) TouchCronTime(global *storage.TransGlobalStore, nextCronInterval int64) {
oldUnix := global.NextCronTime.Unix()
global.NextCronTime = common.GetNextTime(nextCronInterval)
global.UpdateTime = common.GetNextTime(0)
@ -342,7 +346,7 @@ func (s *BoltdbStore) TouchCronTime(global *TransGlobalStore, nextCronInterval i
err := boltGet().Update(func(t *bolt.Tx) error {
g := tGetGlobal(t, global.Gid)
if g == nil || g.Gid != global.Gid {
return ErrNotFound
return storage.ErrNotFound
}
tDelIndex(t, oldUnix, global.Gid)
tPutGlobal(t, global)
@ -352,8 +356,8 @@ func (s *BoltdbStore) TouchCronTime(global *TransGlobalStore, nextCronInterval i
dtmimp.E2P(err)
}
func (s *BoltdbStore) LockOneGlobalTrans(expireIn time.Duration) *TransGlobalStore {
var trans *TransGlobalStore = nil
func (s *BoltdbStore) LockOneGlobalTrans(expireIn time.Duration) *storage.TransGlobalStore {
var trans *storage.TransGlobalStore = nil
min := fmt.Sprintf("%d", time.Now().Add(expireIn).Unix())
next := time.Now().Add(time.Duration(config.RetryInterval) * time.Second)
err := boltGet().Update(func(t *bolt.Tx) error {
@ -361,7 +365,7 @@ func (s *BoltdbStore) LockOneGlobalTrans(expireIn time.Duration) *TransGlobalSto
for trans == nil {
k, v := cursor.First()
if k == nil || string(k) > min {
return ErrNotFound
return storage.ErrNotFound
}
trans = tGetGlobal(t, string(v))
err := t.Bucket(bucketIndex).Delete(k)
@ -372,7 +376,7 @@ func (s *BoltdbStore) LockOneGlobalTrans(expireIn time.Duration) *TransGlobalSto
tPutIndex(t, next.Unix(), trans.Gid)
return nil
})
if err == ErrNotFound {
if err == storage.ErrNotFound {
return nil
}
dtmimp.E2P(err)

9
dtmsvr/storage/boltdb_test.go → dtmsvr/storage/boltdb/boltdb_test.go

@ -1,4 +1,4 @@
package storage
package boltdb
import (
"path"
@ -9,6 +9,7 @@ import (
bolt "go.etcd.io/bbolt"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmsvr/storage"
)
func TestInitializeBuckets(t *testing.T) {
@ -69,7 +70,7 @@ func TestCleanupExpiredData(t *testing.T) {
doneTime := time.Now().Add(-10 * time.Minute)
gids := []string{"gid0", "gid1", "gid2"}
gidDatas := []TransGlobalStore{
gidDatas := []storage.TransGlobalStore{
{}, // not finished
{
FinishTime: &doneTime,
@ -196,7 +197,7 @@ func TestCleanupBranchWithGids(t *testing.T) {
}
keys := []string{"a", "gid001", "gid002", "gid101", "gid201", "z"}
datas := []TransBranchStore{
datas := []storage.TransBranchStore{
{
Gid: "a",
},
@ -277,7 +278,7 @@ func TestCleanupIndexWithGids(t *testing.T) {
}
keys := []string{"a", "0-gid0", "1-gid0", "2-gid1", "3-gid2", "z"}
datas := []TransBranchStore{
datas := []storage.TransBranchStore{
{
Gid: "a",
},

29
dtmsvr/storage/registry/registry.go

@ -0,0 +1,29 @@
package registry
import (
"time"
"github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmsvr/storage"
"github.com/yedf/dtm/dtmsvr/storage/boltdb"
)
var config = &common.Config
var stores map[string]storage.Store = map[string]storage.Store{
"redis": &storage.RedisStore{},
"mysql": &storage.SqlStore{},
"postgres": &storage.SqlStore{},
"boltdb": &boltdb.BoltdbStore{},
}
func GetStore() storage.Store {
return stores[config.Store.Driver]
}
// WaitStoreUp wait for db to go up
func WaitStoreUp() {
for err := GetStore().Ping(); err != nil; err = GetStore().Ping() {
time.Sleep(3 * time.Second)
}
}

18
dtmsvr/storage/store.go

@ -26,24 +26,6 @@ type Store interface {
LockOneGlobalTrans(expireIn time.Duration) *TransGlobalStore
}
var stores map[string]Store = map[string]Store{
"redis": &RedisStore{},
"mysql": &SqlStore{},
"postgres": &SqlStore{},
"boltdb": &BoltdbStore{},
}
func GetStore() Store {
return stores[config.Store.Driver]
}
// WaitStoreUp wait for db to go up
func WaitStoreUp() {
for err := GetStore().Ping(); err != nil; err = GetStore().Ping() {
time.Sleep(3 * time.Second)
}
}
func wrapError(err error) error {
if err == gorm.ErrRecordNotFound || err == redis.Nil {
return ErrNotFound

4
dtmsvr/utils.go

@ -11,9 +11,11 @@ import (
"time"
"github.com/google/uuid"
"github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmsvr/storage"
"github.com/yedf/dtm/dtmsvr/storage/registry"
)
type branchStatus struct {
@ -28,7 +30,7 @@ var e2p = dtmimp.E2P
var config = &common.Config
func GetStore() storage.Store {
return storage.GetStore()
return registry.GetStore()
}
// TransProcessedTestChan only for test usage. when transaction processed once, write gid to this chan

8
test/store_test.go

@ -5,8 +5,10 @@ import (
"time"
"github.com/stretchr/testify/assert"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmsvr/storage"
"github.com/yedf/dtm/dtmsvr/storage/registry"
)
func initTransGlobal(gid string) (*storage.TransGlobalStore, storage.Store) {
@ -15,7 +17,7 @@ func initTransGlobal(gid string) (*storage.TransGlobalStore, storage.Store) {
bs := []storage.TransBranchStore{
{Gid: gid, BranchID: "01"},
}
s := storage.GetStore()
s := registry.GetStore()
err := s.MaySaveNewTrans(g, bs)
dtmimp.E2P(err)
return g, s
@ -87,12 +89,12 @@ func TestStoreLockTrans(t *testing.T) {
}
func TestStoreWait(t *testing.T) {
storage.WaitStoreUp()
registry.WaitStoreUp()
}
func TestUpdateBranchSql(t *testing.T) {
if !config.Store.IsDB() {
r := storage.GetStore().UpdateBranchesSql(nil, nil)
r := registry.GetStore().UpdateBranchesSql(nil, nil)
assert.Nil(t, r)
}
}

Loading…
Cancel
Save