diff --git a/.travis.yml b/.travis.yml index 9a08ffa..5fde38f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -15,4 +15,4 @@ before_install: - go get -t -v ./... - go get github.com/yedf2/goveralls script: - - $GOPATH/bin/goveralls -envs=TEST_STORE=redis,TEST_STORE=mysql,TEST_STORE=boltdb -service=travis-ci -ignore="examples/*,dtmgrpc/dtmgimp/*.pb.go,bench/*,test/*" + - $GOPATH/bin/goveralls -envs=TEST_STORE=redis,TEST_STORE=mysql,TEST_STORE=boltdb -flags '-gcflags=-l' -service=travis-ci -ignore="examples/*,dtmgrpc/dtmgimp/*.pb.go,bench/*,test/*" diff --git a/dtmsvr/storage/boltdb.go b/dtmsvr/storage/boltdb.go index ee3f3f0..217528e 100644 --- a/dtmsvr/storage/boltdb.go +++ b/dtmsvr/storage/boltdb.go @@ -2,6 +2,7 @@ package storage import ( "fmt" + "strings" "sync" "time" @@ -23,16 +24,16 @@ func boltGet() *bolt.DB { dtmimp.E2P(err) // NOTE: we must ensure all buckets is exists before we use it - err = db.Update(func(t *bolt.Tx) error { - for _, bucket := range allBuckets { - _, err := t.CreateBucketIfNotExists(bucket) - if err != nil { - return err - } - } + err = initializeBuckets(db) + dtmimp.E2P(err) - return nil - }) + // TODO: + // 1. refactor this code + // 2. make cleanup run period, to avoid the file growup when server long-running + err = cleanupExpiredData( + time.Duration(common.Config.Store.DataExpire)*time.Second, + db, + ) dtmimp.E2P(err) boltDb = db @@ -40,12 +41,129 @@ func boltGet() *bolt.DB { return boltDb } +func initializeBuckets(db *bolt.DB) error { + return db.Update(func(t *bolt.Tx) error { + for _, bucket := range allBuckets { + _, err := t.CreateBucketIfNotExists(bucket) + if err != nil { + return err + } + } + + return nil + }) +} + +// cleanupExpiredData will clean the expired data in boltdb, the +// expired time is configurable. +func cleanupExpiredData(expiredSeconds time.Duration, db *bolt.DB) error { + if expiredSeconds <= 0 { + return nil + } + + lastKeepTime := time.Now().Add(-expiredSeconds) + return db.Update(func(t *bolt.Tx) error { + globalBucket := t.Bucket(bucketGlobal) + if globalBucket == nil { + return nil + } + + expiredGids := map[string]struct{}{} + cursor := globalBucket.Cursor() + for k, v := cursor.First(); k != nil; k, v = cursor.Next() { + trans := TransGlobalStore{} + dtmimp.MustUnmarshal(v, &trans) + + transDoneTime := trans.FinishTime + if transDoneTime == nil { + transDoneTime = trans.RollbackTime + } + if transDoneTime != nil && lastKeepTime.After(*transDoneTime) { + expiredGids[string(k)] = struct{}{} + } + } + + cleanupGlobalWithGids(t, expiredGids) + cleanupBranchWithGids(t, expiredGids) + cleanupIndexWithGids(t, expiredGids) + return nil + }) +} + +func cleanupGlobalWithGids(t *bolt.Tx, gids map[string]struct{}) { + bucket := t.Bucket(bucketGlobal) + if bucket == nil { + return + } + + dtmimp.Logf("Start to cleanup %d gids", len(gids)) + for gid := range gids { + dtmimp.Logf("Start to delete gid: %s", gid) + bucket.Delete([]byte(gid)) + } +} + +func cleanupBranchWithGids(t *bolt.Tx, gids map[string]struct{}) { + bucket := t.Bucket(bucketBranches) + if bucket == nil { + return + } + + // It's not safe if we delete the item when use cursor, for more detail see + // https://github.com/etcd-io/bbolt/issues/146 + branchKeys := []string{} + for gid := range gids { + cursor := bucket.Cursor() + for k, v := cursor.Seek([]byte(gid)); k != nil; k, v = cursor.Next() { + b := TransBranchStore{} + dtmimp.MustUnmarshal(v, &b) + if b.Gid != gid { + break + } + + branchKeys = append(branchKeys, string(k)) + } + } + + dtmimp.Logf("Start to cleanup %d branches", len(branchKeys)) + for _, key := range branchKeys { + dtmimp.Logf("Start to delete branch: %s", key) + bucket.Delete([]byte(key)) + } +} + +func cleanupIndexWithGids(t *bolt.Tx, gids map[string]struct{}) { + bucket := t.Bucket(bucketIndex) + if bucket == nil { + return + } + + indexKeys := []string{} + cursor := bucket.Cursor() + for k, _ := cursor.First(); k != nil; k, _ = cursor.Next() { + ks := strings.Split(string(k), "-") + if len(ks) != 2 { + continue + } + + if _, ok := gids[ks[1]]; ok { + indexKeys = append(indexKeys, string(k)) + } + } + + dtmimp.Logf("Start to cleanup %d indexes", len(indexKeys)) + for _, key := range indexKeys { + dtmimp.Logf("Start to delete index: %s", key) + bucket.Delete([]byte(key)) + } +} + var bucketGlobal = []byte("global") var bucketBranches = []byte("branches") var bucketIndex = []byte("index") var allBuckets = [][]byte{ - bucketGlobal, bucketBranches, + bucketGlobal, bucketIndex, } diff --git a/dtmsvr/storage/boltdb_test.go b/dtmsvr/storage/boltdb_test.go new file mode 100644 index 0000000..c1656f4 --- /dev/null +++ b/dtmsvr/storage/boltdb_test.go @@ -0,0 +1,331 @@ +package storage + +import ( + "path" + "testing" + "time" + + . "github.com/onsi/gomega" + bolt "go.etcd.io/bbolt" + + "github.com/yedf/dtm/dtmcli/dtmimp" +) + +func TestInitializeBuckets(t *testing.T) { + t.Run("normal test", func(t *testing.T) { + g := NewWithT(t) + db, err := bolt.Open(path.Join(t.TempDir(), "./test.bolt"), 0666, &bolt.Options{Timeout: 1 * time.Second}) + g.Expect(err).ToNot(HaveOccurred()) + defer db.Close() + + err = initializeBuckets(db) + g.Expect(err).ToNot(HaveOccurred()) + + actualBuckets := [][]byte{} + err = db.View(func(t *bolt.Tx) error { + return t.ForEach(func(name []byte, _ *bolt.Bucket) error { + actualBuckets = append(actualBuckets, name) + return nil + }) + }) + g.Expect(err).ToNot(HaveOccurred()) + + g.Expect(actualBuckets).To(Equal(allBuckets)) + }) +} + +func TestCleanupExpiredData(t *testing.T) { + t.Run("negative expired seconds", func(t *testing.T) { + g := NewWithT(t) + db, err := bolt.Open(path.Join(t.TempDir(), "./test.bolt"), 0666, &bolt.Options{Timeout: 1 * time.Second}) + g.Expect(err).ToNot(HaveOccurred()) + defer db.Close() + + err = cleanupExpiredData(-1*time.Second, db) + g.Expect(err).ToNot(HaveOccurred()) + }) + + t.Run("nil global bucket", func(t *testing.T) { + g := NewWithT(t) + db, err := bolt.Open(path.Join(t.TempDir(), "./test.bolt"), 0666, &bolt.Options{Timeout: 1 * time.Second}) + g.Expect(err).ToNot(HaveOccurred()) + defer db.Close() + + err = cleanupExpiredData(time.Second, db) + g.Expect(err).ToNot(HaveOccurred()) + }) + + t.Run("normal test", func(t *testing.T) { + g := NewWithT(t) + db, err := bolt.Open(path.Join(t.TempDir(), "./test.bolt"), 0666, &bolt.Options{Timeout: 1 * time.Second}) + g.Expect(err).ToNot(HaveOccurred()) + defer db.Close() + + // Initialize data + err = initializeBuckets(db) + g.Expect(err).ToNot(HaveOccurred()) + + err = db.Update(func(t *bolt.Tx) error { + doneTime := time.Now().Add(-10 * time.Minute) + + gids := []string{"gid0", "gid1", "gid2"} + gidDatas := []TransGlobalStore{ + {}, // not finished + { + FinishTime: &doneTime, + }, + { + RollbackTime: &doneTime, + }, + } + bucket := t.Bucket(bucketGlobal) + for i := 0; i < len(gids); i++ { + err = bucket.Put([]byte(gids[i]), dtmimp.MustMarshal(gidDatas[i])) + if err != nil { + return err + } + } + + return nil + }) + g.Expect(err).ToNot(HaveOccurred()) + + err = cleanupExpiredData(time.Minute, db) + g.Expect(err).ToNot(HaveOccurred()) + + actualGids := []string{} + err = db.View(func(t *bolt.Tx) error { + cursor := t.Bucket(bucketGlobal).Cursor() + for k, _ := cursor.First(); k != nil; k, _ = cursor.Next() { + actualGids = append(actualGids, string(k)) + } + return nil + }) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(actualGids).To(Equal([]string{"gid0"})) + }) +} + +func TestCleanupGlobalWithGids(t *testing.T) { + t.Run("nil bucket", func(t *testing.T) { + g := NewWithT(t) + db, err := bolt.Open(path.Join(t.TempDir(), "./test.bolt"), 0666, &bolt.Options{Timeout: 1 * time.Second}) + g.Expect(err).ToNot(HaveOccurred()) + defer db.Close() + + err = db.Update(func(t *bolt.Tx) error { + cleanupGlobalWithGids(t, nil) + return nil + }) + g.Expect(err).ToNot(HaveOccurred()) + }) + + t.Run("normal test", func(t *testing.T) { + g := NewWithT(t) + db, err := bolt.Open(path.Join(t.TempDir(), "./test.bolt"), 0666, &bolt.Options{Timeout: 1 * time.Second}) + g.Expect(err).ToNot(HaveOccurred()) + defer db.Close() + + // Initialize data + err = db.Update(func(t *bolt.Tx) error { + bucket, err := t.CreateBucketIfNotExists(bucketGlobal) + if err != nil { + return err + } + + keys := []string{"k1", "k2", "k3"} + datas := []string{"data1", "data2", "data3"} + for i := 0; i < len(keys); i++ { + err = bucket.Put([]byte(keys[i]), []byte(datas[i])) + if err != nil { + return err + } + } + + return nil + }) + g.Expect(err).ToNot(HaveOccurred()) + + err = db.Update(func(t *bolt.Tx) error { + cleanupGlobalWithGids(t, map[string]struct{}{ + "k1": {}, + "k2": {}, + }) + return nil + }) + g.Expect(err).ToNot(HaveOccurred()) + + actualGids := []string{} + err = db.View(func(t *bolt.Tx) error { + cursor := t.Bucket(bucketGlobal).Cursor() + for k, _ := cursor.First(); k != nil; k, _ = cursor.Next() { + actualGids = append(actualGids, string(k)) + } + return nil + }) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(actualGids).To(Equal([]string{"k3"})) + }) +} + +func TestCleanupBranchWithGids(t *testing.T) { + t.Run("nil bucket", func(t *testing.T) { + g := NewWithT(t) + db, err := bolt.Open(path.Join(t.TempDir(), "./test.bolt"), 0666, &bolt.Options{Timeout: 1 * time.Second}) + g.Expect(err).ToNot(HaveOccurred()) + defer db.Close() + + err = db.Update(func(t *bolt.Tx) error { + cleanupBranchWithGids(t, nil) + return nil + }) + g.Expect(err).ToNot(HaveOccurred()) + }) + + t.Run("normal test", func(t *testing.T) { + g := NewWithT(t) + db, err := bolt.Open(path.Join(t.TempDir(), "./test.bolt"), 0666, &bolt.Options{Timeout: 1 * time.Second}) + g.Expect(err).ToNot(HaveOccurred()) + defer db.Close() + + // Initialize data + err = db.Update(func(t *bolt.Tx) error { + bucket, err := t.CreateBucketIfNotExists(bucketBranches) + if err != nil { + return err + } + + keys := []string{"a", "gid001", "gid002", "gid101", "gid201", "z"} + datas := []TransBranchStore{ + { + Gid: "a", + }, + { + Gid: "gid0", + }, + { + Gid: "gid0", + }, + { + Gid: "gid1", + }, + { + Gid: "gid2", + }, + { + Gid: "z", + }, + } + for i := 0; i < len(keys); i++ { + err = bucket.Put([]byte(keys[i]), dtmimp.MustMarshal(datas[i])) + if err != nil { + return err + } + } + + return nil + }) + g.Expect(err).ToNot(HaveOccurred()) + + err = db.Update(func(t *bolt.Tx) error { + cleanupBranchWithGids(t, map[string]struct{}{ + "gid0": {}, + "gid1": {}, + }) + return nil + }) + g.Expect(err).ToNot(HaveOccurred()) + + actualKeys := []string{} + err = db.View(func(t *bolt.Tx) error { + cursor := t.Bucket(bucketBranches).Cursor() + for k, _ := cursor.First(); k != nil; k, _ = cursor.Next() { + actualKeys = append(actualKeys, string(k)) + } + return nil + }) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(actualKeys).To(Equal([]string{"a", "gid201", "z"})) + }) +} + +func TestCleanupIndexWithGids(t *testing.T) { + t.Run("nil bucket", func(t *testing.T) { + g := NewWithT(t) + db, err := bolt.Open(path.Join(t.TempDir(), "./test.bolt"), 0666, &bolt.Options{Timeout: 1 * time.Second}) + g.Expect(err).ToNot(HaveOccurred()) + defer db.Close() + + err = db.Update(func(t *bolt.Tx) error { + cleanupIndexWithGids(t, nil) + return nil + }) + g.Expect(err).ToNot(HaveOccurred()) + }) + + t.Run("normal test", func(t *testing.T) { + g := NewWithT(t) + db, err := bolt.Open(path.Join(t.TempDir(), "./test.bolt"), 0666, &bolt.Options{Timeout: 1 * time.Second}) + g.Expect(err).ToNot(HaveOccurred()) + defer db.Close() + + // Initialize data + err = db.Update(func(t *bolt.Tx) error { + bucket, err := t.CreateBucketIfNotExists(bucketIndex) + if err != nil { + return err + } + + keys := []string{"a", "0-gid0", "1-gid0", "2-gid1", "3-gid2", "z"} + datas := []TransBranchStore{ + { + Gid: "a", + }, + { + Gid: "gid0", + }, + { + Gid: "gid0", + }, + { + Gid: "gid1", + }, + { + Gid: "gid2", + }, + { + Gid: "z", + }, + } + for i := 0; i < len(keys); i++ { + err = bucket.Put([]byte(keys[i]), dtmimp.MustMarshal(datas[i])) + if err != nil { + return err + } + } + + return nil + }) + g.Expect(err).ToNot(HaveOccurred()) + + err = db.Update(func(t *bolt.Tx) error { + cleanupIndexWithGids(t, map[string]struct{}{ + "gid0": {}, + "gid1": {}, + }) + return nil + }) + g.Expect(err).ToNot(HaveOccurred()) + + actualKeys := []string{} + err = db.View(func(t *bolt.Tx) error { + cursor := t.Bucket(bucketIndex).Cursor() + for k, _ := cursor.First(); k != nil; k, _ = cursor.Next() { + actualKeys = append(actualKeys, string(k)) + } + return nil + }) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(actualKeys).To(Equal([]string{"3-gid2", "a", "z"})) + }) +} diff --git a/go.mod b/go.mod index 0c07635..5982ea1 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( github.com/google/uuid v1.3.0 github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/lib/pq v1.10.3 + github.com/onsi/gomega v1.16.0 github.com/polarismesh/grpc-go-polaris v0.0.0-20211128162137-1a59cd7b5733 // indirect github.com/prometheus/client_golang v1.11.0 github.com/stretchr/testify v1.7.0