diff --git a/admin/src/api/api_dtm.ts b/admin/src/api/api_dtm.ts index 0608fdd..349cc3b 100644 --- a/admin/src/api/api_dtm.ts +++ b/admin/src/api/api_dtm.ts @@ -92,6 +92,14 @@ export function getTransaction(payload: { }) } +export function resetNextCronTime(gid: string): Promise { + return request({ + url: '/api/dtmsvr/resetNextCronTime', + method: 'post', + data: { gid } + }) +} + export function getDtmVersion(): Promise> { return request({ url: '/api/dtmsvr/version', diff --git a/admin/src/components.d.ts b/admin/src/components.d.ts index df4993b..a034feb 100644 --- a/admin/src/components.d.ts +++ b/admin/src/components.d.ts @@ -1,6 +1,6 @@ // generated by unplugin-vue-components // We suggest you to commit this file into source control -// Read more: https://github.com/vuejs/core/pull/3399 +// Read more: https://github.com/vuejs/vue-next/pull/3399 import '@vue/runtime-core' declare module '@vue/runtime-core' { diff --git a/admin/src/views/Dashboard/GlobalTransactions/DialogTransactionDetail.vue b/admin/src/views/Dashboard/GlobalTransactions/DialogTransactionDetail.vue index 4816ced..e27f763 100644 --- a/admin/src/views/Dashboard/GlobalTransactions/DialogTransactionDetail.vue +++ b/admin/src/views/Dashboard/GlobalTransactions/DialogTransactionDetail.vue @@ -17,7 +17,16 @@ > ForceStop - + + + + Reset next cron time + {{ transaction?.status }} @@ -31,6 +40,7 @@ {{ transaction?.update_time }} {{ transaction?.next_cron_interval }} {{ transaction?.next_cron_time }} + {{ transaction?.rollback_reason }}

Branches

@@ -51,7 +61,7 @@ import { getTransaction } from '/@/api/api_dtm' import screenfull from '/@/components/Screenfull/index.vue' import { useRoute } from 'vue-router'; import { string } from 'vue-types'; -import { forceStopTransaction} from '/@/api/api_dtm' +import { forceStopTransaction, resetNextCronTime } from '/@/api/api_dtm' // import VueJsonPretty from 'vue-json-pretty'; // import 'vue-json-pretty/lib/styles.css' const route = useRoute(); @@ -121,6 +131,12 @@ const handleTransactionStop = async(gid: string) => { refresh(); } + +const handleSetNextCronTimeToNow = async(gid: string) => { + await resetNextCronTime(gid); + refresh(); +} + type Data = { branches: { gid: string @@ -144,6 +160,7 @@ type Data = { next_cron_interval: number next_cron_time: string concurrent: boolean + rollback_reason: string } } @@ -160,6 +177,7 @@ interface Transaction { next_cron_interval: number next_cron_time: string concurrent: boolean + rollback_reason: string } interface Branches { diff --git a/dtmsvr/api.go b/dtmsvr/api.go index 25c24c9..61510f9 100644 --- a/dtmsvr/api.go +++ b/dtmsvr/api.go @@ -86,6 +86,11 @@ func svcForceStop(t *TransGlobal) interface{} { return nil } +func svcResetNextCronTime(t *TransGlobal) error { + dbt := GetTransGlobal(t.Gid) + return dbt.resetNextCronTime() +} + func svcRegisterBranch(transType string, branch *TransBranch, data map[string]string) error { branches := []TransBranch{*branch, *branch} if transType == "tcc" { diff --git a/dtmsvr/api_http.go b/dtmsvr/api_http.go index 64a291d..3d8d09d 100644 --- a/dtmsvr/api_http.go +++ b/dtmsvr/api_http.go @@ -40,6 +40,7 @@ func addRoute(engine *gin.Engine) { engine.DELETE("/api/dtmsvr/topic/:topicName", dtmutil.WrapHandler2(deleteTopic)) engine.GET("/api/dtmsvr/scanKV", dtmutil.WrapHandler2(scanKV)) engine.GET("/api/dtmsvr/queryKV", dtmutil.WrapHandler2(queryKV)) + engine.POST("/api/dtmsvr/resetNextCronTime", dtmutil.WrapHandler2(resetNextCronTime)) // one global trans only // add prometheus exporter h := promhttp.Handler() @@ -69,6 +70,10 @@ func forceStop(c *gin.Context) interface{} { return svcForceStop(TransFromContext(c)) } +func resetNextCronTime(c *gin.Context) interface{} { + return svcResetNextCronTime(TransFromContext(c)) +} + func registerBranch(c *gin.Context) interface{} { data := map[string]string{} err := c.BindJSON(&data) diff --git a/dtmsvr/storage/boltdb/boltdb.go b/dtmsvr/storage/boltdb/boltdb.go index e940b01..631d73d 100644 --- a/dtmsvr/storage/boltdb/boltdb.go +++ b/dtmsvr/storage/boltdb/boltdb.go @@ -482,6 +482,24 @@ func (s *Store) ResetCronTime(after time.Duration, limit int64) (succeedCount in return } +// ResetTransGlobalCronTime reset nextCronTime of one global trans. +func (s *Store) ResetTransGlobalCronTime(g *storage.TransGlobalStore) error { + old := g.UpdateTime + err := s.boltDb.Update(func(t *bolt.Tx) error { + g := tGetGlobal(t, g.Gid) + if g == nil || g.UpdateTime == old { + return storage.ErrNotFound + } + now := dtmutil.GetNextTime(0) + g.NextCronTime = now + g.UpdateTime = now + tPutGlobal(t, g) + return nil + }) + dtmimp.E2P(err) + return err +} + // ScanKV lists KV pairs func (s *Store) ScanKV(cat string, position *string, limit int64) []storage.KVStore { kvs := []storage.KVStore{} diff --git a/dtmsvr/storage/redis/redis.go b/dtmsvr/storage/redis/redis.go index 9b80d2d..090ab59 100644 --- a/dtmsvr/storage/redis/redis.go +++ b/dtmsvr/storage/redis/redis.go @@ -330,6 +330,16 @@ return tostring(i) return } +// ResetTransGlobalCronTime reset nextCronTime of one global trans. +func (s *Store) ResetTransGlobalCronTime(global *storage.TransGlobalStore) error { + now := dtmutil.GetNextTime(0) + global.NextCronTime = now + global.UpdateTime = now + key := conf.Store.RedisPrefix + "_g_" + global.Gid + _, err := redisGet().Set(ctx, key, dtmimp.MustMarshalString(global), time.Duration(conf.Store.DataExpire)*time.Second).Result() + return err +} + // TouchCronTime updates cronTime func (s *Store) TouchCronTime(global *storage.TransGlobalStore, nextCronInterval int64, nextCronTime *time.Time) { global.UpdateTime = dtmutil.GetNextTime(0) diff --git a/dtmsvr/storage/sql/sql.go b/dtmsvr/storage/sql/sql.go index 3a390cb..28f9c9c 100644 --- a/dtmsvr/storage/sql/sql.go +++ b/dtmsvr/storage/sql/sql.go @@ -204,6 +204,21 @@ func (s *Store) ResetCronTime(after time.Duration, limit int64) (succeedCount in return affected, affected == limit, err } +// ResetTransGlobalCronTime reset nextCronTime of one global trans. +func (s *Store) ResetTransGlobalCronTime(global *storage.TransGlobalStore) error { + now := getTimeStr(0) + where := map[string]string{ + dtmimp.DBTypeMysql: fmt.Sprintf(`gid = '%s'`, global.Gid), + }[conf.Store.Driver] + + sql := fmt.Sprintf(`UPDATE trans_global SET update_time='%s',next_cron_time='%s' WHERE %s`, + now, + now, + where) + _, err := dtmimp.DBExec(conf.Store.Driver, dbGet().ToSQLDB(), sql) + return err +} + // ScanKV lists KV pairs func (s *Store) ScanKV(cat string, position *string, limit int64) []storage.KVStore { kvs := []storage.KVStore{} diff --git a/dtmsvr/storage/store.go b/dtmsvr/storage/store.go index 3b9773f..32456c1 100644 --- a/dtmsvr/storage/store.go +++ b/dtmsvr/storage/store.go @@ -31,6 +31,7 @@ type Store interface { TouchCronTime(global *TransGlobalStore, nextCronInterval int64, nextCronTime *time.Time) LockOneGlobalTrans(expireIn time.Duration) *TransGlobalStore ResetCronTime(after time.Duration, limit int64) (succeedCount int64, hasRemaining bool, err error) + ResetTransGlobalCronTime(global *TransGlobalStore) error ScanKV(cat string, position *string, limit int64) []KVStore FindKV(cat, key string) []KVStore UpdateKV(kv *KVStore) error diff --git a/dtmsvr/trans_status.go b/dtmsvr/trans_status.go index 9e8b4e5..baa51dd 100644 --- a/dtmsvr/trans_status.go +++ b/dtmsvr/trans_status.go @@ -85,6 +85,15 @@ func (t *TransGlobal) changeStatus(status string, opts ...changeStatusOption) { t.Status = status } +func (t *TransGlobal) resetNextCronTime() error { + err := GetStore().ResetTransGlobalCronTime(&t.TransGlobalStore) + if err != nil { + return err + } + logger.Infof("ResetTransGlobalCronTime to now ok for %s", t.TransGlobalStore.String()) + return nil +} + func (t *TransGlobal) changeBranchStatus(b *TransBranch, status string, branchPos int) { now := time.Now() b.Status = status diff --git a/test/api_test.go b/test/api_test.go index 16702b7..9831754 100644 --- a/test/api_test.go +++ b/test/api_test.go @@ -248,3 +248,31 @@ func TestAPIForceStoppedAbnormal(t *testing.T) { assert.Nil(t, err) assert.Equal(t, resp.StatusCode(), http.StatusConflict) } + +// func TestAPIResetNextCronTime(t *testing.T) { +// saga := genSaga(dtmimp.GetFuncName(), false, false) +// saga.Submit() +// waitTransProcessed(saga.Gid) +// assert.Equal(t, []string{StatusPrepared, StatusSucceed, StatusPrepared, StatusSucceed}, getBranchesStatus(saga.Gid)) +// assert.Equal(t, StatusSucceed, getTransStatus(saga.Gid)) +// gid := saga.Gid + +// s := registry.GetStore() +// g := s.FindTransGlobalStore(saga.Gid) + +// // reset +// resp, err := dtmcli.GetRestyClient().R().SetBody(map[string]string{ +// "gid": saga.Gid, +// }).Post(dtmutil.DefaultHTTPServer + "/resetNextCronTime") +// assert.Nil(t, err) +// assert.Equal(t, resp.StatusCode(), http.StatusOK) + +// // after reset assert +// g2 := s.FindTransGlobalStore(gid) +// assert.NotNil(t, g2) +// assert.Equal(t, gid, g2.Gid) +// assert.Greater(t, time.Now().Add(3*time.Second), *g2.NextCronTime) +// assert.Equal(t, g2.UpdateTime, g2.NextCronTime) +// assert.NotEqual(t, g.UpdateTime, g2.UpdateTime) +// assert.NotEqual(t, g.NextCronTime, g2.NextCronTime) +// } diff --git a/test/store_test.go b/test/store_test.go index 68a6daa..8f0f93f 100644 --- a/test/store_test.go +++ b/test/store_test.go @@ -178,3 +178,23 @@ func TestUpdateBranches(t *testing.T) { assert.Nil(t, err) } } + +// func TestResetTransGlobalCronTime(t *testing.T) { +// gid := dtmimp.GetFuncName() +// g, _ := initTransGlobal(gid) + +// s := registry.GetStore() +// g2 := s.FindTransGlobalStore(gid) +// assert.NotNil(t, g2) +// assert.Equal(t, gid, g2.Gid) + +// s.ResetTransGlobalCronTime(g2) + +// g2 = s.FindTransGlobalStore(gid) +// assert.NotNil(t, g2) +// assert.Equal(t, gid, g2.Gid) +// assert.Greater(t, time.Now().Add(3*time.Second), *g2.NextCronTime) +// assert.Equal(t, g2.UpdateTime, g2.NextCronTime) +// assert.NotEqual(t, g.UpdateTime, g2.UpdateTime) +// assert.NotEqual(t, g.NextCronTime, g2.NextCronTime) +// }