🔥A cross-language distributed transaction manager. Support xa, tcc, saga, transactional messages. 跨语言分布式事务管理器
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

189 lines
6.2 KiB

/*
* Copyright (c) 2021 yedf. All rights reserved.
* Use of this source code is governed by a BSD-style
* license that can be found in the LICENSE file.
*/
package dtmsvr
import (
"errors"
"strconv"
"time"
"github.com/dtm-labs/dtm/client/dtmcli"
"github.com/dtm-labs/dtm/client/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmsvr/storage"
"github.com/dtm-labs/dtm/dtmutil"
"github.com/gin-gonic/gin"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
func addRoute(engine *gin.Engine) {
engine.GET("/api/dtmsvr/version", dtmutil.WrapHandler2(func(c *gin.Context) interface{} {
return gin.H{"version": Version}
}))
engine.GET("/api/dtmsvr/newGid", dtmutil.WrapHandler2(newGid))
engine.POST("/api/dtmsvr/prepare", dtmutil.WrapHandler2(prepare))
engine.POST("/api/dtmsvr/submit", dtmutil.WrapHandler2(submit))
engine.POST("/api/dtmsvr/abort", dtmutil.WrapHandler2(abort))
engine.POST("/api/dtmsvr/forceStop", dtmutil.WrapHandler2(forceStop)) // change global status to failed can stop trigger (Use with caution in production environment)
engine.POST("/api/dtmsvr/registerBranch", dtmutil.WrapHandler2(registerBranch))
engine.POST("/api/dtmsvr/registerXaBranch", dtmutil.WrapHandler2(registerBranch)) // compatible for old sdk
engine.POST("/api/dtmsvr/registerTccBranch", dtmutil.WrapHandler2(registerBranch)) // compatible for old sdk
engine.POST("/api/dtmsvr/prepareWorkflow", dtmutil.WrapHandler2(prepareWorkflow))
engine.GET("/api/dtmsvr/query", dtmutil.WrapHandler2(query))
engine.GET("/api/dtmsvr/all", dtmutil.WrapHandler2(all))
engine.GET("/api/dtmsvr/resetCronTime", dtmutil.WrapHandler2(resetCronTime))
engine.GET("/api/dtmsvr/subscribe", dtmutil.WrapHandler2(subscribe))
engine.GET("/api/dtmsvr/unsubscribe", dtmutil.WrapHandler2(unsubscribe))
engine.DELETE("/api/dtmsvr/topic/:topicName", dtmutil.WrapHandler2(deleteTopic))
engine.GET("/api/dtmsvr/scanKV", dtmutil.WrapHandler2(scanKV))
engine.GET("/api/dtmsvr/queryKV", dtmutil.WrapHandler2(queryKV))
engine.POST("/api/dtmsvr/resetNextCronTime", dtmutil.WrapHandler2(resetNextCronTime)) // one global trans only
// add prometheus exporter
h := promhttp.Handler()
engine.GET("/api/metrics", func(c *gin.Context) {
h.ServeHTTP(c.Writer, c.Request)
})
}
// NOTE: unique in storage, can customize the generation rules instead of using server-side generation, it will help with the tracking
func newGid(c *gin.Context) interface{} {
return map[string]interface{}{"gid": GenGid(), "dtm_result": dtmcli.ResultSuccess}
}
func prepare(c *gin.Context) interface{} {
return svcPrepare(TransFromContext(c))
}
func submit(c *gin.Context) interface{} {
return svcSubmit(TransFromContext(c))
}
func abort(c *gin.Context) interface{} {
return svcAbort(TransFromContext(c))
}
func forceStop(c *gin.Context) interface{} {
return svcForceStop(TransFromContext(c))
}
func resetNextCronTime(c *gin.Context) interface{} {
return svcResetNextCronTime(TransFromContext(c))
}
func registerBranch(c *gin.Context) interface{} {
data := map[string]string{}
err := c.BindJSON(&data)
e2p(err)
branch := TransBranch{
Gid: dtmimp.Escape(data["gid"]),
BranchID: data["branch_id"],
Status: dtmcli.StatusPrepared,
BinData: []byte(data["data"]),
}
return svcRegisterBranch(data["trans_type"], &branch, data)
}
func query(c *gin.Context) interface{} {
gid := c.Query("gid")
if gid == "" {
return errors.New("no gid specified")
}
trans := GetStore().FindTransGlobalStore(gid)
branches := GetStore().FindBranches(gid)
return map[string]interface{}{"transaction": trans, "branches": branches}
}
func prepareWorkflow(c *gin.Context) interface{} {
trans, branches, err := svcPrepareWorkflow(TransFromContext(c))
if err != nil {
return err
}
return map[string]interface{}{"transaction": trans, "progresses": branches}
}
func all(c *gin.Context) interface{} {
gid := c.Query("gid")
position := c.Query("position")
sLimit := dtmimp.OrString(c.Query("limit"), "100")
var globals interface{}
if len(gid) > 0 {
find := GetStore().FindTransGlobalStore(gid)
if find != nil {
globals = []interface{}{*find}
}
} else {
condition := storage.TransGlobalScanCondition{
Status: c.Query("status"),
TransType: c.Query("transType"),
CreateTimeStart: stringTotime(c.Query("createTimeStart")),
CreateTimeEnd: stringTotime(c.Query("createTimeEnd")),
}
globals = GetStore().ScanTransGlobalStores(&position, int64(dtmimp.MustAtoi(sLimit)), condition)
}
return map[string]interface{}{"transactions": globals, "next_position": position}
}
func stringTotime(timeStr string) time.Time {
if timeStr == "" {
return time.Time{}
}
return time.Unix(int64(dtmimp.MustAtoi(timeStr))/1000, 0)
}
// unfinished transactions need to be retried as soon as possible after business downtime is recovered
func resetCronTime(c *gin.Context) interface{} {
sTimeoutSecond := dtmimp.OrString(c.Query("timeout"), strconv.FormatInt(3*conf.TimeoutToFail, 10))
sLimit := dtmimp.OrString(c.Query("limit"), "100")
timeout := time.Duration(dtmimp.MustAtoi(sTimeoutSecond)) * time.Second
succeedCount, hasRemaining, err := GetStore().ResetCronTime(timeout, int64(dtmimp.MustAtoi(sLimit)))
if err != nil {
return err
}
return map[string]interface{}{"has_remaining": hasRemaining, "succeed_count": succeedCount}
}
func scanKV(c *gin.Context) interface{} {
cat := c.DefaultQuery("cat", "")
position := c.Query("position")
sLimit := dtmimp.OrString(c.Query("limit"), "100")
kv := GetStore().ScanKV(cat, &position, int64(dtmimp.MustAtoi(sLimit)))
return map[string]interface{}{"kv": kv, "next_position": position}
}
func queryKV(c *gin.Context) interface{} {
cat := c.DefaultQuery("cat", "")
key := c.DefaultQuery("key", "")
kv := GetStore().FindKV(cat, key)
return map[string]interface{}{"kv": kv}
}
func subscribe(c *gin.Context) interface{} {
topic := c.Query("topic")
url := c.Query("url")
remark := c.Query("remark")
return Subscribe(topic, url, remark)
}
func unsubscribe(c *gin.Context) interface{} {
topic := c.Query("topic")
url := c.Query("url")
return Unsubscribe(topic, url)
}
func deleteTopic(c *gin.Context) interface{} {
topic := c.Param("topicName")
if topic == "" {
return errors.New("empty topic")
}
return GetStore().DeleteKV(topicsCat, topic)
}