Browse Source

add extra execute context function for compatibility

pull/394/head
Goxiaoy 3 years ago
parent
commit
468d88a847
  1. 23
      client/workflow/workflow.go
  2. 2
      test/busi/base_http.go
  3. 2
      test/dtmsvr_test.go
  4. 9
      test/workflow_grpc_test.go
  5. 5
      test/workflow_http_ret_test.go
  6. 15
      test/workflow_http_test.go
  7. 7
      test/workflow_ongoing_test.go
  8. 5
      test/workflow_xa_test.go

23
client/workflow/workflow.go

@ -53,21 +53,36 @@ func Register2(name string, handler WfFunc2, custom ...func(wf *Workflow)) error
return defaultFac.register(name, handler, custom...)
}
// Execute will execute a workflow with the gid and specified params
// Execute is the same as ExecuteCtx, but with context.Background
func Execute(name string, gid string, data []byte) error {
return ExecuteCtx(context.Background(), name, gid, data)
}
// ExecuteCtx will execute a workflow with the gid and specified params
// if the workflow with the gid does not exist, then create a new workflow and execute it
// if the workflow with the gid exists, resume to execute it
func Execute(ctx context.Context, name string, gid string, data []byte) error {
func ExecuteCtx(ctx context.Context, name string, gid string, data []byte) error {
_, err := defaultFac.execute(ctx, name, gid, data)
return err
}
// Execute2 is the same as Execute, but workflow func can return result
func Execute2(ctx context.Context, name string, gid string, data []byte) ([]byte, error) {
func Execute2(name string, gid string, data []byte) ([]byte, error) {
return Execute2Ctx(context.Background(), name, gid, data)
}
// Execute2Ctx is the same as Execute2, but with context.Background
func Execute2Ctx(ctx context.Context, name string, gid string, data []byte) ([]byte, error) {
return defaultFac.execute(ctx, name, gid, data)
}
// ExecuteByQS is like Execute, but name and gid will be obtained from qs
func ExecuteByQS(ctx context.Context, qs url.Values, body []byte) error {
func ExecuteByQS(qs url.Values, body []byte) error {
return ExecuteByQSCtx(context.Background(), qs, body)
}
// ExecuteByQSCtx is the same as ExecuteByQS, but with context.Background
func ExecuteByQSCtx(ctx context.Context, qs url.Values, body []byte) error {
name := qs.Get("op")
gid := qs.Get("gid")
_, err := defaultFac.execute(ctx, name, gid, body)

2
test/busi/base_http.go

@ -88,7 +88,7 @@ func BaseAddRoute(app *gin.Engine) {
app.POST(BusiAPI+"/workflow/resume", dtmutil.WrapHandler(func(ctx *gin.Context) interface{} {
data, err := ioutil.ReadAll(ctx.Request.Body)
logger.FatalIfError(err)
return workflow.ExecuteByQS(ctx, ctx.Request.URL.Query(), data)
return workflow.ExecuteByQS(ctx.Request.URL.Query(), data)
}))
app.POST(BusiAPI+"/TransIn", dtmutil.WrapHandler(func(c *gin.Context) interface{} {
return handleGeneralBusiness(c, MainSwitch.TransInResult.Fetch(), reqFrom(c).TransInResult, "transIn")

2
test/dtmsvr_test.go

@ -70,7 +70,7 @@ func TestUpdateBranchAsync(t *testing.T) {
return err
})
assert.Nil(t, err)
err = workflow.Execute(context.Background(), gid, gid, nil)
err = workflow.Execute(gid, gid, nil)
assert.Nil(t, err)
time.Sleep(dtmsvr.UpdateBranchAsyncInterval)

9
test/workflow_grpc_test.go

@ -7,7 +7,6 @@
package test
import (
"context"
"database/sql"
"testing"
@ -34,7 +33,7 @@ func TestWorkflowGrpcSimple(t *testing.T) {
_, err = busi.BusiCli.TransInBSaga(wf.NewBranchCtx(), &req)
return err
})
err := workflow.Execute(context.Background(), gid, gid, dtmgimp.MustProtoMarshal(req))
err := workflow.Execute(gid, gid, dtmgimp.MustProtoMarshal(req))
assert.Error(t, err)
assert.Equal(t, StatusFailed, getTransStatus(gid))
}
@ -62,7 +61,7 @@ func TestWorkflowGrpcRollback(t *testing.T) {
return err
})
before := getBeforeBalances("mysql")
err := workflow.Execute(context.Background(), gid, gid, dtmgimp.MustProtoMarshal(req))
err := workflow.Execute(gid, gid, dtmgimp.MustProtoMarshal(req))
assert.Error(t, err, dtmcli.ErrFailure)
assert.Equal(t, StatusFailed, getTransStatus(gid))
assertSameBalance(t, before, "mysql")
@ -107,7 +106,7 @@ func TestWorkflowMixed(t *testing.T) {
assert.Nil(t, err)
before := getBeforeBalances("mysql")
req := &busi.ReqGrpc{Amount: 30}
err = workflow.Execute(context.Background(), gid, gid, dtmgimp.MustProtoMarshal(req))
err = workflow.Execute(gid, gid, dtmgimp.MustProtoMarshal(req))
assert.Nil(t, err)
assert.Equal(t, StatusSucceed, getTransStatus(gid))
assertNotSameBalance(t, before, "mysql")
@ -128,7 +127,7 @@ func TestWorkflowGrpcError(t *testing.T) {
_, err = busi.BusiCli.TransIn(wf.NewBranchCtx(), &req)
return err
})
err := workflow.Execute(context.Background(), gid, gid, dtmgimp.MustProtoMarshal(req))
err := workflow.Execute(gid, gid, dtmgimp.MustProtoMarshal(req))
assert.Error(t, err)
cronTransOnceForwardCron(t, gid, 1000)
assert.Equal(t, StatusSucceed, getTransStatus(gid))

5
test/workflow_http_ret_test.go

@ -1,7 +1,6 @@
package test
import (
"context"
"testing"
"github.com/dtm-labs/dtm/client/dtmcli/dtmimp"
@ -22,13 +21,13 @@ func TestWorkflowRet(t *testing.T) {
return []byte("result of workflow"), err
})
ret, err := workflow.Execute2(context.Background(), gid, gid, dtmimp.MustMarshal(req))
ret, err := workflow.Execute2(gid, gid, dtmimp.MustMarshal(req))
assert.Nil(t, err)
assert.Equal(t, "result of workflow", string(ret))
assert.Equal(t, StatusSucceed, getTransStatus(gid))
// the second execute will return result directly
ret, err = workflow.Execute2(context.Background(), gid, gid, dtmimp.MustMarshal(req))
ret, err = workflow.Execute2(gid, gid, dtmimp.MustMarshal(req))
assert.Nil(t, err)
assert.Equal(t, "result of workflow", string(ret))
assert.Equal(t, StatusSucceed, getTransStatus(gid))

15
test/workflow_http_test.go

@ -7,7 +7,6 @@
package test
import (
"context"
"database/sql"
"testing"
@ -42,7 +41,7 @@ func TestWorkflowNormal(t *testing.T) {
return nil
})
err := workflow.Execute(context.Background(), gid, gid, dtmimp.MustMarshal(req))
err := workflow.Execute(gid, gid, dtmimp.MustMarshal(req))
assert.Nil(t, err)
assert.Equal(t, StatusSucceed, getTransStatus(gid))
}
@ -83,7 +82,7 @@ func TestWorkflowRollback(t *testing.T) {
})
before := getBeforeBalances("mysql")
err := workflow.Execute(context.Background(), gid, gid, dtmimp.MustMarshal(req))
err := workflow.Execute(gid, gid, dtmimp.MustMarshal(req))
assert.Error(t, err, dtmcli.ErrFailure)
assert.Equal(t, StatusFailed, getTransStatus(gid))
assertSameBalance(t, before, "mysql")
@ -121,7 +120,7 @@ func TestWorkflowTcc(t *testing.T) {
})
before := getBeforeBalances("mysql")
err := workflow.Execute(context.Background(), gid, gid, dtmimp.MustMarshal(req))
err := workflow.Execute(gid, gid, dtmimp.MustMarshal(req))
assert.Nil(t, err)
assert.Equal(t, StatusSucceed, getTransStatus(gid))
assertNotSameBalance(t, before, "mysql")
@ -159,7 +158,7 @@ func TestWorkflowTccRollback(t *testing.T) {
})
before := getBeforeBalances("mysql")
err := workflow.Execute(context.Background(), gid, gid, dtmimp.MustMarshal(req))
err := workflow.Execute(gid, gid, dtmimp.MustMarshal(req))
assert.Error(t, err)
assert.Equal(t, StatusFailed, getTransStatus(gid))
assertSameBalance(t, before, "mysql")
@ -178,7 +177,7 @@ func TestWorkflowError(t *testing.T) {
return err
})
err := workflow.Execute(context.Background(), gid, gid, dtmimp.MustMarshal(req))
err := workflow.Execute(gid, gid, dtmimp.MustMarshal(req))
assert.Error(t, err)
cronTransOnceForwardCron(t, gid, 1000)
assert.Equal(t, StatusSucceed, getTransStatus(gid))
@ -197,7 +196,7 @@ func TestWorkflowOngoing(t *testing.T) {
return err
})
err := workflow.Execute(context.Background(), gid, gid, dtmimp.MustMarshal(req))
err := workflow.Execute(gid, gid, dtmimp.MustMarshal(req))
assert.Error(t, err)
cronTransOnceForwardCron(t, gid, 1000)
assert.Equal(t, StatusSucceed, getTransStatus(gid))
@ -225,7 +224,7 @@ func TestWorkflowResumeSkip(t *testing.T) {
return err
})
err := workflow.Execute(context.Background(), gid, gid, dtmimp.MustMarshal(req))
err := workflow.Execute(gid, gid, dtmimp.MustMarshal(req))
assert.Error(t, err)
cronTransOnceForwardCron(t, gid, 1000)
assert.Equal(t, StatusSucceed, getTransStatus(gid))

