Browse Source

fix: add cleanup when bolt storage startup

pull/111/head
lsytj0413 4 years ago
parent
commit
c2d0a9c332
  1. 2
      .travis.yml
  2. 138
      dtmsvr/storage/boltdb.go
  3. 331
      dtmsvr/storage/boltdb_test.go
  4. 1
      go.mod

2
.travis.yml

@ -15,4 +15,4 @@ before_install:
- go get -t -v ./...
- go get github.com/yedf2/goveralls
script:
- $GOPATH/bin/goveralls -envs=TEST_STORE=redis,TEST_STORE=mysql,TEST_STORE=boltdb -service=travis-ci -ignore="examples/*,dtmgrpc/dtmgimp/*.pb.go,bench/*,test/*"
- $GOPATH/bin/goveralls -envs=TEST_STORE=redis,TEST_STORE=mysql,TEST_STORE=boltdb -flags '-gcflags=-l' -service=travis-ci -ignore="examples/*,dtmgrpc/dtmgimp/*.pb.go,bench/*,test/*"

138
dtmsvr/storage/boltdb.go

@ -2,6 +2,7 @@ package storage
import (
"fmt"
"strings"
"sync"
"time"
@ -23,16 +24,16 @@ func boltGet() *bolt.DB {
dtmimp.E2P(err)
// NOTE: we must ensure all buckets is exists before we use it
err = db.Update(func(t *bolt.Tx) error {
for _, bucket := range allBuckets {
_, err := t.CreateBucketIfNotExists(bucket)
if err != nil {
return err
}
}
err = initializeBuckets(db)
dtmimp.E2P(err)
return nil
})
// TODO:
// 1. refactor this code
// 2. make cleanup run period, to avoid the file growup when server long-running
err = cleanupExpiredData(
time.Duration(common.Config.Store.DataExpire)*time.Second,
db,
)
dtmimp.E2P(err)
boltDb = db
@ -40,12 +41,129 @@ func boltGet() *bolt.DB {
return boltDb
}
func initializeBuckets(db *bolt.DB) error {
return db.Update(func(t *bolt.Tx) error {
for _, bucket := range allBuckets {
_, err := t.CreateBucketIfNotExists(bucket)
if err != nil {
return err
}
}
return nil
})
}
// cleanupExpiredData will clean the expired data in boltdb, the
// expired time is configurable.
func cleanupExpiredData(expiredSeconds time.Duration, db *bolt.DB) error {
if expiredSeconds <= 0 {
return nil
}
lastKeepTime := time.Now().Add(-expiredSeconds)
return db.Update(func(t *bolt.Tx) error {
globalBucket := t.Bucket(bucketGlobal)
if globalBucket == nil {
return nil
}
expiredGids := map[string]struct{}{}
cursor := globalBucket.Cursor()
for k, v := cursor.First(); k != nil; k, v = cursor.Next() {
trans := TransGlobalStore{}
dtmimp.MustUnmarshal(v, &trans)
transDoneTime := trans.FinishTime
if transDoneTime == nil {
transDoneTime = trans.RollbackTime
}
if transDoneTime != nil && lastKeepTime.After(*transDoneTime) {
expiredGids[string(k)] = struct{}{}
}
}
cleanupGlobalWithGids(t, expiredGids)
cleanupBranchWithGids(t, expiredGids)
cleanupIndexWithGids(t, expiredGids)
return nil
})
}
func cleanupGlobalWithGids(t *bolt.Tx, gids map[string]struct{}) {
bucket := t.Bucket(bucketGlobal)
if bucket == nil {
return
}
dtmimp.Logf("Start to cleanup %d gids", len(gids))
for gid := range gids {
dtmimp.Logf("Start to delete gid: %s", gid)
bucket.Delete([]byte(gid))
}
}
func cleanupBranchWithGids(t *bolt.Tx, gids map[string]struct{}) {
bucket := t.Bucket(bucketBranches)
if bucket == nil {
return
}
// It's not safe if we delete the item when use cursor, for more detail see
// https://github.com/etcd-io/bbolt/issues/146
branchKeys := []string{}
for gid := range gids {
cursor := bucket.Cursor()
for k, v := cursor.Seek([]byte(gid)); k != nil; k, v = cursor.Next() {
b := TransBranchStore{}
dtmimp.MustUnmarshal(v, &b)
if b.Gid != gid {
break
}
branchKeys = append(branchKeys, string(k))
}
}
dtmimp.Logf("Start to cleanup %d branches", len(branchKeys))
for _, key := range branchKeys {
dtmimp.Logf("Start to delete branch: %s", key)
bucket.Delete([]byte(key))
}
}
func cleanupIndexWithGids(t *bolt.Tx, gids map[string]struct{}) {
bucket := t.Bucket(bucketIndex)
if bucket == nil {
return
}
indexKeys := []string{}
cursor := bucket.Cursor()
for k, _ := cursor.First(); k != nil; k, _ = cursor.Next() {
ks := strings.Split(string(k), "-")
if len(ks) != 2 {
continue
}
if _, ok := gids[ks[1]]; ok {
indexKeys = append(indexKeys, string(k))
}
}
dtmimp.Logf("Start to cleanup %d indexes", len(indexKeys))
for _, key := range indexKeys {
dtmimp.Logf("Start to delete index: %s", key)
bucket.Delete([]byte(key))
}
}
var bucketGlobal = []byte("global")
var bucketBranches = []byte("branches")
var bucketIndex = []byte("index")
var allBuckets = [][]byte{
bucketGlobal,
bucketBranches,
bucketGlobal,
bucketIndex,
}

331
dtmsvr/storage/boltdb_test.go

@ -0,0 +1,331 @@
package storage
import (
"path"
"testing"
"time"
. "github.com/onsi/gomega"
bolt "go.etcd.io/bbolt"
"github.com/yedf/dtm/dtmcli/dtmimp"
)
func TestInitializeBuckets(t *testing.T) {
t.Run("normal test", func(t *testing.T) {
g := NewWithT(t)
db, err := bolt.Open(path.Join(t.TempDir(), "./test.bolt"), 0666, &bolt.Options{Timeout: 1 * time.Second})
g.Expect(err).ToNot(HaveOccurred())
defer db.Close()
err = initializeBuckets(db)
g.Expect(err).ToNot(HaveOccurred())
actualBuckets := [][]byte{}
err = db.View(func(t *bolt.Tx) error {
return t.ForEach(func(name []byte, _ *bolt.Bucket) error {
actualBuckets = append(actualBuckets, name)
return nil
})
})
g.Expect(err).ToNot(HaveOccurred())
g.Expect(actualBuckets).To(Equal(allBuckets))
})
}
func TestCleanupExpiredData(t *testing.T) {
t.Run("negative expired seconds", func(t *testing.T) {
g := NewWithT(t)
db, err := bolt.Open(path.Join(t.TempDir(), "./test.bolt"), 0666, &bolt.Options{Timeout: 1 * time.Second})
g.Expect(err).ToNot(HaveOccurred())
defer db.Close()
err = cleanupExpiredData(-1*time.Second, db)
g.Expect(err).ToNot(HaveOccurred())
})
t.Run("nil global bucket", func(t *testing.T) {
g := NewWithT(t)
db, err := bolt.Open(path.Join(t.TempDir(), "./test.bolt"), 0666, &bolt.Options{Timeout: 1 * time.Second})
g.Expect(err).ToNot(HaveOccurred())
defer db.Close()
err = cleanupExpiredData(time.Second, db)
g.Expect(err).ToNot(HaveOccurred())
})
t.Run("normal test", func(t *testing.T) {
g := NewWithT(t)
db, err := bolt.Open(path.Join(t.TempDir(), "./test.bolt"), 0666, &bolt.Options{Timeout: 1 * time.Second})
g.Expect(err).ToNot(HaveOccurred())
defer db.Close()
// Initialize data
err = initializeBuckets(db)
g.Expect(err).ToNot(HaveOccurred())
err = db.Update(func(t *bolt.Tx) error {
doneTime := time.Now().Add(-10 * time.Minute)
gids := []string{"gid0", "gid1", "gid2"}
gidDatas := []TransGlobalStore{
{}, // not finished
{
FinishTime: &doneTime,
},
{
RollbackTime: &doneTime,
},
}
bucket := t.Bucket(bucketGlobal)
for i := 0; i < len(gids); i++ {
err = bucket.Put([]byte(gids[i]), dtmimp.MustMarshal(gidDatas[i]))
if err != nil {
return err
}
}
return nil
})
g.Expect(err).ToNot(HaveOccurred())
err = cleanupExpiredData(time.Minute, db)
g.Expect(err).ToNot(HaveOccurred())
actualGids := []string{}
err = db.View(func(t *bolt.Tx) error {
cursor := t.Bucket(bucketGlobal).Cursor()
for k, _ := cursor.First(); k != nil; k, _ = cursor.Next() {
actualGids = append(actualGids, string(k))
}
return nil
})
g.Expect(err).ToNot(HaveOccurred())
g.Expect(actualGids).To(Equal([]string{"gid0"}))
})
}
func TestCleanupGlobalWithGids(t *testing.T) {
t.Run("nil bucket", func(t *testing.T) {
g := NewWithT(t)
db, err := bolt.Open(path.Join(t.TempDir(), "./test.bolt"), 0666, &bolt.Options{Timeout: 1 * time.Second})
g.Expect(err).ToNot(HaveOccurred())
defer db.Close()
err = db.Update(func(t *bolt.Tx) error {
cleanupGlobalWithGids(t, nil)
return nil
})
g.Expect(err).ToNot(HaveOccurred())
})
t.Run("normal test", func(t *testing.T) {
g := NewWithT(t)
db, err := bolt.Open(path.Join(t.TempDir(), "./test.bolt"), 0666, &bolt.Options{Timeout: 1 * time.Second})
g.Expect(err).ToNot(HaveOccurred())
defer db.Close()
// Initialize data
err = db.Update(func(t *bolt.Tx) error {
bucket, err := t.CreateBucketIfNotExists(bucketGlobal)
if err != nil {
return err
}
keys := []string{"k1", "k2", "k3"}
datas := []string{"data1", "data2", "data3"}
for i := 0; i < len(keys); i++ {
err = bucket.Put([]byte(keys[i]), []byte(datas[i]))
if err != nil {
return err
}
}
return nil
})
g.Expect(err).ToNot(HaveOccurred())
err = db.Update(func(t *bolt.Tx) error {
cleanupGlobalWithGids(t, map[string]struct{}{
"k1": {},
"k2": {},
})
return nil
})
g.Expect(err).ToNot(HaveOccurred())
actualGids := []string{}
err = db.View(func(t *bolt.Tx) error {
cursor := t.Bucket(bucketGlobal).Cursor()
for k, _ := cursor.First(); k != nil; k, _ = cursor.Next() {
actualGids = append(actualGids, string(k))
}
return nil
})
g.Expect(err).ToNot(HaveOccurred())
g.Expect(actualGids).To(Equal([]string{"k3"}))
})
}
func TestCleanupBranchWithGids(t *testing.T) {
t.Run("nil bucket", func(t *testing.T) {
g := NewWithT(t)
db, err := bolt.Open(path.Join(t.TempDir(), "./test.bolt"), 0666, &bolt.Options{Timeout: 1 * time.Second})
g.Expect(err).ToNot(HaveOccurred())
defer db.Close()
err = db.Update(func(t *bolt.Tx) error {
cleanupBranchWithGids(t, nil)
return nil
})
g.Expect(err).ToNot(HaveOccurred())
})
t.Run("normal test", func(t *testing.T) {
g := NewWithT(t)
db, err := bolt.Open(path.Join(t.TempDir(), "./test.bolt"), 0666, &bolt.Options{Timeout: 1 * time.Second})
g.Expect(err).ToNot(HaveOccurred())
defer db.Close()
// Initialize data
err = db.Update(func(t *bolt.Tx) error {
bucket, err := t.CreateBucketIfNotExists(bucketBranches)
if err != nil {
return err
}
keys := []string{"a", "gid001", "gid002", "gid101", "gid201", "z"}
datas := []TransBranchStore{
{
Gid: "a",
},
{
Gid: "gid0",
},
{
Gid: "gid0",
},
{
Gid: "gid1",
},
{
Gid: "gid2",
},
{
Gid: "z",
},
}
for i := 0; i < len(keys); i++ {
err = bucket.Put([]byte(keys[i]), dtmimp.MustMarshal(datas[i]))
if err != nil {
return err
}
}
return nil
})
g.Expect(err).ToNot(HaveOccurred())
err = db.Update(func(t *bolt.Tx) error {
cleanupBranchWithGids(t, map[string]struct{}{
"gid0": {},
"gid1": {},
})
return nil
})
g.Expect(err).ToNot(HaveOccurred())
actualKeys := []string{}
err = db.View(func(t *bolt.Tx) error {
cursor := t.Bucket(bucketBranches).Cursor()
for k, _ := cursor.First(); k != nil; k, _ = cursor.Next() {
actualKeys = append(actualKeys, string(k))
}
return nil
})
g.Expect(err).ToNot(HaveOccurred())
g.Expect(actualKeys).To(Equal([]string{"a", "gid201", "z"}))
})
}
func TestCleanupIndexWithGids(t *testing.T) {
t.Run("nil bucket", func(t *testing.T) {
g := NewWithT(t)
db, err := bolt.Open(path.Join(t.TempDir(), "./test.bolt"), 0666, &bolt.Options{Timeout: 1 * time.Second})
g.Expect(err).ToNot(HaveOccurred())
defer db.Close()
err = db.Update(func(t *bolt.Tx) error {
cleanupIndexWithGids(t, nil)
return nil
})
g.Expect(err).ToNot(HaveOccurred())
})
t.Run("normal test", func(t *testing.T) {
g := NewWithT(t)
db, err := bolt.Open(path.Join(t.TempDir(), "./test.bolt"), 0666, &bolt.Options{Timeout: 1 * time.Second})
g.Expect(err).ToNot(HaveOccurred())
defer db.Close()
// Initialize data
err = db.Update(func(t *bolt.Tx) error {
bucket, err := t.CreateBucketIfNotExists(bucketIndex)
if err != nil {
return err
}
keys := []string{"a", "0-gid0", "1-gid0", "2-gid1", "3-gid2", "z"}
datas := []TransBranchStore{
{
Gid: "a",
},
{
Gid: "gid0",
},
{
Gid: "gid0",
},
{
Gid: "gid1",
},
{
Gid: "gid2",
},
{
Gid: "z",
},
}
for i := 0; i < len(keys); i++ {
err = bucket.Put([]byte(keys[i]), dtmimp.MustMarshal(datas[i]))
if err != nil {
return err
}
}
return nil
})
g.Expect(err).ToNot(HaveOccurred())
err = db.Update(func(t *bolt.Tx) error {
cleanupIndexWithGids(t, map[string]struct{}{
"gid0": {},
"gid1": {},
})
return nil
})
g.Expect(err).ToNot(HaveOccurred())
actualKeys := []string{}
err = db.View(func(t *bolt.Tx) error {
cursor := t.Bucket(bucketIndex).Cursor()
for k, _ := cursor.First(); k != nil; k, _ = cursor.Next() {
actualKeys = append(actualKeys, string(k))
}
return nil
})
g.Expect(err).ToNot(HaveOccurred())
g.Expect(actualKeys).To(Equal([]string{"3-gid2", "a", "z"}))
})
}

1
go.mod

@ -11,6 +11,7 @@ require (
github.com/google/uuid v1.3.0
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/lib/pq v1.10.3
github.com/onsi/gomega v1.16.0
github.com/polarismesh/grpc-go-polaris v0.0.0-20211128162137-1a59cd7b5733 // indirect
github.com/prometheus/client_golang v1.11.0
github.com/stretchr/testify v1.7.0

Loading…
Cancel
Save