diff --git a/dtmsvr/api_http.go b/dtmsvr/api_http.go index ff0379b..64a291d 100644 --- a/dtmsvr/api_http.go +++ b/dtmsvr/api_http.go @@ -13,6 +13,7 @@ import ( "github.com/dtm-labs/dtm/client/dtmcli" "github.com/dtm-labs/dtm/client/dtmcli/dtmimp" + "github.com/dtm-labs/dtm/dtmsvr/storage" "github.com/dtm-labs/dtm/dtmutil" "github.com/gin-gonic/gin" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -111,11 +112,24 @@ func all(c *gin.Context) interface{} { globals = []interface{}{*find} } } else { - globals = GetStore().ScanTransGlobalStores(&position, int64(dtmimp.MustAtoi(sLimit))) + condition := storage.TransGlobalScanCondition{ + Status: c.Query("status"), + TransType: c.Query("transType"), + CreateTimeStart: stringTotime(c.Query("createTimeStart")), + CreateTimeEnd: stringTotime(c.Query("createTimeEnd")), + } + globals = GetStore().ScanTransGlobalStores(&position, int64(dtmimp.MustAtoi(sLimit)), condition) } return map[string]interface{}{"transactions": globals, "next_position": position} } +func stringTotime(timeStr string) time.Time { + if timeStr == "" { + return time.Time{} + } + return time.Unix(int64(dtmimp.MustAtoi(timeStr))/1000, 0) +} + // unfinished transactions need to be retried as soon as possible after business downtime is recovered func resetCronTime(c *gin.Context) interface{} { sTimeoutSecond := dtmimp.OrString(c.Query("timeout"), strconv.FormatInt(3*conf.TimeoutToFail, 10)) diff --git a/dtmsvr/storage/boltdb/boltdb.go b/dtmsvr/storage/boltdb/boltdb.go index f3e285d..ccd6f1d 100644 --- a/dtmsvr/storage/boltdb/boltdb.go +++ b/dtmsvr/storage/boltdb/boltdb.go @@ -311,7 +311,7 @@ func (s *Store) FindTransGlobalStore(gid string) (trans *storage.TransGlobalStor } // ScanTransGlobalStores lists GlobalTrans data -func (s *Store) ScanTransGlobalStores(position *string, limit int64) []storage.TransGlobalStore { +func (s *Store) ScanTransGlobalStores(position *string, limit int64, condition storage.TransGlobalScanCondition) []storage.TransGlobalStore { globals := []storage.TransGlobalStore{} err := s.boltDb.View(func(t *bolt.Tx) error { cursor := t.Bucket(bucketGlobal).Cursor() @@ -322,6 +322,7 @@ func (s *Store) ScanTransGlobalStores(position *string, limit int64) []storage.T g := storage.TransGlobalStore{} dtmimp.MustUnmarshal(v, &g) globals = append(globals, g) + // todo condition if len(globals) == int(limit) { break } diff --git a/dtmsvr/storage/redis/redis.go b/dtmsvr/storage/redis/redis.go index f9233ed..1a35620 100644 --- a/dtmsvr/storage/redis/redis.go +++ b/dtmsvr/storage/redis/redis.go @@ -60,7 +60,7 @@ func (s *Store) FindTransGlobalStore(gid string) *storage.TransGlobalStore { } // ScanTransGlobalStores lists GlobalTrans data -func (s *Store) ScanTransGlobalStores(position *string, limit int64) []storage.TransGlobalStore { +func (s *Store) ScanTransGlobalStores(position *string, limit int64, condition storage.TransGlobalScanCondition) []storage.TransGlobalStore { logger.Debugf("calling ScanTransGlobalStores: %s %d", *position, limit) lid := uint64(0) if *position != "" { @@ -80,6 +80,7 @@ func (s *Store) ScanTransGlobalStores(position *string, limit int64) []storage.T for _, v := range values { global := storage.TransGlobalStore{} dtmimp.MustUnmarshalString(v.(string), &global) + // todo condition globals = append(globals, global) if len(globals) == int(limit) { break diff --git a/dtmsvr/storage/sql/sql.go b/dtmsvr/storage/sql/sql.go index 360755b..bcbbbb1 100644 --- a/dtmsvr/storage/sql/sql.go +++ b/dtmsvr/storage/sql/sql.go @@ -52,12 +52,13 @@ func (s *Store) FindTransGlobalStore(gid string) *storage.TransGlobalStore { } // ScanTransGlobalStores lists GlobalTrans data -func (s *Store) ScanTransGlobalStores(position *string, limit int64) []storage.TransGlobalStore { +func (s *Store) ScanTransGlobalStores(position *string, limit int64, condition storage.TransGlobalScanCondition) []storage.TransGlobalStore { globals := []storage.TransGlobalStore{} lid := math.MaxInt64 if *position != "" { lid = dtmimp.MustAtoi(*position) } + // todo condition dbr := dbGet().Must().Where("id < ?", lid).Order("id desc").Limit(int(limit)).Find(&globals) if dbr.RowsAffected < limit { *position = "" diff --git a/dtmsvr/storage/store.go b/dtmsvr/storage/store.go index d2617f9..3b9773f 100644 --- a/dtmsvr/storage/store.go +++ b/dtmsvr/storage/store.go @@ -22,7 +22,7 @@ type Store interface { Ping() error PopulateData(skipDrop bool) FindTransGlobalStore(gid string) *TransGlobalStore - ScanTransGlobalStores(position *string, limit int64) []TransGlobalStore + ScanTransGlobalStores(position *string, limit int64, condition TransGlobalScanCondition) []TransGlobalStore FindBranches(gid string) []TransBranchStore UpdateBranches(branches []TransBranchStore, updates []string) (int, error) LockGlobalSaveBranches(gid string, status string, branches []TransBranchStore, branchStart int) @@ -37,3 +37,11 @@ type Store interface { DeleteKV(cat, key string) error CreateKV(cat, key, value string) error } + +// TransGlobalScanCondition contains filter options to scan global trans. +type TransGlobalScanCondition struct { + Status string + TransType string + CreateTimeStart time.Time + CreateTimeEnd time.Time +}