diff --git a/dtmsvr/api.go b/dtmsvr/api.go index 7d120e0..6a057fe 100644 --- a/dtmsvr/api.go +++ b/dtmsvr/api.go @@ -8,6 +8,7 @@ package dtmsvr import ( "fmt" + "time" "github.com/dtm-labs/dtm/client/dtmcli" "github.com/dtm-labs/dtm/client/dtmcli/dtmimp" @@ -96,6 +97,17 @@ func svcRegisterBranch(transType string, branch *TransBranch, data map[string]st branches[1].Op = dtmimp.OpCommit branches[1].URL = data["url"] } else if transType == "workflow" { + if data["sync"] == "" && conf.UpdateBranchSync == 0 { + now := time.Now() + updateBranchAsyncChan <- branchStatus{ + gid: branch.Gid, + branchID: branch.BranchID, + op: data["op"], + status: data["status"], + finishTime: &now, + } + return nil + } branches = []TransBranch{*branch} branches[0].Status = data["status"] branches[0].Op = data["op"] diff --git a/dtmsvr/storage/sql/sql.go b/dtmsvr/storage/sql/sql.go index 6df0c76..d4f8f84 100644 --- a/dtmsvr/storage/sql/sql.go +++ b/dtmsvr/storage/sql/sql.go @@ -77,7 +77,7 @@ func (s *Store) FindBranches(gid string) []storage.TransBranchStore { // UpdateBranches update branches info func (s *Store) UpdateBranches(branches []storage.TransBranchStore, updates []string) (int, error) { db := dbGet().Clauses(clause.OnConflict{ - OnConstraint: "trans_branch_op_pkey", + OnConstraint: "gid_branch_uniq", DoUpdates: clause.AssignmentColumns(updates), }).Create(branches) return int(db.RowsAffected), db.Error diff --git a/dtmsvr/svr.go b/dtmsvr/svr.go index afba7e6..4f6f5bf 100644 --- a/dtmsvr/svr.go +++ b/dtmsvr/svr.go @@ -105,7 +105,6 @@ func updateBranchAsync() { select { case updateBranch := <-updateBranchAsyncChan: updates = append(updates, TransBranch{ - ModelBase: dtmutil.ModelBase{ID: updateBranch.id}, Gid: updateBranch.gid, BranchID: updateBranch.branchID, Op: updateBranch.op, diff --git a/dtmsvr/trans_status.go b/dtmsvr/trans_status.go index 64154b7..f0285db 100644 --- a/dtmsvr/trans_status.go +++ b/dtmsvr/trans_status.go @@ -81,7 +81,7 @@ func (t *TransGlobal) changeBranchStatus(b *TransBranch, status string, branchPo logger.Infof("LockGlobalSaveBranches ok: gid: %s old status: %s branches: %s", b.Gid, dtmcli.StatusPrepared, b.String()) } else { // for better performance, batch the updates of branch status - updateBranchAsyncChan <- branchStatus{id: b.ID, gid: t.Gid, branchID: b.BranchID, op: b.Op, status: status, finishTime: &now} + updateBranchAsyncChan <- branchStatus{gid: t.Gid, branchID: b.BranchID, op: b.Op, status: status, finishTime: &now} } } diff --git a/dtmsvr/utils.go b/dtmsvr/utils.go index 887522a..6ab5ffe 100644 --- a/dtmsvr/utils.go +++ b/dtmsvr/utils.go @@ -18,7 +18,6 @@ import ( ) type branchStatus struct { - id uint64 gid string branchID string op string diff --git a/test/dtmsvr_test.go b/test/dtmsvr_test.go index f27b609..1d2312f 100644 --- a/test/dtmsvr_test.go +++ b/test/dtmsvr_test.go @@ -11,6 +11,7 @@ import ( "time" "github.com/dtm-labs/dtm/client/dtmcli/dtmimp" + "github.com/dtm-labs/dtm/client/workflow" "github.com/dtm-labs/dtm/dtmsvr" "github.com/dtm-labs/dtm/dtmsvr/config" "github.com/dtm-labs/dtm/dtmutil" @@ -39,8 +40,11 @@ func getBranchesStatus(gid string) []string { return status } +func isSqlStore() bool { + return conf.Store.Driver == config.Mysql || conf.Store.Driver == config.Postgres +} func TestUpdateBranchAsync(t *testing.T) { - if conf.Store.Driver != config.Mysql { + if !isSqlStore() { return } conf.UpdateBranchSync = 0 @@ -49,8 +53,31 @@ func TestUpdateBranchAsync(t *testing.T) { err := saga.Submit() assert.Nil(t, err) waitTransProcessed(saga.Gid) + + gid := dtmimp.GetFuncName() + "-wf" + workflow.SetProtocolForTest(dtmimp.ProtocolHTTP) + err = workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error { + _, err := busi.BusiCli.TransOut(wf.NewBranchCtx(), &busi.ReqGrpc{}) + // add additional data directly + dtmimp.TransRegisterBranch(wf.TransBase, map[string]string{ + "branch_id": "01", + "op": "action", + "status": "succeed", + }, "registerBranch") + return err + }) + assert.Nil(t, err) + err = workflow.Execute(gid, gid, nil) + assert.Nil(t, err) + waitTransProcessed(gid) + time.Sleep(dtmsvr.UpdateBranchAsyncInterval) + assert.Equal(t, []string{StatusPrepared, StatusSucceed}, getBranchesStatus(saga.Gid)) assert.Equal(t, StatusSucceed, getTransStatus(saga.Gid)) + + assert.Equal(t, []string{StatusSucceed}, getBranchesStatus(gid)) + assert.Equal(t, StatusSucceed, getTransStatus(gid)) + conf.UpdateBranchSync = 1 }