Browse Source

batch save workflow branch ok

pull/330/head
yedf2 4 years ago
parent
commit
2d70367692
  1. 12
      dtmsvr/api.go
  2. 2
      dtmsvr/storage/sql/sql.go
  3. 1
      dtmsvr/svr.go
  4. 2
      dtmsvr/trans_status.go
  5. 1
      dtmsvr/utils.go
  6. 29
      test/dtmsvr_test.go

12
dtmsvr/api.go

@ -8,6 +8,7 @@ package dtmsvr
import ( import (
"fmt" "fmt"
"time"
"github.com/dtm-labs/dtm/client/dtmcli" "github.com/dtm-labs/dtm/client/dtmcli"
"github.com/dtm-labs/dtm/client/dtmcli/dtmimp" "github.com/dtm-labs/dtm/client/dtmcli/dtmimp"
@ -96,6 +97,17 @@ func svcRegisterBranch(transType string, branch *TransBranch, data map[string]st
branches[1].Op = dtmimp.OpCommit branches[1].Op = dtmimp.OpCommit
branches[1].URL = data["url"] branches[1].URL = data["url"]
} else if transType == "workflow" { } else if transType == "workflow" {
if data["sync"] == "" && conf.UpdateBranchSync == 0 {
now := time.Now()
updateBranchAsyncChan <- branchStatus{
gid: branch.Gid,
branchID: branch.BranchID,
op: data["op"],
status: data["status"],
finishTime: &now,
}
return nil
}
branches = []TransBranch{*branch} branches = []TransBranch{*branch}
branches[0].Status = data["status"] branches[0].Status = data["status"]
branches[0].Op = data["op"] branches[0].Op = data["op"]

2
dtmsvr/storage/sql/sql.go

@ -77,7 +77,7 @@ func (s *Store) FindBranches(gid string) []storage.TransBranchStore {
// UpdateBranches update branches info // UpdateBranches update branches info
func (s *Store) UpdateBranches(branches []storage.TransBranchStore, updates []string) (int, error) { func (s *Store) UpdateBranches(branches []storage.TransBranchStore, updates []string) (int, error) {
db := dbGet().Clauses(clause.OnConflict{ db := dbGet().Clauses(clause.OnConflict{
OnConstraint: "trans_branch_op_pkey", OnConstraint: "gid_branch_uniq",
DoUpdates: clause.AssignmentColumns(updates), DoUpdates: clause.AssignmentColumns(updates),
}).Create(branches) }).Create(branches)
return int(db.RowsAffected), db.Error return int(db.RowsAffected), db.Error

1
dtmsvr/svr.go

@ -105,7 +105,6 @@ func updateBranchAsync() {
select { select {
case updateBranch := <-updateBranchAsyncChan: case updateBranch := <-updateBranchAsyncChan:
updates = append(updates, TransBranch{ updates = append(updates, TransBranch{
ModelBase: dtmutil.ModelBase{ID: updateBranch.id},
Gid: updateBranch.gid, Gid: updateBranch.gid,
BranchID: updateBranch.branchID, BranchID: updateBranch.branchID,
Op: updateBranch.op, Op: updateBranch.op,

2
dtmsvr/trans_status.go

@ -81,7 +81,7 @@ func (t *TransGlobal) changeBranchStatus(b *TransBranch, status string, branchPo
logger.Infof("LockGlobalSaveBranches ok: gid: %s old status: %s branches: %s", logger.Infof("LockGlobalSaveBranches ok: gid: %s old status: %s branches: %s",
b.Gid, dtmcli.StatusPrepared, b.String()) b.Gid, dtmcli.StatusPrepared, b.String())
} else { // for better performance, batch the updates of branch status } else { // for better performance, batch the updates of branch status
updateBranchAsyncChan <- branchStatus{id: b.ID, gid: t.Gid, branchID: b.BranchID, op: b.Op, status: status, finishTime: &now} updateBranchAsyncChan <- branchStatus{gid: t.Gid, branchID: b.BranchID, op: b.Op, status: status, finishTime: &now}
} }
} }

1
dtmsvr/utils.go

@ -18,7 +18,6 @@ import (
) )
type branchStatus struct { type branchStatus struct {
id uint64
gid string gid string
branchID string branchID string
op string op string

29
test/dtmsvr_test.go

@ -11,6 +11,7 @@ import (
"time" "time"
"github.com/dtm-labs/dtm/client/dtmcli/dtmimp" "github.com/dtm-labs/dtm/client/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/client/workflow"
"github.com/dtm-labs/dtm/dtmsvr" "github.com/dtm-labs/dtm/dtmsvr"
"github.com/dtm-labs/dtm/dtmsvr/config" "github.com/dtm-labs/dtm/dtmsvr/config"
"github.com/dtm-labs/dtm/dtmutil" "github.com/dtm-labs/dtm/dtmutil"
@ -39,8 +40,11 @@ func getBranchesStatus(gid string) []string {
return status return status
} }
func isSqlStore() bool {
return conf.Store.Driver == config.Mysql || conf.Store.Driver == config.Postgres
}
func TestUpdateBranchAsync(t *testing.T) { func TestUpdateBranchAsync(t *testing.T) {
if conf.Store.Driver != config.Mysql { if !isSqlStore() {
return return
} }
conf.UpdateBranchSync = 0 conf.UpdateBranchSync = 0
@ -49,8 +53,31 @@ func TestUpdateBranchAsync(t *testing.T) {
err := saga.Submit() err := saga.Submit()
assert.Nil(t, err) assert.Nil(t, err)
waitTransProcessed(saga.Gid) waitTransProcessed(saga.Gid)
gid := dtmimp.GetFuncName() + "-wf"
workflow.SetProtocolForTest(dtmimp.ProtocolHTTP)
err = workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error {
_, err := busi.BusiCli.TransOut(wf.NewBranchCtx(), &busi.ReqGrpc{})
// add additional data directly
dtmimp.TransRegisterBranch(wf.TransBase, map[string]string{
"branch_id": "01",
"op": "action",
"status": "succeed",
}, "registerBranch")
return err
})
assert.Nil(t, err)
err = workflow.Execute(gid, gid, nil)
assert.Nil(t, err)
waitTransProcessed(gid)
time.Sleep(dtmsvr.UpdateBranchAsyncInterval) time.Sleep(dtmsvr.UpdateBranchAsyncInterval)
assert.Equal(t, []string{StatusPrepared, StatusSucceed}, getBranchesStatus(saga.Gid)) assert.Equal(t, []string{StatusPrepared, StatusSucceed}, getBranchesStatus(saga.Gid))
assert.Equal(t, StatusSucceed, getTransStatus(saga.Gid)) assert.Equal(t, StatusSucceed, getTransStatus(saga.Gid))
assert.Equal(t, []string{StatusSucceed}, getBranchesStatus(gid))
assert.Equal(t, StatusSucceed, getTransStatus(gid))
conf.UpdateBranchSync = 1 conf.UpdateBranchSync = 1
} }

Loading…
Cancel
Save