From 8df9bbbb37ef4baaa0f8918a5a70ba5fd0df652a Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Sat, 22 Jan 2022 20:48:55 +0800 Subject: [PATCH] grpc msg use new PrepareAndSubmitBarrier --- dtmgrpc/dtmgimp/types.go | 14 ++++++++++ dtmgrpc/msg.go | 27 ++++++++++++++----- dtmgrpc/tcc.go | 10 +------ dtmgrpc/type.go | 14 ++++++++++ dtmgrpc/xa.go | 10 +------ dtmsvr/trans_status.go | 15 ++--------- test/busi/busi.pb.go | 56 ++++++++++++++++++++++----------------- test/busi/busi.proto | 1 + test/busi/busi_grpc.pb.go | 36 +++++++++++++++++++++++++ 9 files changed, 120 insertions(+), 63 deletions(-) diff --git a/dtmgrpc/dtmgimp/types.go b/dtmgrpc/dtmgimp/types.go index 535e924..2df228a 100644 --- a/dtmgrpc/dtmgimp/types.go +++ b/dtmgrpc/dtmgimp/types.go @@ -13,7 +13,10 @@ import ( "github.com/dtm-labs/dtm/dtmcli/dtmimp" "github.com/dtm-labs/dtm/dtmcli/logger" + "github.com/dtm-labs/dtmdriver" "google.golang.org/grpc" + "google.golang.org/grpc/metadata" + "google.golang.org/protobuf/proto" ) // GrpcServerLog 打印grpc服务端的日志 @@ -46,3 +49,14 @@ func GrpcClientLog(ctx context.Context, method string, req, reply interface{}, c } return err } + +// InvokeURL invoke a url for trans +func InvokeBranch(t *dtmimp.TransBase, msg proto.Message, url string, reply interface{}, branchID string, op string) error { + server, method, err := dtmdriver.GetDriver().ParseServerMethod(url) + if err != nil { + return err + } + ctx := TransInfo2Ctx(t.Gid, t.TransType, branchID, op, t.Dtm) + ctx = metadata.AppendToOutgoingContext(ctx, Map2Kvs(t.BranchHeaders)...) + return MustGetGrpcConn(server, false).Invoke(ctx, method, msg, reply) +} diff --git a/dtmgrpc/msg.go b/dtmgrpc/msg.go index cbb8842..2691aab 100644 --- a/dtmgrpc/msg.go +++ b/dtmgrpc/msg.go @@ -8,6 +8,7 @@ package dtmgrpc import ( "database/sql" + "errors" "github.com/dtm-labs/dtm/dtmcli" "github.com/dtm-labs/dtm/dtmcli/dtmimp" @@ -45,15 +46,27 @@ func (s *MsgGrpc) Submit() error { // PrepareAndSubmit one method for the entire busi->prepare->submit func (s *MsgGrpc) PrepareAndSubmit(queryPrepared string, db *sql.DB, busiCall dtmcli.BarrierBusiFunc) error { + return s.PrepareAndSubmitBarrier(queryPrepared, func(bb *dtmcli.BranchBarrier) error { + return bb.CallWithDB(db, busiCall) + }) +} + +// PrepareAndSubmit one method for the entire busi->prepare->submit +func (s *MsgGrpc) PrepareAndSubmitBarrier(queryPrepared string, busiCall func(bb *dtmcli.BranchBarrier) error) error { bb, err := dtmcli.BarrierFrom(s.TransType, s.Gid, "00", "msg") // a special barrier for msg QueryPrepared if err == nil { - err = bb.CallWithDB(db, func(tx *sql.Tx) error { - err := busiCall(tx) - if err == nil { - err = s.Prepare(queryPrepared) - } - return err - }) + err = s.Prepare(queryPrepared) + } + if err == nil { + err = busiCall(bb) + if err != nil && !errors.Is(err, dtmcli.ErrFailure) { + var reply interface{} + err = dtmgimp.InvokeBranch(&s.TransBase, nil, queryPrepared, &reply, bb.BranchID, bb.Op) + err = GrpcError2DtmError(err) + } + if errors.Is(err, dtmcli.ErrFailure) { + _ = dtmgimp.DtmGrpcCall(&s.TransBase, "Abort") + } } if err == nil { err = s.Submit() diff --git a/dtmgrpc/tcc.go b/dtmgrpc/tcc.go index bd1b336..11fb1b4 100644 --- a/dtmgrpc/tcc.go +++ b/dtmgrpc/tcc.go @@ -13,8 +13,6 @@ import ( "github.com/dtm-labs/dtm/dtmcli/dtmimp" "github.com/dtm-labs/dtm/dtmgrpc/dtmgimp" "github.com/dtm-labs/dtm/dtmgrpc/dtmgpb" - "github.com/dtm-labs/dtmdriver" - "google.golang.org/grpc/metadata" "google.golang.org/protobuf/proto" ) @@ -87,11 +85,5 @@ func (t *TccGrpc) CallBranch(busiMsg proto.Message, tryURL string, confirmURL st if err != nil { return err } - server, method, err := dtmdriver.GetDriver().ParseServerMethod(tryURL) - if err != nil { - return err - } - ctx := dtmgimp.TransInfo2Ctx(t.Gid, t.TransType, branchID, "try", t.Dtm) - ctx = metadata.AppendToOutgoingContext(ctx, dtmgimp.Map2Kvs(t.BranchHeaders)...) - return dtmgimp.MustGetGrpcConn(server, false).Invoke(ctx, method, busiMsg, reply) + return dtmgimp.InvokeBranch(&t.TransBase, busiMsg, tryURL, reply, branchID, "try") } diff --git a/dtmgrpc/type.go b/dtmgrpc/type.go index 194ff59..14e85bd 100644 --- a/dtmgrpc/type.go +++ b/dtmgrpc/type.go @@ -30,6 +30,20 @@ func DtmError2GrpcError(res interface{}) error { return e } +func GrpcError2DtmError(err error) error { + st, ok := status.FromError(err) + if ok && st.Code() == codes.Aborted { + // version lower then v1.10, will specify Ongoing in code Aborted + if st.Message() == dtmcli.ResultOngoing { + return dtmcli.ErrOngoing + } + return dtmcli.ErrFailure + } else if ok && st.Code() == codes.FailedPrecondition { + return dtmcli.ErrOngoing + } + return err +} + // MustGenGid must gen a gid from grpcServer func MustGenGid(grpcServer string) string { dc := dtmgimp.MustGetDtmClient(grpcServer) diff --git a/dtmgrpc/xa.go b/dtmgrpc/xa.go index 3cfd8bf..dcdb9b1 100644 --- a/dtmgrpc/xa.go +++ b/dtmgrpc/xa.go @@ -15,7 +15,6 @@ import ( "github.com/dtm-labs/dtm/dtmcli/dtmimp" "github.com/dtm-labs/dtm/dtmgrpc/dtmgimp" "github.com/dtm-labs/dtm/dtmgrpc/dtmgpb" - "github.com/dtm-labs/dtmdriver" grpc "google.golang.org/grpc" "google.golang.org/protobuf/proto" emptypb "google.golang.org/protobuf/types/known/emptypb" @@ -119,12 +118,5 @@ func (xc *XaGrpcClient) XaGlobalTransaction2(gid string, custom func(*XaGrpc), x // CallBranch call a xa branch func (x *XaGrpc) CallBranch(msg proto.Message, url string, reply interface{}) error { - server, method, err := dtmdriver.GetDriver().ParseServerMethod(url) - if err != nil { - return err - } - err = dtmgimp.MustGetGrpcConn(server, false).Invoke( - dtmgimp.TransInfo2Ctx(x.Gid, x.TransType, x.NewSubBranchID(), "action", x.Dtm), method, msg, reply) - return err - + return dtmgimp.InvokeBranch(&x.TransBase, msg, url, reply, x.NewSubBranchID(), "action") } diff --git a/dtmsvr/trans_status.go b/dtmsvr/trans_status.go index c70e5f2..4f1aa31 100644 --- a/dtmsvr/trans_status.go +++ b/dtmsvr/trans_status.go @@ -15,11 +15,10 @@ import ( "github.com/dtm-labs/dtm/dtmcli" "github.com/dtm-labs/dtm/dtmcli/dtmimp" "github.com/dtm-labs/dtm/dtmcli/logger" + "github.com/dtm-labs/dtm/dtmgrpc" "github.com/dtm-labs/dtm/dtmgrpc/dtmgimp" "github.com/dtm-labs/dtmdriver" - "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" - "google.golang.org/grpc/status" ) func (t *TransGlobal) touchCronTime(ctype cronType) { @@ -92,17 +91,7 @@ func (t *TransGlobal) getURLResult(url string, branchID, op string, branchPayloa if err == nil { return nil } - st, ok := status.FromError(err) - if ok && st.Code() == codes.Aborted { - // version lower then v1.10, will specify Ongoing in code Aborted - if st.Message() == dtmcli.ResultOngoing { - return dtmcli.ErrOngoing - } - return dtmcli.ErrFailure - } else if ok && st.Code() == codes.FailedPrecondition { - return dtmcli.ErrOngoing - } - return err + return dtmgrpc.GrpcError2DtmError(err) } dtmimp.PanicIf(!strings.HasPrefix(url, "http"), fmt.Errorf("bad url for http: %s", url)) resp, err := dtmimp.RestyClient.R().SetBody(string(branchPayload)). diff --git a/test/busi/busi.pb.go b/test/busi/busi.pb.go index 759886f..c152e5a 100644 --- a/test/busi/busi.pb.go +++ b/test/busi/busi.pb.go @@ -148,7 +148,7 @@ var file_test_busi_busi_proto_rawDesc = []byte{ 0x6e, 0x73, 0x49, 0x6e, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x22, 0x25, 0x0a, 0x09, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, - 0x65, 0x32, 0x8d, 0x09, 0x0a, 0x04, 0x42, 0x75, 0x73, 0x69, 0x12, 0x32, 0x0a, 0x07, 0x54, 0x72, + 0x65, 0x32, 0xcc, 0x09, 0x0a, 0x04, 0x42, 0x75, 0x73, 0x69, 0x12, 0x32, 0x0a, 0x07, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x49, 0x6e, 0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73, 0x69, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x33, @@ -221,8 +221,12 @@ var file_test_busi_busi_proto_rawDesc = []byte{ 0x72, 0x65, 0x70, 0x61, 0x72, 0x65, 0x64, 0x42, 0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73, 0x69, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, - 0x00, 0x42, 0x08, 0x5a, 0x06, 0x2e, 0x2f, 0x62, 0x75, 0x73, 0x69, 0x62, 0x06, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x33, + 0x00, 0x12, 0x3d, 0x0a, 0x12, 0x51, 0x75, 0x65, 0x72, 0x79, 0x50, 0x72, 0x65, 0x70, 0x61, 0x72, + 0x65, 0x64, 0x52, 0x65, 0x64, 0x69, 0x73, 0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73, 0x69, 0x2e, 0x42, + 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, + 0x42, 0x08, 0x5a, 0x06, 0x2e, 0x2f, 0x62, 0x75, 0x73, 0x69, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x33, } var ( @@ -264,28 +268,30 @@ var file_test_busi_busi_proto_depIdxs = []int32{ 0, // 17: busi.Busi.TransOutHeaderNo:input_type -> busi.BusiReq 0, // 18: busi.Busi.QueryPrepared:input_type -> busi.BusiReq 0, // 19: busi.Busi.QueryPreparedB:input_type -> busi.BusiReq - 2, // 20: busi.Busi.TransIn:output_type -> google.protobuf.Empty - 2, // 21: busi.Busi.TransOut:output_type -> google.protobuf.Empty - 2, // 22: busi.Busi.TransInRevert:output_type -> google.protobuf.Empty - 2, // 23: busi.Busi.TransOutRevert:output_type -> google.protobuf.Empty - 2, // 24: busi.Busi.TransInConfirm:output_type -> google.protobuf.Empty - 2, // 25: busi.Busi.TransOutConfirm:output_type -> google.protobuf.Empty - 2, // 26: busi.Busi.XaNotify:output_type -> google.protobuf.Empty - 2, // 27: busi.Busi.TransInXa:output_type -> google.protobuf.Empty - 2, // 28: busi.Busi.TransOutXa:output_type -> google.protobuf.Empty - 2, // 29: busi.Busi.TransInTcc:output_type -> google.protobuf.Empty - 2, // 30: busi.Busi.TransOutTcc:output_type -> google.protobuf.Empty - 2, // 31: busi.Busi.TransInTccNested:output_type -> google.protobuf.Empty - 2, // 32: busi.Busi.TransInBSaga:output_type -> google.protobuf.Empty - 2, // 33: busi.Busi.TransOutBSaga:output_type -> google.protobuf.Empty - 2, // 34: busi.Busi.TransInRevertBSaga:output_type -> google.protobuf.Empty - 2, // 35: busi.Busi.TransOutRevertBSaga:output_type -> google.protobuf.Empty - 2, // 36: busi.Busi.TransOutHeaderYes:output_type -> google.protobuf.Empty - 2, // 37: busi.Busi.TransOutHeaderNo:output_type -> google.protobuf.Empty - 1, // 38: busi.Busi.QueryPrepared:output_type -> busi.BusiReply - 2, // 39: busi.Busi.QueryPreparedB:output_type -> google.protobuf.Empty - 20, // [20:40] is the sub-list for method output_type - 0, // [0:20] is the sub-list for method input_type + 0, // 20: busi.Busi.QueryPreparedRedis:input_type -> busi.BusiReq + 2, // 21: busi.Busi.TransIn:output_type -> google.protobuf.Empty + 2, // 22: busi.Busi.TransOut:output_type -> google.protobuf.Empty + 2, // 23: busi.Busi.TransInRevert:output_type -> google.protobuf.Empty + 2, // 24: busi.Busi.TransOutRevert:output_type -> google.protobuf.Empty + 2, // 25: busi.Busi.TransInConfirm:output_type -> google.protobuf.Empty + 2, // 26: busi.Busi.TransOutConfirm:output_type -> google.protobuf.Empty + 2, // 27: busi.Busi.XaNotify:output_type -> google.protobuf.Empty + 2, // 28: busi.Busi.TransInXa:output_type -> google.protobuf.Empty + 2, // 29: busi.Busi.TransOutXa:output_type -> google.protobuf.Empty + 2, // 30: busi.Busi.TransInTcc:output_type -> google.protobuf.Empty + 2, // 31: busi.Busi.TransOutTcc:output_type -> google.protobuf.Empty + 2, // 32: busi.Busi.TransInTccNested:output_type -> google.protobuf.Empty + 2, // 33: busi.Busi.TransInBSaga:output_type -> google.protobuf.Empty + 2, // 34: busi.Busi.TransOutBSaga:output_type -> google.protobuf.Empty + 2, // 35: busi.Busi.TransInRevertBSaga:output_type -> google.protobuf.Empty + 2, // 36: busi.Busi.TransOutRevertBSaga:output_type -> google.protobuf.Empty + 2, // 37: busi.Busi.TransOutHeaderYes:output_type -> google.protobuf.Empty + 2, // 38: busi.Busi.TransOutHeaderNo:output_type -> google.protobuf.Empty + 1, // 39: busi.Busi.QueryPrepared:output_type -> busi.BusiReply + 2, // 40: busi.Busi.QueryPreparedB:output_type -> google.protobuf.Empty + 2, // 41: busi.Busi.QueryPreparedRedis:output_type -> google.protobuf.Empty + 21, // [21:42] is the sub-list for method output_type + 0, // [0:21] is the sub-list for method input_type 0, // [0:0] is the sub-list for extension type_name 0, // [0:0] is the sub-list for extension extendee 0, // [0:0] is the sub-list for field type_name diff --git a/test/busi/busi.proto b/test/busi/busi.proto index c94d8df..039ac64 100644 --- a/test/busi/busi.proto +++ b/test/busi/busi.proto @@ -39,5 +39,6 @@ service Busi { rpc TransOutHeaderNo(BusiReq) returns (google.protobuf.Empty) {} rpc QueryPrepared(BusiReq) returns (BusiReply) {} rpc QueryPreparedB(BusiReq) returns (google.protobuf.Empty) {} + rpc QueryPreparedRedis(BusiReq) returns (google.protobuf.Empty) {} } diff --git a/test/busi/busi_grpc.pb.go b/test/busi/busi_grpc.pb.go index f7db854..177a726 100644 --- a/test/busi/busi_grpc.pb.go +++ b/test/busi/busi_grpc.pb.go @@ -39,6 +39,7 @@ type BusiClient interface { TransOutHeaderNo(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*emptypb.Empty, error) QueryPrepared(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*BusiReply, error) QueryPreparedB(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*emptypb.Empty, error) + QueryPreparedRedis(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*emptypb.Empty, error) } type busiClient struct { @@ -229,6 +230,15 @@ func (c *busiClient) QueryPreparedB(ctx context.Context, in *BusiReq, opts ...gr return out, nil } +func (c *busiClient) QueryPreparedRedis(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*emptypb.Empty, error) { + out := new(emptypb.Empty) + err := c.cc.Invoke(ctx, "/busi.Busi/QueryPreparedRedis", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // BusiServer is the server API for Busi service. // All implementations must embed UnimplementedBusiServer // for forward compatibility @@ -253,6 +263,7 @@ type BusiServer interface { TransOutHeaderNo(context.Context, *BusiReq) (*emptypb.Empty, error) QueryPrepared(context.Context, *BusiReq) (*BusiReply, error) QueryPreparedB(context.Context, *BusiReq) (*emptypb.Empty, error) + QueryPreparedRedis(context.Context, *BusiReq) (*emptypb.Empty, error) mustEmbedUnimplementedBusiServer() } @@ -320,6 +331,9 @@ func (UnimplementedBusiServer) QueryPrepared(context.Context, *BusiReq) (*BusiRe func (UnimplementedBusiServer) QueryPreparedB(context.Context, *BusiReq) (*emptypb.Empty, error) { return nil, status.Errorf(codes.Unimplemented, "method QueryPreparedB not implemented") } +func (UnimplementedBusiServer) QueryPreparedRedis(context.Context, *BusiReq) (*emptypb.Empty, error) { + return nil, status.Errorf(codes.Unimplemented, "method QueryPreparedRedis not implemented") +} func (UnimplementedBusiServer) mustEmbedUnimplementedBusiServer() {} // UnsafeBusiServer may be embedded to opt out of forward compatibility for this service. @@ -693,6 +707,24 @@ func _Busi_QueryPreparedB_Handler(srv interface{}, ctx context.Context, dec func return interceptor(ctx, in, info, handler) } +func _Busi_QueryPreparedRedis_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(BusiReq) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(BusiServer).QueryPreparedRedis(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/busi.Busi/QueryPreparedRedis", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(BusiServer).QueryPreparedRedis(ctx, req.(*BusiReq)) + } + return interceptor(ctx, in, info, handler) +} + // Busi_ServiceDesc is the grpc.ServiceDesc for Busi service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -780,6 +812,10 @@ var Busi_ServiceDesc = grpc.ServiceDesc{ MethodName: "QueryPreparedB", Handler: _Busi_QueryPreparedB_Handler, }, + { + MethodName: "QueryPreparedRedis", + Handler: _Busi_QueryPreparedRedis_Handler, + }, }, Streams: []grpc.StreamDesc{}, Metadata: "test/busi/busi.proto",