Browse Source

support resetCronTime

pull/227/head
xyctruth 4 years ago
parent
commit
1d72b3842f
  1. 14
      dtmsvr/api_http.go
  2. 33
      dtmsvr/storage/boltdb/boltdb.go
  3. 25
      dtmsvr/storage/redis/redis.go
  4. 28
      dtmsvr/storage/sql/sql.go
  5. 1
      dtmsvr/storage/store.go
  6. 14
      test/api_test.go
  7. 72
      test/store_test.go

14
dtmsvr/api_http.go

@ -8,6 +8,8 @@ package dtmsvr
import ( import (
"errors" "errors"
"strconv"
"time"
"github.com/dtm-labs/dtm/dtmcli" "github.com/dtm-labs/dtm/dtmcli"
"github.com/dtm-labs/dtm/dtmcli/dtmimp" "github.com/dtm-labs/dtm/dtmcli/dtmimp"
@ -26,6 +28,7 @@ func addRoute(engine *gin.Engine) {
engine.POST("/api/dtmsvr/registerTccBranch", dtmutil.WrapHandler2(registerBranch)) // compatible for old sdk engine.POST("/api/dtmsvr/registerTccBranch", dtmutil.WrapHandler2(registerBranch)) // compatible for old sdk
engine.GET("/api/dtmsvr/query", dtmutil.WrapHandler2(query)) engine.GET("/api/dtmsvr/query", dtmutil.WrapHandler2(query))
engine.GET("/api/dtmsvr/all", dtmutil.WrapHandler2(all)) engine.GET("/api/dtmsvr/all", dtmutil.WrapHandler2(all))
engine.GET("/api/dtmsvr/resetCronTime", dtmutil.WrapHandler2(resetCronTime))
// add prometheus exporter // add prometheus exporter
h := promhttp.Handler() h := promhttp.Handler()
@ -75,7 +78,14 @@ func query(c *gin.Context) interface{} {
func all(c *gin.Context) interface{} { func all(c *gin.Context) interface{} {
position := c.Query("position") position := c.Query("position")
slimit := dtmimp.OrString(c.Query("limit"), "100") sLimit := dtmimp.OrString(c.Query("limit"), "100")
globals := GetStore().ScanTransGlobalStores(&position, int64(dtmimp.MustAtoi(slimit))) globals := GetStore().ScanTransGlobalStores(&position, int64(dtmimp.MustAtoi(sLimit)))
return map[string]interface{}{"transactions": globals, "next_position": position} return map[string]interface{}{"transactions": globals, "next_position": position}
} }
func resetCronTime(c *gin.Context) interface{} {
sTimeoutSecond := dtmimp.OrString(c.Query("timeout"), strconv.FormatInt(3*conf.TimeoutToFail, 10))
sLimit := dtmimp.OrString(c.Query("limit"), "100")
timeout := time.Duration(dtmimp.MustAtoi(sTimeoutSecond)) * time.Second
return GetStore().ResetCronTime(timeout, int64(dtmimp.MustAtoi(sLimit)))
}

33
dtmsvr/storage/boltdb/boltdb.go

@ -11,15 +11,17 @@ import (
"strings" "strings"
"time" "time"
bolt "go.etcd.io/bbolt"
"github.com/dtm-labs/dtm/dtmcli" "github.com/dtm-labs/dtm/dtmcli"
"github.com/dtm-labs/dtm/dtmcli/dtmimp" "github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmcli/logger" "github.com/dtm-labs/dtm/dtmcli/logger"
"github.com/dtm-labs/dtm/dtmsvr/config"
"github.com/dtm-labs/dtm/dtmsvr/storage" "github.com/dtm-labs/dtm/dtmsvr/storage"
"github.com/dtm-labs/dtm/dtmutil" "github.com/dtm-labs/dtm/dtmutil"
bolt "go.etcd.io/bbolt"
) )
var conf = &config.Config
// Store implements storage.Store, and storage with boltdb // Store implements storage.Store, and storage with boltdb
type Store struct { type Store struct {
boltDb *bolt.DB boltDb *bolt.DB
@ -409,3 +411,30 @@ func (s *Store) LockOneGlobalTrans(expireIn time.Duration) *storage.TransGlobalS
dtmimp.E2P(err) dtmimp.E2P(err)
return trans return trans
} }
func (s *Store) ResetCronTime(timeout time.Duration, limit int64) error {
next := time.Now()
var trans *storage.TransGlobalStore
min := fmt.Sprintf("%d", time.Now().Add(timeout).Unix())
err := s.boltDb.Update(func(t *bolt.Tx) error {
cursor := t.Bucket(bucketIndex).Cursor()
i := 0
for k, v := cursor.Seek([]byte(min)); k != nil && i < int(limit); k, v = cursor.Next() {
trans = tGetGlobal(t, string(v))
err := t.Bucket(bucketIndex).Delete(k)
dtmimp.E2P(err)
if trans.Status == dtmcli.StatusSucceed || trans.Status == dtmcli.StatusFailed {
continue
}
trans.NextCronTime = &next
tPutGlobal(t, trans)
tPutIndex(t, next.Unix(), trans.Gid)
i++
}
return nil
})
return err
}

25
dtmsvr/storage/redis/redis.go

@ -260,6 +260,31 @@ return gid
} }
} }
// ResetCronTime rest nextCronTime
func (s *Store) ResetCronTime(timeout time.Duration, limit int64) error {
next := time.Now().Unix()
timeoutTimestamp := time.Now().Add(timeout).Unix()
args := newArgList().AppendGid("").AppendRaw(timeoutTimestamp).AppendRaw(next).AppendRaw(limit)
lua := `-- ResetCronTime
local r = redis.call('ZRANGEBYSCORE', KEYS[3], ARGV[3], '+inf', 'WITHSCORES', 'LIMIT', 0, ARGV[5])
local index = 1
while(true)
do
local gid = r[index]
if gid == nil then
break
end
redis.call('ZADD', KEYS[3], ARGV[4], gid)
index = index + 2
end
`
_, err := callLua(args, lua)
if errors.Is(err, storage.ErrNotFound) {
return nil
}
return err
}
// TouchCronTime updates cronTime // TouchCronTime updates cronTime
func (s *Store) TouchCronTime(global *storage.TransGlobalStore, nextCronInterval int64, nextCronTime *time.Time) { func (s *Store) TouchCronTime(global *storage.TransGlobalStore, nextCronInterval int64, nextCronTime *time.Time) {
global.UpdateTime = dtmutil.GetNextTime(0) global.UpdateTime = dtmutil.GetNextTime(0)

28
dtmsvr/storage/sql/sql.go

@ -5,14 +5,13 @@ import (
"math" "math"
"time" "time"
"github.com/lithammer/shortuuid/v3"
"gorm.io/gorm"
"gorm.io/gorm/clause"
"github.com/dtm-labs/dtm/dtmcli/dtmimp" "github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmsvr/config" "github.com/dtm-labs/dtm/dtmsvr/config"
"github.com/dtm-labs/dtm/dtmsvr/storage" "github.com/dtm-labs/dtm/dtmsvr/storage"
"github.com/dtm-labs/dtm/dtmutil" "github.com/dtm-labs/dtm/dtmutil"
"github.com/lithammer/shortuuid/v3"
"gorm.io/gorm"
"gorm.io/gorm/clause"
) )
var conf = &config.Config var conf = &config.Config
@ -157,6 +156,27 @@ func (s *Store) LockOneGlobalTrans(expireIn time.Duration) *storage.TransGlobalS
return global return global
} }
func (s *Store) ResetCronTime(timeout time.Duration, limit int64) error {
db := dbGet()
getTime := func(second int) string {
return map[string]string{
"mysql": fmt.Sprintf("date_add(now(), interval %d second)", second),
"postgres": fmt.Sprintf("current_timestamp + interval '%d second'", second),
}[conf.Store.Driver]
}
timeoutSecond := int(timeout / time.Second)
whereTime := fmt.Sprintf("next_cron_time > %s", getTime(timeoutSecond))
global := &storage.TransGlobalStore{}
dbr := db.Must().Model(global).
Where(whereTime + "and status in ('prepared', 'aborting', 'submitted')").
Limit(int(limit)).
Select([]string{"next_cron_time"}).
Updates(&storage.TransGlobalStore{
NextCronTime: dtmutil.GetNextTime(0),
})
return dbr.Error
}
// SetDBConn sets db conn pool // SetDBConn sets db conn pool
func SetDBConn(db *gorm.DB) { func SetDBConn(db *gorm.DB) {
sqldb, _ := db.DB() sqldb, _ := db.DB()

1
dtmsvr/storage/store.go

@ -30,4 +30,5 @@ type Store interface {
ChangeGlobalStatus(global *TransGlobalStore, newStatus string, updates []string, finished bool) ChangeGlobalStatus(global *TransGlobalStore, newStatus string, updates []string, finished bool)
TouchCronTime(global *TransGlobalStore, nextCronInterval int64, nextCronTime *time.Time) TouchCronTime(global *TransGlobalStore, nextCronInterval int64, nextCronTime *time.Time)
LockOneGlobalTrans(expireIn time.Duration) *TransGlobalStore LockOneGlobalTrans(expireIn time.Duration) *TransGlobalStore
ResetCronTime(timeout time.Duration, limit int64) error
} }

14
test/api_test.go

@ -8,6 +8,7 @@ package test
import ( import (
"fmt" "fmt"
"strconv"
"testing" "testing"
"github.com/dtm-labs/dtm/dtmcli/dtmimp" "github.com/dtm-labs/dtm/dtmcli/dtmimp"
@ -79,3 +80,16 @@ func TestDtmMetrics(t *testing.T) {
assert.Nil(t, err) assert.Nil(t, err)
assert.Equal(t, rest.StatusCode(), 200) assert.Equal(t, rest.StatusCode(), 200)
} }
func TestAPIResetCronTime(t *testing.T) {
testStoreResetCronTime(t, dtmimp.GetFuncName(), func(timeout int64, limit int64) error {
sTimeout := strconv.FormatInt(timeout, 10)
sLimit := strconv.FormatInt(limit, 10)
_, err := dtmimp.RestyClient.R().SetQueryParams(map[string]string{
"timeout": sTimeout,
"limit": sLimit,
}).Get(dtmutil.DefaultHTTPServer + "/resetCronTime")
return err
})
}

72
test/store_test.go

@ -1,6 +1,7 @@
package test package test
import ( import (
"fmt"
"testing" "testing"
"time" "time"
@ -13,6 +14,10 @@ import (
func initTransGlobal(gid string) (*storage.TransGlobalStore, storage.Store) { func initTransGlobal(gid string) (*storage.TransGlobalStore, storage.Store) {
next := time.Now().Add(10 * time.Second) next := time.Now().Add(10 * time.Second)
return initTransGlobalByNextCronTime(gid, next)
}
func initTransGlobalByNextCronTime(gid string, next time.Time) (*storage.TransGlobalStore, storage.Store) {
g := &storage.TransGlobalStore{Gid: gid, Status: "prepared", NextCronTime: &next} g := &storage.TransGlobalStore{Gid: gid, Status: "prepared", NextCronTime: &next}
bs := []storage.TransBranchStore{ bs := []storage.TransBranchStore{
{Gid: gid, BranchID: "01"}, {Gid: gid, BranchID: "01"},
@ -88,6 +93,73 @@ func TestStoreLockTrans(t *testing.T) {
assert.Nil(t, g2) assert.Nil(t, g2)
} }
func TestStoreResetCronTime(t *testing.T) {
s := registry.GetStore()
testStoreResetCronTime(t, dtmimp.GetFuncName(), func(timeout int64, limit int64) error {
return s.ResetCronTime(time.Duration(timeout)*time.Second, limit)
})
}
func testStoreResetCronTime(t *testing.T, funcName string, restCronHandler func(expire int64, limit int64) error) {
s := registry.GetStore()
var restTimeTimeout, lockExpireIn, limit, i int64
restTimeTimeout = 100 //The time that will be ResetCronTime
lockExpireIn = 2 //The time that will be LockOneGlobalTrans
limit = 10 // rest limit
// Will be reset
for i = 0; i < limit; i++ {
gid := funcName + fmt.Sprintf("%d", i)
_, _ = initTransGlobalByNextCronTime(gid, time.Now().Add(time.Duration(restTimeTimeout+10)*time.Second))
}
// Will not be reset
gid := funcName + fmt.Sprintf("%d", 10)
_, _ = initTransGlobalByNextCronTime(gid, time.Now().Add(time.Duration(restTimeTimeout-10)*time.Second))
// Not Fount
g := s.LockOneGlobalTrans(time.Duration(lockExpireIn) * time.Second)
assert.Nil(t, g)
// Rest limit-1 count
err := restCronHandler(restTimeTimeout, limit-1)
assert.Nil(t, err)
// Fount limit-1 count
for i = 0; i < limit-1; i++ {
g = s.LockOneGlobalTrans(time.Duration(lockExpireIn) * time.Second)
assert.NotNil(t, g)
s.ChangeGlobalStatus(g, "succeed", []string{}, true)
}
// Not Fount
g = s.LockOneGlobalTrans(time.Duration(lockExpireIn) * time.Second)
assert.Nil(t, g)
// Rest 1 count
err = restCronHandler(restTimeTimeout, limit)
// Fount 1 count
g = s.LockOneGlobalTrans(time.Duration(lockExpireIn) * time.Second)
assert.NotNil(t, g)
s.ChangeGlobalStatus(g, "succeed", []string{}, true)
// Not Fount
g = s.LockOneGlobalTrans(time.Duration(lockExpireIn) * time.Second)
assert.Nil(t, g)
// Increase the restTimeTimeout, Rest 1 count
err = restCronHandler(restTimeTimeout-12, limit)
assert.Nil(t, err)
// Fount 1 count
g = s.LockOneGlobalTrans(time.Duration(lockExpireIn) * time.Second)
assert.NotNil(t, g)
s.ChangeGlobalStatus(g, "succeed", []string{}, true)
// Not Fount
g = s.LockOneGlobalTrans(time.Duration(lockExpireIn) * time.Second)
assert.Nil(t, g)
}
func TestUpdateBranches(t *testing.T) { func TestUpdateBranches(t *testing.T) {
if !conf.Store.IsDB() { if !conf.Store.IsDB() {
_, err := registry.GetStore().UpdateBranches(nil, nil) _, err := registry.GetStore().UpdateBranches(nil, nil)

Loading…
Cancel
Save