diff --git a/dtmsvr/svr.go b/dtmsvr/svr.go index f139b30..288bec2 100644 --- a/dtmsvr/svr.go +++ b/dtmsvr/svr.go @@ -106,7 +106,7 @@ func updateBranchAsync() { select { case updateBranch := <-updateBranchAsyncChan: k := updateBranch.gid + updateBranch.branchID + "-" + updateBranch.op - if !exists[k] { + if !exists[k] { // postgres does not allow exists[k] = true updates = append(updates, TransBranch{ Gid: updateBranch.gid, @@ -119,7 +119,7 @@ func updateBranchAsync() { case <-time.After(checkInterval): } } - for len(updates) > 0 { + for i := 0; i < 3 && len(updates) > 0; i++ { rowAffected, err := GetStore().UpdateBranches(updates, []string{"status", "finish_time", "update_time"}) if err != nil { diff --git a/test/workflow_grpc_test.go b/test/workflow_grpc_test.go index 175535c..a75f844 100644 --- a/test/workflow_grpc_test.go +++ b/test/workflow_grpc_test.go @@ -68,43 +68,46 @@ func TestWorkflowGrpcRollback(t *testing.T) { func TestWorkflowMixed(t *testing.T) { workflow.SetProtocolForTest(dtmimp.ProtocolHTTP) - req := &busi.ReqGrpc{Amount: 30} gid := dtmimp.GetFuncName() - workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error { + err := workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error { var req busi.ReqGrpc dtmgimp.MustProtoUnmarshal(data, &req) - wf.NewBranch().OnRollback(func(bb *dtmcli.BranchBarrier) error { + _, err := wf.NewBranch().OnRollback(func(bb *dtmcli.BranchBarrier) error { _, err := busi.BusiCli.TransOutRevertBSaga(wf.Context, &req) return err + }).Do(func(bb *dtmcli.BranchBarrier) ([]byte, error) { + return nil, bb.CallWithDB(dbGet().ToSQLDB(), func(tx *sql.Tx) error { + return busi.SagaAdjustBalance(tx, busi.TransOutUID, int(-req.Amount), "") + }) }) - _, err := busi.BusiCli.TransOutBSaga(wf.Context, &req) if err != nil { return err } + req2 := &busi.ReqHTTP{Amount: int(req.Amount / 2)} _, err = wf.NewBranch().OnCommit(func(bb *dtmcli.BranchBarrier) error { - _, err := busi.BusiCli.TransInConfirm(wf.Context, &req) + _, err := wf.NewRequest().SetBody(req2).Post(Busi + "/TccBTransInConfirm") return err }).OnRollback(func(bb *dtmcli.BranchBarrier) error { - req2 := &busi.ReqHTTP{Amount: 30} - _, err := wf.NewRequest().SetBody(req2).Post(Busi + "/TransInRevert") + _, err := wf.NewRequest().SetBody(req2).Post(Busi + "/TccBTransInCancel") return err - }).Do(func(bb *dtmcli.BranchBarrier) ([]byte, error) { - err := busi.SagaAdjustBalance(dbGet().ToSQLDB(), busi.TransInUID, int(req.Amount), "") - return nil, err - }) + }).NewRequest().SetBody(req2).Post(Busi + "/TccBTransInTry") if err != nil { return err } _, err = wf.NewBranch().DoXa(busi.BusiConf, func(db *sql.DB) ([]byte, error) { - return nil, busi.SagaAdjustBalance(db, busi.TransInUID, 0, dtmcli.ResultSuccess) + return nil, busi.SagaAdjustBalance(db, busi.TransInUID, int(req.Amount/2), dtmcli.ResultSuccess) }) return err }) - err := workflow.Execute(gid, gid, dtmgimp.MustProtoMarshal(req)) + assert.Nil(t, err) + before := getBeforeBalances("mysql") + req := &busi.ReqGrpc{Amount: 30} + err = workflow.Execute(gid, gid, dtmgimp.MustProtoMarshal(req)) assert.Nil(t, err) assert.Equal(t, StatusSucceed, getTransStatus(gid)) + assertNotSameBalance(t, before, "mysql") } func TestWorkflowGrpcError(t *testing.T) {