From d98f7f56f7cdf0028b1a455fa79042f9ffbf6896 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BE=90=E4=BA=91=E9=87=91YunjinXu?= Date: Sun, 25 Jun 2023 09:57:26 +0800 Subject: [PATCH] reset trans corntime once - store --- dtmsvr/storage/boltdb/boltdb.go | 18 ++++++++++++++++++ dtmsvr/storage/redis/redis.go | 10 ++++++++++ dtmsvr/storage/sql/sql.go | 15 +++++++++++++++ dtmsvr/storage/store.go | 1 + dtmsvr/trans_status.go | 9 +++++++++ 5 files changed, 53 insertions(+) diff --git a/dtmsvr/storage/boltdb/boltdb.go b/dtmsvr/storage/boltdb/boltdb.go index e940b01..631d73d 100644 --- a/dtmsvr/storage/boltdb/boltdb.go +++ b/dtmsvr/storage/boltdb/boltdb.go @@ -482,6 +482,24 @@ func (s *Store) ResetCronTime(after time.Duration, limit int64) (succeedCount in return } +// ResetTransGlobalCronTime reset nextCronTime of one global trans. +func (s *Store) ResetTransGlobalCronTime(g *storage.TransGlobalStore) error { + old := g.UpdateTime + err := s.boltDb.Update(func(t *bolt.Tx) error { + g := tGetGlobal(t, g.Gid) + if g == nil || g.UpdateTime == old { + return storage.ErrNotFound + } + now := dtmutil.GetNextTime(0) + g.NextCronTime = now + g.UpdateTime = now + tPutGlobal(t, g) + return nil + }) + dtmimp.E2P(err) + return err +} + // ScanKV lists KV pairs func (s *Store) ScanKV(cat string, position *string, limit int64) []storage.KVStore { kvs := []storage.KVStore{} diff --git a/dtmsvr/storage/redis/redis.go b/dtmsvr/storage/redis/redis.go index 9b80d2d..090ab59 100644 --- a/dtmsvr/storage/redis/redis.go +++ b/dtmsvr/storage/redis/redis.go @@ -330,6 +330,16 @@ return tostring(i) return } +// ResetTransGlobalCronTime reset nextCronTime of one global trans. +func (s *Store) ResetTransGlobalCronTime(global *storage.TransGlobalStore) error { + now := dtmutil.GetNextTime(0) + global.NextCronTime = now + global.UpdateTime = now + key := conf.Store.RedisPrefix + "_g_" + global.Gid + _, err := redisGet().Set(ctx, key, dtmimp.MustMarshalString(global), time.Duration(conf.Store.DataExpire)*time.Second).Result() + return err +} + // TouchCronTime updates cronTime func (s *Store) TouchCronTime(global *storage.TransGlobalStore, nextCronInterval int64, nextCronTime *time.Time) { global.UpdateTime = dtmutil.GetNextTime(0) diff --git a/dtmsvr/storage/sql/sql.go b/dtmsvr/storage/sql/sql.go index 3a390cb..28f9c9c 100644 --- a/dtmsvr/storage/sql/sql.go +++ b/dtmsvr/storage/sql/sql.go @@ -204,6 +204,21 @@ func (s *Store) ResetCronTime(after time.Duration, limit int64) (succeedCount in return affected, affected == limit, err } +// ResetTransGlobalCronTime reset nextCronTime of one global trans. +func (s *Store) ResetTransGlobalCronTime(global *storage.TransGlobalStore) error { + now := getTimeStr(0) + where := map[string]string{ + dtmimp.DBTypeMysql: fmt.Sprintf(`gid = '%s'`, global.Gid), + }[conf.Store.Driver] + + sql := fmt.Sprintf(`UPDATE trans_global SET update_time='%s',next_cron_time='%s' WHERE %s`, + now, + now, + where) + _, err := dtmimp.DBExec(conf.Store.Driver, dbGet().ToSQLDB(), sql) + return err +} + // ScanKV lists KV pairs func (s *Store) ScanKV(cat string, position *string, limit int64) []storage.KVStore { kvs := []storage.KVStore{} diff --git a/dtmsvr/storage/store.go b/dtmsvr/storage/store.go index 3b9773f..32456c1 100644 --- a/dtmsvr/storage/store.go +++ b/dtmsvr/storage/store.go @@ -31,6 +31,7 @@ type Store interface { TouchCronTime(global *TransGlobalStore, nextCronInterval int64, nextCronTime *time.Time) LockOneGlobalTrans(expireIn time.Duration) *TransGlobalStore ResetCronTime(after time.Duration, limit int64) (succeedCount int64, hasRemaining bool, err error) + ResetTransGlobalCronTime(global *TransGlobalStore) error ScanKV(cat string, position *string, limit int64) []KVStore FindKV(cat, key string) []KVStore UpdateKV(kv *KVStore) error diff --git a/dtmsvr/trans_status.go b/dtmsvr/trans_status.go index 9e8b4e5..baa51dd 100644 --- a/dtmsvr/trans_status.go +++ b/dtmsvr/trans_status.go @@ -85,6 +85,15 @@ func (t *TransGlobal) changeStatus(status string, opts ...changeStatusOption) { t.Status = status } +func (t *TransGlobal) resetNextCronTime() error { + err := GetStore().ResetTransGlobalCronTime(&t.TransGlobalStore) + if err != nil { + return err + } + logger.Infof("ResetTransGlobalCronTime to now ok for %s", t.TransGlobalStore.String()) + return nil +} + func (t *TransGlobal) changeBranchStatus(b *TransBranch, status string, branchPos int) { now := time.Now() b.Status = status