7
test/workflow_ongoing_test.go

@ -7,7 +7,6 @@
package test
import (
"context"
"database/sql"
"testing"
@ -48,7 +47,7 @@ func TestWorkflowSimpleResume(t *testing.T) {
return err
})
err := workflow.Execute(context.Background(), gid, gid, dtmimp.MustMarshal(req))
err := workflow.Execute(gid, gid, dtmimp.MustMarshal(req))
assert.Error(t, err)
cronTransOnceForwardNow(t, gid, 1000)
assert.Equal(t, StatusSucceed, getTransStatus(gid))
@ -95,7 +94,7 @@ func TestWorkflowGrpcRollbackResume(t *testing.T) {
})
before := getBeforeBalances("mysql")
req := &busi.ReqGrpc{Amount: 30, TransInResult: "FAILURE"}
err := workflow.Execute(context.Background(), gid, gid, dtmgimp.MustProtoMarshal(req))
err := workflow.Execute(gid, gid, dtmgimp.MustProtoMarshal(req))
assert.Error(t, err, dtmcli.ErrOngoing)
assert.Equal(t, StatusPrepared, getTransStatus(gid))
cronTransOnceForwardNow(t, gid, 1000)
@ -141,7 +140,7 @@ func TestWorkflowXaResume(t *testing.T) {
return err
})
before := getBeforeBalances("mysql")
err := workflow.Execute(context.Background(), gid, gid, nil)
err := workflow.Execute(gid, gid, nil)
assert.Equal(t, dtmcli.ErrOngoing, err)
cronTransOnceForwardNow(t, gid, 1000)

5
test/workflow_xa_test.go

@ -7,7 +7,6 @@
package test
import (
"context"
"database/sql"
"testing"
@ -35,7 +34,7 @@ func TestWorkflowXaAction(t *testing.T) {
return err
})
before := getBeforeBalances("mysql")
err := workflow.Execute(context.Background(), gid, gid, nil)
err := workflow.Execute(gid, gid, nil)
assert.Nil(t, err)
assert.Equal(t, StatusSucceed, getTransStatus(gid))
assertNotSameBalance(t, before, "mysql")
@ -59,7 +58,7 @@ func TestWorkflowXaRollback(t *testing.T) {
return err
})
before := getBeforeBalances("mysql")
err := workflow.Execute(context.Background(), gid, gid, nil)
err := workflow.Execute(gid, gid, nil)
assert.Equal(t, dtmcli.ErrFailure, err)
assert.Equal(t, StatusFailed, getTransStatus(gid))
assertSameBalance(t, before, "mysql")

Loading…
Cancel
Save