Browse Source

Merge branch 'main' of github.com:Leizhengzi/dtm into main

pull/228/head
liulei 4 years ago
parent
commit
ec4f0d1c1d
  1. 94
      README.md
  2. 2
      charts/templates/deployment.yaml
  3. 1
      conf.sample.yml
  4. 15
      dtmcli/msg.go
  5. 6
      dtmcli/saga.go
  6. 7
      dtmgrpc/msg.go
  7. 2
      dtmgrpc/saga.go
  8. 110
      dtmsvr/api_json_rpc_http.go
  9. 1
      dtmsvr/config/config.go
  10. 4
      dtmsvr/storage/boltdb/boltdb.go
  11. 4
      dtmsvr/storage/redis/redis.go
  12. 18
      dtmsvr/storage/registry/registry.go
  13. 4
      dtmsvr/storage/sql/sql.go
  14. 2
      dtmsvr/storage/store.go
  15. 12
      dtmsvr/svr.go
  16. 12
      dtmsvr/trans_class.go
  17. 27
      dtmsvr/trans_status.go
  18. 20
      dtmsvr/trans_type_msg.go
  19. 94
      helper/README-en.md
  20. 3
      test/base_test.go
  21. 39
      test/msg_delay_test.go
  22. 8
      test/store_test.go
  23. 7
      test/types.go

94
README.md

