Browse Source

cover more

pull/328/head
yedf2 4 years ago
parent
commit
a108bb99de
  1. 4
      dtmgrpc/workflow/imp.go
  2. 24
      dtmgrpc/workflow/utils.go
  3. 2
      test/workflow_grpc_test.go
  4. 29
      test/workflow_http_test.go
  5. 2
      test/workflow_ongoing_test.go

4
dtmgrpc/workflow/imp.go

@ -153,9 +153,7 @@ func (wf *Workflow) callPhase2(branchID string, fn WfPhase2Func) error {
wf.currentBranch = branchID
r := wf.recordedDo(func(bb *dtmcli.BranchBarrier) *stepResult {
err := fn(bb)
if errors.Is(err, dtmcli.ErrFailure) {
panic("should not return ErrFail in phase2")
}
dtmimp.PanicIf(errors.Is(err, dtmcli.ErrFailure), errors.New("should not return ErrFail in phase2"))
return wf.stepResultFromLocal(nil, err)
})
_, err := wf.stepResultToLocal(r)

24
dtmgrpc/workflow/utils.go

@ -14,13 +14,6 @@ import (
"google.golang.org/protobuf/reflect/protoreflect"
)
func statusToCode(status string) int {
if status == "succeed" {
return 200
}
return 409
}
func wfErrorToStatus(err error) string {
if err == nil {
return dtmcli.StatusSucceed
@ -102,15 +95,12 @@ func (wf *Workflow) stepResultToLocal(sr *stepResult) ([]byte, error) {
}
func (wf *Workflow) stepResultFromGrpc(reply interface{}, err error) *stepResult {
sr := &stepResult{Error: err}
if err == nil {
sr.Error = wf.Options.GRPCError2DtmError(err)
sr.Status = wfErrorToStatus(sr.Error)
if sr.Error == nil {
sr.Data = dtmgimp.MustProtoMarshal(reply.(protoreflect.ProtoMessage))
} else if sr.Status == dtmcli.StatusFailed {
sr.Data = []byte(sr.Error.Error())
}
sr := &stepResult{Error: wf.Options.GRPCError2DtmError(err)}
sr.Status = wfErrorToStatus(sr.Error)
if sr.Error == nil {
sr.Data = dtmgimp.MustProtoMarshal(reply.(protoreflect.ProtoMessage))
} else if sr.Status == dtmcli.StatusFailed {
sr.Data = []byte(sr.Error.Error())
}
return sr
}
@ -135,5 +125,5 @@ func (wf *Workflow) stepResultToHTTP(s *stepResult) (*http.Response, error) {
if s.Error != nil {
return nil, s.Error
}
return newJSONResponse(statusToCode(s.Status), s.Data), nil
return newJSONResponse(200, s.Data), nil
}

2
test/workflow_grpc_test.go

@ -40,7 +40,7 @@ func TestWorkflowGrpcSimple(t *testing.T) {
func TestWorkflowGrpcNormal(t *testing.T) {
workflow.SetProtocolForTest(dtmimp.ProtocolGRPC)
req := &busi.BusiReq{Amount: 30, TransInResult: "FAILURE"}
req := &busi.ReqGrpc{Amount: 30, TransInResult: "FAILURE"}
gid := dtmimp.GetFuncName()
workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error {
var req busi.BusiReq

29
test/workflow_http_test.go

@ -89,14 +89,27 @@ func TestWorkflowError(t *testing.T) {
var req busi.ReqHTTP
dtmimp.MustUnmarshal(data, &req)
_, err := wf.NewBranch().NewRequest().SetBody(req).Post(Busi + "/TransOut")
if err != nil {
return err
}
_, err = wf.NewBranch().NewRequest().SetBody(req).Post(Busi + "/TransIn")
if err != nil {
return err
}
return nil
return err
})
err := workflow.Execute(gid, gid, dtmimp.MustMarshal(req))
assert.Error(t, err)
go waitTransProcessed(gid)
cronTransOnceForwardCron(t, gid, 1000)
assert.Equal(t, StatusSucceed, getTransStatus(gid))
}
func TestWorkflowOngoing(t *testing.T) {
workflow.SetProtocolForTest(dtmimp.ProtocolHTTP)
req := busi.GenReqHTTP(30, false, false)
gid := dtmimp.GetFuncName()
busi.MainSwitch.TransOutResult.SetOnce("ONGOING")
workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error {
var req busi.ReqHTTP
dtmimp.MustUnmarshal(data, &req)
_, err := wf.NewBranch().NewRequest().SetBody(req).Post(Busi + "/TransOut")
return err
})
err := workflow.Execute(gid, gid, dtmimp.MustMarshal(req))

2
test/workflow_ongoing_test.go

@ -93,7 +93,7 @@ func TestWorkflowGrpcRollbackResume(t *testing.T) {
}, func(wf *workflow.Workflow) {
wf.Options.CompensateErrorBranch = true
})
req := &busi.BusiReq{Amount: 30, TransInResult: "FAILURE"}
req := &busi.ReqGrpc{Amount: 30, TransInResult: "FAILURE"}
err := workflow.Execute(gid, gid, dtmgimp.MustProtoMarshal(req))
assert.Error(t, err, dtmcli.ErrOngoing)
assert.Equal(t, StatusPrepared, getTransStatus(gid))

Loading…
Cancel
Save