mirror of https://github.com/dtm-labs/dtm.git
2 changed files with 105 additions and 0 deletions
@ -0,0 +1,48 @@ |
|||
package dtmgrpc |
|||
|
|||
import ( |
|||
context "context" |
|||
|
|||
"github.com/yedf/dtm/dtmcli" |
|||
) |
|||
|
|||
// SagaGrpc struct of saga
|
|||
type SagaGrpc struct { |
|||
dtmcli.SagaData |
|||
dtmcli.TransBase |
|||
} |
|||
|
|||
// NewSaga create a saga
|
|||
func NewSaga(server string, gid string) *SagaGrpc { |
|||
return &SagaGrpc{ |
|||
SagaData: dtmcli.SagaData{TransData: dtmcli.TransData{ |
|||
Gid: gid, |
|||
TransType: "saga", |
|||
}}, |
|||
TransBase: dtmcli.TransBase{ |
|||
Dtm: server, |
|||
}, |
|||
} |
|||
} |
|||
|
|||
// Add add a saga step
|
|||
func (s *SagaGrpc) Add(action string, compensate string, appData []byte) *SagaGrpc { |
|||
dtmcli.Logf("saga %s Add %s %s %v", s.SagaData.Gid, action, compensate, string(appData)) |
|||
step := dtmcli.SagaStep{ |
|||
Action: action, |
|||
Compensate: compensate, |
|||
Data: string(appData), |
|||
} |
|||
s.Steps = append(s.Steps, step) |
|||
return s |
|||
} |
|||
|
|||
// Submit submit the saga trans
|
|||
func (s *SagaGrpc) Submit() error { |
|||
_, err := MustGetDtmClient(s.Dtm).Submit(context.Background(), &DtmRequest{ |
|||
Gid: s.Gid, |
|||
TransType: s.TransType, |
|||
Data: dtmcli.MustMarshalString(&s.Steps), |
|||
}) |
|||
return err |
|||
} |
|||
@ -0,0 +1,57 @@ |
|||
package dtmsvr |
|||
|
|||
import ( |
|||
"testing" |
|||
"time" |
|||
|
|||
"github.com/stretchr/testify/assert" |
|||
"github.com/yedf/dtm/dtmcli" |
|||
"github.com/yedf/dtm/dtmgrpc" |
|||
"github.com/yedf/dtm/examples" |
|||
) |
|||
|
|||
func TestGrpcSaga(t *testing.T) { |
|||
sagaGrpcNormal(t) |
|||
sagaGrpcCommittedPending(t) |
|||
sagaGrpcRollback(t) |
|||
} |
|||
|
|||
func sagaGrpcNormal(t *testing.T) { |
|||
saga := genSagaGrpc("gid-sagaGrpcNormal", false, false) |
|||
saga.Submit() |
|||
WaitTransProcessed(saga.Gid) |
|||
assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(saga.Gid)) |
|||
assert.Equal(t, "succeed", getTransStatus(saga.Gid)) |
|||
transQuery(t, saga.Gid) |
|||
} |
|||
|
|||
func sagaGrpcCommittedPending(t *testing.T) { |
|||
saga := genSagaGrpc("gid-committedPendingGrpc", false, false) |
|||
examples.MainSwitch.TransOutResult.SetOnce("PENDING") |
|||
saga.Submit() |
|||
WaitTransProcessed(saga.Gid) |
|||
assert.Equal(t, []string{"prepared", "prepared", "prepared", "prepared"}, getBranchesStatus(saga.Gid)) |
|||
CronTransOnce(60 * time.Second) |
|||
assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(saga.Gid)) |
|||
assert.Equal(t, "succeed", getTransStatus(saga.Gid)) |
|||
} |
|||
|
|||
func sagaGrpcRollback(t *testing.T) { |
|||
saga := genSagaGrpc("gid-rollbackSaga2Grpc", false, true) |
|||
examples.MainSwitch.TransOutRevertResult.SetOnce("PENDING") |
|||
saga.Submit() |
|||
WaitTransProcessed(saga.Gid) |
|||
assert.Equal(t, "aborting", getTransStatus(saga.Gid)) |
|||
CronTransOnce(60 * time.Second) |
|||
assert.Equal(t, "failed", getTransStatus(saga.Gid)) |
|||
assert.Equal(t, []string{"succeed", "succeed", "succeed", "failed"}, getBranchesStatus(saga.Gid)) |
|||
} |
|||
|
|||
func genSagaGrpc(gid string, outFailed bool, inFailed bool) *dtmgrpc.SagaGrpc { |
|||
dtmcli.Logf("beginning a grpc saga test ---------------- %s", gid) |
|||
saga := dtmgrpc.NewSaga(examples.DtmGrpcServer, gid) |
|||
req := dtmcli.MustMarshal(examples.GenTransReq(30, outFailed, inFailed)) |
|||
saga.Add(examples.BusiGrpc+"/examples.Busi/TransOut", examples.BusiGrpc+"/examples.Busi/TransOutRevert", req) |
|||
saga.Add(examples.BusiGrpc+"/examples.Busi/TransIn", examples.BusiGrpc+"/examples.Busi/TransInRevert", req) |
|||
return saga |
|||
} |
|||
Loading…
Reference in new issue