From 1c024317d6b308dd143a851627e000b80bd7ae23 Mon Sep 17 00:00:00 2001
From: xyctruth <398041993@qq.com>
Date: Mon, 21 Feb 2022 17:02:58 +0800
Subject: [PATCH 01/17] msg support delay
---
dtmcli/msg.go | 15 +++++++++++++++
dtmcli/saga.go | 6 +++---
dtmgrpc/msg.go | 7 +++++++
dtmgrpc/saga.go | 2 +-
dtmsvr/storage/boltdb/boltdb.go | 4 ++--
dtmsvr/storage/redis/redis.go | 4 ++--
dtmsvr/storage/sql/sql.go | 4 ++--
dtmsvr/storage/store.go | 2 +-
dtmsvr/trans_status.go | 13 ++++++++++++-
dtmsvr/trans_type_msg.go | 16 ++++++++++++++++
test/store_test.go | 5 +++--
11 files changed, 64 insertions(+), 14 deletions(-)
diff --git a/dtmcli/msg.go b/dtmcli/msg.go
index 920f26f..069824a 100644
--- a/dtmcli/msg.go
+++ b/dtmcli/msg.go
@@ -16,6 +16,7 @@ import (
// Msg reliable msg type
type Msg struct {
dtmimp.TransBase
+ delay uint64 // delay call branch, unit second
}
// NewMsg create new msg
@@ -30,6 +31,12 @@ func (s *Msg) Add(action string, postData interface{}) *Msg {
return s
}
+// EnableDelay delay call branch, unit second
+func (s *Msg) EnableDelay(delay uint64) *Msg {
+ s.delay = delay
+ return s
+}
+
// Prepare prepare the msg, msg will later be submitted
func (s *Msg) Prepare(queryPrepared string) error {
s.QueryPrepared = dtmimp.OrString(queryPrepared, s.QueryPrepared)
@@ -38,6 +45,7 @@ func (s *Msg) Prepare(queryPrepared string) error {
// Submit submit the msg
func (s *Msg) Submit() error {
+ s.BuildCustomOptions()
return dtmimp.TransCallDtm(&s.TransBase, s, "submit")
}
@@ -74,3 +82,10 @@ func (s *Msg) DoAndSubmit(queryPrepared string, busiCall func(bb *BranchBarrier)
}
return err
}
+
+// BuildCustomOptions add custom options to the request context
+func (s *Msg) BuildCustomOptions() {
+ if s.delay > 0 {
+ s.CustomData = dtmimp.MustMarshalString(map[string]interface{}{"delay": s.delay})
+ }
+}
diff --git a/dtmcli/saga.go b/dtmcli/saga.go
index 87cf08f..5fcd331 100644
--- a/dtmcli/saga.go
+++ b/dtmcli/saga.go
@@ -43,12 +43,12 @@ func (s *Saga) EnableConcurrent() *Saga {
// Submit submit the saga trans
func (s *Saga) Submit() error {
- s.AddConcurrentContext()
+ s.BuildCustomOptions()
return dtmimp.TransCallDtm(&s.TransBase, s, "submit")
}
-// AddConcurrentContext adds concurrent options to the request context
-func (s *Saga) AddConcurrentContext() {
+// BuildCustomOptions add custom options to the request context
+func (s *Saga) BuildCustomOptions() {
if s.concurrent {
s.CustomData = dtmimp.MustMarshalString(map[string]interface{}{"orders": s.orders, "concurrent": s.concurrent})
}
diff --git a/dtmgrpc/msg.go b/dtmgrpc/msg.go
index 5805c9a..9188b54 100644
--- a/dtmgrpc/msg.go
+++ b/dtmgrpc/msg.go
@@ -33,6 +33,12 @@ func (s *MsgGrpc) Add(action string, msg proto.Message) *MsgGrpc {
return s
}
+// EnableDelay delay call branch, unit second
+func (s *MsgGrpc) EnableDelay(delay uint64) *MsgGrpc {
+ s.Msg.EnableDelay(delay)
+ return s
+}
+
// Prepare prepare the msg, msg will later be submitted
func (s *MsgGrpc) Prepare(queryPrepared string) error {
s.QueryPrepared = dtmimp.OrString(queryPrepared, s.QueryPrepared)
@@ -41,6 +47,7 @@ func (s *MsgGrpc) Prepare(queryPrepared string) error {
// Submit submit the msg
func (s *MsgGrpc) Submit() error {
+ s.Msg.BuildCustomOptions()
return dtmgimp.DtmGrpcCall(&s.TransBase, "Submit")
}
diff --git a/dtmgrpc/saga.go b/dtmgrpc/saga.go
index 9fca5d9..4d206fe 100644
--- a/dtmgrpc/saga.go
+++ b/dtmgrpc/saga.go
@@ -43,6 +43,6 @@ func (s *SagaGrpc) EnableConcurrent() *SagaGrpc {
// Submit submit the saga trans
func (s *SagaGrpc) Submit() error {
- s.Saga.AddConcurrentContext()
+ s.Saga.BuildCustomOptions()
return dtmgimp.DtmGrpcCall(&s.Saga.TransBase, "Submit")
}
diff --git a/dtmsvr/storage/boltdb/boltdb.go b/dtmsvr/storage/boltdb/boltdb.go
index 9202d23..278e898 100644
--- a/dtmsvr/storage/boltdb/boltdb.go
+++ b/dtmsvr/storage/boltdb/boltdb.go
@@ -364,10 +364,10 @@ func (s *Store) ChangeGlobalStatus(global *storage.TransGlobalStore, newStatus s
}
// TouchCronTime updates cronTime
-func (s *Store) TouchCronTime(global *storage.TransGlobalStore, nextCronInterval int64) {
+func (s *Store) TouchCronTime(global *storage.TransGlobalStore, nextCronInterval int64, nextCronTime *time.Time) {
oldUnix := global.NextCronTime.Unix()
- global.NextCronTime = dtmutil.GetNextTime(nextCronInterval)
global.UpdateTime = dtmutil.GetNextTime(0)
+ global.NextCronTime = nextCronTime
global.NextCronInterval = nextCronInterval
err := s.boltDb.Update(func(t *bolt.Tx) error {
g := tGetGlobal(t, global.Gid)
diff --git a/dtmsvr/storage/redis/redis.go b/dtmsvr/storage/redis/redis.go
index e0903a5..0f7e965 100644
--- a/dtmsvr/storage/redis/redis.go
+++ b/dtmsvr/storage/redis/redis.go
@@ -261,9 +261,9 @@ return gid
}
// TouchCronTime updates cronTime
-func (s *Store) TouchCronTime(global *storage.TransGlobalStore, nextCronInterval int64) {
- global.NextCronTime = dtmutil.GetNextTime(nextCronInterval)
+func (s *Store) TouchCronTime(global *storage.TransGlobalStore, nextCronInterval int64, nextCronTime *time.Time) {
global.UpdateTime = dtmutil.GetNextTime(0)
+ global.NextCronTime = nextCronTime
global.NextCronInterval = nextCronInterval
args := newArgList().
AppendGid(global.Gid).
diff --git a/dtmsvr/storage/sql/sql.go b/dtmsvr/storage/sql/sql.go
index 3c807af..6c90822 100644
--- a/dtmsvr/storage/sql/sql.go
+++ b/dtmsvr/storage/sql/sql.go
@@ -121,9 +121,9 @@ func (s *Store) ChangeGlobalStatus(global *storage.TransGlobalStore, newStatus s
}
// TouchCronTime updates cronTime
-func (s *Store) TouchCronTime(global *storage.TransGlobalStore, nextCronInterval int64) {
- global.NextCronTime = dtmutil.GetNextTime(nextCronInterval)
+func (s *Store) TouchCronTime(global *storage.TransGlobalStore, nextCronInterval int64, nextCronTime *time.Time) {
global.UpdateTime = dtmutil.GetNextTime(0)
+ global.NextCronTime = nextCronTime
global.NextCronInterval = nextCronInterval
dbGet().Must().Model(global).Where("status=? and gid=?", global.Status, global.Gid).
Select([]string{"next_cron_time", "update_time", "next_cron_interval"}).Updates(global)
diff --git a/dtmsvr/storage/store.go b/dtmsvr/storage/store.go
index eca4d38..a03a912 100644
--- a/dtmsvr/storage/store.go
+++ b/dtmsvr/storage/store.go
@@ -28,6 +28,6 @@ type Store interface {
LockGlobalSaveBranches(gid string, status string, branches []TransBranchStore, branchStart int)
MaySaveNewTrans(global *TransGlobalStore, branches []TransBranchStore) error
ChangeGlobalStatus(global *TransGlobalStore, newStatus string, updates []string, finished bool)
- TouchCronTime(global *TransGlobalStore, nextCronInterval int64)
+ TouchCronTime(global *TransGlobalStore, nextCronInterval int64, nextCronTime *time.Time)
LockOneGlobalTrans(expireIn time.Duration) *TransGlobalStore
}
diff --git a/dtmsvr/trans_status.go b/dtmsvr/trans_status.go
index 23876b4..8722e7f 100644
--- a/dtmsvr/trans_status.go
+++ b/dtmsvr/trans_status.go
@@ -9,6 +9,7 @@ package dtmsvr
import (
"errors"
"fmt"
+ "github.com/dtm-labs/dtm/dtmutil"
"strings"
"time"
@@ -23,7 +24,17 @@ import (
func (t *TransGlobal) touchCronTime(ctype cronType) {
t.lastTouched = time.Now()
- GetStore().TouchCronTime(&t.TransGlobalStore, t.getNextCronInterval(ctype))
+ nextCronInterval := t.getNextCronInterval(ctype)
+ nextCronTime := dtmutil.GetNextTime(nextCronInterval)
+ GetStore().TouchCronTime(&t.TransGlobalStore, nextCronInterval, nextCronTime)
+ logger.Infof("TouchCronTime for: %s", t.TransGlobalStore.String())
+}
+
+func (t *TransGlobal) delayCronTime(delay uint64) {
+ t.lastTouched = time.Now()
+ nextCronInterval := t.getNextCronInterval(cronKeep)
+ nextCronTime := dtmutil.GetNextTime(int64(delay))
+ GetStore().TouchCronTime(&t.TransGlobalStore, nextCronInterval, nextCronTime)
logger.Infof("TouchCronTime for: %s", t.TransGlobalStore.String())
}
diff --git a/dtmsvr/trans_type_msg.go b/dtmsvr/trans_type_msg.go
index 5fe3eb2..3e62c69 100644
--- a/dtmsvr/trans_type_msg.go
+++ b/dtmsvr/trans_type_msg.go
@@ -9,6 +9,7 @@ package dtmsvr
import (
"errors"
"fmt"
+ "github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmcli"
"github.com/dtm-labs/dtm/dtmcli/logger"
@@ -38,6 +39,10 @@ func (t *transMsgProcessor) GenBranches() []TransBranch {
return branches
}
+type cMsgCustom struct {
+ Delay uint64 //delay call branch, unit second
+}
+
func (t *TransGlobal) mayQueryPrepared() {
if !t.needProcess() || t.Status == dtmcli.StatusSubmitted {
return
@@ -60,6 +65,17 @@ func (t *transMsgProcessor) ProcessOnce(branches []TransBranch) error {
if !t.needProcess() || t.Status == dtmcli.StatusPrepared {
return nil
}
+
+ cmc := cMsgCustom{Delay: 0}
+ if t.CustomData != "" {
+ dtmimp.MustUnmarshalString(t.CustomData, &cmc)
+ }
+
+ if cmc.Delay > 0 {
+ t.delayCronTime(cmc.Delay)
+ return nil
+ }
+
current := 0 // 当前正在处理的步骤
for ; current < len(branches); current++ {
branch := &branches[current]
diff --git a/test/store_test.go b/test/store_test.go
index b711c76..2e64397 100644
--- a/test/store_test.go
+++ b/test/store_test.go
@@ -1,6 +1,7 @@
package test
import (
+ "github.com/dtm-labs/dtm/dtmutil"
"testing"
"time"
@@ -74,11 +75,11 @@ func TestStoreLockTrans(t *testing.T) {
assert.NotNil(t, g2)
assert.Equal(t, gid, g2.Gid)
- s.TouchCronTime(g, 3*conf.RetryInterval)
+ s.TouchCronTime(g, 3*conf.RetryInterval, dtmutil.GetNextTime(3*conf.RetryInterval))
g2 = s.LockOneGlobalTrans(2 * time.Duration(conf.RetryInterval) * time.Second)
assert.Nil(t, g2)
- s.TouchCronTime(g, 1*conf.RetryInterval)
+ s.TouchCronTime(g, 1*conf.RetryInterval, dtmutil.GetNextTime(1*conf.RetryInterval))
g2 = s.LockOneGlobalTrans(2 * time.Duration(conf.RetryInterval) * time.Second)
assert.NotNil(t, g2)
assert.Equal(t, gid, g2.Gid)
From d82d04bf8657823583a0e2e546c7c39be3cf1b4a Mon Sep 17 00:00:00 2001
From: xyctruth <398041993@qq.com>
Date: Mon, 21 Feb 2022 18:25:53 +0800
Subject: [PATCH 02/17] add msg delay unit test
---
dtmsvr/cron.go | 2 +-
dtmsvr/trans_class.go | 11 ++++++++++-
dtmsvr/trans_type_msg.go | 2 +-
test/msg_delay_test.go | 36 ++++++++++++++++++++++++++++++++++++
4 files changed, 48 insertions(+), 3 deletions(-)
create mode 100644 test/msg_delay_test.go
diff --git a/dtmsvr/cron.go b/dtmsvr/cron.go
index a768fa1..c1b0692 100644
--- a/dtmsvr/cron.go
+++ b/dtmsvr/cron.go
@@ -55,7 +55,7 @@ func lockOneTrans(expireIn time.Duration) *TransGlobal {
return nil
}
logger.Infof("cron job return a trans: %s", global.String())
- return &TransGlobal{TransGlobalStore: *global}
+ return &TransGlobal{TransGlobalStore: *global, triggerType: triggerCron}
}
func handlePanic(perr *error) {
diff --git a/dtmsvr/trans_class.go b/dtmsvr/trans_class.go
index b6e080b..e9dc48c 100644
--- a/dtmsvr/trans_class.go
+++ b/dtmsvr/trans_class.go
@@ -19,11 +19,19 @@ import (
"github.com/gin-gonic/gin"
)
+type triggerType int
+
+const (
+ triggerManual triggerType = iota //manual trigger
+ triggerCron //cron trigger
+)
+
// TransGlobal global transaction
type TransGlobal struct {
storage.TransGlobalStore
lastTouched time.Time // record the start time of process
updateBranchSync bool
+ triggerType triggerType
}
// TransBranch branch transaction
@@ -104,7 +112,8 @@ func TransFromDtmRequest(ctx context.Context, c *dtmgpb.DtmRequest) *TransGlobal
PassthroughHeaders: o.PassthroughHeaders,
BranchHeaders: o.BranchHeaders,
},
- }}
+ },
+ triggerType: triggerManual}
if c.Steps != "" {
dtmimp.MustUnmarshalString(c.Steps, &r.Steps)
}
diff --git a/dtmsvr/trans_type_msg.go b/dtmsvr/trans_type_msg.go
index 3e62c69..fd386c6 100644
--- a/dtmsvr/trans_type_msg.go
+++ b/dtmsvr/trans_type_msg.go
@@ -71,7 +71,7 @@ func (t *transMsgProcessor) ProcessOnce(branches []TransBranch) error {
dtmimp.MustUnmarshalString(t.CustomData, &cmc)
}
- if cmc.Delay > 0 {
+ if cmc.Delay > 0 && t.triggerType == triggerManual {
t.delayCronTime(cmc.Delay)
return nil
}
diff --git a/test/msg_delay_test.go b/test/msg_delay_test.go
new file mode 100644
index 0000000..7300530
--- /dev/null
+++ b/test/msg_delay_test.go
@@ -0,0 +1,36 @@
+package test
+
+import (
+ "github.com/dtm-labs/dtm/dtmcli"
+ "github.com/dtm-labs/dtm/dtmcli/dtmimp"
+ "github.com/dtm-labs/dtm/dtmsvr"
+ "github.com/dtm-labs/dtm/dtmutil"
+ "github.com/dtm-labs/dtm/test/busi"
+ "github.com/stretchr/testify/assert"
+ "testing"
+ "time"
+)
+
+func genMsgDelay(gid string) *dtmcli.Msg {
+ req := busi.GenTransReq(30, false, false)
+ msg := dtmcli.NewMsg(dtmutil.DefaultHTTPServer, gid).
+ Add(busi.Busi+"/TransOut", &req).
+ Add(busi.Busi+"/TransIn", &req).EnableDelay(2)
+ msg.QueryPrepared = busi.Busi + "/QueryPrepared"
+ return msg
+}
+
+func TestMsgDelayNormal(t *testing.T) {
+ gid := dtmimp.GetFuncName()
+ msg := genMsgDelay(gid)
+ msg.Submit()
+ assert.Equal(t, StatusSubmitted, getTransStatus(msg.Gid))
+ waitTransProcessed(msg.Gid)
+ assert.Equal(t, []string{StatusPrepared, StatusPrepared}, getBranchesStatus(msg.Gid))
+ assert.Equal(t, StatusSubmitted, getTransStatus(msg.Gid))
+ time.Sleep(2 * time.Second)
+ dtmsvr.CronForwardDuration = 0
+ cronTransOnce(t, gid)
+ assert.Equal(t, []string{StatusSucceed, StatusSucceed}, getBranchesStatus(msg.Gid))
+ assert.Equal(t, StatusSucceed, getTransStatus(msg.Gid))
+}
From 63c97260779e11baafd9cbfd9ba5e14da45dbd10 Mon Sep 17 00:00:00 2001
From: xyctruth <398041993@qq.com>
Date: Mon, 21 Feb 2022 18:32:56 +0800
Subject: [PATCH 03/17] goimports
---
test/msg_delay_test.go | 5 +++--
1 file changed, 3 insertions(+), 2 deletions(-)
diff --git a/test/msg_delay_test.go b/test/msg_delay_test.go
index 7300530..ace5dc8 100644
--- a/test/msg_delay_test.go
+++ b/test/msg_delay_test.go
@@ -1,14 +1,15 @@
package test
import (
+ "testing"
+ "time"
+
"github.com/dtm-labs/dtm/dtmcli"
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmsvr"
"github.com/dtm-labs/dtm/dtmutil"
"github.com/dtm-labs/dtm/test/busi"
"github.com/stretchr/testify/assert"
- "testing"
- "time"
)
func genMsgDelay(gid string) *dtmcli.Msg {
From 6ccc44a1ee02a6776e52a866606442707f065d64 Mon Sep 17 00:00:00 2001
From: xyctruth <398041993@qq.com>
Date: Mon, 21 Feb 2022 18:38:18 +0800
Subject: [PATCH 04/17] golangci lint
---
dtmsvr/trans_status.go | 2 +-
dtmsvr/trans_type_msg.go | 2 +-
test/store_test.go | 5 ++---
3 files changed, 4 insertions(+), 5 deletions(-)
diff --git a/dtmsvr/trans_status.go b/dtmsvr/trans_status.go
index 8722e7f..6546e06 100644
--- a/dtmsvr/trans_status.go
+++ b/dtmsvr/trans_status.go
@@ -9,7 +9,6 @@ package dtmsvr
import (
"errors"
"fmt"
- "github.com/dtm-labs/dtm/dtmutil"
"strings"
"time"
@@ -18,6 +17,7 @@ import (
"github.com/dtm-labs/dtm/dtmcli/logger"
"github.com/dtm-labs/dtm/dtmgrpc"
"github.com/dtm-labs/dtm/dtmgrpc/dtmgimp"
+ "github.com/dtm-labs/dtm/dtmutil"
"github.com/dtm-labs/dtmdriver"
"google.golang.org/grpc/metadata"
)
diff --git a/dtmsvr/trans_type_msg.go b/dtmsvr/trans_type_msg.go
index fd386c6..3a10196 100644
--- a/dtmsvr/trans_type_msg.go
+++ b/dtmsvr/trans_type_msg.go
@@ -9,9 +9,9 @@ package dtmsvr
import (
"errors"
"fmt"
- "github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmcli"
+ "github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmcli/logger"
)
diff --git a/test/store_test.go b/test/store_test.go
index 2e64397..46869e0 100644
--- a/test/store_test.go
+++ b/test/store_test.go
@@ -1,15 +1,14 @@
package test
import (
- "github.com/dtm-labs/dtm/dtmutil"
"testing"
"time"
- "github.com/stretchr/testify/assert"
-
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmsvr/storage"
"github.com/dtm-labs/dtm/dtmsvr/storage/registry"
+ "github.com/dtm-labs/dtm/dtmutil"
+ "github.com/stretchr/testify/assert"
)
func initTransGlobal(gid string) (*storage.TransGlobalStore, storage.Store) {
From 52882f797076d86db4df7e27f46254fba1432c29 Mon Sep 17 00:00:00 2001
From: xyctruth <398041993@qq.com>
Date: Mon, 21 Feb 2022 19:09:57 +0800
Subject: [PATCH 05/17] =?UTF-8?q?=E8=BF=98=E5=8E=9F=20CronForwardDuration?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
test/msg_delay_test.go | 1 +
1 file changed, 1 insertion(+)
diff --git a/test/msg_delay_test.go b/test/msg_delay_test.go
index ace5dc8..32fe378 100644
--- a/test/msg_delay_test.go
+++ b/test/msg_delay_test.go
@@ -34,4 +34,5 @@ func TestMsgDelayNormal(t *testing.T) {
cronTransOnce(t, gid)
assert.Equal(t, []string{StatusSucceed, StatusSucceed}, getBranchesStatus(msg.Gid))
assert.Equal(t, StatusSucceed, getTransStatus(msg.Gid))
+ dtmsvr.CronForwardDuration = 180 * time.Second
}
From 8dc04d8ab3c3c0abe0f3589b2ee4245b359dc65d Mon Sep 17 00:00:00 2001
From: xyctruth <398041993@qq.com>
Date: Tue, 22 Feb 2022 10:13:09 +0800
Subject: [PATCH 06/17] fix msg delay unit test
---
dtmsvr/trans_class.go | 5 +++--
test/msg_delay_test.go | 8 +-------
2 files changed, 4 insertions(+), 9 deletions(-)
diff --git a/dtmsvr/trans_class.go b/dtmsvr/trans_class.go
index e9dc48c..47bf492 100644
--- a/dtmsvr/trans_class.go
+++ b/dtmsvr/trans_class.go
@@ -19,11 +19,12 @@ import (
"github.com/gin-gonic/gin"
)
+// triggerType trigger transaction type
type triggerType int
const (
- triggerManual triggerType = iota //manual trigger
- triggerCron //cron trigger
+ triggerManual triggerType = iota //triggerManual manual trigger
+ triggerCron //triggerCron cron trigger
)
// TransGlobal global transaction
diff --git a/test/msg_delay_test.go b/test/msg_delay_test.go
index 32fe378..2c6165c 100644
--- a/test/msg_delay_test.go
+++ b/test/msg_delay_test.go
@@ -2,11 +2,9 @@ package test
import (
"testing"
- "time"
"github.com/dtm-labs/dtm/dtmcli"
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
- "github.com/dtm-labs/dtm/dtmsvr"
"github.com/dtm-labs/dtm/dtmutil"
"github.com/dtm-labs/dtm/test/busi"
"github.com/stretchr/testify/assert"
@@ -25,14 +23,10 @@ func TestMsgDelayNormal(t *testing.T) {
gid := dtmimp.GetFuncName()
msg := genMsgDelay(gid)
msg.Submit()
- assert.Equal(t, StatusSubmitted, getTransStatus(msg.Gid))
waitTransProcessed(msg.Gid)
assert.Equal(t, []string{StatusPrepared, StatusPrepared}, getBranchesStatus(msg.Gid))
assert.Equal(t, StatusSubmitted, getTransStatus(msg.Gid))
- time.Sleep(2 * time.Second)
- dtmsvr.CronForwardDuration = 0
- cronTransOnce(t, gid)
+ cronTransOnceForwardNow(t, gid, 2)
assert.Equal(t, []string{StatusSucceed, StatusSucceed}, getBranchesStatus(msg.Gid))
assert.Equal(t, StatusSucceed, getTransStatus(msg.Gid))
- dtmsvr.CronForwardDuration = 180 * time.Second
}
From 7b76ead4382185f9fe3fedf86dcc5a311f6b6b48 Mon Sep 17 00:00:00 2001
From: xyctruth <398041993@qq.com>
Date: Tue, 22 Feb 2022 10:47:35 +0800
Subject: [PATCH 07/17] fix helm deployment mount configMap name
---
charts/templates/deployment.yaml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/charts/templates/deployment.yaml b/charts/templates/deployment.yaml
index de2c04c..805b62c 100644
--- a/charts/templates/deployment.yaml
+++ b/charts/templates/deployment.yaml
@@ -67,4 +67,4 @@ spec:
volumes:
- name: config
configMap:
- name: dtm-conf
+ name: {{ include "dtm.fullname" . }}-conf
From c701eba8a47a00c3892e448644429ca9abda05b2 Mon Sep 17 00:00:00 2001
From: xyctruth <398041993@qq.com>
Date: Tue, 22 Feb 2022 14:23:38 +0800
Subject: [PATCH 08/17] add msg delay test case
---
test/msg_delay_test.go | 6 ++++--
1 file changed, 4 insertions(+), 2 deletions(-)
diff --git a/test/msg_delay_test.go b/test/msg_delay_test.go
index 2c6165c..bd60e28 100644
--- a/test/msg_delay_test.go
+++ b/test/msg_delay_test.go
@@ -14,7 +14,7 @@ func genMsgDelay(gid string) *dtmcli.Msg {
req := busi.GenTransReq(30, false, false)
msg := dtmcli.NewMsg(dtmutil.DefaultHTTPServer, gid).
Add(busi.Busi+"/TransOut", &req).
- Add(busi.Busi+"/TransIn", &req).EnableDelay(2)
+ Add(busi.Busi+"/TransIn", &req).EnableDelay(10)
msg.QueryPrepared = busi.Busi + "/QueryPrepared"
return msg
}
@@ -26,7 +26,9 @@ func TestMsgDelayNormal(t *testing.T) {
waitTransProcessed(msg.Gid)
assert.Equal(t, []string{StatusPrepared, StatusPrepared}, getBranchesStatus(msg.Gid))
assert.Equal(t, StatusSubmitted, getTransStatus(msg.Gid))
- cronTransOnceForwardNow(t, gid, 2)
+ cronTransOnceForwardCron(t, "", 0)
+ cronTransOnceForwardCron(t, "", 8)
+ cronTransOnceForwardCron(t, gid, 12)
assert.Equal(t, []string{StatusSucceed, StatusSucceed}, getBranchesStatus(msg.Gid))
assert.Equal(t, StatusSucceed, getTransStatus(msg.Gid))
}
From cbb42755d38b84be408fd81c9de26b08ff9cfb47 Mon Sep 17 00:00:00 2001
From: pandaLIU <563883861@qq.com>
Date: Tue, 22 Feb 2022 22:14:32 +0800
Subject: [PATCH 09/17] Added json rpc http
---
conf.sample.yml | 1 +
dtmsvr/api_json_rpc_http.go | 110 ++++++++++++++++++++++++++++++++++++
dtmsvr/config/config.go | 1 +
dtmsvr/svr.go | 12 ++++
dtmsvr/trans_class.go | 11 ++++
5 files changed, 135 insertions(+)
create mode 100644 dtmsvr/api_json_rpc_http.go
diff --git a/conf.sample.yml b/conf.sample.yml
index a4e869c..d9052a5 100644
--- a/conf.sample.yml
+++ b/conf.sample.yml
@@ -56,6 +56,7 @@
# HttpPort: 36789
# GrpcPort: 36790
+# JsonRpcHttp: 36791
### advanced options
# UpdateBranchAsyncGoroutineNum: 1 # num of async goroutine to update branch status
diff --git a/dtmsvr/api_json_rpc_http.go b/dtmsvr/api_json_rpc_http.go
new file mode 100644
index 0000000..4f174e4
--- /dev/null
+++ b/dtmsvr/api_json_rpc_http.go
@@ -0,0 +1,110 @@
+package dtmsvr
+
+import (
+ "encoding/json"
+ "fmt"
+ "github.com/dtm-labs/dtm/dtmcli"
+ "github.com/dtm-labs/dtm/dtmcli/logger"
+ "github.com/gin-gonic/gin"
+ "net/http"
+)
+
+type jsonRpcHttpReq struct {
+ Method string `json:"method"`
+ Jsonrpc string `json:"jsonrpc"`
+ Params interface{} `json:"params"`
+ Id string `json:"id"`
+}
+
+func addJsonRpcHttpRouter(engine *gin.Engine) {
+ engine.POST("/", dispatcher)
+}
+
+func dispatcher(c *gin.Context) {
+ req := new(jsonRpcHttpReq)
+ err := c.BindJSON(req)
+ logger.Infof("request:%s\n", req)
+ if err != nil {
+ c.JSON(http.StatusOK, gin.H{"id": req.Id, "result": nil, "error": map[string]interface{}{"code": -32700, "message": "Parse error"}})
+ return
+ }
+ if req.Method == "dtmserver.NewGid" {
+ res, err := jsonRpcHttpNewGid()
+ c.JSON(http.StatusOK, gin.H{"id": req.Id, "result": res, "error": err})
+ return
+ }
+
+ if req.Method == "dtmserver.Prepare" {
+ res := jsonRpcHttpPrepare(req.Params)
+ c.JSON(http.StatusOK, gin.H{"id": req.Id, "result": res, "error": nil})
+ return
+ }
+
+ if req.Method == "dtmserver.Submit" {
+ res := jsonRpcHttpSubmit(req.Params)
+ c.JSON(http.StatusOK, gin.H{"id": req.Id, "result": res, "error": nil})
+ return
+ }
+
+ if req.Method == "dtmserver.Abort" {
+ res := jsonRpcHttpAbort(req.Params)
+ c.JSON(http.StatusOK, gin.H{"id": req.Id, "result": res, "error": nil})
+ return
+ }
+
+ if req.Method == "dtmserver.RegisterBranch" {
+ res := jsonRpcHttpRegisterBranch(req.Params)
+ c.JSON(http.StatusOK, gin.H{"id": req.Id, "result": res, "error": nil})
+ return
+ }
+ c.JSON(http.StatusOK, gin.H{"id": req.Id, "result": nil, "error": map[string]interface{}{"code": -32601, "message": "Method not found"}})
+ return
+}
+
+func jsonRpcHttpNewGid() (interface{}, error) {
+ return map[string]interface{}{"gid": GenGid(), "dtm_result": dtmcli.ResultSuccess}, nil
+}
+
+func jsonRpcHttpPrepare(params interface{}) interface{} {
+ res := svcPrepare(TransFromJsonRpcHttpContext(params))
+ if res == nil {
+ return map[string]string{"dtm_result": "SUCCESS"}
+ }
+ return map[string]string{"dtm_result": "FAILURE", "message": fmt.Sprintf("%v", res)}
+}
+
+func jsonRpcHttpSubmit(params interface{}) interface{} {
+ res := svcSubmit(TransFromJsonRpcHttpContext(params))
+ if res == nil {
+ return map[string]string{"dtm_result": "SUCCESS"}
+ }
+ return map[string]string{"dtm_result": "FAILURE", "message": fmt.Sprintf("%v", res)}
+}
+
+func jsonRpcHttpAbort(params interface{}) interface{} {
+ res := svcAbort(TransFromJsonRpcHttpContext(params))
+ if res == nil {
+ return map[string]string{"dtm_result": "SUCCESS"}
+ }
+ return map[string]string{"dtm_result": "FAILURE", "message": fmt.Sprintf("%v", res)}
+}
+
+func jsonRpcHttpRegisterBranch(params interface{}) interface{} {
+ data := map[string]string{}
+ paramsJson, _ := json.Marshal(params)
+ err := json.Unmarshal(paramsJson, &data)
+ if err != nil {
+ return map[string]string{"dtm_result": "FAILURE", "message": err.Error()}
+ }
+ branch := TransBranch{
+ Gid: data["gid"],
+ BranchID: data["branch_id"],
+ Status: dtmcli.StatusPrepared,
+ BinData: []byte(data["data"]),
+ }
+ res := svcRegisterBranch(data["trans_type"], &branch, data)
+ if res == nil {
+ return map[string]string{"dtm_result": "SUCCESS"}
+ }
+ return map[string]string{"dtm_result": "FAILURE", "message": res.Error()}
+}
diff --git a/dtmsvr/config/config.go b/dtmsvr/config/config.go
index 06a8fda..4f9ce1c 100644
--- a/dtmsvr/config/config.go
+++ b/dtmsvr/config/config.go
@@ -76,6 +76,7 @@ type configType struct {
RequestTimeout int64 `yaml:"RequestTimeout" default:"3"`
HTTPPort int64 `yaml:"HttpPort" default:"36789"`
GrpcPort int64 `yaml:"GrpcPort" default:"36790"`
+ JsonRpcHttpPort int64 `yaml:"JsonRpcHttpPort" default:"36791"`
MicroService MicroService `yaml:"MicroService"`
UpdateBranchSync int64 `yaml:"UpdateBranchSync"`
UpdateBranchAsyncGoroutineNum int64 `yaml:"UpdateBranchAsyncGoroutineNum" default:"1"`
diff --git a/dtmsvr/svr.go b/dtmsvr/svr.go
index 1816087..7989986 100644
--- a/dtmsvr/svr.go
+++ b/dtmsvr/svr.go
@@ -66,6 +66,18 @@ func StartSvr() {
logger.FatalIfError(err)
err = dtmdriver.GetDriver().RegisterGrpcService(conf.MicroService.Target, conf.MicroService.EndPoint)
logger.FatalIfError(err)
+
+ // start json-rpc server
+ jsonRpcHttpApp := dtmutil.GetGinApp()
+ jsonRpcHttpApp = httpMetrics(jsonRpcHttpApp)
+ addJsonRpcHttpRouter(jsonRpcHttpApp)
+ logger.Infof("dtmsvr listen at: %d", conf.JsonRpcHttpPort)
+ go func() {
+ err := jsonRpcHttpApp.Run(fmt.Sprintf(":%d", conf.JsonRpcHttpPort))
+ if err != nil {
+ logger.Errorf("start server err: %v", err)
+ }
+ }()
}
// PopulateDB setup mysql data
diff --git a/dtmsvr/trans_class.go b/dtmsvr/trans_class.go
index b6e080b..8b039b9 100644
--- a/dtmsvr/trans_class.go
+++ b/dtmsvr/trans_class.go
@@ -8,6 +8,7 @@ package dtmsvr
import (
"context"
+ "encoding/json"
"time"
"github.com/dtm-labs/dtm/dtmcli"
@@ -84,6 +85,16 @@ func TransFromContext(c *gin.Context) *TransGlobal {
return &m
}
+func TransFromJsonRpcHttpContext(params interface{}) *TransGlobal {
+ jsonStr, _ := json.Marshal(params)
+ m := TransGlobal{}
+ err := json.Unmarshal(jsonStr, &m)
+ if err != nil {
+ return nil
+ }
+ return &m
+}
+
// TransFromDtmRequest TransFromContext
func TransFromDtmRequest(ctx context.Context, c *dtmgpb.DtmRequest) *TransGlobal {
o := &dtmgpb.DtmTransOptions{}
From eb888f552ffe70edb5ab108ac1e1c855cf921dfc Mon Sep 17 00:00:00 2001
From: yedf2 <120050102@qq.com>
Date: Wed, 23 Feb 2022 16:15:29 +0800
Subject: [PATCH 10/17] fix TestBaseSqlDB
---
test/base_test.go | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
diff --git a/test/base_test.go b/test/base_test.go
index b33bfd8..31ef17d 100644
--- a/test/base_test.go
+++ b/test/base_test.go
@@ -36,8 +36,9 @@ func TestBaseSqlDB(t *testing.T) {
Gid: "gid2",
BranchID: "branch_id2",
Op: dtmcli.BranchAction,
+ BarrierID: 1,
}
- db.Must().Exec("insert into dtm_barrier.barrier(trans_type, gid, branch_id, op, reason) values('saga', 'gid1', 'branch_id1', 'action', 'saga')")
+ db.Must().Exec("insert into dtm_barrier.barrier(trans_type, gid, branch_id, op, barrier_id, reason) values('saga', 'gid1', 'branch_id1', 'action', '01', 'saga')")
tx, err := db.ToSQLDB().Begin()
asserts.Nil(err)
err = barrier.Call(tx, func(tx *sql.Tx) error {
From 2b7a2ce72b23a44fc2d2cbc2f90aa23a2e3ed4b0 Mon Sep 17 00:00:00 2001
From: yedf2 <120050102@qq.com>
Date: Wed, 23 Feb 2022 16:45:47 +0800
Subject: [PATCH 11/17] update readme
---
README.md | 94 ++++++++++++++++++++-------------------------
helper/README-en.md | 94 ++++++++++++++++++++-------------------------
2 files changed, 84 insertions(+), 104 deletions(-)
diff --git a/README.md b/README.md
index 4c14c6f..ae285cf 100644
--- a/README.md
+++ b/README.md
@@ -11,7 +11,7 @@ English | [简体中文](https://github.com/dtm-labs/dtm/blob/main/helper/README
## What is DTM
-DTM is a distributed transaction solution which provides cross-service eventually data consistency. It provides saga, tcc, xa, 2-phase message strategies for a variety of application scenarios. It also supports multiple languages and multiple store engine to form up a transaction as following:
+DTM is a distributed transaction framework which provides cross-service eventually data consistency. It provides saga, tcc, xa, 2-phase message strategies for a variety of application scenarios. It also supports multiple languages and multiple store engine to form up a transaction as following:
@@ -26,49 +26,26 @@ DTM is a distributed transaction solution which provides cross-service eventuall
## Features
-* Extremely easy to adapt
- - Support HTTP and gRPC, provide easy-to-use programming interfaces, lower substantially the barrier of getting started with distributed transactions. Newcomers can adapt quickly.
-
-* Easy to use
- - Relieving developers from worrying about suspension, null compensation, idempotent transaction, and other tricky problems, the framework layer handles them all.
-
* Language-agnostic
- Suit for companies with multiple-language stacks.
Easy to write bindings for Go, Python, PHP, Node.js, Ruby, and other languages.
-* Easy to deploy, easy to extend
- - DTM depends only on MySQL, easy to deploy, cluster, and scale horizontally.
-
-* Support for multiple distributed transaction protocol
- - TCC, SAGA, XA, Transactional messages.
+* Support for multiple distributed transaction solutions
+ - TCC, SAGA, XA, 2-phases message.
-## DTM vs. others
-
-There is no mature open-source distributed transaction framework for non-Java languages.
-Mature open-source distributed transaction frameworks for Java language include Ali's Seata, Huawei's ServiceComb-Pack, Jingdong's shardingsphere, himly, tcc-transaction, ByteTCC, and so on, of which Seata is most widely used.
-
-The following is a comparison of the main features of dtm and Seata.
+* Extremely easy to adapt
+ - Support HTTP and gRPC, provide easy-to-use programming interfaces, lower substantially the barrier of getting started with distributed transactions. Newcomers can adapt quickly.
+* Easy to use
+ - Relieving developers from worrying about suspension, null compensation, idempotent transaction, and other tricky problems, the framework layer handles them all.
-| Features | DTM | Seata | Remarks |
-| :-----: | :----: | :----: | :----: |
-| Supported languages | Golang, Python, PHP, and others | Java | dtm allows easy access from a new language |
-| Exception handling | [Sub-transaction barrier](https://zhuanlan.zhihu.com/p/388444465) | manual | dtm solves idempotent transaction, hanging, null compensation |
-| TCC | ✓ | ✓ | |
-| XA | ✓ | ✓ | |
-| AT | suggest XA | ✓ | AT is similar to XA with better performance but with dirty rollback |
-| SAGA | support concurrency | complicated state-machine mode | dtm's state-machine mode is being planned |
-| Transactional Messaging | ✓ | ✗ | dtm provides Transactional Messaging similar to RocketMQ |
-| Multiple DBs in a service |✓|✗||
-| Communication protocols | HTTP, gRPC | Dubbo, no HTTP | |
-| Star count | |
| dtm 0.1 is released from 20210604 and under fast development |
+* Easy to deploy, easy to extend
+ - DTM depends only on MySQL/Redis, easy to deploy, cluster, and scale horizontally.
-From the features' comparison above, if your language stack includes languages other than Java, then dtm is the one for you.
-If your language stack is Java, you can also choose to access dtm and use sub-transaction barrier technology to simplify your business development.
## [Cook Book](https://en.dtm.pub)
-# Quick start
+## Quick start
### run dtm
@@ -77,14 +54,17 @@ git clone https://github.com/dtm-labs/dtm && cd dtm
go run main.go
```
-### Start the example
+### Start an example
+Suppose we want to perform an inter-bank transfer. The operations of transfer out (TransOut) and transfer in (TransIn) are coded in separate micro-services.
+
+Here is an example to illustrate a solution of dtm to this problem:
``` bash
git clone https://github.com/dtm-labs/dtmcli-go-sample && cd dtmcli-go-sample
go run main.go
```
-# Code
+## Code
### Use
``` go
@@ -102,29 +82,39 @@ go run main.go
// submit the created saga transaction,dtm ensures all subtractions either complete or get revoked
err := saga.Submit()
```
-### Complete example
-Refer to [dtm-examples](https://github.com/dtm-labs/dtm-examples).
+When the above code runs, we can see in the console that services TransOut, TransIn has been called.
-### Slack
+#### Timing diagram
+A timing diagram for a successfully completed SAGA transaction would be as follows:
-You can join the [DTM slack channel here](https://join.slack.com/t/dtm-w6k9662/shared_invite/zt-vkrph4k1-eFqEFnMkbmlXqfUo5GWHWw).
+
-### Wechat
+#### Rollback upon failure
+If any forward operation fails, DTM invokes the corresponding compensating operation of each sub-transaction to roll back, after which the transaction is successfully rolled back.
-Add wechat friend with id yedf2008, or scan the OR code. Fill in dtm as verification.
+Let's purposely fail the forward operation of the second sub-transaction and watch what happens
-
+``` go
+app.POST(qsBusiAPI+"/TransIn", func(c *gin.Context) {
+ log.Printf("TransIn")
+ // c.JSON(200, "")
+ c.JSON(409, "") // Status 409 for Failure. Won't be retried
+})
+```
-### Give a star! ⭐
+The timing diagram for the intended failure is as follows:
-If you think this project is good, or helpful to you, please give a star!
+
+
+## More examples
-### Who is using
-
-
-
-
-
-
@@ -26,49 +26,26 @@ DTM is a distributed transaction solution which provides cross-service eventuall
## Features
-* Extremely easy to adapt
- - Support HTTP and gRPC, provide easy-to-use programming interfaces, lower substantially the barrier of getting started with distributed transactions. Newcomers can adapt quickly.
-
-* Easy to use
- - Relieving developers from worrying about suspension, null compensation, idempotent transaction, and other tricky problems, the framework layer handles them all.
-
* Language-agnostic
- Suit for companies with multiple-language stacks.
Easy to write bindings for Go, Python, PHP, Node.js, Ruby, and other languages.
-* Easy to deploy, easy to extend
- - DTM depends only on MySQL, easy to deploy, cluster, and scale horizontally.
-
-* Support for multiple distributed transaction protocol
- - TCC, SAGA, XA, Transactional messages.
+* Support for multiple distributed transaction solutions
+ - TCC, SAGA, XA, 2-phases message.
-## DTM vs. others
-
-There is no mature open-source distributed transaction framework for non-Java languages.
-Mature open-source distributed transaction frameworks for Java language include Ali's Seata, Huawei's ServiceComb-Pack, Jingdong's shardingsphere, himly, tcc-transaction, ByteTCC, and so on, of which Seata is most widely used.
-
-The following is a comparison of the main features of dtm and Seata.
+* Extremely easy to adapt
+ - Support HTTP and gRPC, provide easy-to-use programming interfaces, lower substantially the barrier of getting started with distributed transactions. Newcomers can adapt quickly.
+* Easy to use
+ - Relieving developers from worrying about suspension, null compensation, idempotent transaction, and other tricky problems, the framework layer handles them all.
-| Features | DTM | Seata | Remarks |
-| :-----: | :----: | :----: | :----: |
-| Supported languages | Golang, Python, PHP, and others | Java | dtm allows easy access from a new language |
-| Exception handling | [Sub-transaction barrier](https://zhuanlan.zhihu.com/p/388444465) | manual | dtm solves idempotent transaction, hanging, null compensation |
-| TCC | ✓ | ✓ | |
-| XA | ✓ | ✓ | |
-| AT | suggest XA | ✓ | AT is similar to XA with better performance but with dirty rollback |
-| SAGA | support concurrency | complicated state-machine mode | dtm's state-machine mode is being planned |
-| Transactional Messaging | ✓ | ✗ | dtm provides Transactional Messaging similar to RocketMQ |
-| Multiple DBs in a service |✓|✗||
-| Communication protocols | HTTP, gRPC | Dubbo, no HTTP | |
-| Star count |
-### Wechat
+#### Rollback upon failure
+If any forward operation fails, DTM invokes the corresponding compensating operation of each sub-transaction to roll back, after which the transaction is successfully rolled back.
-Add wechat friend with id yedf2008, or scan the OR code. Fill in dtm as verification.
+Let's purposely fail the forward operation of the second sub-transaction and watch what happens
-
+``` go
+app.POST(qsBusiAPI+"/TransIn", func(c *gin.Context) {
+ log.Printf("TransIn")
+ // c.JSON(200, "")
+ c.JSON(409, "") // Status 409 for Failure. Won't be retried
+})
+```
-### Give a star! ⭐
+The timing diagram for the intended failure is as follows:
-If you think this project is good, or helpful to you, please give a star!
+
+
+## More examples
-### Who is using
-
-
-
-
-
-
+
#### Rollback upon failure
If any forward operation fails, DTM invokes the corresponding compensating operation of each sub-transaction to roll back, after which the transaction is successfully rolled back.
@@ -105,7 +105,7 @@ app.POST(qsBusiAPI+"/TransIn", func(c *gin.Context) {
The timing diagram for the intended failure is as follows:
-
+
## More examples
diff --git a/helper/README-en.md b/helper/README-en.md
index 0d3f29c..25c45d1 100644
--- a/helper/README-en.md
+++ b/helper/README-en.md
@@ -88,7 +88,7 @@ When the above code runs, we can see in the console that services TransOut, Tran
#### Timing diagram
A timing diagram for a successfully completed SAGA transaction would be as follows:
-
+
#### Rollback upon failure
If any forward operation fails, DTM invokes the corresponding compensating operation of each sub-transaction to roll back, after which the transaction is successfully rolled back.
@@ -105,7 +105,7 @@ app.POST(qsBusiAPI+"/TransIn", func(c *gin.Context) {
The timing diagram for the intended failure is as follows:
-
+
## More examples
From e674f195cc9c741091e6374726931f80a7f32ddb Mon Sep 17 00:00:00 2001
From: yedf2 <120050102@qq.com>
Date: Wed, 23 Feb 2022 17:50:22 +0800
Subject: [PATCH 13/17] update image height
---
README.md | 2 +-
helper/README-en.md | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
diff --git a/README.md b/README.md
index 25c45d1..58f9f28 100644
--- a/README.md
+++ b/README.md
@@ -105,7 +105,7 @@ app.POST(qsBusiAPI+"/TransIn", func(c *gin.Context) {
The timing diagram for the intended failure is as follows:
-
+
## More examples
diff --git a/helper/README-en.md b/helper/README-en.md
index 25c45d1..58f9f28 100644
--- a/helper/README-en.md
+++ b/helper/README-en.md
@@ -105,7 +105,7 @@ app.POST(qsBusiAPI+"/TransIn", func(c *gin.Context) {
The timing diagram for the intended failure is as follows:
-
+
## More examples
From 8cd01342e04f972d0c06a9ce6394ec92926aa0ea Mon Sep 17 00:00:00 2001
From: xyctruth <398041993@qq.com>
Date: Wed, 23 Feb 2022 18:52:28 +0800
Subject: [PATCH 14/17] add needDelay func, fix name
---
dtmcli/msg.go | 4 ++--
dtmgrpc/msg.go | 6 +++---
dtmsvr/cron.go | 2 +-
dtmsvr/trans_class.go | 12 +-----------
dtmsvr/trans_status.go | 4 ++++
dtmsvr/trans_type_msg.go | 2 +-
test/msg_delay_test.go | 11 ++++++++---
test/types.go | 7 +++++++
8 files changed, 27 insertions(+), 21 deletions(-)
diff --git a/dtmcli/msg.go b/dtmcli/msg.go
index 069824a..0bc6dc6 100644
--- a/dtmcli/msg.go
+++ b/dtmcli/msg.go
@@ -31,8 +31,8 @@ func (s *Msg) Add(action string, postData interface{}) *Msg {
return s
}
-// EnableDelay delay call branch, unit second
-func (s *Msg) EnableDelay(delay uint64) *Msg {
+// SetDelay delay call branch, unit second
+func (s *Msg) SetDelay(delay uint64) *Msg {
s.delay = delay
return s
}
diff --git a/dtmgrpc/msg.go b/dtmgrpc/msg.go
index 9188b54..be5b304 100644
--- a/dtmgrpc/msg.go
+++ b/dtmgrpc/msg.go
@@ -33,9 +33,9 @@ func (s *MsgGrpc) Add(action string, msg proto.Message) *MsgGrpc {
return s
}
-// EnableDelay delay call branch, unit second
-func (s *MsgGrpc) EnableDelay(delay uint64) *MsgGrpc {
- s.Msg.EnableDelay(delay)
+// SetDelay delay call branch, unit second
+func (s *MsgGrpc) SetDelay(delay uint64) *MsgGrpc {
+ s.Msg.SetDelay(delay)
return s
}
diff --git a/dtmsvr/cron.go b/dtmsvr/cron.go
index c1b0692..a768fa1 100644
--- a/dtmsvr/cron.go
+++ b/dtmsvr/cron.go
@@ -55,7 +55,7 @@ func lockOneTrans(expireIn time.Duration) *TransGlobal {
return nil
}
logger.Infof("cron job return a trans: %s", global.String())
- return &TransGlobal{TransGlobalStore: *global, triggerType: triggerCron}
+ return &TransGlobal{TransGlobalStore: *global}
}
func handlePanic(perr *error) {
diff --git a/dtmsvr/trans_class.go b/dtmsvr/trans_class.go
index 47bf492..b6e080b 100644
--- a/dtmsvr/trans_class.go
+++ b/dtmsvr/trans_class.go
@@ -19,20 +19,11 @@ import (
"github.com/gin-gonic/gin"
)
-// triggerType trigger transaction type
-type triggerType int
-
-const (
- triggerManual triggerType = iota //triggerManual manual trigger
- triggerCron //triggerCron cron trigger
-)
-
// TransGlobal global transaction
type TransGlobal struct {
storage.TransGlobalStore
lastTouched time.Time // record the start time of process
updateBranchSync bool
- triggerType triggerType
}
// TransBranch branch transaction
@@ -113,8 +104,7 @@ func TransFromDtmRequest(ctx context.Context, c *dtmgpb.DtmRequest) *TransGlobal
PassthroughHeaders: o.PassthroughHeaders,
BranchHeaders: o.BranchHeaders,
},
- },
- triggerType: triggerManual}
+ }}
if c.Steps != "" {
dtmimp.MustUnmarshalString(c.Steps, &r.Steps)
}
diff --git a/dtmsvr/trans_status.go b/dtmsvr/trans_status.go
index 6546e06..8d56af6 100644
--- a/dtmsvr/trans_status.go
+++ b/dtmsvr/trans_status.go
@@ -79,6 +79,10 @@ func (t *TransGlobal) isTimeout() bool {
return time.Since(*t.CreateTime)+NowForwardDuration >= time.Duration(timeout)*time.Second
}
+func (t *TransGlobal) needDelay(delay uint64) bool {
+ return time.Since(*t.CreateTime)+CronForwardDuration < time.Duration(delay)*time.Second
+}
+
func (t *TransGlobal) needProcess() bool {
return t.Status == dtmcli.StatusSubmitted || t.Status == dtmcli.StatusAborting || t.Status == dtmcli.StatusPrepared && t.isTimeout()
}
diff --git a/dtmsvr/trans_type_msg.go b/dtmsvr/trans_type_msg.go
index 3a10196..731318b 100644
--- a/dtmsvr/trans_type_msg.go
+++ b/dtmsvr/trans_type_msg.go
@@ -71,7 +71,7 @@ func (t *transMsgProcessor) ProcessOnce(branches []TransBranch) error {
dtmimp.MustUnmarshalString(t.CustomData, &cmc)
}
- if cmc.Delay > 0 && t.triggerType == triggerManual {
+ if cmc.Delay > 0 && t.needDelay(cmc.Delay) {
t.delayCronTime(cmc.Delay)
return nil
}
diff --git a/test/msg_delay_test.go b/test/msg_delay_test.go
index bd60e28..9b9d030 100644
--- a/test/msg_delay_test.go
+++ b/test/msg_delay_test.go
@@ -5,6 +5,7 @@ import (
"github.com/dtm-labs/dtm/dtmcli"
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
+ "github.com/dtm-labs/dtm/dtmsvr"
"github.com/dtm-labs/dtm/dtmutil"
"github.com/dtm-labs/dtm/test/busi"
"github.com/stretchr/testify/assert"
@@ -14,7 +15,7 @@ func genMsgDelay(gid string) *dtmcli.Msg {
req := busi.GenTransReq(30, false, false)
msg := dtmcli.NewMsg(dtmutil.DefaultHTTPServer, gid).
Add(busi.Busi+"/TransOut", &req).
- Add(busi.Busi+"/TransIn", &req).EnableDelay(10)
+ Add(busi.Busi+"/TransIn", &req).SetDelay(10)
msg.QueryPrepared = busi.Busi + "/QueryPrepared"
return msg
}
@@ -22,8 +23,12 @@ func genMsgDelay(gid string) *dtmcli.Msg {
func TestMsgDelayNormal(t *testing.T) {
gid := dtmimp.GetFuncName()
msg := genMsgDelay(gid)
- msg.Submit()
- waitTransProcessed(msg.Gid)
+ submitForwardCron(0, func() {
+ msg.Submit()
+ waitTransProcessed(msg.Gid)
+ })
+
+ dtmsvr.NowForwardDuration = 0
assert.Equal(t, []string{StatusPrepared, StatusPrepared}, getBranchesStatus(msg.Gid))
assert.Equal(t, StatusSubmitted, getTransStatus(msg.Gid))
cronTransOnceForwardCron(t, "", 0)
diff --git a/test/types.go b/test/types.go
index 2d1d545..0f3ed43 100644
--- a/test/types.go
+++ b/test/types.go
@@ -68,6 +68,13 @@ func cronTransOnceForwardCron(t *testing.T, gid string, seconds int) {
dtmsvr.CronForwardDuration = old
}
+func submitForwardCron(seconds int, fn func()) {
+ old := dtmsvr.CronForwardDuration
+ dtmsvr.CronForwardDuration = time.Duration(seconds) * time.Second
+ fn()
+ dtmsvr.CronForwardDuration = old
+}
+
const (
// StatusPrepared status for global/branch trans status.
StatusPrepared = dtmcli.StatusPrepared
From 50c19a5d43614ad8a40c009512af96f89ab168e5 Mon Sep 17 00:00:00 2001
From: xyctruth <398041993@qq.com>
Date: Wed, 23 Feb 2022 19:10:29 +0800
Subject: [PATCH 15/17] merge touchCronTime delayCronTime func
---
dtmsvr/trans_status.go | 26 ++++++++++++++------------
dtmsvr/trans_type_msg.go | 6 +++---
2 files changed, 17 insertions(+), 15 deletions(-)
diff --git a/dtmsvr/trans_status.go b/dtmsvr/trans_status.go
index 8d56af6..8b0a70a 100644
--- a/dtmsvr/trans_status.go
+++ b/dtmsvr/trans_status.go
@@ -22,18 +22,20 @@ import (
"google.golang.org/grpc/metadata"
)
-func (t *TransGlobal) touchCronTime(ctype cronType) {
+// touchCronTime Based on ctype or delay set nextCronTime
+// delay = 0 ,use ctype set nextCronTime and nextCronInterval
+// delay > 0 ,use delay set nextCronTime ,use ctype set nextCronInterval
+func (t *TransGlobal) touchCronTime(ctype cronType, delay uint64) {
t.lastTouched = time.Now()
nextCronInterval := t.getNextCronInterval(ctype)
- nextCronTime := dtmutil.GetNextTime(nextCronInterval)
- GetStore().TouchCronTime(&t.TransGlobalStore, nextCronInterval, nextCronTime)
- logger.Infof("TouchCronTime for: %s", t.TransGlobalStore.String())
-}
-func (t *TransGlobal) delayCronTime(delay uint64) {
- t.lastTouched = time.Now()
- nextCronInterval := t.getNextCronInterval(cronKeep)
- nextCronTime := dtmutil.GetNextTime(int64(delay))
+ var nextCronTime *time.Time
+ if delay > 0 {
+ nextCronTime = dtmutil.GetNextTime(int64(delay))
+ } else {
+ nextCronTime = dtmutil.GetNextTime(nextCronInterval)
+ }
+
GetStore().TouchCronTime(&t.TransGlobalStore, nextCronInterval, nextCronTime)
logger.Infof("TouchCronTime for: %s", t.TransGlobalStore.String())
}
@@ -147,11 +149,11 @@ func (t *TransGlobal) execBranch(branch *TransBranch, branchPos int) error {
// if time pass 1500ms and NextCronInterval is not default, then reset NextCronInterval
if err == nil && time.Since(t.lastTouched)+NowForwardDuration >= 1500*time.Millisecond ||
t.NextCronInterval > conf.RetryInterval && t.NextCronInterval > t.RetryInterval {
- t.touchCronTime(cronReset)
+ t.touchCronTime(cronReset, 0)
} else if err == dtmimp.ErrOngoing {
- t.touchCronTime(cronKeep)
+ t.touchCronTime(cronKeep, 0)
} else if err != nil {
- t.touchCronTime(cronBackoff)
+ t.touchCronTime(cronBackoff, 0)
}
return err
}
diff --git a/dtmsvr/trans_type_msg.go b/dtmsvr/trans_type_msg.go
index 731318b..6cb4679 100644
--- a/dtmsvr/trans_type_msg.go
+++ b/dtmsvr/trans_type_msg.go
@@ -53,10 +53,10 @@ func (t *TransGlobal) mayQueryPrepared() {
} else if errors.Is(err, dtmcli.ErrFailure) {
t.changeStatus(dtmcli.StatusFailed)
} else if errors.Is(err, dtmcli.ErrOngoing) {
- t.touchCronTime(cronReset)
+ t.touchCronTime(cronReset, 0)
} else {
logger.Errorf("getting result failed for %s. error: %v", t.QueryPrepared, err)
- t.touchCronTime(cronBackoff)
+ t.touchCronTime(cronBackoff, 0)
}
}
@@ -72,7 +72,7 @@ func (t *transMsgProcessor) ProcessOnce(branches []TransBranch) error {
}
if cmc.Delay > 0 && t.needDelay(cmc.Delay) {
- t.delayCronTime(cmc.Delay)
+ t.touchCronTime(cronKeep, cmc.Delay)
return nil
}
From ad5dbd906ffc4e8a78190d48edf38ba488be46a1 Mon Sep 17 00:00:00 2001
From: yedf2 <120050102@qq.com>
Date: Wed, 23 Feb 2022 20:06:23 +0800
Subject: [PATCH 16/17] storeFac use single sqlFac
---
dtmsvr/storage/registry/registry.go | 18 ++++++++----------
1 file changed, 8 insertions(+), 10 deletions(-)
diff --git a/dtmsvr/storage/registry/registry.go b/dtmsvr/storage/registry/registry.go
index 003fd69..e699bd3 100644
--- a/dtmsvr/storage/registry/registry.go
+++ b/dtmsvr/storage/registry/registry.go
@@ -18,6 +18,12 @@ type StorageFactory interface {
GetStorage() storage.Store
}
+var sqlFac = &SingletonFactory{
+ creatorFunction: func() storage.Store {
+ return &sql.Store{}
+ },
+}
+
var storeFactorys = map[string]StorageFactory{
"boltdb": &SingletonFactory{
creatorFunction: func() storage.Store {
@@ -29,16 +35,8 @@ var storeFactorys = map[string]StorageFactory{
return &redis.Store{}
},
},
- "mysql": &SingletonFactory{
- creatorFunction: func() storage.Store {
- return &sql.Store{}
- },
- },
- "postgres": &SingletonFactory{
- creatorFunction: func() storage.Store {
- return &sql.Store{}
- },
- },
+ "mysql": sqlFac,
+ "postgres": sqlFac,
}
// GetStore returns storage.Store
From ec72eed9545836bc0b584ccd54ff4f4bd5570da1 Mon Sep 17 00:00:00 2001
From: yedf2 <120050102@qq.com>
Date: Thu, 24 Feb 2022 10:45:11 +0800
Subject: [PATCH 17/17] fix lint
---
conf.sample.yml | 2 +-
dtmsvr/api_json_rpc_http.go | 58 ++++++++++++++++++-------------------
dtmsvr/config/config.go | 2 +-
dtmsvr/svr.go | 10 +++----
dtmsvr/trans_class.go | 3 +-
5 files changed, 38 insertions(+), 37 deletions(-)
diff --git a/conf.sample.yml b/conf.sample.yml
index d9052a5..1fd557e 100644
--- a/conf.sample.yml
+++ b/conf.sample.yml
@@ -56,7 +56,7 @@
# HttpPort: 36789
# GrpcPort: 36790
-# JsonRpcHttp: 36791
+# JSONRPC: 36791
### advanced options
# UpdateBranchAsyncGoroutineNum: 1 # num of async goroutine to update branch status
diff --git a/dtmsvr/api_json_rpc_http.go b/dtmsvr/api_json_rpc_http.go
index 4f174e4..edfc426 100644
--- a/dtmsvr/api_json_rpc_http.go
+++ b/dtmsvr/api_json_rpc_http.go
@@ -3,96 +3,96 @@ package dtmsvr
import (
"encoding/json"
"fmt"
+ "net/http"
+
"github.com/dtm-labs/dtm/dtmcli"
"github.com/dtm-labs/dtm/dtmcli/logger"
"github.com/gin-gonic/gin"
- "net/http"
)
-type jsonRpcHttpReq struct {
+type jsonRPCReq struct {
Method string `json:"method"`
Jsonrpc string `json:"jsonrpc"`
Params interface{} `json:"params"`
- Id string `json:"id"`
+ ID string `json:"id"`
}
-func addJsonRpcHttpRouter(engine *gin.Engine) {
+func addJSONRPCRouter(engine *gin.Engine) {
engine.POST("/", dispatcher)
}
func dispatcher(c *gin.Context) {
- req := new(jsonRpcHttpReq)
+ req := new(jsonRPCReq)
err := c.BindJSON(req)
logger.Infof("request:%s\n", req)
if err != nil {
- c.JSON(http.StatusOK, gin.H{"id": req.Id, "result": nil, "error": map[string]interface{}{"code": -32700, "message": "Parse error"}})
+ c.JSON(http.StatusOK, gin.H{"id": req.ID, "result": nil, "error": map[string]interface{}{"code": -32700, "message": "Parse error"}})
return
}
if req.Method == "dtmserver.NewGid" {
- res, err := jsonRpcHttpNewGid()
- c.JSON(http.StatusOK, gin.H{"id": req.Id, "result": res, "error": err})
+ res := jsonRPCNewGid()
+ c.JSON(http.StatusOK, gin.H{"id": req.ID, "result": res, "error": err})
return
}
if req.Method == "dtmserver.Prepare" {
- res := jsonRpcHttpPrepare(req.Params)
- c.JSON(http.StatusOK, gin.H{"id": req.Id, "result": res, "error": nil})
+ res := jsonRPCPrepare(req.Params)
+ c.JSON(http.StatusOK, gin.H{"id": req.ID, "result": res, "error": nil})
return
}
if req.Method == "dtmserver.Submit" {
- res := jsonRpcHttpSubmit(req.Params)
- c.JSON(http.StatusOK, gin.H{"id": req.Id, "result": res, "error": nil})
+ res := jsonRPCSubmit(req.Params)
+ c.JSON(http.StatusOK, gin.H{"id": req.ID, "result": res, "error": nil})
return
}
if req.Method == "dtmserver.Abort" {
- res := jsonRpcHttpAbort(req.Params)
- c.JSON(http.StatusOK, gin.H{"id": req.Id, "result": res, "error": nil})
+ res := jsonRPCAbort(req.Params)
+ c.JSON(http.StatusOK, gin.H{"id": req.ID, "result": res, "error": nil})
return
}
if req.Method == "dtmserver.RegisterBranch" {
- res := jsonRpcHttpRegisterBranch(req.Params)
- c.JSON(http.StatusOK, gin.H{"id": req.Id, "result": res, "error": nil})
+ res := jsonRPCRegisterBranch(req.Params)
+ c.JSON(http.StatusOK, gin.H{"id": req.ID, "result": res, "error": nil})
return
}
- c.JSON(http.StatusOK, gin.H{"id": req.Id, "result": nil, "error": map[string]interface{}{"code": -32601, "message": "Method not found"}})
- return
+ c.JSON(http.StatusOK, gin.H{"id": req.ID, "result": nil, "error": map[string]interface{}{"code": -32601, "message": "Method not found"}})
}
-func jsonRpcHttpNewGid() (interface{}, error) {
- return map[string]interface{}{"gid": GenGid(), "dtm_result": dtmcli.ResultSuccess}, nil
+func jsonRPCNewGid() interface{} {
+ return map[string]interface{}{"gid": GenGid(), "dtm_result": dtmcli.ResultSuccess}
}
-func jsonRpcHttpPrepare(params interface{}) interface{} {
- res := svcPrepare(TransFromJsonRpcHttpContext(params))
+func jsonRPCPrepare(params interface{}) interface{} {
+ res := svcPrepare(TransFromJSONRPCContext(params))
if res == nil {
return map[string]string{"dtm_result": "SUCCESS"}
}
return map[string]string{"dtm_result": "FAILURE", "message": fmt.Sprintf("%v", res)}
}
-func jsonRpcHttpSubmit(params interface{}) interface{} {
- res := svcSubmit(TransFromJsonRpcHttpContext(params))
+func jsonRPCSubmit(params interface{}) interface{} {
+ res := svcSubmit(TransFromJSONRPCContext(params))
if res == nil {
return map[string]string{"dtm_result": "SUCCESS"}
}
return map[string]string{"dtm_result": "FAILURE", "message": fmt.Sprintf("%v", res)}
}
-func jsonRpcHttpAbort(params interface{}) interface{} {
- res := svcAbort(TransFromJsonRpcHttpContext(params))
+func jsonRPCAbort(params interface{}) interface{} {
+ res := svcAbort(TransFromJSONRPCContext(params))
if res == nil {
return map[string]string{"dtm_result": "SUCCESS"}
}
return map[string]string{"dtm_result": "FAILURE", "message": fmt.Sprintf("%v", res)}
}
-func jsonRpcHttpRegisterBranch(params interface{}) interface{} {
+func jsonRPCRegisterBranch(params interface{}) interface{} {
data := map[string]string{}
- paramsJson, _ := json.Marshal(params)
- err := json.Unmarshal(paramsJson, &data)
+ paramsJSON, _ := json.Marshal(params)
+ err := json.Unmarshal(paramsJSON, &data)
if err != nil {
return map[string]string{"dtm_result": "FAILURE", "message": err.Error()}
}
diff --git a/dtmsvr/config/config.go b/dtmsvr/config/config.go
index 4f9ce1c..7f69588 100644
--- a/dtmsvr/config/config.go
+++ b/dtmsvr/config/config.go
@@ -76,7 +76,7 @@ type configType struct {
RequestTimeout int64 `yaml:"RequestTimeout" default:"3"`
HTTPPort int64 `yaml:"HttpPort" default:"36789"`
GrpcPort int64 `yaml:"GrpcPort" default:"36790"`
- JsonRpcHttpPort int64 `yaml:"JsonRpcHttpPort" default:"36791"`
+ JSONRPCPort int64 `yaml:"JSONRPCPort" default:"36791"`
MicroService MicroService `yaml:"MicroService"`
UpdateBranchSync int64 `yaml:"UpdateBranchSync"`
UpdateBranchAsyncGoroutineNum int64 `yaml:"UpdateBranchAsyncGoroutineNum" default:"1"`
diff --git a/dtmsvr/svr.go b/dtmsvr/svr.go
index 7989986..62b06d6 100644
--- a/dtmsvr/svr.go
+++ b/dtmsvr/svr.go
@@ -68,12 +68,12 @@ func StartSvr() {
logger.FatalIfError(err)
// start json-rpc server
- jsonRpcHttpApp := dtmutil.GetGinApp()
- jsonRpcHttpApp = httpMetrics(jsonRpcHttpApp)
- addJsonRpcHttpRouter(jsonRpcHttpApp)
- logger.Infof("dtmsvr listen at: %d", conf.JsonRpcHttpPort)
+ jsonRPCApp := dtmutil.GetGinApp()
+ jsonRPCApp = httpMetrics(jsonRPCApp)
+ addJSONRPCRouter(jsonRPCApp)
+ logger.Infof("dtmsvr listen at: %d", conf.JSONRPCPort)
go func() {
- err := jsonRpcHttpApp.Run(fmt.Sprintf(":%d", conf.JsonRpcHttpPort))
+ err := jsonRPCApp.Run(fmt.Sprintf(":%d", conf.JSONRPCPort))
if err != nil {
logger.Errorf("start server err: %v", err)
}
diff --git a/dtmsvr/trans_class.go b/dtmsvr/trans_class.go
index 8b039b9..48538cc 100644
--- a/dtmsvr/trans_class.go
+++ b/dtmsvr/trans_class.go
@@ -85,7 +85,8 @@ func TransFromContext(c *gin.Context) *TransGlobal {
return &m
}
-func TransFromJsonRpcHttpContext(params interface{}) *TransGlobal {
+// TransFromJSONRPCContext 1
+func TransFromJSONRPCContext(params interface{}) *TransGlobal {
jsonStr, _ := json.Marshal(params)
m := TransGlobal{}
err := json.Unmarshal(jsonStr, &m)