mirror of https://github.com/dtm-labs/dtm.git
csharpjavadistributed-transactionsdtmgogolangmicroservicenodejsphpdatabasesagaseatatcctransactiontransactionsxapythondistributed
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
67 lines
2.0 KiB
67 lines
2.0 KiB
package workflow
|
|
|
|
import (
|
|
"context"
|
|
|
|
"github.com/dtm-labs/dtm/client/dtmcli/dtmimp"
|
|
"github.com/dtm-labs/dtm/client/dtmgrpc/dtmgimp"
|
|
"github.com/dtm-labs/dtm/client/dtmgrpc/dtmgpb"
|
|
"google.golang.org/protobuf/types/known/emptypb"
|
|
)
|
|
|
|
func (wf *Workflow) getProgress() ([]*dtmgpb.DtmProgress, error) {
|
|
if wf.Protocol == dtmimp.ProtocolGRPC {
|
|
var reply dtmgpb.DtmProgressesReply
|
|
err := dtmgimp.MustGetGrpcConn(wf.Dtm, false).Invoke(wf.Context, "/dtmgimp.Dtm/PrepareWorkflow",
|
|
dtmgimp.GetDtmRequest(wf.TransBase), &reply)
|
|
if err == nil {
|
|
return reply.Progresses, nil
|
|
}
|
|
return nil, err
|
|
}
|
|
resp, err := dtmimp.RestyClient.R().SetBody(wf.TransBase).Post(wf.Dtm + "/prepareWorkflow")
|
|
var reply dtmgpb.DtmProgressesReply
|
|
if err == nil {
|
|
dtmimp.MustUnmarshal(resp.Body(), &reply)
|
|
}
|
|
return reply.Progresses, err
|
|
}
|
|
|
|
func (wf *Workflow) submit(status string) error {
|
|
if wf.Protocol == dtmimp.ProtocolHTTP {
|
|
m := map[string]interface{}{
|
|
"gid": wf.Gid,
|
|
"trans_type": wf.TransType,
|
|
"req_extra": map[string]string{
|
|
"status": status,
|
|
},
|
|
}
|
|
_, err := dtmimp.TransCallDtmExt(wf.TransBase, m, "submit")
|
|
return err
|
|
}
|
|
req := dtmgimp.GetDtmRequest(wf.TransBase)
|
|
req.ReqExtra = map[string]string{
|
|
"status": status,
|
|
}
|
|
reply := emptypb.Empty{}
|
|
return dtmgimp.MustGetGrpcConn(wf.Dtm, false).Invoke(wf.Context, "/dtmgimp.Dtm/"+"Submit", req, &reply)
|
|
}
|
|
|
|
func (wf *Workflow) registerBranch(res []byte, branchID string, op string, status string) error {
|
|
if wf.Protocol == dtmimp.ProtocolHTTP {
|
|
return dtmimp.TransRegisterBranch(wf.TransBase, map[string]string{
|
|
"data": string(res),
|
|
"branch_id": branchID,
|
|
"op": op,
|
|
"status": status,
|
|
}, "registerBranch")
|
|
}
|
|
_, err := dtmgimp.MustGetDtmClient(wf.Dtm).RegisterBranch(context.Background(), &dtmgpb.DtmBranchRequest{
|
|
Gid: wf.Gid,
|
|
TransType: wf.TransType,
|
|
BranchID: branchID,
|
|
BusiPayload: res,
|
|
Data: map[string]string{"status": status, "op": op},
|
|
})
|
|
return err
|
|
}
|
|
|