@ -11,7 +11,7 @@ English | [简体中文](https://github.com/dtm-labs/dtm/blob/main/helper/README
## What is DTM
DTM is a distributed transaction solution which provides cross-service eventually data consistency. It provides saga, tcc, xa, 2-phase message strategies for a variety of application scenarios. It also supports multiple languages and multiple store engine to form up a transaction as following:
DTM is a distributed transaction framework which provides cross-service eventually data consistency. It provides saga, tcc, xa, 2-phase message strategies for a variety of application scenarios. It also supports multiple languages and multiple store engine to form up a transaction as following:
<img alt="function-picture" src="https://en.dtm.pub/assets/function.7d5618f8.png" height=250 />
@ -26,49 +26,26 @@ DTM is a distributed transaction solution which provides cross-service eventuall
## Features
* Extremely easy to adapt
- Support HTTP and gRPC, provide easy-to-use programming interfaces, lower substantially the barrier of getting started with distributed transactions. Newcomers can adapt quickly.
* Easy to use
- Relieving developers from worrying about suspension, null compensation, idempotent transaction, and other tricky problems, the framework layer handles them all.
* Language-agnostic
- Suit for companies with multiple-language stacks.
Easy to write bindings for Go, Python, PHP, Node.js, Ruby, and other languages.
* Easy to deploy, easy to extend
- DTM depends only on MySQL, easy to deploy, cluster, and scale horizontally.
* Support for multiple distributed transaction protocol
- TCC, SAGA, XA, Transactional messages.
* Support for multiple distributed transaction solutions
- TCC, SAGA, XA, 2-phases message.
## DTM vs. others
There is no mature open-source distributed transaction framework for non-Java languages.
Mature open-source distributed transaction frameworks for Java language include Ali's Seata, Huawei's ServiceComb-Pack, Jingdong's shardingsphere, himly, tcc-transaction, ByteTCC, and so on, of which Seata is most widely used.
The following is a comparison of the main features of dtm and Seata.
* Extremely easy to adapt
- Support HTTP and gRPC, provide easy-to-use programming interfaces, lower substantially the barrier of getting started with distributed transactions. Newcomers can adapt quickly.
* Easy to use
- Relieving developers from worrying about suspension, null compensation, idempotent transaction, and other tricky problems, the framework layer handles them all.
| Features | DTM | Seata | Remarks |
| :-----: | :----: | :----: | :----: |
| Supported languages | <span style="color:green">Golang, Python, PHP, and others</span> | <span style="color:orange">Java</span> | dtm allows easy access from a new language |
| Exception handling | [Sub-transaction barrier](https://zhuanlan.zhihu.com/p/388444465) | <span style="color:orange">manual</span> | dtm solves idempotent transaction, hanging, null compensation |
| TCC | <span style="color:green"></span> | <span style="color:green"></span> | |
| XA | <span style="color:green"></span> | <span style="color:green"></span> | |
| AT | <span style="color:orange">suggest XA</span> | <span style="color:green"></span> | AT is similar to XA with better performance but with dirty rollback |
| SAGA | <span style="color:green">support concurrency</span> | <span style="color:green">complicated state-machine mode</span> | dtm's state-machine mode is being planned |
| Transactional Messaging | <span style="color:green"></span> | <span style="color:red"></span> | dtm provides Transactional Messaging similar to RocketMQ |
| Multiple DBs in a service |<span style="color:green"></span>|<span style="color:red"></span>||
| Communication protocols | <span style="color:green">HTTP, gRPC</span> | <span style="color:green">Dubbo, no HTTP</span> | |
| Star count | <img src="https://img.shields.io/github/stars/dtm-labs/dtm.svg?style=social" alt="github stars"/> | <img src="https://img.shields.io/github/stars/seata/seata.svg?style=social" alt="github stars"/> | dtm 0.1 is released from 20210604 and under fast development |
* Easy to deploy, easy to extend
- DTM depends only on MySQL/Redis, easy to deploy, cluster, and scale horizontally.
From the features' comparison above, if your language stack includes languages other than Java, then dtm is the one for you.
If your language stack is Java, you can also choose to access dtm and use sub-transaction barrier technology to simplify your business development.
## [Cook Book](https://en.dtm.pub)
# Quick start
## Quick start
### run dtm
@ -77,14 +54,17 @@ git clone https://github.com/dtm-labs/dtm && cd dtm
go run main.go
```
### Start the example
### Start an example
Suppose we want to perform an inter-bank transfer. The operations of transfer out (TransOut) and transfer in (TransIn) are coded in separate micro-services.
Here is an example to illustrate a solution of dtm to this problem:
``` bash
git clone https://github.com/dtm-labs/dtmcli-go-sample && cd dtmcli-go-sample
go run main.go
```
# Code
## Code
### Use
``` go
@ -102,29 +82,39 @@ go run main.go
// submit the created saga transaction,dtm ensures all subtractions either complete or get revoked
err := saga.Submit()
```
### Complete example
Refer to [dtm-examples](https://github.com/dtm-labs/dtm-examples).
When the above code runs, we can see in the console that services TransOut, TransIn has been called.
### Slack
#### Timing diagram
A timing diagram for a successfully completed SAGA transaction would be as follows:
You can join the [DTM slack channel here](https://join.slack.com/t/dtm-w6k9662/shared_invite/zt-vkrph4k1-eFqEFnMkbmlXqfUo5GWHWw).
<img alt="saga-success" src="https://en.dtm.pub/assets/saga_normal.59a75c01.jpg" height=450/>
### Wechat
#### Rollback upon failure
If any forward operation fails, DTM invokes the corresponding compensating operation of each sub-transaction to roll back, after which the transaction is successfully rolled back.
Add wechat friend with id yedf2008, or scan the OR code. Fill in dtm as verification.
Let's purposely fail the forward operation of the second sub-transaction and watch what happens
![yedf2008](http://service.ivydad.com/cover/dubbingb6b5e2c0-2d2a-cd59-f7c5-c6b90aceb6f1.jpeg)
``` go
app.POST(qsBusiAPI+"/TransIn", func(c *gin.Context) {
log.Printf("TransIn")
// c.JSON(200, "")
c.JSON(409, "") // Status 409 for Failure. Won't be retried
})
```
### Give a star! ⭐
The timing diagram for the intended failure is as follows:
If you think this project is good, or helpful to you, please give a star!
<img alt="saga-failed" src="https://en.dtm.pub/assets/saga_rollback.7989c866.jpg" height=550>
## More examples
### Who is using
<div style='vertical-align: middle'>
<img alt='Tencent' height='80' src='https://dtm.pub/assets/tencent.4b87bfd8.jpeg' /img>
<img alt='Ivydad' height='80' src='https://www.ivydad.com/_nuxt/img/header-logo.5b3eb96.png'>
<img alt='Eglass' height='80' src='https://img.epeijing.cn/official-website/assets/logo.png'>
<img alt='Jiou' height='80' src='http://www.siqitech.com.cn/img/logo.3f6c2914.png'>
<img alt='GoldenData' height='80' src='https://pic1.zhimg.com/80/v2-dc1d0cef5f7b72be345fc34d768e69e3_1440w.png'>
</div>
Refer to [dtm-examples](https://github.com/dtm-labs/dtm-examples).
## Slack
You can join the [DTM slack channel here](https://join.slack.com/t/dtm-w6k9662/shared_invite/zt-vkrph4k1-eFqEFnMkbmlXqfUo5GWHWw).
## Give a star! ⭐
If you think this project is good, or helpful to you, please give a star!

2
charts/templates/deployment.yaml

@ -67,4 +67,4 @@ spec:
volumes:
- name: config
configMap:
name: dtm-conf
name: {{ include "dtm.fullname" . }}-conf

1
conf.sample.yml

@ -56,6 +56,7 @@
# HttpPort: 36789
# GrpcPort: 36790
# JSONRPC: 36791
### advanced options
# UpdateBranchAsyncGoroutineNum: 1 # num of async goroutine to update branch status

15
dtmcli/msg.go

@ -16,6 +16,7 @@ import (
// Msg reliable msg type
type Msg struct {
dtmimp.TransBase
delay uint64 // delay call branch, unit second
}
// NewMsg create new msg
@ -30,6 +31,12 @@ func (s *Msg) Add(action string, postData interface{}) *Msg {
return s
}
// SetDelay delay call branch, unit second
func (s *Msg) SetDelay(delay uint64) *Msg {
s.delay = delay
return s
}
// Prepare prepare the msg, msg will later be submitted
func (s *Msg) Prepare(queryPrepared string) error {
s.QueryPrepared = dtmimp.OrString(queryPrepared, s.QueryPrepared)
@ -38,6 +45,7 @@ func (s *Msg) Prepare(queryPrepared string) error {
// Submit submit the msg
func (s *Msg) Submit() error {
s.BuildCustomOptions()
return dtmimp.TransCallDtm(&s.TransBase, s, "submit")
}
@ -74,3 +82,10 @@ func (s *Msg) DoAndSubmit(queryPrepared string, busiCall func(bb *BranchBarrier)
}
return err
}
// BuildCustomOptions add custom options to the request context
func (s *Msg) BuildCustomOptions() {
if s.delay > 0 {
s.CustomData = dtmimp.MustMarshalString(map[string]interface{}{"delay": s.delay})
}
}

6
dtmcli/saga.go

@ -43,12 +43,12 @@ func (s *Saga) EnableConcurrent() *Saga {
// Submit submit the saga trans
func (s *Saga) Submit() error {
s.AddConcurrentContext()
s.BuildCustomOptions()
return dtmimp.TransCallDtm(&s.TransBase, s, "submit")
}
// AddConcurrentContext adds concurrent options to the request context
func (s *Saga) AddConcurrentContext() {
// BuildCustomOptions add custom options to the request context
func (s *Saga) BuildCustomOptions() {
if s.concurrent {
s.CustomData = dtmimp.MustMarshalString(map[string]interface{}{"orders": s.orders, "concurrent": s.concurrent})
}

7
dtmgrpc/msg.go

@ -33,6 +33,12 @@ func (s *MsgGrpc) Add(action string, msg proto.Message) *MsgGrpc {
return s
}
// SetDelay delay call branch, unit second
func (s *MsgGrpc) SetDelay(delay uint64) *MsgGrpc {
s.Msg.SetDelay(delay)
return s
}
// Prepare prepare the msg, msg will later be submitted
func (s *MsgGrpc) Prepare(queryPrepared string) error {
s.QueryPrepared = dtmimp.OrString(queryPrepared, s.QueryPrepared)
@ -41,6 +47,7 @@ func (s *MsgGrpc) Prepare(queryPrepared string) error {
// Submit submit the msg
func (s *MsgGrpc) Submit() error {
s.Msg.BuildCustomOptions()
return dtmgimp.DtmGrpcCall(&s.TransBase, "Submit")
}

2
dtmgrpc/saga.go

@ -43,6 +43,6 @@ func (s *SagaGrpc) EnableConcurrent() *SagaGrpc {
// Submit submit the saga trans
func (s *SagaGrpc) Submit() error {
s.Saga.AddConcurrentContext()
s.Saga.BuildCustomOptions()
return dtmgimp.DtmGrpcCall(&s.Saga.TransBase, "Submit")
}

110
dtmsvr/api_json_rpc_http.go

@ -0,0 +1,110 @@
package dtmsvr
import (
"encoding/json"
"fmt"
"net/http"
"github.com/dtm-labs/dtm/dtmcli"
"github.com/dtm-labs/dtm/dtmcli/logger"
"github.com/gin-gonic/gin"
)
type jsonRPCReq struct {
Method string `json:"method"`
Jsonrpc string `json:"jsonrpc"`
Params interface{} `json:"params"`
ID string `json:"id"`
}
func addJSONRPCRouter(engine *gin.Engine) {
engine.POST("/", dispatcher)
}
func dispatcher(c *gin.Context) {
req := new(jsonRPCReq)
err := c.BindJSON(req)
logger.Infof("request:%s\n", req)
if err != nil {
c.JSON(http.StatusOK, gin.H{"id": req.ID, "result": nil, "error": map[string]interface{}{"code": -32700, "message": "Parse error"}})
return
}
if req.Method == "dtmserver.NewGid" {
res := jsonRPCNewGid()
c.JSON(http.StatusOK, gin.H{"id": req.ID, "result": res, "error": err})
return
}
if req.Method == "dtmserver.Prepare" {
res := jsonRPCPrepare(req.Params)
c.JSON(http.StatusOK, gin.H{"id": req.ID, "result": res, "error": nil})
return
}
if req.Method == "dtmserver.Submit" {
res := jsonRPCSubmit(req.Params)
c.JSON(http.StatusOK, gin.H{"id": req.ID, "result": res, "error": nil})
return
}
if req.Method == "dtmserver.Abort" {
res := jsonRPCAbort(req.Params)
c.JSON(http.StatusOK, gin.H{"id": req.ID, "result": res, "error": nil})
return
}
if req.Method == "dtmserver.RegisterBranch" {
res := jsonRPCRegisterBranch(req.Params)
c.JSON(http.StatusOK, gin.H{"id": req.ID, "result": res, "error": nil})
return
}
c.JSON(http.StatusOK, gin.H{"id": req.ID, "result": nil, "error": map[string]interface{}{"code": -32601, "message": "Method not found"}})
}
func jsonRPCNewGid() interface{} {
return map[string]interface{}{"gid": GenGid(), "dtm_result": dtmcli.ResultSuccess}
}
func jsonRPCPrepare(params interface{}) interface{} {
res := svcPrepare(TransFromJSONRPCContext(params))
if res == nil {
return map[string]string{"dtm_result": "SUCCESS"}
}
return map[string]string{"dtm_result": "FAILURE", "message": fmt.Sprintf("%v", res)}
}
func jsonRPCSubmit(params interface{}) interface{} {
res := svcSubmit(TransFromJSONRPCContext(params))
if res == nil {
return map[string]string{"dtm_result": "SUCCESS"}
}
return map[string]string{"dtm_result": "FAILURE", "message": fmt.Sprintf("%v", res)}
}
func jsonRPCAbort(params interface{}) interface{} {
res := svcAbort(TransFromJSONRPCContext(params))
if res == nil {
return map[string]string{"dtm_result": "SUCCESS"}
}
return map[string]string{"dtm_result": "FAILURE", "message": fmt.Sprintf("%v", res)}
}
func jsonRPCRegisterBranch(params interface{}) interface{} {
data := map[string]string{}
paramsJSON, _ := json.Marshal(params)
err := json.Unmarshal(paramsJSON, &data)
if err != nil {
return map[string]string{"dtm_result": "FAILURE", "message": err.Error()}
}
branch := TransBranch{
Gid: data["gid"],
BranchID: data["branch_id"],
Status: dtmcli.StatusPrepared,
BinData: []byte(data["data"]),
}
res := svcRegisterBranch(data["trans_type"], &branch, data)
if res == nil {
return map[string]string{"dtm_result": "SUCCESS"}
}
return map[string]string{"dtm_result": "FAILURE", "message": res.Error()}
}

1
dtmsvr/config/config.go

@ -76,6 +76,7 @@ type configType struct {
RequestTimeout int64 `yaml:"RequestTimeout" default:"3"`
HTTPPort int64 `yaml:"HttpPort" default:"36789"`
GrpcPort int64 `yaml:"GrpcPort" default:"36790"`
JSONRPCPort int64 `yaml:"JSONRPCPort" default:"36791"`
MicroService MicroService `yaml:"MicroService"`
UpdateBranchSync int64 `yaml:"UpdateBranchSync"`
UpdateBranchAsyncGoroutineNum int64 `yaml:"UpdateBranchAsyncGoroutineNum" default:"1"`

4
dtmsvr/storage/boltdb/boltdb.go

@ -364,10 +364,10 @@ func (s *Store) ChangeGlobalStatus(global *storage.TransGlobalStore, newStatus s
}
// TouchCronTime updates cronTime
func (s *Store) TouchCronTime(global *storage.TransGlobalStore, nextCronInterval int64) {
func (s *Store) TouchCronTime(global *storage.TransGlobalStore, nextCronInterval int64, nextCronTime *time.Time) {
oldUnix := global.NextCronTime.Unix()
global.NextCronTime = dtmutil.GetNextTime(nextCronInterval)
global.UpdateTime = dtmutil.GetNextTime(0)
global.NextCronTime = nextCronTime
global.NextCronInterval = nextCronInterval
err := s.boltDb.Update(func(t *bolt.Tx) error {
g := tGetGlobal(t, global.Gid)

4
dtmsvr/storage/redis/redis.go

@ -261,9 +261,9 @@ return gid
}
// TouchCronTime updates cronTime
func (s *Store) TouchCronTime(global *storage.TransGlobalStore, nextCronInterval int64) {
global.NextCronTime = dtmutil.GetNextTime(nextCronInterval)
func (s *Store) TouchCronTime(global *storage.TransGlobalStore, nextCronInterval int64, nextCronTime *time.Time) {
global.UpdateTime = dtmutil.GetNextTime(0)
global.NextCronTime = nextCronTime
global.NextCronInterval = nextCronInterval
args := newArgList().
AppendGid(global.Gid).

18
dtmsvr/storage/registry/registry.go

@ -18,6 +18,12 @@ type StorageFactory interface {
GetStorage() storage.Store
}
var sqlFac = &SingletonFactory{
creatorFunction: func() storage.Store {
return &sql.Store{}
},
}
var storeFactorys = map[string]StorageFactory{
"boltdb": &SingletonFactory{
creatorFunction: func() storage.Store {
@ -29,16 +35,8 @@ var storeFactorys = map[string]StorageFactory{
return &redis.Store{}
},
},
"mysql": &SingletonFactory{
creatorFunction: func() storage.Store {
return &sql.Store{}
},
},
"postgres": &SingletonFactory{
creatorFunction: func() storage.Store {
return &sql.Store{}
},
},
"mysql": sqlFac,
"postgres": sqlFac,
}
// GetStore returns storage.Store

4
dtmsvr/storage/sql/sql.go

@ -121,9 +121,9 @@ func (s *Store) ChangeGlobalStatus(global *storage.TransGlobalStore, newStatus s
}
// TouchCronTime updates cronTime
func (s *Store) TouchCronTime(global *storage.TransGlobalStore, nextCronInterval int64) {
global.NextCronTime = dtmutil.GetNextTime(nextCronInterval)
func (s *Store) TouchCronTime(global *storage.TransGlobalStore, nextCronInterval int64, nextCronTime *time.Time) {
global.UpdateTime = dtmutil.GetNextTime(0)
global.NextCronTime = nextCronTime
global.NextCronInterval = nextCronInterval
dbGet().Must().Model(global).Where("status=? and gid=?", global.Status, global.Gid).
Select([]string{"next_cron_time", "update_time", "next_cron_interval"}).Updates(global)

2
dtmsvr/storage/store.go

@ -28,6 +28,6 @@ type Store interface {
LockGlobalSaveBranches(gid string, status string, branches []TransBranchStore, branchStart int)
MaySaveNewTrans(global *TransGlobalStore, branches []TransBranchStore) error
ChangeGlobalStatus(global *TransGlobalStore, newStatus string, updates []string, finished bool)
TouchCronTime(global *TransGlobalStore, nextCronInterval int64)
TouchCronTime(global *TransGlobalStore, nextCronInterval int64, nextCronTime *time.Time)
LockOneGlobalTrans(expireIn time.Duration) *TransGlobalStore
}

12
dtmsvr/svr.go

@ -66,6 +66,18 @@ func StartSvr() {
logger.FatalIfError(err)
err = dtmdriver.GetDriver().RegisterGrpcService(conf.MicroService.Target, conf.MicroService.EndPoint)
logger.FatalIfError(err)
// start json-rpc server
jsonRPCApp := dtmutil.GetGinApp()
jsonRPCApp = httpMetrics(jsonRPCApp)
addJSONRPCRouter(jsonRPCApp)
logger.Infof("dtmsvr listen at: %d", conf.JSONRPCPort)
go func() {
err := jsonRPCApp.Run(fmt.Sprintf(":%d", conf.JSONRPCPort))
if err != nil {
logger.Errorf("start server err: %v", err)
}
}()
}
// PopulateDB setup mysql data

12
dtmsvr/trans_class.go

@ -8,6 +8,7 @@ package dtmsvr
import (
"context"
"encoding/json"
"time"
"github.com/dtm-labs/dtm/dtmcli"
@ -84,6 +85,17 @@ func TransFromContext(c *gin.Context) *TransGlobal {
return &m
}
// TransFromJSONRPCContext 1
func TransFromJSONRPCContext(params interface{}) *TransGlobal {
jsonStr, _ := json.Marshal(params)
m := TransGlobal{}
err := json.Unmarshal(jsonStr, &m)
if err != nil {
return nil
}
return &m
}
// TransFromDtmRequest TransFromContext
func TransFromDtmRequest(ctx context.Context, c *dtmgpb.DtmRequest) *TransGlobal {
o := &dtmgpb.DtmTransOptions{}

27
dtmsvr/trans_status.go

@ -20,13 +20,26 @@ import (
"github.com/dtm-labs/dtm/dtmcli/logger"
"github.com/dtm-labs/dtm/dtmgrpc"
"github.com/dtm-labs/dtm/dtmgrpc/dtmgimp"
"github.com/dtm-labs/dtm/dtmutil"
"github.com/dtm-labs/dtmdriver"
"google.golang.org/grpc/metadata"
)
func (t *TransGlobal) touchCronTime(ctype cronType) {
// touchCronTime Based on ctype or delay set nextCronTime
// delay = 0 ,use ctype set nextCronTime and nextCronInterval
// delay > 0 ,use delay set nextCronTime ,use ctype set nextCronInterval
func (t *TransGlobal) touchCronTime(ctype cronType, delay uint64) {
t.lastTouched = time.Now()
GetStore().TouchCronTime(&t.TransGlobalStore, t.getNextCronInterval(ctype))
nextCronInterval := t.getNextCronInterval(ctype)
var nextCronTime *time.Time
if delay > 0 {
nextCronTime = dtmutil.GetNextTime(int64(delay))
} else {
nextCronTime = dtmutil.GetNextTime(nextCronInterval)
}
GetStore().TouchCronTime(&t.TransGlobalStore, nextCronInterval, nextCronTime)
logger.Infof("TouchCronTime for: %s", t.TransGlobalStore.String())
}
@ -71,6 +84,10 @@ func (t *TransGlobal) isTimeout() bool {
return time.Since(*t.CreateTime)+NowForwardDuration >= time.Duration(timeout)*time.Second
}
func (t *TransGlobal) needDelay(delay uint64) bool {
return time.Since(*t.CreateTime)+CronForwardDuration < time.Duration(delay)*time.Second
}
func (t *TransGlobal) needProcess() bool {
return t.Status == dtmcli.StatusSubmitted || t.Status == dtmcli.StatusAborting || t.Status == dtmcli.StatusPrepared && t.isTimeout()
}
@ -146,11 +163,11 @@ func (t *TransGlobal) execBranch(branch *TransBranch, branchPos int) error {
// if time pass 1500ms and NextCronInterval is not default, then reset NextCronInterval
if err == nil && time.Since(t.lastTouched)+NowForwardDuration >= 1500*time.Millisecond ||
t.NextCronInterval > conf.RetryInterval && t.NextCronInterval > t.RetryInterval {
t.touchCronTime(cronReset)
t.touchCronTime(cronReset, 0)
} else if err == dtmimp.ErrOngoing {
t.touchCronTime(cronKeep)
t.touchCronTime(cronKeep, 0)
} else if err != nil {
t.touchCronTime(cronBackoff)
t.touchCronTime(cronBackoff, 0)
}
return err
}

20
dtmsvr/trans_type_msg.go

@ -11,6 +11,7 @@ import (
"fmt"
"github.com/dtm-labs/dtm/dtmcli"
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmcli/logger"
)
@ -38,6 +39,10 @@ func (t *transMsgProcessor) GenBranches() []TransBranch {
return branches
}
type cMsgCustom struct {
Delay uint64 //delay call branch, unit second
}
func (t *TransGlobal) mayQueryPrepared() {
if !t.needProcess() || t.Status == dtmcli.StatusSubmitted {
return
@ -48,10 +53,10 @@ func (t *TransGlobal) mayQueryPrepared() {
} else if errors.Is(err, dtmcli.ErrFailure) {
t.changeStatus(dtmcli.StatusFailed)
} else if errors.Is(err, dtmcli.ErrOngoing) {
t.touchCronTime(cronReset)
t.touchCronTime(cronReset, 0)
} else {
logger.Errorf("getting result failed for %s. error: %v", t.QueryPrepared, err)
t.touchCronTime(cronBackoff)
t.touchCronTime(cronBackoff, 0)
}
}
@ -60,6 +65,17 @@ func (t *transMsgProcessor) ProcessOnce(branches []TransBranch) error {
if !t.needProcess() || t.Status == dtmcli.StatusPrepared {
return nil
}
cmc := cMsgCustom{Delay: 0}
if t.CustomData != "" {
dtmimp.MustUnmarshalString(t.CustomData, &cmc)
}
if cmc.Delay > 0 && t.needDelay(cmc.Delay) {
t.touchCronTime(cronKeep, cmc.Delay)
return nil
}
current := 0 // 当前正在处理的步骤
for ; current < len(branches); current++ {
branch := &branches[current]

94
helper/README-en.md

@ -11,7 +11,7 @@ English | [简体中文](https://github.com/dtm-labs/dtm/blob/main/helper/README
## What is DTM
DTM is a distributed transaction solution which provides cross-service eventually data consistency. It provides saga, tcc, xa, 2-phase message strategies for a variety of application scenarios. It also supports multiple languages and multiple store engine to form up a transaction as following:
DTM is a distributed transaction framework which provides cross-service eventually data consistency. It provides saga, tcc, xa, 2-phase message strategies for a variety of application scenarios. It also supports multiple languages and multiple store engine to form up a transaction as following:
<img alt="function-picture" src="https://en.dtm.pub/assets/function.7d5618f8.png" height=250 />
@ -26,49 +26,26 @@ DTM is a distributed transaction solution which provides cross-service eventuall
## Features
* Extremely easy to adapt
- Support HTTP and gRPC, provide easy-to-use programming interfaces, lower substantially the barrier of getting started with distributed transactions. Newcomers can adapt quickly.
* Easy to use
- Relieving developers from worrying about suspension, null compensation, idempotent transaction, and other tricky problems, the framework layer handles them all.
* Language-agnostic
- Suit for companies with multiple-language stacks.
Easy to write bindings for Go, Python, PHP, Node.js, Ruby, and other languages.
* Easy to deploy, easy to extend
- DTM depends only on MySQL, easy to deploy, cluster, and scale horizontally.
* Support for multiple distributed transaction protocol
- TCC, SAGA, XA, Transactional messages.
* Support for multiple distributed transaction solutions
- TCC, SAGA, XA, 2-phases message.
## DTM vs. others
There is no mature open-source distributed transaction framework for non-Java languages.
Mature open-source distributed transaction frameworks for Java language include Ali's Seata, Huawei's ServiceComb-Pack, Jingdong's shardingsphere, himly, tcc-transaction, ByteTCC, and so on, of which Seata is most widely used.
The following is a comparison of the main features of dtm and Seata.
* Extremely easy to adapt
- Support HTTP and gRPC, provide easy-to-use programming interfaces, lower substantially the barrier of getting started with distributed transactions. Newcomers can adapt quickly.
* Easy to use
- Relieving developers from worrying about suspension, null compensation, idempotent transaction, and other tricky problems, the framework layer handles them all.
| Features | DTM | Seata | Remarks |
| :-----: | :----: | :----: | :----: |
| Supported languages | <span style="color:green">Golang, Python, PHP, and others</span> | <span style="color:orange">Java</span> | dtm allows easy access from a new language |
| Exception handling | [Sub-transaction barrier](https://zhuanlan.zhihu.com/p/388444465) | <span style="color:orange">manual</span> | dtm solves idempotent transaction, hanging, null compensation |
| TCC | <span style="color:green"></span> | <span style="color:green"></span> | |
| XA | <span style="color:green"></span> | <span style="color:green"></span> | |
| AT | <span style="color:orange">suggest XA</span> | <span style="color:green"></span> | AT is similar to XA with better performance but with dirty rollback |
| SAGA | <span style="color:green">support concurrency</span> | <span style="color:green">complicated state-machine mode</span> | dtm's state-machine mode is being planned |
| Transactional Messaging | <span style="color:green"></span> | <span style="color:red"></span> | dtm provides Transactional Messaging similar to RocketMQ |
| Multiple DBs in a service |<span style="color:green"></span>|<span style="color:red"></span>||
| Communication protocols | <span style="color:green">HTTP, gRPC</span> | <span style="color:green">Dubbo, no HTTP</span> | |
| Star count | <img src="https://img.shields.io/github/stars/dtm-labs/dtm.svg?style=social" alt="github stars"/> | <img src="https://img.shields.io/github/stars/seata/seata.svg?style=social" alt="github stars"/> | dtm 0.1 is released from 20210604 and under fast development |
* Easy to deploy, easy to extend
- DTM depends only on MySQL/Redis, easy to deploy, cluster, and scale horizontally.
From the features' comparison above, if your language stack includes languages other than Java, then dtm is the one for you.
If your language stack is Java, you can also choose to access dtm and use sub-transaction barrier technology to simplify your business development.
## [Cook Book](https://en.dtm.pub)
# Quick start
## Quick start
### run dtm
@ -77,14 +54,17 @@ git clone https://github.com/dtm-labs/dtm && cd dtm
go run main.go
```
### Start the example
### Start an example
Suppose we want to perform an inter-bank transfer. The operations of transfer out (TransOut) and transfer in (TransIn) are coded in separate micro-services.
Here is an example to illustrate a solution of dtm to this problem:
``` bash
git clone https://github.com/dtm-labs/dtmcli-go-sample && cd dtmcli-go-sample
go run main.go
```
# Code
## Code
### Use
``` go
@ -102,29 +82,39 @@ go run main.go
// submit the created saga transaction,dtm ensures all subtractions either complete or get revoked
err := saga.Submit()
```
### Complete example
Refer to [dtm-examples](https://github.com/dtm-labs/dtm-examples).
When the above code runs, we can see in the console that services TransOut, TransIn has been called.
### Slack
#### Timing diagram
A timing diagram for a successfully completed SAGA transaction would be as follows:
You can join the [DTM slack channel here](https://join.slack.com/t/dtm-w6k9662/shared_invite/zt-vkrph4k1-eFqEFnMkbmlXqfUo5GWHWw).
<img alt="saga-success" src="https://en.dtm.pub/assets/saga_normal.59a75c01.jpg" height=450/>
### Wechat
#### Rollback upon failure
If any forward operation fails, DTM invokes the corresponding compensating operation of each sub-transaction to roll back, after which the transaction is successfully rolled back.
Add wechat friend with id yedf2008, or scan the OR code. Fill in dtm as verification.
Let's purposely fail the forward operation of the second sub-transaction and watch what happens
![yedf2008](http://service.ivydad.com/cover/dubbingb6b5e2c0-2d2a-cd59-f7c5-c6b90aceb6f1.jpeg)
``` go
app.POST(qsBusiAPI+"/TransIn", func(c *gin.Context) {
log.Printf("TransIn")
// c.JSON(200, "")
c.JSON(409, "") // Status 409 for Failure. Won't be retried
})
```
### Give a star! ⭐
The timing diagram for the intended failure is as follows:
If you think this project is good, or helpful to you, please give a star!
<img alt="saga-failed" src="https://en.dtm.pub/assets/saga_rollback.7989c866.jpg" height=550>
## More examples
### Who is using
<div style='vertical-align: middle'>
<img alt='Tencent' height='80' src='https://dtm.pub/assets/tencent.4b87bfd8.jpeg' /img>
<img alt='Ivydad' height='80' src='https://www.ivydad.com/_nuxt/img/header-logo.5b3eb96.png'>
<img alt='Eglass' height='80' src='https://img.epeijing.cn/official-website/assets/logo.png'>
<img alt='Jiou' height='80' src='http://www.siqitech.com.cn/img/logo.3f6c2914.png'>
<img alt='GoldenData' height='80' src='https://pic1.zhimg.com/80/v2-dc1d0cef5f7b72be345fc34d768e69e3_1440w.png'>
</div>
Refer to [dtm-examples](https://github.com/dtm-labs/dtm-examples).
## Slack
You can join the [DTM slack channel here](https://join.slack.com/t/dtm-w6k9662/shared_invite/zt-vkrph4k1-eFqEFnMkbmlXqfUo5GWHWw).
## Give a star! ⭐
If you think this project is good, or helpful to you, please give a star!

3
test/base_test.go

@ -36,8 +36,9 @@ func TestBaseSqlDB(t *testing.T) {
Gid: "gid2",
BranchID: "branch_id2",
Op: dtmcli.BranchAction,
BarrierID: 1,
}
db.Must().Exec("insert into dtm_barrier.barrier(trans_type, gid, branch_id, op, reason) values('saga', 'gid1', 'branch_id1', 'action', 'saga')")
db.Must().Exec("insert into dtm_barrier.barrier(trans_type, gid, branch_id, op, barrier_id, reason) values('saga', 'gid1', 'branch_id1', 'action', '01', 'saga')")
tx, err := db.ToSQLDB().Begin()
asserts.Nil(err)
err = barrier.Call(tx, func(tx *sql.Tx) error {

39
test/msg_delay_test.go

@ -0,0 +1,39 @@
package test
import (
"testing"
"github.com/dtm-labs/dtm/dtmcli"
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmsvr"
"github.com/dtm-labs/dtm/dtmutil"
"github.com/dtm-labs/dtm/test/busi"
"github.com/stretchr/testify/assert"
)
func genMsgDelay(gid string) *dtmcli.Msg {
req := busi.GenTransReq(30, false, false)
msg := dtmcli.NewMsg(dtmutil.DefaultHTTPServer, gid).
Add(busi.Busi+"/TransOut", &req).
Add(busi.Busi+"/TransIn", &req).SetDelay(10)
msg.QueryPrepared = busi.Busi + "/QueryPrepared"
return msg
}
func TestMsgDelayNormal(t *testing.T) {
gid := dtmimp.GetFuncName()
msg := genMsgDelay(gid)
submitForwardCron(0, func() {
msg.Submit()
waitTransProcessed(msg.Gid)
})
dtmsvr.NowForwardDuration = 0
assert.Equal(t, []string{StatusPrepared, StatusPrepared}, getBranchesStatus(msg.Gid))
assert.Equal(t, StatusSubmitted, getTransStatus(msg.Gid))
cronTransOnceForwardCron(t, "", 0)
cronTransOnceForwardCron(t, "", 8)
cronTransOnceForwardCron(t, gid, 12)
assert.Equal(t, []string{StatusSucceed, StatusSucceed}, getBranchesStatus(msg.Gid))
assert.Equal(t, StatusSucceed, getTransStatus(msg.Gid))
}

8
test/store_test.go

@ -4,11 +4,11 @@ import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmsvr/storage"
"github.com/dtm-labs/dtm/dtmsvr/storage/registry"
"github.com/dtm-labs/dtm/dtmutil"
"github.com/stretchr/testify/assert"
)
func initTransGlobal(gid string) (*storage.TransGlobalStore, storage.Store) {
@ -74,11 +74,11 @@ func TestStoreLockTrans(t *testing.T) {
assert.NotNil(t, g2)
assert.Equal(t, gid, g2.Gid)
s.TouchCronTime(g, 3*conf.RetryInterval)
s.TouchCronTime(g, 3*conf.RetryInterval, dtmutil.GetNextTime(3*conf.RetryInterval))
g2 = s.LockOneGlobalTrans(2 * time.Duration(conf.RetryInterval) * time.Second)
assert.Nil(t, g2)
s.TouchCronTime(g, 1*conf.RetryInterval)
s.TouchCronTime(g, 1*conf.RetryInterval, dtmutil.GetNextTime(1*conf.RetryInterval))
g2 = s.LockOneGlobalTrans(2 * time.Duration(conf.RetryInterval) * time.Second)
assert.NotNil(t, g2)
assert.Equal(t, gid, g2.Gid)

7
test/types.go

@ -68,6 +68,13 @@ func cronTransOnceForwardCron(t *testing.T, gid string, seconds int) {
dtmsvr.CronForwardDuration = old
}
func submitForwardCron(seconds int, fn func()) {
old := dtmsvr.CronForwardDuration
dtmsvr.CronForwardDuration = time.Duration(seconds) * time.Second
fn()
dtmsvr.CronForwardDuration = old
}
const (
// StatusPrepared status for global/branch trans status.
StatusPrepared = dtmcli.StatusPrepared

Loading…
Cancel
Save