Browse Source

Merge pull request #426 from wooln/feature-all-trans-scan-condition-struct

Add TransGlobalScanCondition struct for admin api all trans scan
pull/421/head
yedf2 3 years ago
committed by GitHub
parent
commit
df3894c950
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 16
      dtmsvr/api_http.go
  2. 3
      dtmsvr/storage/boltdb/boltdb.go
  3. 3
      dtmsvr/storage/redis/redis.go
  4. 3
      dtmsvr/storage/sql/sql.go
  5. 10
      dtmsvr/storage/store.go

16
dtmsvr/api_http.go

@ -13,6 +13,7 @@ import (
"github.com/dtm-labs/dtm/client/dtmcli"
"github.com/dtm-labs/dtm/client/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmsvr/storage"
"github.com/dtm-labs/dtm/dtmutil"
"github.com/gin-gonic/gin"
"github.com/prometheus/client_golang/prometheus/promhttp"
@ -111,11 +112,24 @@ func all(c *gin.Context) interface{} {
globals = []interface{}{*find}
}
} else {
globals = GetStore().ScanTransGlobalStores(&position, int64(dtmimp.MustAtoi(sLimit)))
condition := storage.TransGlobalScanCondition{
Status: c.Query("status"),
TransType: c.Query("transType"),
CreateTimeStart: stringTotime(c.Query("createTimeStart")),
CreateTimeEnd: stringTotime(c.Query("createTimeEnd")),
}
globals = GetStore().ScanTransGlobalStores(&position, int64(dtmimp.MustAtoi(sLimit)), condition)
}
return map[string]interface{}{"transactions": globals, "next_position": position}
}
func stringTotime(timeStr string) time.Time {
if timeStr == "" {
return time.Time{}
}
return time.Unix(int64(dtmimp.MustAtoi(timeStr))/1000, 0)
}
// unfinished transactions need to be retried as soon as possible after business downtime is recovered
func resetCronTime(c *gin.Context) interface{} {
sTimeoutSecond := dtmimp.OrString(c.Query("timeout"), strconv.FormatInt(3*conf.TimeoutToFail, 10))

3
dtmsvr/storage/boltdb/boltdb.go

@ -311,7 +311,7 @@ func (s *Store) FindTransGlobalStore(gid string) (trans *storage.TransGlobalStor
}
// ScanTransGlobalStores lists GlobalTrans data
func (s *Store) ScanTransGlobalStores(position *string, limit int64) []storage.TransGlobalStore {
func (s *Store) ScanTransGlobalStores(position *string, limit int64, condition storage.TransGlobalScanCondition) []storage.TransGlobalStore {
globals := []storage.TransGlobalStore{}
err := s.boltDb.View(func(t *bolt.Tx) error {
cursor := t.Bucket(bucketGlobal).Cursor()
@ -322,6 +322,7 @@ func (s *Store) ScanTransGlobalStores(position *string, limit int64) []storage.T
g := storage.TransGlobalStore{}
dtmimp.MustUnmarshal(v, &g)
globals = append(globals, g)
// todo condition
if len(globals) == int(limit) {
break
}

3
dtmsvr/storage/redis/redis.go

@ -60,7 +60,7 @@ func (s *Store) FindTransGlobalStore(gid string) *storage.TransGlobalStore {
}
// ScanTransGlobalStores lists GlobalTrans data
func (s *Store) ScanTransGlobalStores(position *string, limit int64) []storage.TransGlobalStore {
func (s *Store) ScanTransGlobalStores(position *string, limit int64, condition storage.TransGlobalScanCondition) []storage.TransGlobalStore {
logger.Debugf("calling ScanTransGlobalStores: %s %d", *position, limit)
lid := uint64(0)
if *position != "" {
@ -80,6 +80,7 @@ func (s *Store) ScanTransGlobalStores(position *string, limit int64) []storage.T
for _, v := range values {
global := storage.TransGlobalStore{}
dtmimp.MustUnmarshalString(v.(string), &global)
// todo condition
globals = append(globals, global)
if len(globals) == int(limit) {
break

3
dtmsvr/storage/sql/sql.go

@ -52,12 +52,13 @@ func (s *Store) FindTransGlobalStore(gid string) *storage.TransGlobalStore {
}
// ScanTransGlobalStores lists GlobalTrans data
func (s *Store) ScanTransGlobalStores(position *string, limit int64) []storage.TransGlobalStore {
func (s *Store) ScanTransGlobalStores(position *string, limit int64, condition storage.TransGlobalScanCondition) []storage.TransGlobalStore {
globals := []storage.TransGlobalStore{}
lid := math.MaxInt64
if *position != "" {
lid = dtmimp.MustAtoi(*position)
}
// todo condition
dbr := dbGet().Must().Where("id < ?", lid).Order("id desc").Limit(int(limit)).Find(&globals)
if dbr.RowsAffected < limit {
*position = ""

10
dtmsvr/storage/store.go

@ -22,7 +22,7 @@ type Store interface {
Ping() error
PopulateData(skipDrop bool)
FindTransGlobalStore(gid string) *TransGlobalStore
ScanTransGlobalStores(position *string, limit int64) []TransGlobalStore
ScanTransGlobalStores(position *string, limit int64, condition TransGlobalScanCondition) []TransGlobalStore
FindBranches(gid string) []TransBranchStore
UpdateBranches(branches []TransBranchStore, updates []string) (int, error)
LockGlobalSaveBranches(gid string, status string, branches []TransBranchStore, branchStart int)
@ -37,3 +37,11 @@ type Store interface {
DeleteKV(cat, key string) error
CreateKV(cat, key, value string) error
}
// TransGlobalScanCondition contains filter options to scan global trans.
type TransGlobalScanCondition struct {
Status string
TransType string
CreateTimeStart time.Time
CreateTimeEnd time.Time
}

Loading…
Cancel
Save