Browse Source

Add TransGlobalScanCondition struct for admin api scan

pull/426/head
徐云金YunjinXu 3 years ago
parent
commit
115c9b18b0
  1. 16
      dtmsvr/api_http.go
  2. 3
      dtmsvr/storage/boltdb/boltdb.go
  3. 3
      dtmsvr/storage/redis/redis.go
  4. 3
      dtmsvr/storage/sql/sql.go
  5. 10
      dtmsvr/storage/store.go

16
dtmsvr/api_http.go

@ -13,6 +13,7 @@ import (
"github.com/dtm-labs/dtm/client/dtmcli" "github.com/dtm-labs/dtm/client/dtmcli"
"github.com/dtm-labs/dtm/client/dtmcli/dtmimp" "github.com/dtm-labs/dtm/client/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmsvr/storage"
"github.com/dtm-labs/dtm/dtmutil" "github.com/dtm-labs/dtm/dtmutil"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/client_golang/prometheus/promhttp"
@ -111,11 +112,24 @@ func all(c *gin.Context) interface{} {
globals = []interface{}{*find} globals = []interface{}{*find}
} }
} else { } else {
globals = GetStore().ScanTransGlobalStores(&position, int64(dtmimp.MustAtoi(sLimit))) condition := storage.TransGlobalScanCondition{
Status: c.Query("status"),
TransType: c.Query("transType"),
CreateTimeStart: stringTotime(c.Query("createTimeStart")),
CreateTimeEnd: stringTotime(c.Query("createTimeEnd")),
}
globals = GetStore().ScanTransGlobalStores(&position, int64(dtmimp.MustAtoi(sLimit)), condition)
} }
return map[string]interface{}{"transactions": globals, "next_position": position} return map[string]interface{}{"transactions": globals, "next_position": position}
} }
func stringTotime(timeStr string) time.Time {
if timeStr == "" {
return time.Time{}
}
return time.Unix(int64(dtmimp.MustAtoi(timeStr))/1000, 0)
}
// unfinished transactions need to be retried as soon as possible after business downtime is recovered // unfinished transactions need to be retried as soon as possible after business downtime is recovered
func resetCronTime(c *gin.Context) interface{} { func resetCronTime(c *gin.Context) interface{} {
sTimeoutSecond := dtmimp.OrString(c.Query("timeout"), strconv.FormatInt(3*conf.TimeoutToFail, 10)) sTimeoutSecond := dtmimp.OrString(c.Query("timeout"), strconv.FormatInt(3*conf.TimeoutToFail, 10))

3
dtmsvr/storage/boltdb/boltdb.go

@ -311,7 +311,7 @@ func (s *Store) FindTransGlobalStore(gid string) (trans *storage.TransGlobalStor
} }
// ScanTransGlobalStores lists GlobalTrans data // ScanTransGlobalStores lists GlobalTrans data
func (s *Store) ScanTransGlobalStores(position *string, limit int64) []storage.TransGlobalStore { func (s *Store) ScanTransGlobalStores(position *string, limit int64, condition storage.TransGlobalScanCondition) []storage.TransGlobalStore {
globals := []storage.TransGlobalStore{} globals := []storage.TransGlobalStore{}
err := s.boltDb.View(func(t *bolt.Tx) error { err := s.boltDb.View(func(t *bolt.Tx) error {
cursor := t.Bucket(bucketGlobal).Cursor() cursor := t.Bucket(bucketGlobal).Cursor()
@ -322,6 +322,7 @@ func (s *Store) ScanTransGlobalStores(position *string, limit int64) []storage.T
g := storage.TransGlobalStore{} g := storage.TransGlobalStore{}
dtmimp.MustUnmarshal(v, &g) dtmimp.MustUnmarshal(v, &g)
globals = append(globals, g) globals = append(globals, g)
// todo condition
if len(globals) == int(limit) { if len(globals) == int(limit) {
break break
} }

3
dtmsvr/storage/redis/redis.go

@ -60,7 +60,7 @@ func (s *Store) FindTransGlobalStore(gid string) *storage.TransGlobalStore {
} }
// ScanTransGlobalStores lists GlobalTrans data // ScanTransGlobalStores lists GlobalTrans data
func (s *Store) ScanTransGlobalStores(position *string, limit int64) []storage.TransGlobalStore { func (s *Store) ScanTransGlobalStores(position *string, limit int64, condition storage.TransGlobalScanCondition) []storage.TransGlobalStore {
logger.Debugf("calling ScanTransGlobalStores: %s %d", *position, limit) logger.Debugf("calling ScanTransGlobalStores: %s %d", *position, limit)
lid := uint64(0) lid := uint64(0)
if *position != "" { if *position != "" {
@ -80,6 +80,7 @@ func (s *Store) ScanTransGlobalStores(position *string, limit int64) []storage.T
for _, v := range values { for _, v := range values {
global := storage.TransGlobalStore{} global := storage.TransGlobalStore{}
dtmimp.MustUnmarshalString(v.(string), &global) dtmimp.MustUnmarshalString(v.(string), &global)
// todo condition
globals = append(globals, global) globals = append(globals, global)
if len(globals) == int(limit) { if len(globals) == int(limit) {
break break

3
dtmsvr/storage/sql/sql.go

@ -52,12 +52,13 @@ func (s *Store) FindTransGlobalStore(gid string) *storage.TransGlobalStore {
} }
// ScanTransGlobalStores lists GlobalTrans data // ScanTransGlobalStores lists GlobalTrans data
func (s *Store) ScanTransGlobalStores(position *string, limit int64) []storage.TransGlobalStore { func (s *Store) ScanTransGlobalStores(position *string, limit int64, condition storage.TransGlobalScanCondition) []storage.TransGlobalStore {
globals := []storage.TransGlobalStore{} globals := []storage.TransGlobalStore{}
lid := math.MaxInt64 lid := math.MaxInt64
if *position != "" { if *position != "" {
lid = dtmimp.MustAtoi(*position) lid = dtmimp.MustAtoi(*position)
} }
// todo condition
dbr := dbGet().Must().Where("id < ?", lid).Order("id desc").Limit(int(limit)).Find(&globals) dbr := dbGet().Must().Where("id < ?", lid).Order("id desc").Limit(int(limit)).Find(&globals)
if dbr.RowsAffected < limit { if dbr.RowsAffected < limit {
*position = "" *position = ""

10
dtmsvr/storage/store.go

@ -22,7 +22,7 @@ type Store interface {
Ping() error Ping() error
PopulateData(skipDrop bool) PopulateData(skipDrop bool)
FindTransGlobalStore(gid string) *TransGlobalStore FindTransGlobalStore(gid string) *TransGlobalStore
ScanTransGlobalStores(position *string, limit int64) []TransGlobalStore ScanTransGlobalStores(position *string, limit int64, condition TransGlobalScanCondition) []TransGlobalStore
FindBranches(gid string) []TransBranchStore FindBranches(gid string) []TransBranchStore
UpdateBranches(branches []TransBranchStore, updates []string) (int, error) UpdateBranches(branches []TransBranchStore, updates []string) (int, error)
LockGlobalSaveBranches(gid string, status string, branches []TransBranchStore, branchStart int) LockGlobalSaveBranches(gid string, status string, branches []TransBranchStore, branchStart int)
@ -37,3 +37,11 @@ type Store interface {
DeleteKV(cat, key string) error DeleteKV(cat, key string) error
CreateKV(cat, key, value string) error CreateKV(cat, key, value string) error
} }
// TransGlobalScanCondition contains filter options to scan global trans.
type TransGlobalScanCondition struct {
Status string
TransType string
CreateTimeStart time.Time
CreateTimeEnd time.Time
}

Loading…
Cancel
Save