Browse Source

Merge c087c00d0c into 18146ee53b

pull/576/merge
Yunjin Xu 2 months ago
committed by GitHub
parent
commit
99c4b980be
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 8
      admin/src/api/api_dtm.ts
  2. 1
      admin/src/components.d.ts
  3. 23
      admin/src/views/Dashboard/GlobalTransactions/DialogTransactionDetail.vue
  4. 6
      dtmsvr/api.go
  5. 6
      dtmsvr/api_http.go
  6. 18
      dtmsvr/storage/boltdb/boltdb.go
  7. 9
      dtmsvr/storage/redis/redis.go
  8. 19
      dtmsvr/storage/sql/sql.go
  9. 1
      dtmsvr/storage/store.go
  10. 9
      dtmsvr/trans_status.go
  11. 28
      test/api_test.go

8
admin/src/api/api_dtm.ts

@ -100,6 +100,14 @@ export function resetNextCronTime(gid: string): Promise<AxiosResponse> {
})
}
export function setNextCronTime(gid: string, time: Date): Promise<AxiosResponse> {
return request({
url: '/api/dtmsvr/setNextCronTime',
method: 'post',
data: { gid, next_cron_time: time}
})
}
export function getDtmVersion(): Promise<AxiosResponse<any>> {
return request({
url: '/api/dtmsvr/version',

1
admin/src/components.d.ts

@ -11,6 +11,7 @@ declare module 'vue' {
ABreadcrumb: typeof import('ant-design-vue/es')['Breadcrumb']
ABreadcrumbItem: typeof import('ant-design-vue/es')['BreadcrumbItem']
AButton: typeof import('ant-design-vue/es')['Button']
ADatePicker: typeof import('ant-design-vue/es')['DatePicker']
ADescriptions: typeof import('ant-design-vue/es')['Descriptions']
ADescriptionsItem: typeof import('ant-design-vue/es')['DescriptionsItem']
ADivider: typeof import('ant-design-vue/es')['Divider']

23
admin/src/views/Dashboard/GlobalTransactions/DialogTransactionDetail.vue

@ -26,6 +26,21 @@
class="action-button"
@confirm="handleSetNextCronTimeToNow(<string>transaction?.gid)" >
<a-button type="default">Reset next cron time</a-button>
</a-popconfirm>
<a-date-picker
v-model:value="nextCronTimeInput"
format="YYYY-MM-DD HH:mm:ss"
placeholder="Set NextCronTime"
show-time
/>
<a-popconfirm
title="Set next cron time?"
ok-text="Yes, reset"
cancel-text="No"
class="action-button"
:disabled="!nextCronTimeInput"
@confirm="handleSetNextCronTime(<string>transaction?.gid)" >
<a-button type="default" :disabled="!nextCronTimeInput">Set next cron time</a-button>
</a-popconfirm>
<a-descriptions bordered size="small" :column="{ xxl: 4, xl: 3, lg: 3, md: 3, sm: 2, xs: 1 }">
<a-descriptions-item label="Status">
@ -61,7 +76,7 @@ import { getTransaction } from '/@/api/api_dtm'
import screenfull from '/@/components/Screenfull/index.vue'
import { useRoute } from 'vue-router';
import { string } from 'vue-types';
import { forceStopTransaction, resetNextCronTime } from '/@/api/api_dtm'
import { forceStopTransaction, resetNextCronTime, setNextCronTime } from '/@/api/api_dtm'
// import VueJsonPretty from 'vue-json-pretty';
// import 'vue-json-pretty/lib/styles.css'
const route = useRoute();
@ -72,6 +87,7 @@ const transaction = ref<Transaction>()
const visible = ref(false)
const textVal = ref('')
const closeable = ref(true)
const nextCronTimeInput = ref()
let _gid = <string>route.params.gid;
@ -137,6 +153,11 @@ const handleSetNextCronTimeToNow = async(gid: string) => {
refresh();
}
const handleSetNextCronTime = async(gid: string) => {
await setNextCronTime(gid, nextCronTimeInput.value);
refresh();
}
type Data = {
branches: {
gid: string

6
dtmsvr/api.go

@ -93,6 +93,12 @@ func svcResetNextCronTime(t *TransGlobal) error {
return dbt.resetNextCronTime()
}
func svcSetNextCronTime(t *TransGlobal, dt *time.Time) error {
dbt := GetTransGlobal(t.Gid)
dbt.NextCronTime = dt
return dbt.setNextCronTime()
}
func svcRegisterBranch(transType string, branch *TransBranch, data map[string]string) error {
branches := []TransBranch{*branch, *branch}
if transType == "tcc" {

6
dtmsvr/api_http.go

@ -41,6 +41,7 @@ func addRoute(engine *gin.Engine) {
engine.GET("/api/dtmsvr/scanKV", dtmutil.WrapHandler2(scanKV))
engine.GET("/api/dtmsvr/queryKV", dtmutil.WrapHandler2(queryKV))
engine.POST("/api/dtmsvr/resetNextCronTime", dtmutil.WrapHandler2(resetNextCronTime)) // one global trans only
engine.POST("/api/dtmsvr/setNextCronTime", dtmutil.WrapHandler2(setNextCronTime)) // one global trans only
// add prometheus exporter
h := promhttp.Handler()
@ -74,6 +75,11 @@ func resetNextCronTime(c *gin.Context) interface{} {
return svcResetNextCronTime(TransFromContext(c))
}
func setNextCronTime(c *gin.Context) interface{} {
trans := TransFromContext(c)
return svcSetNextCronTime(trans, trans.NextCronTime)
}
func registerBranch(c *gin.Context) interface{} {
data := map[string]string{}
err := c.BindJSON(&data)

18
dtmsvr/storage/boltdb/boltdb.go

@ -501,6 +501,24 @@ func (s *Store) ResetTransGlobalCronTime(g *storage.TransGlobalStore) error {
return err
}
// SetTransGlobalCronTime set nextCronTime of one global trans.
func (s *Store) SetTransGlobalCronTime(g *storage.TransGlobalStore) error {
err := s.boltDb.Update(func(t *bolt.Tx) error {
dt := g.NextCronTime
g := tGetGlobal(t, g.Gid)
if g == nil {
return storage.ErrNotFound
}
now := dtmutil.GetNextTime(0)
g.NextCronTime = dt
g.UpdateTime = now
tPutGlobal(t, g)
return nil
})
dtmimp.E2P(err)
return err
}
// ScanKV lists KV pairs
func (s *Store) ScanKV(cat string, position *string, limit int64) []storage.KVStore {
kvs := []storage.KVStore{}

9
dtmsvr/storage/redis/redis.go

@ -344,6 +344,15 @@ func (s *Store) ResetTransGlobalCronTime(global *storage.TransGlobalStore) error
return err
}
// SetTransGlobalCronTime set nextCronTime of one global trans.
func (s *Store) SetTransGlobalCronTime(global *storage.TransGlobalStore) error {
now := dtmutil.GetNextTime(0)
global.UpdateTime = now
key := conf.Store.RedisPrefix + "_g_" + global.Gid
_, err := redisGet().Set(ctx, key, dtmimp.MustMarshalString(global), time.Duration(conf.Store.DataExpire)*time.Second).Result()
return err
}
// TouchCronTime updates cronTime
func (s *Store) TouchCronTime(global *storage.TransGlobalStore, nextCronInterval int64, nextCronTime *time.Time) {
global.UpdateTime = dtmutil.GetNextTime(0)

19
dtmsvr/storage/sql/sql.go

@ -227,6 +227,18 @@ func (s *Store) ResetTransGlobalCronTime(global *storage.TransGlobalStore) error
return err
}
// ResetTransGlobalCronTime set nextCronTime of one global trans.
func (s *Store) SetTransGlobalCronTime(global *storage.TransGlobalStore) error {
timeStr := getTimeSqlStr(global.NextCronTime)
now := getTimeStr(0)
sql := fmt.Sprintf(`UPDATE trans_global SET update_time='%s',next_cron_time='%s' WHERE gid = '%s'`,
now,
timeStr,
global.Gid)
_, err := dtmimp.DBExec(conf.Store.Driver, dbGet().ToSQLDB(), sql)
return err
}
// ScanKV lists KV pairs
func (s *Store) ScanKV(cat string, position *string, limit int64) []storage.KVStore {
kvs := []storage.KVStore{}
@ -328,3 +340,10 @@ func getTimeStr(afterSecond int64) string {
}
return dtmutil.GetNextTime(afterSecond).Format("2006-01-02 15:04:05")
}
func getTimeSqlStr(dt *time.Time) string {
if conf.Store.Driver == config.SQLServer {
return dt.Format(time.RFC3339)
}
return dt.Format("2006-01-02 15:04:05")
}

1
dtmsvr/storage/store.go

@ -32,6 +32,7 @@ type Store interface {
LockOneGlobalTrans(expireIn time.Duration) *TransGlobalStore
ResetCronTime(after time.Duration, limit int64) (succeedCount int64, hasRemaining bool, err error)
ResetTransGlobalCronTime(global *TransGlobalStore) error
SetTransGlobalCronTime(global *TransGlobalStore) error
ScanKV(cat string, position *string, limit int64) []KVStore
FindKV(cat, key string) []KVStore
UpdateKV(kv *KVStore) error

9
dtmsvr/trans_status.go

@ -95,6 +95,15 @@ func (t *TransGlobal) resetNextCronTime() error {
return nil
}
func (t *TransGlobal) setNextCronTime() error {
err := GetStore().SetTransGlobalCronTime(&t.TransGlobalStore)
if err != nil {
return err
}
logger.Infof("SetTransGlobalCronTime to %s for %s", t.NextCronTime, t.TransGlobalStore.String())
return nil
}
func (t *TransGlobal) changeBranchStatus(b *TransBranch, status string, branchPos int) {
now := time.Now()
b.Status = status

28
test/api_test.go

@ -275,3 +275,31 @@ func TestAPIResetNextCronTime(t *testing.T) {
assert.Greater(t, time.Now().Add(3*time.Second), *g2.NextCronTime)
assert.NotEqual(t, g.NextCronTime, g2.NextCronTime)
}
func TestAPISetNextCronTime(t *testing.T) {
saga := genSaga(dtmimp.GetFuncName(), false, false)
saga.Submit()
waitTransProcessed(saga.Gid)
assert.Equal(t, []string{StatusPrepared, StatusSucceed, StatusPrepared, StatusSucceed}, getBranchesStatus(saga.Gid))
assert.Equal(t, StatusSucceed, getTransStatus(saga.Gid))
gid := saga.Gid
s := registry.GetStore()
g := s.FindTransGlobalStore(saga.Gid)
nextCronTime := time.Now().Add(30 * time.Second).UTC()
// set
resp, err := dtmcli.GetRestyClient().R().SetBody(map[string]string{
"gid": saga.Gid,
"next_cron_time": nextCronTime.Format(time.RFC3339),
}).Post(dtmutil.DefaultHTTPServer + "/setNextCronTime")
assert.Nil(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode())
// after set assert
g2 := s.FindTransGlobalStore(gid)
assert.NotNil(t, g2)
assert.Equal(t, gid, g2.Gid)
assert.Equal(t, nextCronTime.Truncate(time.Second).UTC(), g2.NextCronTime.Truncate(time.Second).UTC())
assert.NotEqual(t, g.NextCronTime, g2.NextCronTime)
}

Loading…
Cancel
Save