From 468d88a84758d77284501b87116016fff46adf49 Mon Sep 17 00:00:00 2001 From: Goxiaoy Date: Sat, 4 Feb 2023 21:04:16 +0800 Subject: [PATCH] add extra execute context function for compatibility --- client/workflow/workflow.go | 23 +++++++++++++++++++---- test/busi/base_http.go | 2 +- test/dtmsvr_test.go | 2 +- test/workflow_grpc_test.go | 9 ++++----- test/workflow_http_ret_test.go | 5 ++--- test/workflow_http_test.go | 15 +++++++-------- test/workflow_ongoing_test.go | 7 +++---- test/workflow_xa_test.go | 5 ++--- 8 files changed, 39 insertions(+), 29 deletions(-) diff --git a/client/workflow/workflow.go b/client/workflow/workflow.go index 700ba40..02b38a2 100644 --- a/client/workflow/workflow.go +++ b/client/workflow/workflow.go @@ -53,21 +53,36 @@ func Register2(name string, handler WfFunc2, custom ...func(wf *Workflow)) error return defaultFac.register(name, handler, custom...) } -// Execute will execute a workflow with the gid and specified params +// Execute is the same as ExecuteCtx, but with context.Background +func Execute(name string, gid string, data []byte) error { + return ExecuteCtx(context.Background(), name, gid, data) +} + +// ExecuteCtx will execute a workflow with the gid and specified params // if the workflow with the gid does not exist, then create a new workflow and execute it // if the workflow with the gid exists, resume to execute it -func Execute(ctx context.Context, name string, gid string, data []byte) error { +func ExecuteCtx(ctx context.Context, name string, gid string, data []byte) error { _, err := defaultFac.execute(ctx, name, gid, data) return err } // Execute2 is the same as Execute, but workflow func can return result -func Execute2(ctx context.Context, name string, gid string, data []byte) ([]byte, error) { +func Execute2(name string, gid string, data []byte) ([]byte, error) { + return Execute2Ctx(context.Background(), name, gid, data) +} + +// Execute2Ctx is the same as Execute2, but with context.Background +func Execute2Ctx(ctx context.Context, name string, gid string, data []byte) ([]byte, error) { return defaultFac.execute(ctx, name, gid, data) } // ExecuteByQS is like Execute, but name and gid will be obtained from qs -func ExecuteByQS(ctx context.Context, qs url.Values, body []byte) error { +func ExecuteByQS(qs url.Values, body []byte) error { + return ExecuteByQSCtx(context.Background(), qs, body) +} + +// ExecuteByQSCtx is the same as ExecuteByQS, but with context.Background +func ExecuteByQSCtx(ctx context.Context, qs url.Values, body []byte) error { name := qs.Get("op") gid := qs.Get("gid") _, err := defaultFac.execute(ctx, name, gid, body) diff --git a/test/busi/base_http.go b/test/busi/base_http.go index 2da780e..44bbabc 100644 --- a/test/busi/base_http.go +++ b/test/busi/base_http.go @@ -88,7 +88,7 @@ func BaseAddRoute(app *gin.Engine) { app.POST(BusiAPI+"/workflow/resume", dtmutil.WrapHandler(func(ctx *gin.Context) interface{} { data, err := ioutil.ReadAll(ctx.Request.Body) logger.FatalIfError(err) - return workflow.ExecuteByQS(ctx, ctx.Request.URL.Query(), data) + return workflow.ExecuteByQS(ctx.Request.URL.Query(), data) })) app.POST(BusiAPI+"/TransIn", dtmutil.WrapHandler(func(c *gin.Context) interface{} { return handleGeneralBusiness(c, MainSwitch.TransInResult.Fetch(), reqFrom(c).TransInResult, "transIn") diff --git a/test/dtmsvr_test.go b/test/dtmsvr_test.go index a482234..497a449 100644 --- a/test/dtmsvr_test.go +++ b/test/dtmsvr_test.go @@ -70,7 +70,7 @@ func TestUpdateBranchAsync(t *testing.T) { return err }) assert.Nil(t, err) - err = workflow.Execute(context.Background(), gid, gid, nil) + err = workflow.Execute(gid, gid, nil) assert.Nil(t, err) time.Sleep(dtmsvr.UpdateBranchAsyncInterval) diff --git a/test/workflow_grpc_test.go b/test/workflow_grpc_test.go index cf90eb6..1385cd2 100644 --- a/test/workflow_grpc_test.go +++ b/test/workflow_grpc_test.go @@ -7,7 +7,6 @@ package test import ( - "context" "database/sql" "testing" @@ -34,7 +33,7 @@ func TestWorkflowGrpcSimple(t *testing.T) { _, err = busi.BusiCli.TransInBSaga(wf.NewBranchCtx(), &req) return err }) - err := workflow.Execute(context.Background(), gid, gid, dtmgimp.MustProtoMarshal(req)) + err := workflow.Execute(gid, gid, dtmgimp.MustProtoMarshal(req)) assert.Error(t, err) assert.Equal(t, StatusFailed, getTransStatus(gid)) } @@ -62,7 +61,7 @@ func TestWorkflowGrpcRollback(t *testing.T) { return err }) before := getBeforeBalances("mysql") - err := workflow.Execute(context.Background(), gid, gid, dtmgimp.MustProtoMarshal(req)) + err := workflow.Execute(gid, gid, dtmgimp.MustProtoMarshal(req)) assert.Error(t, err, dtmcli.ErrFailure) assert.Equal(t, StatusFailed, getTransStatus(gid)) assertSameBalance(t, before, "mysql") @@ -107,7 +106,7 @@ func TestWorkflowMixed(t *testing.T) { assert.Nil(t, err) before := getBeforeBalances("mysql") req := &busi.ReqGrpc{Amount: 30} - err = workflow.Execute(context.Background(), gid, gid, dtmgimp.MustProtoMarshal(req)) + err = workflow.Execute(gid, gid, dtmgimp.MustProtoMarshal(req)) assert.Nil(t, err) assert.Equal(t, StatusSucceed, getTransStatus(gid)) assertNotSameBalance(t, before, "mysql") @@ -128,7 +127,7 @@ func TestWorkflowGrpcError(t *testing.T) { _, err = busi.BusiCli.TransIn(wf.NewBranchCtx(), &req) return err }) - err := workflow.Execute(context.Background(), gid, gid, dtmgimp.MustProtoMarshal(req)) + err := workflow.Execute(gid, gid, dtmgimp.MustProtoMarshal(req)) assert.Error(t, err) cronTransOnceForwardCron(t, gid, 1000) assert.Equal(t, StatusSucceed, getTransStatus(gid)) diff --git a/test/workflow_http_ret_test.go b/test/workflow_http_ret_test.go index a0747b8..434653a 100644 --- a/test/workflow_http_ret_test.go +++ b/test/workflow_http_ret_test.go @@ -1,7 +1,6 @@ package test import ( - "context" "testing" "github.com/dtm-labs/dtm/client/dtmcli/dtmimp" @@ -22,13 +21,13 @@ func TestWorkflowRet(t *testing.T) { return []byte("result of workflow"), err }) - ret, err := workflow.Execute2(context.Background(), gid, gid, dtmimp.MustMarshal(req)) + ret, err := workflow.Execute2(gid, gid, dtmimp.MustMarshal(req)) assert.Nil(t, err) assert.Equal(t, "result of workflow", string(ret)) assert.Equal(t, StatusSucceed, getTransStatus(gid)) // the second execute will return result directly - ret, err = workflow.Execute2(context.Background(), gid, gid, dtmimp.MustMarshal(req)) + ret, err = workflow.Execute2(gid, gid, dtmimp.MustMarshal(req)) assert.Nil(t, err) assert.Equal(t, "result of workflow", string(ret)) assert.Equal(t, StatusSucceed, getTransStatus(gid)) diff --git a/test/workflow_http_test.go b/test/workflow_http_test.go index 8b4cb26..991a315 100644 --- a/test/workflow_http_test.go +++ b/test/workflow_http_test.go @@ -7,7 +7,6 @@ package test import ( - "context" "database/sql" "testing" @@ -42,7 +41,7 @@ func TestWorkflowNormal(t *testing.T) { return nil }) - err := workflow.Execute(context.Background(), gid, gid, dtmimp.MustMarshal(req)) + err := workflow.Execute(gid, gid, dtmimp.MustMarshal(req)) assert.Nil(t, err) assert.Equal(t, StatusSucceed, getTransStatus(gid)) } @@ -83,7 +82,7 @@ func TestWorkflowRollback(t *testing.T) { }) before := getBeforeBalances("mysql") - err := workflow.Execute(context.Background(), gid, gid, dtmimp.MustMarshal(req)) + err := workflow.Execute(gid, gid, dtmimp.MustMarshal(req)) assert.Error(t, err, dtmcli.ErrFailure) assert.Equal(t, StatusFailed, getTransStatus(gid)) assertSameBalance(t, before, "mysql") @@ -121,7 +120,7 @@ func TestWorkflowTcc(t *testing.T) { }) before := getBeforeBalances("mysql") - err := workflow.Execute(context.Background(), gid, gid, dtmimp.MustMarshal(req)) + err := workflow.Execute(gid, gid, dtmimp.MustMarshal(req)) assert.Nil(t, err) assert.Equal(t, StatusSucceed, getTransStatus(gid)) assertNotSameBalance(t, before, "mysql") @@ -159,7 +158,7 @@ func TestWorkflowTccRollback(t *testing.T) { }) before := getBeforeBalances("mysql") - err := workflow.Execute(context.Background(), gid, gid, dtmimp.MustMarshal(req)) + err := workflow.Execute(gid, gid, dtmimp.MustMarshal(req)) assert.Error(t, err) assert.Equal(t, StatusFailed, getTransStatus(gid)) assertSameBalance(t, before, "mysql") @@ -178,7 +177,7 @@ func TestWorkflowError(t *testing.T) { return err }) - err := workflow.Execute(context.Background(), gid, gid, dtmimp.MustMarshal(req)) + err := workflow.Execute(gid, gid, dtmimp.MustMarshal(req)) assert.Error(t, err) cronTransOnceForwardCron(t, gid, 1000) assert.Equal(t, StatusSucceed, getTransStatus(gid)) @@ -197,7 +196,7 @@ func TestWorkflowOngoing(t *testing.T) { return err }) - err := workflow.Execute(context.Background(), gid, gid, dtmimp.MustMarshal(req)) + err := workflow.Execute(gid, gid, dtmimp.MustMarshal(req)) assert.Error(t, err) cronTransOnceForwardCron(t, gid, 1000) assert.Equal(t, StatusSucceed, getTransStatus(gid)) @@ -225,7 +224,7 @@ func TestWorkflowResumeSkip(t *testing.T) { return err }) - err := workflow.Execute(context.Background(), gid, gid, dtmimp.MustMarshal(req)) + err := workflow.Execute(gid, gid, dtmimp.MustMarshal(req)) assert.Error(t, err) cronTransOnceForwardCron(t, gid, 1000) assert.Equal(t, StatusSucceed, getTransStatus(gid)) diff --git a/test/workflow_ongoing_test.go b/test/workflow_ongoing_test.go index e4454bd..4d6dba4 100644 --- a/test/workflow_ongoing_test.go +++ b/test/workflow_ongoing_test.go @@ -7,7 +7,6 @@ package test import ( - "context" "database/sql" "testing" @@ -48,7 +47,7 @@ func TestWorkflowSimpleResume(t *testing.T) { return err }) - err := workflow.Execute(context.Background(), gid, gid, dtmimp.MustMarshal(req)) + err := workflow.Execute(gid, gid, dtmimp.MustMarshal(req)) assert.Error(t, err) cronTransOnceForwardNow(t, gid, 1000) assert.Equal(t, StatusSucceed, getTransStatus(gid)) @@ -95,7 +94,7 @@ func TestWorkflowGrpcRollbackResume(t *testing.T) { }) before := getBeforeBalances("mysql") req := &busi.ReqGrpc{Amount: 30, TransInResult: "FAILURE"} - err := workflow.Execute(context.Background(), gid, gid, dtmgimp.MustProtoMarshal(req)) + err := workflow.Execute(gid, gid, dtmgimp.MustProtoMarshal(req)) assert.Error(t, err, dtmcli.ErrOngoing) assert.Equal(t, StatusPrepared, getTransStatus(gid)) cronTransOnceForwardNow(t, gid, 1000) @@ -141,7 +140,7 @@ func TestWorkflowXaResume(t *testing.T) { return err }) before := getBeforeBalances("mysql") - err := workflow.Execute(context.Background(), gid, gid, nil) + err := workflow.Execute(gid, gid, nil) assert.Equal(t, dtmcli.ErrOngoing, err) cronTransOnceForwardNow(t, gid, 1000) diff --git a/test/workflow_xa_test.go b/test/workflow_xa_test.go index 0881062..dcf55b1 100644 --- a/test/workflow_xa_test.go +++ b/test/workflow_xa_test.go @@ -7,7 +7,6 @@ package test import ( - "context" "database/sql" "testing" @@ -35,7 +34,7 @@ func TestWorkflowXaAction(t *testing.T) { return err }) before := getBeforeBalances("mysql") - err := workflow.Execute(context.Background(), gid, gid, nil) + err := workflow.Execute(gid, gid, nil) assert.Nil(t, err) assert.Equal(t, StatusSucceed, getTransStatus(gid)) assertNotSameBalance(t, before, "mysql") @@ -59,7 +58,7 @@ func TestWorkflowXaRollback(t *testing.T) { return err }) before := getBeforeBalances("mysql") - err := workflow.Execute(context.Background(), gid, gid, nil) + err := workflow.Execute(gid, gid, nil) assert.Equal(t, dtmcli.ErrFailure, err) assert.Equal(t, StatusFailed, getTransStatus(gid)) assertSameBalance(t, before, "mysql")