Browse Source

reset trans corntime once - store

pull/439/head
徐云金YunjinXu 3 years ago
parent
commit
d98f7f56f7
  1. 18
      dtmsvr/storage/boltdb/boltdb.go
  2. 10
      dtmsvr/storage/redis/redis.go
  3. 15
      dtmsvr/storage/sql/sql.go
  4. 1
      dtmsvr/storage/store.go
  5. 9
      dtmsvr/trans_status.go

18
dtmsvr/storage/boltdb/boltdb.go

@ -482,6 +482,24 @@ func (s *Store) ResetCronTime(after time.Duration, limit int64) (succeedCount in
return
}
// ResetTransGlobalCronTime reset nextCronTime of one global trans.
func (s *Store) ResetTransGlobalCronTime(g *storage.TransGlobalStore) error {
old := g.UpdateTime
err := s.boltDb.Update(func(t *bolt.Tx) error {
g := tGetGlobal(t, g.Gid)
if g == nil || g.UpdateTime == old {
return storage.ErrNotFound
}
now := dtmutil.GetNextTime(0)
g.NextCronTime = now
g.UpdateTime = now
tPutGlobal(t, g)
return nil
})
dtmimp.E2P(err)
return err
}
// ScanKV lists KV pairs
func (s *Store) ScanKV(cat string, position *string, limit int64) []storage.KVStore {
kvs := []storage.KVStore{}

10
dtmsvr/storage/redis/redis.go

@ -330,6 +330,16 @@ return tostring(i)
return
}
// ResetTransGlobalCronTime reset nextCronTime of one global trans.
func (s *Store) ResetTransGlobalCronTime(global *storage.TransGlobalStore) error {
now := dtmutil.GetNextTime(0)
global.NextCronTime = now
global.UpdateTime = now
key := conf.Store.RedisPrefix + "_g_" + global.Gid
_, err := redisGet().Set(ctx, key, dtmimp.MustMarshalString(global), time.Duration(conf.Store.DataExpire)*time.Second).Result()
return err
}
// TouchCronTime updates cronTime
func (s *Store) TouchCronTime(global *storage.TransGlobalStore, nextCronInterval int64, nextCronTime *time.Time) {
global.UpdateTime = dtmutil.GetNextTime(0)

15
dtmsvr/storage/sql/sql.go

@ -204,6 +204,21 @@ func (s *Store) ResetCronTime(after time.Duration, limit int64) (succeedCount in
return affected, affected == limit, err
}
// ResetTransGlobalCronTime reset nextCronTime of one global trans.
func (s *Store) ResetTransGlobalCronTime(global *storage.TransGlobalStore) error {
now := getTimeStr(0)
where := map[string]string{
dtmimp.DBTypeMysql: fmt.Sprintf(`gid = '%s'`, global.Gid),
}[conf.Store.Driver]
sql := fmt.Sprintf(`UPDATE trans_global SET update_time='%s',next_cron_time='%s' WHERE %s`,
now,
now,
where)
_, err := dtmimp.DBExec(conf.Store.Driver, dbGet().ToSQLDB(), sql)
return err
}
// ScanKV lists KV pairs
func (s *Store) ScanKV(cat string, position *string, limit int64) []storage.KVStore {
kvs := []storage.KVStore{}

1
dtmsvr/storage/store.go

@ -31,6 +31,7 @@ type Store interface {
TouchCronTime(global *TransGlobalStore, nextCronInterval int64, nextCronTime *time.Time)
LockOneGlobalTrans(expireIn time.Duration) *TransGlobalStore
ResetCronTime(after time.Duration, limit int64) (succeedCount int64, hasRemaining bool, err error)
ResetTransGlobalCronTime(global *TransGlobalStore) error
ScanKV(cat string, position *string, limit int64) []KVStore
FindKV(cat, key string) []KVStore
UpdateKV(kv *KVStore) error

9
dtmsvr/trans_status.go

@ -85,6 +85,15 @@ func (t *TransGlobal) changeStatus(status string, opts ...changeStatusOption) {
t.Status = status
}
func (t *TransGlobal) resetNextCronTime() error {
err := GetStore().ResetTransGlobalCronTime(&t.TransGlobalStore)
if err != nil {
return err
}
logger.Infof("ResetTransGlobalCronTime to now ok for %s", t.TransGlobalStore.String())
return nil
}
func (t *TransGlobal) changeBranchStatus(b *TransBranch, status string, branchPos int) {
now := time.Now()
b.Status = status

Loading…
Cancel
Save