diff --git a/README.md b/README.md
index 4c14c6f..58f9f28 100644
--- a/README.md
+++ b/README.md
@@ -11,7 +11,7 @@ English | [简体中文](https://github.com/dtm-labs/dtm/blob/main/helper/README
## What is DTM
-DTM is a distributed transaction solution which provides cross-service eventually data consistency. It provides saga, tcc, xa, 2-phase message strategies for a variety of application scenarios. It also supports multiple languages and multiple store engine to form up a transaction as following:
+DTM is a distributed transaction framework which provides cross-service eventually data consistency. It provides saga, tcc, xa, 2-phase message strategies for a variety of application scenarios. It also supports multiple languages and multiple store engine to form up a transaction as following:
@@ -26,49 +26,26 @@ DTM is a distributed transaction solution which provides cross-service eventuall
## Features
-* Extremely easy to adapt
- - Support HTTP and gRPC, provide easy-to-use programming interfaces, lower substantially the barrier of getting started with distributed transactions. Newcomers can adapt quickly.
-
-* Easy to use
- - Relieving developers from worrying about suspension, null compensation, idempotent transaction, and other tricky problems, the framework layer handles them all.
-
* Language-agnostic
- Suit for companies with multiple-language stacks.
Easy to write bindings for Go, Python, PHP, Node.js, Ruby, and other languages.
-* Easy to deploy, easy to extend
- - DTM depends only on MySQL, easy to deploy, cluster, and scale horizontally.
-
-* Support for multiple distributed transaction protocol
- - TCC, SAGA, XA, Transactional messages.
+* Support for multiple distributed transaction solutions
+ - TCC, SAGA, XA, 2-phases message.
-## DTM vs. others
-
-There is no mature open-source distributed transaction framework for non-Java languages.
-Mature open-source distributed transaction frameworks for Java language include Ali's Seata, Huawei's ServiceComb-Pack, Jingdong's shardingsphere, himly, tcc-transaction, ByteTCC, and so on, of which Seata is most widely used.
-
-The following is a comparison of the main features of dtm and Seata.
+* Extremely easy to adapt
+ - Support HTTP and gRPC, provide easy-to-use programming interfaces, lower substantially the barrier of getting started with distributed transactions. Newcomers can adapt quickly.
+* Easy to use
+ - Relieving developers from worrying about suspension, null compensation, idempotent transaction, and other tricky problems, the framework layer handles them all.
-| Features | DTM | Seata | Remarks |
-| :-----: | :----: | :----: | :----: |
-| Supported languages | Golang, Python, PHP, and others | Java | dtm allows easy access from a new language |
-| Exception handling | [Sub-transaction barrier](https://zhuanlan.zhihu.com/p/388444465) | manual | dtm solves idempotent transaction, hanging, null compensation |
-| TCC | ✓ | ✓ | |
-| XA | ✓ | ✓ | |
-| AT | suggest XA | ✓ | AT is similar to XA with better performance but with dirty rollback |
-| SAGA | support concurrency | complicated state-machine mode | dtm's state-machine mode is being planned |
-| Transactional Messaging | ✓ | ✗ | dtm provides Transactional Messaging similar to RocketMQ |
-| Multiple DBs in a service |✓|✗||
-| Communication protocols | HTTP, gRPC | Dubbo, no HTTP | |
-| Star count |
|
| dtm 0.1 is released from 20210604 and under fast development |
+* Easy to deploy, easy to extend
+ - DTM depends only on MySQL/Redis, easy to deploy, cluster, and scale horizontally.
-From the features' comparison above, if your language stack includes languages other than Java, then dtm is the one for you.
-If your language stack is Java, you can also choose to access dtm and use sub-transaction barrier technology to simplify your business development.
## [Cook Book](https://en.dtm.pub)
-# Quick start
+## Quick start
### run dtm
@@ -77,14 +54,17 @@ git clone https://github.com/dtm-labs/dtm && cd dtm
go run main.go
```
-### Start the example
+### Start an example
+Suppose we want to perform an inter-bank transfer. The operations of transfer out (TransOut) and transfer in (TransIn) are coded in separate micro-services.
+
+Here is an example to illustrate a solution of dtm to this problem:
``` bash
git clone https://github.com/dtm-labs/dtmcli-go-sample && cd dtmcli-go-sample
go run main.go
```
-# Code
+## Code
### Use
``` go
@@ -102,29 +82,39 @@ go run main.go
// submit the created saga transaction,dtm ensures all subtractions either complete or get revoked
err := saga.Submit()
```
-### Complete example
-Refer to [dtm-examples](https://github.com/dtm-labs/dtm-examples).
+When the above code runs, we can see in the console that services TransOut, TransIn has been called.
-### Slack
+#### Timing diagram
+A timing diagram for a successfully completed SAGA transaction would be as follows:
-You can join the [DTM slack channel here](https://join.slack.com/t/dtm-w6k9662/shared_invite/zt-vkrph4k1-eFqEFnMkbmlXqfUo5GWHWw).
+
-### Wechat
+#### Rollback upon failure
+If any forward operation fails, DTM invokes the corresponding compensating operation of each sub-transaction to roll back, after which the transaction is successfully rolled back.
-Add wechat friend with id yedf2008, or scan the OR code. Fill in dtm as verification.
+Let's purposely fail the forward operation of the second sub-transaction and watch what happens
-
+``` go
+app.POST(qsBusiAPI+"/TransIn", func(c *gin.Context) {
+ log.Printf("TransIn")
+ // c.JSON(200, "")
+ c.JSON(409, "") // Status 409 for Failure. Won't be retried
+})
+```
-### Give a star! ⭐
+The timing diagram for the intended failure is as follows:
-If you think this project is good, or helpful to you, please give a star!
+
+
+## More examples
-### Who is using
-
+Refer to [dtm-examples](https://github.com/dtm-labs/dtm-examples).
+
+## Slack
+
+You can join the [DTM slack channel here](https://join.slack.com/t/dtm-w6k9662/shared_invite/zt-vkrph4k1-eFqEFnMkbmlXqfUo5GWHWw).
+
+## Give a star! ⭐
+
+If you think this project is good, or helpful to you, please give a star!
diff --git a/charts/templates/deployment.yaml b/charts/templates/deployment.yaml
index de2c04c..805b62c 100644
--- a/charts/templates/deployment.yaml
+++ b/charts/templates/deployment.yaml
@@ -67,4 +67,4 @@ spec:
volumes:
- name: config
configMap:
- name: dtm-conf
+ name: {{ include "dtm.fullname" . }}-conf
diff --git a/conf.sample.yml b/conf.sample.yml
index a4e869c..1fd557e 100644
--- a/conf.sample.yml
+++ b/conf.sample.yml
@@ -56,6 +56,7 @@
# HttpPort: 36789
# GrpcPort: 36790
+# JSONRPC: 36791
### advanced options
# UpdateBranchAsyncGoroutineNum: 1 # num of async goroutine to update branch status
diff --git a/dtmcli/msg.go b/dtmcli/msg.go
index 920f26f..0bc6dc6 100644
--- a/dtmcli/msg.go
+++ b/dtmcli/msg.go
@@ -16,6 +16,7 @@ import (
// Msg reliable msg type
type Msg struct {
dtmimp.TransBase
+ delay uint64 // delay call branch, unit second
}
// NewMsg create new msg
@@ -30,6 +31,12 @@ func (s *Msg) Add(action string, postData interface{}) *Msg {
return s
}
+// SetDelay delay call branch, unit second
+func (s *Msg) SetDelay(delay uint64) *Msg {
+ s.delay = delay
+ return s
+}
+
// Prepare prepare the msg, msg will later be submitted
func (s *Msg) Prepare(queryPrepared string) error {
s.QueryPrepared = dtmimp.OrString(queryPrepared, s.QueryPrepared)
@@ -38,6 +45,7 @@ func (s *Msg) Prepare(queryPrepared string) error {
// Submit submit the msg
func (s *Msg) Submit() error {
+ s.BuildCustomOptions()
return dtmimp.TransCallDtm(&s.TransBase, s, "submit")
}
@@ -74,3 +82,10 @@ func (s *Msg) DoAndSubmit(queryPrepared string, busiCall func(bb *BranchBarrier)
}
return err
}
+
+// BuildCustomOptions add custom options to the request context
+func (s *Msg) BuildCustomOptions() {
+ if s.delay > 0 {
+ s.CustomData = dtmimp.MustMarshalString(map[string]interface{}{"delay": s.delay})
+ }
+}
diff --git a/dtmcli/saga.go b/dtmcli/saga.go
index 87cf08f..5fcd331 100644
--- a/dtmcli/saga.go
+++ b/dtmcli/saga.go
@@ -43,12 +43,12 @@ func (s *Saga) EnableConcurrent() *Saga {
// Submit submit the saga trans
func (s *Saga) Submit() error {
- s.AddConcurrentContext()
+ s.BuildCustomOptions()
return dtmimp.TransCallDtm(&s.TransBase, s, "submit")
}
-// AddConcurrentContext adds concurrent options to the request context
-func (s *Saga) AddConcurrentContext() {
+// BuildCustomOptions add custom options to the request context
+func (s *Saga) BuildCustomOptions() {
if s.concurrent {
s.CustomData = dtmimp.MustMarshalString(map[string]interface{}{"orders": s.orders, "concurrent": s.concurrent})
}
diff --git a/dtmgrpc/msg.go b/dtmgrpc/msg.go
index 5805c9a..be5b304 100644
--- a/dtmgrpc/msg.go
+++ b/dtmgrpc/msg.go
@@ -33,6 +33,12 @@ func (s *MsgGrpc) Add(action string, msg proto.Message) *MsgGrpc {
return s
}
+// SetDelay delay call branch, unit second
+func (s *MsgGrpc) SetDelay(delay uint64) *MsgGrpc {
+ s.Msg.SetDelay(delay)
+ return s
+}
+
// Prepare prepare the msg, msg will later be submitted
func (s *MsgGrpc) Prepare(queryPrepared string) error {
s.QueryPrepared = dtmimp.OrString(queryPrepared, s.QueryPrepared)
@@ -41,6 +47,7 @@ func (s *MsgGrpc) Prepare(queryPrepared string) error {
// Submit submit the msg
func (s *MsgGrpc) Submit() error {
+ s.Msg.BuildCustomOptions()
return dtmgimp.DtmGrpcCall(&s.TransBase, "Submit")
}
diff --git a/dtmgrpc/saga.go b/dtmgrpc/saga.go
index 9fca5d9..4d206fe 100644
--- a/dtmgrpc/saga.go
+++ b/dtmgrpc/saga.go
@@ -43,6 +43,6 @@ func (s *SagaGrpc) EnableConcurrent() *SagaGrpc {
// Submit submit the saga trans
func (s *SagaGrpc) Submit() error {
- s.Saga.AddConcurrentContext()
+ s.Saga.BuildCustomOptions()
return dtmgimp.DtmGrpcCall(&s.Saga.TransBase, "Submit")
}
diff --git a/dtmsvr/api_json_rpc_http.go b/dtmsvr/api_json_rpc_http.go
new file mode 100644
index 0000000..edfc426
--- /dev/null
+++ b/dtmsvr/api_json_rpc_http.go
@@ -0,0 +1,110 @@
+package dtmsvr
+
+import (
+ "encoding/json"
+ "fmt"
+ "net/http"
+
+ "github.com/dtm-labs/dtm/dtmcli"
+ "github.com/dtm-labs/dtm/dtmcli/logger"
+ "github.com/gin-gonic/gin"
+)
+
+type jsonRPCReq struct {
+ Method string `json:"method"`
+ Jsonrpc string `json:"jsonrpc"`
+ Params interface{} `json:"params"`
+ ID string `json:"id"`
+}
+
+func addJSONRPCRouter(engine *gin.Engine) {
+ engine.POST("/", dispatcher)
+}
+
+func dispatcher(c *gin.Context) {
+ req := new(jsonRPCReq)
+ err := c.BindJSON(req)
+ logger.Infof("request:%s\n", req)
+ if err != nil {
+ c.JSON(http.StatusOK, gin.H{"id": req.ID, "result": nil, "error": map[string]interface{}{"code": -32700, "message": "Parse error"}})
+ return
+ }
+ if req.Method == "dtmserver.NewGid" {
+ res := jsonRPCNewGid()
+ c.JSON(http.StatusOK, gin.H{"id": req.ID, "result": res, "error": err})
+ return
+ }
+
+ if req.Method == "dtmserver.Prepare" {
+ res := jsonRPCPrepare(req.Params)
+ c.JSON(http.StatusOK, gin.H{"id": req.ID, "result": res, "error": nil})
+ return
+ }
+
+ if req.Method == "dtmserver.Submit" {
+ res := jsonRPCSubmit(req.Params)
+ c.JSON(http.StatusOK, gin.H{"id": req.ID, "result": res, "error": nil})
+ return
+ }
+
+ if req.Method == "dtmserver.Abort" {
+ res := jsonRPCAbort(req.Params)
+ c.JSON(http.StatusOK, gin.H{"id": req.ID, "result": res, "error": nil})
+ return
+ }
+
+ if req.Method == "dtmserver.RegisterBranch" {
+ res := jsonRPCRegisterBranch(req.Params)
+ c.JSON(http.StatusOK, gin.H{"id": req.ID, "result": res, "error": nil})
+ return
+ }
+ c.JSON(http.StatusOK, gin.H{"id": req.ID, "result": nil, "error": map[string]interface{}{"code": -32601, "message": "Method not found"}})
+}
+
+func jsonRPCNewGid() interface{} {
+ return map[string]interface{}{"gid": GenGid(), "dtm_result": dtmcli.ResultSuccess}
+}
+
+func jsonRPCPrepare(params interface{}) interface{} {
+ res := svcPrepare(TransFromJSONRPCContext(params))
+ if res == nil {
+ return map[string]string{"dtm_result": "SUCCESS"}
+ }
+ return map[string]string{"dtm_result": "FAILURE", "message": fmt.Sprintf("%v", res)}
+}
+
+func jsonRPCSubmit(params interface{}) interface{} {
+ res := svcSubmit(TransFromJSONRPCContext(params))
+ if res == nil {
+ return map[string]string{"dtm_result": "SUCCESS"}
+ }
+ return map[string]string{"dtm_result": "FAILURE", "message": fmt.Sprintf("%v", res)}
+}
+
+func jsonRPCAbort(params interface{}) interface{} {
+ res := svcAbort(TransFromJSONRPCContext(params))
+ if res == nil {
+ return map[string]string{"dtm_result": "SUCCESS"}
+ }
+ return map[string]string{"dtm_result": "FAILURE", "message": fmt.Sprintf("%v", res)}
+}
+
+func jsonRPCRegisterBranch(params interface{}) interface{} {
+ data := map[string]string{}
+ paramsJSON, _ := json.Marshal(params)
+ err := json.Unmarshal(paramsJSON, &data)
+ if err != nil {
+ return map[string]string{"dtm_result": "FAILURE", "message": err.Error()}
+ }
+ branch := TransBranch{
+ Gid: data["gid"],
+ BranchID: data["branch_id"],
+ Status: dtmcli.StatusPrepared,
+ BinData: []byte(data["data"]),
+ }
+ res := svcRegisterBranch(data["trans_type"], &branch, data)
+ if res == nil {
+ return map[string]string{"dtm_result": "SUCCESS"}
+ }
+ return map[string]string{"dtm_result": "FAILURE", "message": res.Error()}
+}
diff --git a/dtmsvr/config/config.go b/dtmsvr/config/config.go
index 06a8fda..7f69588 100644
--- a/dtmsvr/config/config.go
+++ b/dtmsvr/config/config.go
@@ -76,6 +76,7 @@ type configType struct {
RequestTimeout int64 `yaml:"RequestTimeout" default:"3"`
HTTPPort int64 `yaml:"HttpPort" default:"36789"`
GrpcPort int64 `yaml:"GrpcPort" default:"36790"`
+ JSONRPCPort int64 `yaml:"JSONRPCPort" default:"36791"`
MicroService MicroService `yaml:"MicroService"`
UpdateBranchSync int64 `yaml:"UpdateBranchSync"`
UpdateBranchAsyncGoroutineNum int64 `yaml:"UpdateBranchAsyncGoroutineNum" default:"1"`
diff --git a/dtmsvr/storage/boltdb/boltdb.go b/dtmsvr/storage/boltdb/boltdb.go
index 9202d23..278e898 100644
--- a/dtmsvr/storage/boltdb/boltdb.go
+++ b/dtmsvr/storage/boltdb/boltdb.go
@@ -364,10 +364,10 @@ func (s *Store) ChangeGlobalStatus(global *storage.TransGlobalStore, newStatus s
}
// TouchCronTime updates cronTime
-func (s *Store) TouchCronTime(global *storage.TransGlobalStore, nextCronInterval int64) {
+func (s *Store) TouchCronTime(global *storage.TransGlobalStore, nextCronInterval int64, nextCronTime *time.Time) {
oldUnix := global.NextCronTime.Unix()
- global.NextCronTime = dtmutil.GetNextTime(nextCronInterval)
global.UpdateTime = dtmutil.GetNextTime(0)
+ global.NextCronTime = nextCronTime
global.NextCronInterval = nextCronInterval
err := s.boltDb.Update(func(t *bolt.Tx) error {
g := tGetGlobal(t, global.Gid)
diff --git a/dtmsvr/storage/redis/redis.go b/dtmsvr/storage/redis/redis.go
index e0903a5..0f7e965 100644
--- a/dtmsvr/storage/redis/redis.go
+++ b/dtmsvr/storage/redis/redis.go
@@ -261,9 +261,9 @@ return gid
}
// TouchCronTime updates cronTime
-func (s *Store) TouchCronTime(global *storage.TransGlobalStore, nextCronInterval int64) {
- global.NextCronTime = dtmutil.GetNextTime(nextCronInterval)
+func (s *Store) TouchCronTime(global *storage.TransGlobalStore, nextCronInterval int64, nextCronTime *time.Time) {
global.UpdateTime = dtmutil.GetNextTime(0)
+ global.NextCronTime = nextCronTime
global.NextCronInterval = nextCronInterval
args := newArgList().
AppendGid(global.Gid).
diff --git a/dtmsvr/storage/registry/registry.go b/dtmsvr/storage/registry/registry.go
index 003fd69..e699bd3 100644
--- a/dtmsvr/storage/registry/registry.go
+++ b/dtmsvr/storage/registry/registry.go
@@ -18,6 +18,12 @@ type StorageFactory interface {
GetStorage() storage.Store
}
+var sqlFac = &SingletonFactory{
+ creatorFunction: func() storage.Store {
+ return &sql.Store{}
+ },
+}
+
var storeFactorys = map[string]StorageFactory{
"boltdb": &SingletonFactory{
creatorFunction: func() storage.Store {
@@ -29,16 +35,8 @@ var storeFactorys = map[string]StorageFactory{
return &redis.Store{}
},
},
- "mysql": &SingletonFactory{
- creatorFunction: func() storage.Store {
- return &sql.Store{}
- },
- },
- "postgres": &SingletonFactory{
- creatorFunction: func() storage.Store {
- return &sql.Store{}
- },
- },
+ "mysql": sqlFac,
+ "postgres": sqlFac,
}
// GetStore returns storage.Store
diff --git a/dtmsvr/storage/sql/sql.go b/dtmsvr/storage/sql/sql.go
index 3c807af..6c90822 100644
--- a/dtmsvr/storage/sql/sql.go
+++ b/dtmsvr/storage/sql/sql.go
@@ -121,9 +121,9 @@ func (s *Store) ChangeGlobalStatus(global *storage.TransGlobalStore, newStatus s
}
// TouchCronTime updates cronTime
-func (s *Store) TouchCronTime(global *storage.TransGlobalStore, nextCronInterval int64) {
- global.NextCronTime = dtmutil.GetNextTime(nextCronInterval)
+func (s *Store) TouchCronTime(global *storage.TransGlobalStore, nextCronInterval int64, nextCronTime *time.Time) {
global.UpdateTime = dtmutil.GetNextTime(0)
+ global.NextCronTime = nextCronTime
global.NextCronInterval = nextCronInterval
dbGet().Must().Model(global).Where("status=? and gid=?", global.Status, global.Gid).
Select([]string{"next_cron_time", "update_time", "next_cron_interval"}).Updates(global)
diff --git a/dtmsvr/storage/store.go b/dtmsvr/storage/store.go
index eca4d38..a03a912 100644
--- a/dtmsvr/storage/store.go
+++ b/dtmsvr/storage/store.go
@@ -28,6 +28,6 @@ type Store interface {
LockGlobalSaveBranches(gid string, status string, branches []TransBranchStore, branchStart int)
MaySaveNewTrans(global *TransGlobalStore, branches []TransBranchStore) error
ChangeGlobalStatus(global *TransGlobalStore, newStatus string, updates []string, finished bool)
- TouchCronTime(global *TransGlobalStore, nextCronInterval int64)
+ TouchCronTime(global *TransGlobalStore, nextCronInterval int64, nextCronTime *time.Time)
LockOneGlobalTrans(expireIn time.Duration) *TransGlobalStore
}
diff --git a/dtmsvr/svr.go b/dtmsvr/svr.go
index 1816087..62b06d6 100644
--- a/dtmsvr/svr.go
+++ b/dtmsvr/svr.go
@@ -66,6 +66,18 @@ func StartSvr() {
logger.FatalIfError(err)
err = dtmdriver.GetDriver().RegisterGrpcService(conf.MicroService.Target, conf.MicroService.EndPoint)
logger.FatalIfError(err)
+
+ // start json-rpc server
+ jsonRPCApp := dtmutil.GetGinApp()
+ jsonRPCApp = httpMetrics(jsonRPCApp)
+ addJSONRPCRouter(jsonRPCApp)
+ logger.Infof("dtmsvr listen at: %d", conf.JSONRPCPort)
+ go func() {
+ err := jsonRPCApp.Run(fmt.Sprintf(":%d", conf.JSONRPCPort))
+ if err != nil {
+ logger.Errorf("start server err: %v", err)
+ }
+ }()
}
// PopulateDB setup mysql data
diff --git a/dtmsvr/trans_class.go b/dtmsvr/trans_class.go
index 8aa1b60..6849d2f 100644
--- a/dtmsvr/trans_class.go
+++ b/dtmsvr/trans_class.go
@@ -8,6 +8,7 @@ package dtmsvr
import (
"context"
+ "encoding/json"
"time"
"github.com/dtm-labs/dtm/dtmcli"
@@ -84,6 +85,17 @@ func TransFromContext(c *gin.Context) *TransGlobal {
return &m
}
+// TransFromJSONRPCContext 1
+func TransFromJSONRPCContext(params interface{}) *TransGlobal {
+ jsonStr, _ := json.Marshal(params)
+ m := TransGlobal{}
+ err := json.Unmarshal(jsonStr, &m)
+ if err != nil {
+ return nil
+ }
+ return &m
+}
+
// TransFromDtmRequest TransFromContext
func TransFromDtmRequest(ctx context.Context, c *dtmgpb.DtmRequest) *TransGlobal {
o := &dtmgpb.DtmTransOptions{}
diff --git a/dtmsvr/trans_status.go b/dtmsvr/trans_status.go
index 430df7c..01c2ab0 100644
--- a/dtmsvr/trans_status.go
+++ b/dtmsvr/trans_status.go
@@ -20,13 +20,26 @@ import (
"github.com/dtm-labs/dtm/dtmcli/logger"
"github.com/dtm-labs/dtm/dtmgrpc"
"github.com/dtm-labs/dtm/dtmgrpc/dtmgimp"
+ "github.com/dtm-labs/dtm/dtmutil"
"github.com/dtm-labs/dtmdriver"
"google.golang.org/grpc/metadata"
)
-func (t *TransGlobal) touchCronTime(ctype cronType) {
+// touchCronTime Based on ctype or delay set nextCronTime
+// delay = 0 ,use ctype set nextCronTime and nextCronInterval
+// delay > 0 ,use delay set nextCronTime ,use ctype set nextCronInterval
+func (t *TransGlobal) touchCronTime(ctype cronType, delay uint64) {
t.lastTouched = time.Now()
- GetStore().TouchCronTime(&t.TransGlobalStore, t.getNextCronInterval(ctype))
+ nextCronInterval := t.getNextCronInterval(ctype)
+
+ var nextCronTime *time.Time
+ if delay > 0 {
+ nextCronTime = dtmutil.GetNextTime(int64(delay))
+ } else {
+ nextCronTime = dtmutil.GetNextTime(nextCronInterval)
+ }
+
+ GetStore().TouchCronTime(&t.TransGlobalStore, nextCronInterval, nextCronTime)
logger.Infof("TouchCronTime for: %s", t.TransGlobalStore.String())
}
@@ -71,6 +84,10 @@ func (t *TransGlobal) isTimeout() bool {
return time.Since(*t.CreateTime)+NowForwardDuration >= time.Duration(timeout)*time.Second
}
+func (t *TransGlobal) needDelay(delay uint64) bool {
+ return time.Since(*t.CreateTime)+CronForwardDuration < time.Duration(delay)*time.Second
+}
+
func (t *TransGlobal) needProcess() bool {
return t.Status == dtmcli.StatusSubmitted || t.Status == dtmcli.StatusAborting || t.Status == dtmcli.StatusPrepared && t.isTimeout()
}
@@ -146,11 +163,11 @@ func (t *TransGlobal) execBranch(branch *TransBranch, branchPos int) error {
// if time pass 1500ms and NextCronInterval is not default, then reset NextCronInterval
if err == nil && time.Since(t.lastTouched)+NowForwardDuration >= 1500*time.Millisecond ||
t.NextCronInterval > conf.RetryInterval && t.NextCronInterval > t.RetryInterval {
- t.touchCronTime(cronReset)
+ t.touchCronTime(cronReset, 0)
} else if err == dtmimp.ErrOngoing {
- t.touchCronTime(cronKeep)
+ t.touchCronTime(cronKeep, 0)
} else if err != nil {
- t.touchCronTime(cronBackoff)
+ t.touchCronTime(cronBackoff, 0)
}
return err
}
diff --git a/dtmsvr/trans_type_msg.go b/dtmsvr/trans_type_msg.go
index 5fe3eb2..6cb4679 100644
--- a/dtmsvr/trans_type_msg.go
+++ b/dtmsvr/trans_type_msg.go
@@ -11,6 +11,7 @@ import (
"fmt"
"github.com/dtm-labs/dtm/dtmcli"
+ "github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmcli/logger"
)
@@ -38,6 +39,10 @@ func (t *transMsgProcessor) GenBranches() []TransBranch {
return branches
}
+type cMsgCustom struct {
+ Delay uint64 //delay call branch, unit second
+}
+
func (t *TransGlobal) mayQueryPrepared() {
if !t.needProcess() || t.Status == dtmcli.StatusSubmitted {
return
@@ -48,10 +53,10 @@ func (t *TransGlobal) mayQueryPrepared() {
} else if errors.Is(err, dtmcli.ErrFailure) {
t.changeStatus(dtmcli.StatusFailed)
} else if errors.Is(err, dtmcli.ErrOngoing) {
- t.touchCronTime(cronReset)
+ t.touchCronTime(cronReset, 0)
} else {
logger.Errorf("getting result failed for %s. error: %v", t.QueryPrepared, err)
- t.touchCronTime(cronBackoff)
+ t.touchCronTime(cronBackoff, 0)
}
}
@@ -60,6 +65,17 @@ func (t *transMsgProcessor) ProcessOnce(branches []TransBranch) error {
if !t.needProcess() || t.Status == dtmcli.StatusPrepared {
return nil
}
+
+ cmc := cMsgCustom{Delay: 0}
+ if t.CustomData != "" {
+ dtmimp.MustUnmarshalString(t.CustomData, &cmc)
+ }
+
+ if cmc.Delay > 0 && t.needDelay(cmc.Delay) {
+ t.touchCronTime(cronKeep, cmc.Delay)
+ return nil
+ }
+
current := 0 // 当前正在处理的步骤
for ; current < len(branches); current++ {
branch := &branches[current]
diff --git a/helper/README-en.md b/helper/README-en.md
index 4c14c6f..58f9f28 100644
--- a/helper/README-en.md
+++ b/helper/README-en.md
@@ -11,7 +11,7 @@ English | [简体中文](https://github.com/dtm-labs/dtm/blob/main/helper/README
## What is DTM
-DTM is a distributed transaction solution which provides cross-service eventually data consistency. It provides saga, tcc, xa, 2-phase message strategies for a variety of application scenarios. It also supports multiple languages and multiple store engine to form up a transaction as following:
+DTM is a distributed transaction framework which provides cross-service eventually data consistency. It provides saga, tcc, xa, 2-phase message strategies for a variety of application scenarios. It also supports multiple languages and multiple store engine to form up a transaction as following:
@@ -26,49 +26,26 @@ DTM is a distributed transaction solution which provides cross-service eventuall
## Features
-* Extremely easy to adapt
- - Support HTTP and gRPC, provide easy-to-use programming interfaces, lower substantially the barrier of getting started with distributed transactions. Newcomers can adapt quickly.
-
-* Easy to use
- - Relieving developers from worrying about suspension, null compensation, idempotent transaction, and other tricky problems, the framework layer handles them all.
-
* Language-agnostic
- Suit for companies with multiple-language stacks.
Easy to write bindings for Go, Python, PHP, Node.js, Ruby, and other languages.
-* Easy to deploy, easy to extend
- - DTM depends only on MySQL, easy to deploy, cluster, and scale horizontally.
-
-* Support for multiple distributed transaction protocol
- - TCC, SAGA, XA, Transactional messages.
+* Support for multiple distributed transaction solutions
+ - TCC, SAGA, XA, 2-phases message.
-## DTM vs. others
-
-There is no mature open-source distributed transaction framework for non-Java languages.
-Mature open-source distributed transaction frameworks for Java language include Ali's Seata, Huawei's ServiceComb-Pack, Jingdong's shardingsphere, himly, tcc-transaction, ByteTCC, and so on, of which Seata is most widely used.
-
-The following is a comparison of the main features of dtm and Seata.
+* Extremely easy to adapt
+ - Support HTTP and gRPC, provide easy-to-use programming interfaces, lower substantially the barrier of getting started with distributed transactions. Newcomers can adapt quickly.
+* Easy to use
+ - Relieving developers from worrying about suspension, null compensation, idempotent transaction, and other tricky problems, the framework layer handles them all.
-| Features | DTM | Seata | Remarks |
-| :-----: | :----: | :----: | :----: |
-| Supported languages | Golang, Python, PHP, and others | Java | dtm allows easy access from a new language |
-| Exception handling | [Sub-transaction barrier](https://zhuanlan.zhihu.com/p/388444465) | manual | dtm solves idempotent transaction, hanging, null compensation |
-| TCC | ✓ | ✓ | |
-| XA | ✓ | ✓ | |
-| AT | suggest XA | ✓ | AT is similar to XA with better performance but with dirty rollback |
-| SAGA | support concurrency | complicated state-machine mode | dtm's state-machine mode is being planned |
-| Transactional Messaging | ✓ | ✗ | dtm provides Transactional Messaging similar to RocketMQ |
-| Multiple DBs in a service |✓|✗||
-| Communication protocols | HTTP, gRPC | Dubbo, no HTTP | |
-| Star count |
|
| dtm 0.1 is released from 20210604 and under fast development |
+* Easy to deploy, easy to extend
+ - DTM depends only on MySQL/Redis, easy to deploy, cluster, and scale horizontally.
-From the features' comparison above, if your language stack includes languages other than Java, then dtm is the one for you.
-If your language stack is Java, you can also choose to access dtm and use sub-transaction barrier technology to simplify your business development.
## [Cook Book](https://en.dtm.pub)
-# Quick start
+## Quick start
### run dtm
@@ -77,14 +54,17 @@ git clone https://github.com/dtm-labs/dtm && cd dtm
go run main.go
```
-### Start the example
+### Start an example
+Suppose we want to perform an inter-bank transfer. The operations of transfer out (TransOut) and transfer in (TransIn) are coded in separate micro-services.
+
+Here is an example to illustrate a solution of dtm to this problem:
``` bash
git clone https://github.com/dtm-labs/dtmcli-go-sample && cd dtmcli-go-sample
go run main.go
```
-# Code
+## Code
### Use
``` go
@@ -102,29 +82,39 @@ go run main.go
// submit the created saga transaction,dtm ensures all subtractions either complete or get revoked
err := saga.Submit()
```
-### Complete example
-Refer to [dtm-examples](https://github.com/dtm-labs/dtm-examples).
+When the above code runs, we can see in the console that services TransOut, TransIn has been called.
-### Slack
+#### Timing diagram
+A timing diagram for a successfully completed SAGA transaction would be as follows:
-You can join the [DTM slack channel here](https://join.slack.com/t/dtm-w6k9662/shared_invite/zt-vkrph4k1-eFqEFnMkbmlXqfUo5GWHWw).
+
-### Wechat
+#### Rollback upon failure
+If any forward operation fails, DTM invokes the corresponding compensating operation of each sub-transaction to roll back, after which the transaction is successfully rolled back.
-Add wechat friend with id yedf2008, or scan the OR code. Fill in dtm as verification.
+Let's purposely fail the forward operation of the second sub-transaction and watch what happens
-
+``` go
+app.POST(qsBusiAPI+"/TransIn", func(c *gin.Context) {
+ log.Printf("TransIn")
+ // c.JSON(200, "")
+ c.JSON(409, "") // Status 409 for Failure. Won't be retried
+})
+```
-### Give a star! ⭐
+The timing diagram for the intended failure is as follows:
-If you think this project is good, or helpful to you, please give a star!
+
+
+## More examples
-### Who is using
-
+Refer to [dtm-examples](https://github.com/dtm-labs/dtm-examples).
+
+## Slack
+
+You can join the [DTM slack channel here](https://join.slack.com/t/dtm-w6k9662/shared_invite/zt-vkrph4k1-eFqEFnMkbmlXqfUo5GWHWw).
+
+## Give a star! ⭐
+
+If you think this project is good, or helpful to you, please give a star!
diff --git a/test/base_test.go b/test/base_test.go
index b33bfd8..31ef17d 100644
--- a/test/base_test.go
+++ b/test/base_test.go
@@ -36,8 +36,9 @@ func TestBaseSqlDB(t *testing.T) {
Gid: "gid2",
BranchID: "branch_id2",
Op: dtmcli.BranchAction,
+ BarrierID: 1,
}
- db.Must().Exec("insert into dtm_barrier.barrier(trans_type, gid, branch_id, op, reason) values('saga', 'gid1', 'branch_id1', 'action', 'saga')")
+ db.Must().Exec("insert into dtm_barrier.barrier(trans_type, gid, branch_id, op, barrier_id, reason) values('saga', 'gid1', 'branch_id1', 'action', '01', 'saga')")
tx, err := db.ToSQLDB().Begin()
asserts.Nil(err)
err = barrier.Call(tx, func(tx *sql.Tx) error {
diff --git a/test/msg_delay_test.go b/test/msg_delay_test.go
new file mode 100644
index 0000000..9b9d030
--- /dev/null
+++ b/test/msg_delay_test.go
@@ -0,0 +1,39 @@
+package test
+
+import (
+ "testing"
+
+ "github.com/dtm-labs/dtm/dtmcli"
+ "github.com/dtm-labs/dtm/dtmcli/dtmimp"
+ "github.com/dtm-labs/dtm/dtmsvr"
+ "github.com/dtm-labs/dtm/dtmutil"
+ "github.com/dtm-labs/dtm/test/busi"
+ "github.com/stretchr/testify/assert"
+)
+
+func genMsgDelay(gid string) *dtmcli.Msg {
+ req := busi.GenTransReq(30, false, false)
+ msg := dtmcli.NewMsg(dtmutil.DefaultHTTPServer, gid).
+ Add(busi.Busi+"/TransOut", &req).
+ Add(busi.Busi+"/TransIn", &req).SetDelay(10)
+ msg.QueryPrepared = busi.Busi + "/QueryPrepared"
+ return msg
+}
+
+func TestMsgDelayNormal(t *testing.T) {
+ gid := dtmimp.GetFuncName()
+ msg := genMsgDelay(gid)
+ submitForwardCron(0, func() {
+ msg.Submit()
+ waitTransProcessed(msg.Gid)
+ })
+
+ dtmsvr.NowForwardDuration = 0
+ assert.Equal(t, []string{StatusPrepared, StatusPrepared}, getBranchesStatus(msg.Gid))
+ assert.Equal(t, StatusSubmitted, getTransStatus(msg.Gid))
+ cronTransOnceForwardCron(t, "", 0)
+ cronTransOnceForwardCron(t, "", 8)
+ cronTransOnceForwardCron(t, gid, 12)
+ assert.Equal(t, []string{StatusSucceed, StatusSucceed}, getBranchesStatus(msg.Gid))
+ assert.Equal(t, StatusSucceed, getTransStatus(msg.Gid))
+}
diff --git a/test/store_test.go b/test/store_test.go
index b711c76..46869e0 100644
--- a/test/store_test.go
+++ b/test/store_test.go
@@ -4,11 +4,11 @@ import (
"testing"
"time"
- "github.com/stretchr/testify/assert"
-
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmsvr/storage"
"github.com/dtm-labs/dtm/dtmsvr/storage/registry"
+ "github.com/dtm-labs/dtm/dtmutil"
+ "github.com/stretchr/testify/assert"
)
func initTransGlobal(gid string) (*storage.TransGlobalStore, storage.Store) {
@@ -74,11 +74,11 @@ func TestStoreLockTrans(t *testing.T) {
assert.NotNil(t, g2)
assert.Equal(t, gid, g2.Gid)
- s.TouchCronTime(g, 3*conf.RetryInterval)
+ s.TouchCronTime(g, 3*conf.RetryInterval, dtmutil.GetNextTime(3*conf.RetryInterval))
g2 = s.LockOneGlobalTrans(2 * time.Duration(conf.RetryInterval) * time.Second)
assert.Nil(t, g2)
- s.TouchCronTime(g, 1*conf.RetryInterval)
+ s.TouchCronTime(g, 1*conf.RetryInterval, dtmutil.GetNextTime(1*conf.RetryInterval))
g2 = s.LockOneGlobalTrans(2 * time.Duration(conf.RetryInterval) * time.Second)
assert.NotNil(t, g2)
assert.Equal(t, gid, g2.Gid)
diff --git a/test/types.go b/test/types.go
index 2d1d545..0f3ed43 100644
--- a/test/types.go
+++ b/test/types.go
@@ -68,6 +68,13 @@ func cronTransOnceForwardCron(t *testing.T, gid string, seconds int) {
dtmsvr.CronForwardDuration = old
}
+func submitForwardCron(seconds int, fn func()) {
+ old := dtmsvr.CronForwardDuration
+ dtmsvr.CronForwardDuration = time.Duration(seconds) * time.Second
+ fn()
+ dtmsvr.CronForwardDuration = old
+}
+
const (
// StatusPrepared status for global/branch trans status.
StatusPrepared = dtmcli.StatusPrepared