Browse Source

Merge pull request #471 from dtm-labs/alpha

use a new algorithm to do lock one trans
pull/473/head v1.17.8
yedf2 2 years ago
committed by GitHub
parent
commit
dedf72ea95
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 92
      dtmsvr/storage/boltdb/boltdb_test.go
  2. 23
      dtmsvr/storage/sql/sql.go
  3. 12
      dtmsvr/trans_status.go

92
dtmsvr/storage/boltdb/boltdb_test.go

@ -11,7 +11,7 @@ import (
"testing"
"time"
. "github.com/onsi/gomega"
ga "github.com/onsi/gomega"
bolt "go.etcd.io/bbolt"
"github.com/dtm-labs/dtm/client/dtmcli/dtmimp"
@ -20,13 +20,13 @@ import (
func TestInitializeBuckets(t *testing.T) {
t.Run("normal test", func(t *testing.T) {
g := NewWithT(t)
g := ga.NewWithT(t)
db, err := bolt.Open(path.Join(t.TempDir(), "./test.bolt"), 0666, &bolt.Options{Timeout: 1 * time.Second})
g.Expect(err).ToNot(HaveOccurred())
g.Expect(err).ToNot(ga.HaveOccurred())
defer db.Close()
err = initializeBuckets(db)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(err).ToNot(ga.HaveOccurred())
actualBuckets := [][]byte{}
err = db.View(func(t *bolt.Tx) error {
@ -35,42 +35,42 @@ func TestInitializeBuckets(t *testing.T) {
return nil
})
})
g.Expect(err).ToNot(HaveOccurred())
g.Expect(err).ToNot(ga.HaveOccurred())
g.Expect(actualBuckets).To(Equal(allBuckets))
g.Expect(actualBuckets).To(ga.Equal(allBuckets))
})
}
func TestCleanupExpiredData(t *testing.T) {
t.Run("negative expired seconds", func(t *testing.T) {
g := NewWithT(t)
g := ga.NewWithT(t)
db, err := bolt.Open(path.Join(t.TempDir(), "./test.bolt"), 0666, &bolt.Options{Timeout: 1 * time.Second})
g.Expect(err).ToNot(HaveOccurred())
g.Expect(err).ToNot(ga.HaveOccurred())
defer db.Close()
err = cleanupExpiredData(-1*time.Second, db)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(err).ToNot(ga.HaveOccurred())
})
t.Run("nil global bucket", func(t *testing.T) {
g := NewWithT(t)
g := ga.NewWithT(t)
db, err := bolt.Open(path.Join(t.TempDir(), "./test.bolt"), 0666, &bolt.Options{Timeout: 1 * time.Second})
g.Expect(err).ToNot(HaveOccurred())
g.Expect(err).ToNot(ga.HaveOccurred())
defer db.Close()
err = cleanupExpiredData(time.Second, db)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(err).ToNot(ga.HaveOccurred())
})
t.Run("normal test", func(t *testing.T) {
g := NewWithT(t)
g := ga.NewWithT(t)
db, err := bolt.Open(path.Join(t.TempDir(), "./test.bolt"), 0666, &bolt.Options{Timeout: 1 * time.Second})
g.Expect(err).ToNot(HaveOccurred())
g.Expect(err).ToNot(ga.HaveOccurred())
defer db.Close()
// Initialize data
err = initializeBuckets(db)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(err).ToNot(ga.HaveOccurred())
err = db.Update(func(t *bolt.Tx) error {
doneTime := time.Now().Add(-10 * time.Minute)
@ -95,10 +95,10 @@ func TestCleanupExpiredData(t *testing.T) {
return nil
})
g.Expect(err).ToNot(HaveOccurred())
g.Expect(err).ToNot(ga.HaveOccurred())
err = cleanupExpiredData(time.Minute, db)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(err).ToNot(ga.HaveOccurred())
actualGids := []string{}
err = db.View(func(t *bolt.Tx) error {
@ -108,29 +108,29 @@ func TestCleanupExpiredData(t *testing.T) {
}
return nil
})
g.Expect(err).ToNot(HaveOccurred())
g.Expect(actualGids).To(Equal([]string{"gid0"}))
g.Expect(err).ToNot(ga.HaveOccurred())
g.Expect(actualGids).To(ga.Equal([]string{"gid0"}))
})
}
func TestCleanupGlobalWithGids(t *testing.T) {
t.Run("nil bucket", func(t *testing.T) {
g := NewWithT(t)
g := ga.NewWithT(t)
db, err := bolt.Open(path.Join(t.TempDir(), "./test.bolt"), 0666, &bolt.Options{Timeout: 1 * time.Second})
g.Expect(err).ToNot(HaveOccurred())
g.Expect(err).ToNot(ga.HaveOccurred())
defer db.Close()
err = db.Update(func(t *bolt.Tx) error {
cleanupGlobalWithGids(t, nil)
return nil
})
g.Expect(err).ToNot(HaveOccurred())
g.Expect(err).ToNot(ga.HaveOccurred())
})
t.Run("normal test", func(t *testing.T) {
g := NewWithT(t)
g := ga.NewWithT(t)
db, err := bolt.Open(path.Join(t.TempDir(), "./test.bolt"), 0666, &bolt.Options{Timeout: 1 * time.Second})
g.Expect(err).ToNot(HaveOccurred())
g.Expect(err).ToNot(ga.HaveOccurred())
defer db.Close()
// Initialize data
@ -151,7 +151,7 @@ func TestCleanupGlobalWithGids(t *testing.T) {
return nil
})
g.Expect(err).ToNot(HaveOccurred())
g.Expect(err).ToNot(ga.HaveOccurred())
err = db.Update(func(t *bolt.Tx) error {
cleanupGlobalWithGids(t, map[string]struct{}{
@ -160,7 +160,7 @@ func TestCleanupGlobalWithGids(t *testing.T) {
})
return nil
})
g.Expect(err).ToNot(HaveOccurred())
g.Expect(err).ToNot(ga.HaveOccurred())
actualGids := []string{}
err = db.View(func(t *bolt.Tx) error {
@ -170,29 +170,29 @@ func TestCleanupGlobalWithGids(t *testing.T) {
}
return nil
})
g.Expect(err).ToNot(HaveOccurred())
g.Expect(actualGids).To(Equal([]string{"k3"}))
g.Expect(err).ToNot(ga.HaveOccurred())
g.Expect(actualGids).To(ga.Equal([]string{"k3"}))
})
}
func TestCleanupBranchWithGids(t *testing.T) {
t.Run("nil bucket", func(t *testing.T) {
g := NewWithT(t)
g := ga.NewWithT(t)
db, err := bolt.Open(path.Join(t.TempDir(), "./test.bolt"), 0666, &bolt.Options{Timeout: 1 * time.Second})
g.Expect(err).ToNot(HaveOccurred())
g.Expect(err).ToNot(ga.HaveOccurred())
defer db.Close()
err = db.Update(func(t *bolt.Tx) error {
cleanupBranchWithGids(t, nil)
return nil
})
g.Expect(err).ToNot(HaveOccurred())
g.Expect(err).ToNot(ga.HaveOccurred())
})
t.Run("normal test", func(t *testing.T) {
g := NewWithT(t)
g := ga.NewWithT(t)
db, err := bolt.Open(path.Join(t.TempDir(), "./test.bolt"), 0666, &bolt.Options{Timeout: 1 * time.Second})
g.Expect(err).ToNot(HaveOccurred())
g.Expect(err).ToNot(ga.HaveOccurred())
defer db.Close()
// Initialize data
@ -232,7 +232,7 @@ func TestCleanupBranchWithGids(t *testing.T) {
return nil
})
g.Expect(err).ToNot(HaveOccurred())
g.Expect(err).ToNot(ga.HaveOccurred())
err = db.Update(func(t *bolt.Tx) error {
cleanupBranchWithGids(t, map[string]struct{}{
@ -241,7 +241,7 @@ func TestCleanupBranchWithGids(t *testing.T) {
})
return nil
})
g.Expect(err).ToNot(HaveOccurred())
g.Expect(err).ToNot(ga.HaveOccurred())
actualKeys := []string{}
err = db.View(func(t *bolt.Tx) error {
@ -251,29 +251,29 @@ func TestCleanupBranchWithGids(t *testing.T) {
}
return nil
})
g.Expect(err).ToNot(HaveOccurred())
g.Expect(actualKeys).To(Equal([]string{"a", "gid201", "z"}))
g.Expect(err).ToNot(ga.HaveOccurred())
g.Expect(actualKeys).To(ga.Equal([]string{"a", "gid201", "z"}))
})
}
func TestCleanupIndexWithGids(t *testing.T) {
t.Run("nil bucket", func(t *testing.T) {
g := NewWithT(t)
g := ga.NewWithT(t)
db, err := bolt.Open(path.Join(t.TempDir(), "./test.bolt"), 0666, &bolt.Options{Timeout: 1 * time.Second})
g.Expect(err).ToNot(HaveOccurred())
g.Expect(err).ToNot(ga.HaveOccurred())
defer db.Close()
err = db.Update(func(t *bolt.Tx) error {
cleanupIndexWithGids(t, nil)
return nil
})
g.Expect(err).ToNot(HaveOccurred())
g.Expect(err).ToNot(ga.HaveOccurred())
})
t.Run("normal test", func(t *testing.T) {
g := NewWithT(t)
g := ga.NewWithT(t)
db, err := bolt.Open(path.Join(t.TempDir(), "./test.bolt"), 0666, &bolt.Options{Timeout: 1 * time.Second})
g.Expect(err).ToNot(HaveOccurred())
g.Expect(err).ToNot(ga.HaveOccurred())
defer db.Close()
// Initialize data
@ -313,7 +313,7 @@ func TestCleanupIndexWithGids(t *testing.T) {
return nil
})
g.Expect(err).ToNot(HaveOccurred())
g.Expect(err).ToNot(ga.HaveOccurred())
err = db.Update(func(t *bolt.Tx) error {
cleanupIndexWithGids(t, map[string]struct{}{
@ -322,7 +322,7 @@ func TestCleanupIndexWithGids(t *testing.T) {
})
return nil
})
g.Expect(err).ToNot(HaveOccurred())
g.Expect(err).ToNot(ga.HaveOccurred())
actualKeys := []string{}
err = db.View(func(t *bolt.Tx) error {
@ -332,7 +332,7 @@ func TestCleanupIndexWithGids(t *testing.T) {
}
return nil
})
g.Expect(err).ToNot(HaveOccurred())
g.Expect(actualKeys).To(Equal([]string{"3-gid2", "a", "z"}))
g.Expect(err).ToNot(ga.HaveOccurred())
g.Expect(actualKeys).To(ga.Equal([]string{"3-gid2", "a", "z"}))
})
}

23
dtmsvr/storage/sql/sql.go

@ -8,6 +8,8 @@
package sql
import (
"database/sql"
"errors"
"fmt"
"math"
"time"
@ -159,23 +161,26 @@ func (s *Store) LockOneGlobalTrans(expireIn time.Duration) *storage.TransGlobalS
db := dbGet()
owner := shortuuid.New()
nextCronTime := getTimeStr(int64(expireIn / time.Second))
where := map[string]string{
dtmimp.DBTypeMysql: fmt.Sprintf(`next_cron_time < '%s' and status in ('prepared', 'aborting', 'submitted') limit 1`, nextCronTime),
dtmimp.DBTypePostgres: fmt.Sprintf(`id in (select id from trans_global where next_cron_time < '%s' and status in ('prepared', 'aborting', 'submitted') limit 1 )`, nextCronTime),
where := fmt.Sprintf(`next_cron_time < '%s' and status in ('prepared', 'aborting', 'submitted')`, nextCronTime)
order := map[string]string{
dtmimp.DBTypeMysql: `order by rand()`,
dtmimp.DBTypePostgres: `order by random()`,
}[conf.Store.Driver]
ssql := fmt.Sprintf(`select count(1) from trans_global where %s`, where)
var cnt int64
err := db.ToSQLDB().QueryRow(ssql).Scan(&cnt)
dtmimp.PanicIf(err != nil, err)
if cnt == 0 {
ssql := fmt.Sprintf(`select id from trans_global where %s %s limit 1`, where, order)
var id int64
err := db.ToSQLDB().QueryRow(ssql).Scan(&id)
if errors.Is(err, sql.ErrNoRows) {
return nil
}
dtmimp.PanicIf(err != nil, err)
sql := fmt.Sprintf(`UPDATE trans_global SET update_time='%s',next_cron_time='%s', owner='%s' WHERE %s`,
sql := fmt.Sprintf(`UPDATE trans_global SET update_time='%s',next_cron_time='%s', owner='%s' WHERE id=%d and %s`,
getTimeStr(0),
getTimeStr(conf.RetryInterval),
owner,
id,
where)
affected, err := dtmimp.DBExec(conf.Store.Driver, db.ToSQLDB(), sql)

12
dtmsvr/trans_status.go

@ -262,13 +262,15 @@ func (t *TransGlobal) execBranch(ctx context.Context, branch *TransBranch, branc
func (t *TransGlobal) getNextCronInterval(ctype cronType) int64 {
if ctype == cronBackoff {
return t.NextCronInterval * 2
} else if ctype == cronKeep {
}
if ctype == cronKeep {
return t.NextCronInterval
} else if t.RetryInterval != 0 {
}
if t.RetryInterval != 0 {
return t.RetryInterval
} else if t.TimeoutToFail > 0 && t.TimeoutToFail < conf.RetryInterval {
}
if t.TimeoutToFail > 0 && t.TimeoutToFail < conf.RetryInterval {
return t.TimeoutToFail
} else {
return conf.RetryInterval
}
return conf.RetryInterval
}

Loading…
Cancel
Save