Browse Source

add error retry limit

pull/330/head
yedf2 4 years ago
parent
commit
12f0346525
  1. 4
      dtmsvr/svr.go
  2. 29
      test/workflow_grpc_test.go

4
dtmsvr/svr.go

@ -106,7 +106,7 @@ func updateBranchAsync() {
select {
case updateBranch := <-updateBranchAsyncChan:
k := updateBranch.gid + updateBranch.branchID + "-" + updateBranch.op
if !exists[k] {
if !exists[k] { // postgres does not allow
exists[k] = true
updates = append(updates, TransBranch{
Gid: updateBranch.gid,
@ -119,7 +119,7 @@ func updateBranchAsync() {
case <-time.After(checkInterval):
}
}
for len(updates) > 0 {
for i := 0; i < 3 && len(updates) > 0; i++ {
rowAffected, err := GetStore().UpdateBranches(updates, []string{"status", "finish_time", "update_time"})
if err != nil {

29
test/workflow_grpc_test.go

@ -68,43 +68,46 @@ func TestWorkflowGrpcRollback(t *testing.T) {
func TestWorkflowMixed(t *testing.T) {
workflow.SetProtocolForTest(dtmimp.ProtocolHTTP)
req := &busi.ReqGrpc{Amount: 30}
gid := dtmimp.GetFuncName()
workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error {
err := workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error {
var req busi.ReqGrpc
dtmgimp.MustProtoUnmarshal(data, &req)
wf.NewBranch().OnRollback(func(bb *dtmcli.BranchBarrier) error {
_, err := wf.NewBranch().OnRollback(func(bb *dtmcli.BranchBarrier) error {
_, err := busi.BusiCli.TransOutRevertBSaga(wf.Context, &req)
return err
}).Do(func(bb *dtmcli.BranchBarrier) ([]byte, error) {
return nil, bb.CallWithDB(dbGet().ToSQLDB(), func(tx *sql.Tx) error {
return busi.SagaAdjustBalance(tx, busi.TransOutUID, int(-req.Amount), "")
})
})
_, err := busi.BusiCli.TransOutBSaga(wf.Context, &req)
if err != nil {
return err
}
req2 := &busi.ReqHTTP{Amount: int(req.Amount / 2)}
_, err = wf.NewBranch().OnCommit(func(bb *dtmcli.BranchBarrier) error {
_, err := busi.BusiCli.TransInConfirm(wf.Context, &req)
_, err := wf.NewRequest().SetBody(req2).Post(Busi + "/TccBTransInConfirm")
return err
}).OnRollback(func(bb *dtmcli.BranchBarrier) error {
req2 := &busi.ReqHTTP{Amount: 30}
_, err := wf.NewRequest().SetBody(req2).Post(Busi + "/TransInRevert")
_, err := wf.NewRequest().SetBody(req2).Post(Busi + "/TccBTransInCancel")
return err
}).Do(func(bb *dtmcli.BranchBarrier) ([]byte, error) {
err := busi.SagaAdjustBalance(dbGet().ToSQLDB(), busi.TransInUID, int(req.Amount), "")
return nil, err
})
}).NewRequest().SetBody(req2).Post(Busi + "/TccBTransInTry")
if err != nil {
return err
}
_, err = wf.NewBranch().DoXa(busi.BusiConf, func(db *sql.DB) ([]byte, error) {
return nil, busi.SagaAdjustBalance(db, busi.TransInUID, 0, dtmcli.ResultSuccess)
return nil, busi.SagaAdjustBalance(db, busi.TransInUID, int(req.Amount/2), dtmcli.ResultSuccess)
})
return err
})
err := workflow.Execute(gid, gid, dtmgimp.MustProtoMarshal(req))
assert.Nil(t, err)
before := getBeforeBalances("mysql")
req := &busi.ReqGrpc{Amount: 30}
err = workflow.Execute(gid, gid, dtmgimp.MustProtoMarshal(req))
assert.Nil(t, err)
assert.Equal(t, StatusSucceed, getTransStatus(gid))
assertNotSameBalance(t, before, "mysql")
}
func TestWorkflowGrpcError(t *testing.T) {

Loading…
Cancel
Save