Browse Source

feat: grpc invoke 增加ctx传递,以用于opentrace 需求

pull/259/head
qintang 4 years ago
parent
commit
b5c9e98f08
  1. 4
      dtmcli/dtmimp/trans_base.go
  2. 2
      dtmgrpc/dtmgimp/types.go
  3. 6
      dtmgrpc/dtmgimp/utils.go
  4. 2
      dtmsvr/trans_status.go

4
dtmcli/dtmimp/trans_base.go

@ -7,6 +7,7 @@
package dtmimp
import (
"context"
"errors"
"fmt"
"net/http"
@ -49,6 +50,7 @@ type TransOptions struct {
PassthroughHeaders []string `json:"passthrough_headers,omitempty" gorm:"-"` // for inherit the specified gin context headers
BranchHeaders map[string]string `json:"branch_headers,omitempty" gorm:"-"` // custom branch headers, dtm server => service api
Concurrent bool `json:"concurrent" gorm:"-"` // for trans type: saga msg
Ctx context.Context `json:"-" gorm:"-"`
}
// TransBase base for all trans
@ -76,7 +78,7 @@ func NewTransBase(gid string, transType string, dtm string, branchID string) *Tr
TransType: transType,
BranchIDGen: BranchIDGen{BranchID: branchID},
Dtm: dtm,
TransOptions: TransOptions{PassthroughHeaders: PassthroughHeaders},
TransOptions: TransOptions{PassthroughHeaders: PassthroughHeaders,Ctx: context.Background()},
}
}

2
dtmgrpc/dtmgimp/types.go

@ -56,7 +56,7 @@ func InvokeBranch(t *dtmimp.TransBase, isRaw bool, msg proto.Message, url string
if err != nil {
return err
}
ctx := TransInfo2Ctx(t.Gid, t.TransType, branchID, op, t.Dtm)
ctx := TransInfo2Ctx(t.Ctx, t.Gid, t.TransType, branchID, op, t.Dtm)
ctx = metadata.AppendToOutgoingContext(ctx, Map2Kvs(t.BranchHeaders)...)
if t.TransType == "xa" { // xa branch need additional phase2_url
ctx = metadata.AppendToOutgoingContext(ctx, Map2Kvs(map[string]string{dtmpre + "phase2_url": url})...)

6
dtmgrpc/dtmgimp/utils.go

@ -27,7 +27,7 @@ func MustProtoMarshal(msg proto.Message) []byte {
// DtmGrpcCall make a convenient call to dtm
func DtmGrpcCall(s *dtmimp.TransBase, operation string) error {
reply := emptypb.Empty{}
return MustGetGrpcConn(s.Dtm, false).Invoke(context.Background(), "/dtmgimp.Dtm/"+operation, &dtmgpb.DtmRequest{
return MustGetGrpcConn(s.Dtm, false).Invoke(s.Ctx, "/dtmgimp.Dtm/"+operation, &dtmgpb.DtmRequest{
Gid: s.Gid,
TransType: s.TransType,
TransOptions: &dtmgpb.DtmTransOptions{
@ -48,7 +48,7 @@ func DtmGrpcCall(s *dtmimp.TransBase, operation string) error {
const dtmpre string = "dtm-"
// TransInfo2Ctx add trans info to grpc context
func TransInfo2Ctx(gid, transType, branchID, op, dtm string) context.Context {
func TransInfo2Ctx(ctx context.Context, gid, transType, branchID, op, dtm string) context.Context {
md := metadata.Pairs(
dtmpre+"gid", gid,
dtmpre+"trans_type", transType,
@ -56,7 +56,7 @@ func TransInfo2Ctx(gid, transType, branchID, op, dtm string) context.Context {
dtmpre+"op", op,
dtmpre+"dtm", dtm,
)
return metadata.NewOutgoingContext(context.Background(), md)
return metadata.NewOutgoingContext(ctx, md)
}
// Map2Kvs map to metadata kv

2
dtmsvr/trans_status.go

@ -154,7 +154,7 @@ func (t *TransGlobal) getURLResult(uri string, branchID, op string, branchPayloa
}
conn := dtmgimp.MustGetGrpcConn(server, true)
ctx := dtmgimp.TransInfo2Ctx(t.Gid, t.TransType, branchID, op, "")
ctx := dtmgimp.TransInfo2Ctx(t.Ctx, t.Gid, t.TransType, branchID, op, "")
kvs := dtmgimp.Map2Kvs(t.Ext.Headers)
kvs = append(kvs, dtmgimp.Map2Kvs(t.BranchHeaders)...)
ctx = metadata.AppendToOutgoingContext(ctx, kvs...)

Loading…
Cancel
Save