|
|
|
@ -88,6 +88,82 @@ func TestWorkflowRollback(t *testing.T) { |
|
|
|
assertSameBalance(t, before, "mysql") |
|
|
|
} |
|
|
|
|
|
|
|
func TestWorkflowTcc(t *testing.T) { |
|
|
|
workflow.SetProtocolForTest(dtmimp.ProtocolHTTP) |
|
|
|
req := busi.GenReqHTTP(30, false, false) |
|
|
|
gid := dtmimp.GetFuncName() |
|
|
|
|
|
|
|
workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error { |
|
|
|
var req busi.ReqHTTP |
|
|
|
dtmimp.MustUnmarshal(data, &req) |
|
|
|
_, err := wf.NewBranch().OnRollback(func(bb *dtmcli.BranchBarrier) error { |
|
|
|
_, err := wf.NewRequest().SetBody(req).Post(Busi + "/TccBTransOutCancel") |
|
|
|
return err |
|
|
|
}).OnCommit(func(bb *dtmcli.BranchBarrier) error { |
|
|
|
_, err := wf.NewRequest().SetBody(req).Post(Busi + "/TccBTransOutConfirm") |
|
|
|
return err |
|
|
|
}).NewRequest().SetBody(req).Post(Busi + "/TccBTransOutTry") |
|
|
|
if err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
_, err = wf.NewBranch().OnRollback(func(bb *dtmcli.BranchBarrier) error { |
|
|
|
_, err := wf.NewRequest().SetBody(req).Post(Busi + "/TccBTransInCancel") |
|
|
|
return err |
|
|
|
}).OnCommit(func(bb *dtmcli.BranchBarrier) error { |
|
|
|
_, err := wf.NewRequest().SetBody(req).Post(Busi + "/TccBTransInConfirm") |
|
|
|
return err |
|
|
|
}).NewRequest().SetBody(req).Post(Busi + "/TccBTransInTry") |
|
|
|
if err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
return nil |
|
|
|
}) |
|
|
|
|
|
|
|
before := getBeforeBalances("mysql") |
|
|
|
err := workflow.Execute(gid, gid, dtmimp.MustMarshal(req)) |
|
|
|
assert.Nil(t, err) |
|
|
|
assert.Equal(t, StatusSucceed, getTransStatus(gid)) |
|
|
|
assertNotSameBalance(t, before, "mysql") |
|
|
|
} |
|
|
|
|
|
|
|
func TestWorkflowTccRollback(t *testing.T) { |
|
|
|
workflow.SetProtocolForTest(dtmimp.ProtocolHTTP) |
|
|
|
req := busi.GenReqHTTP(30, false, true) |
|
|
|
gid := dtmimp.GetFuncName() |
|
|
|
|
|
|
|
workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error { |
|
|
|
var req busi.ReqHTTP |
|
|
|
dtmimp.MustUnmarshal(data, &req) |
|
|
|
_, err := wf.NewBranch().OnRollback(func(bb *dtmcli.BranchBarrier) error { |
|
|
|
_, err := wf.NewRequest().SetBody(req).Post(Busi + "/TccBTransOutCancel") |
|
|
|
return err |
|
|
|
}).OnCommit(func(bb *dtmcli.BranchBarrier) error { |
|
|
|
_, err := wf.NewRequest().SetBody(req).Post(Busi + "/TccBTransOutConfirm") |
|
|
|
return err |
|
|
|
}).NewRequest().SetBody(req).Post(Busi + "/TccBTransOutTry") |
|
|
|
if err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
_, err = wf.NewBranch().OnRollback(func(bb *dtmcli.BranchBarrier) error { |
|
|
|
_, err := wf.NewRequest().SetBody(req).Post(Busi + "/TccBTransInCancel") |
|
|
|
return err |
|
|
|
}).OnCommit(func(bb *dtmcli.BranchBarrier) error { |
|
|
|
_, err := wf.NewRequest().SetBody(req).Post(Busi + "/TccBTransInConfirm") |
|
|
|
return err |
|
|
|
}).NewRequest().SetBody(req).Post(Busi + "/TccBTransInTry") |
|
|
|
if err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
return nil |
|
|
|
}) |
|
|
|
|
|
|
|
before := getBeforeBalances("mysql") |
|
|
|
err := workflow.Execute(gid, gid, dtmimp.MustMarshal(req)) |
|
|
|
assert.Error(t, err) |
|
|
|
assert.Equal(t, StatusFailed, getTransStatus(gid)) |
|
|
|
assertSameBalance(t, before, "mysql") |
|
|
|
} |
|
|
|
|
|
|
|
func TestWorkflowError(t *testing.T) { |
|
|
|
workflow.SetProtocolForTest(dtmimp.ProtocolHTTP) |
|
|
|
req := busi.GenReqHTTP(30, false, false) |
|
|
|
|