Browse Source

Merge pull request #574 from giantwu/fix-grpc-client-banlance

Fix grpc client banlance
main
yedf2 3 months ago
committed by GitHub
parent
commit
18146ee53b
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 28
      client/dtmgrpc/dtmgimp/grpc_clients.go
  2. 3
      conf.sample.yml
  3. 10
      dtmsvr/config/config.go

28
client/dtmgrpc/dtmgimp/grpc_clients.go

@ -8,6 +8,7 @@ package dtmgimp
import (
"fmt"
"os"
"sync"
"github.com/dtm-labs/dtm/client/dtmcli/dtmimp"
@ -39,6 +40,9 @@ var normalClients, rawClients sync.Map
// ClientInterceptors declares grpc.UnaryClientInterceptors slice
var ClientInterceptors = []grpc.UnaryClientInterceptor{}
// GrpcServiceConfigGetter is a function to get gRPC service config, can be set by server config
var GrpcServiceConfigGetter func() string
// MustGetDtmClient 1
func MustGetDtmClient(grpcServer string) dtmgpb.DtmClient {
return dtmgpb.NewDtmClient(MustGetGrpcConn(grpcServer, false))
@ -61,7 +65,12 @@ func GetGrpcConn(grpcServer string, isRaw bool) (conn *grpc.ClientConn, rerr err
interceptors := append(ClientInterceptors, GrpcClientLog)
interceptors = append(interceptors, dtmdriver.Middlewares.Grpc...)
inOpt := grpc.WithChainUnaryInterceptor(interceptors...)
conn, rerr := grpc.Dial(grpcServer, inOpt, grpc.WithTransportCredentials(insecure.NewCredentials()), opts)
grpcServiceConfig, hasConfig := getGrpcServiceConfig()
dialOpts := []grpc.DialOption{inOpt, grpc.WithTransportCredentials(insecure.NewCredentials()), opts}
if hasConfig {
dialOpts = append(dialOpts, grpc.WithDefaultServiceConfig(grpcServiceConfig))
}
conn, rerr := grpc.Dial(grpcServer, dialOpts...)
if rerr == nil {
clients.Store(grpcServer, conn)
v = conn
@ -77,3 +86,20 @@ func MustGetGrpcConn(grpcServer string, isRaw bool) *grpc.ClientConn {
dtmimp.E2P(err)
return conn
}
// getGrpcServiceConfig returns the gRPC service config from config getter or environment variable, and a bool indicating if config is set
func getGrpcServiceConfig() (string, bool) {
// First try to get from config getter (set by server)
if GrpcServiceConfigGetter != nil {
if config := GrpcServiceConfigGetter(); config != "" {
return config, true
}
}
// Fallback to environment variable
config := os.Getenv("GRPC_SERVICE_CONFIG")
if config != "" {
return config, true
}
// No config set
return "", false
}

3
conf.sample.yml

@ -70,3 +70,6 @@
# AlertRetryLimit: 3 # default 3; if a transaction branch has been retried 3 times, the AlertHook will be called
# AlertWebHook: '' # default ''; sample: 'http://localhost:8080/dtm-hook'. this hook will be called like this:
## curl -H "Content-Type: application/json" -d '{"gid":"xxxx","status":"submitted","retry_count":3}' http://localhost:8080/dtm-hook
# GrpcBalancer: 'round_robin' # default round_robin,For more information about service configs, see:https://github.com/grpc/grpc/blob/master/doc/service_config.md
# GrpcServiceConfig: '{"loadBalancingConfig": [{"round_robin":{}}]}' # default gRPC service config for client connections

10
dtmsvr/config/config.go

@ -5,6 +5,7 @@ import (
"io/ioutil"
"github.com/dtm-labs/dtm/client/dtmcli"
"github.com/dtm-labs/dtm/client/dtmgrpc/dtmgimp"
"github.com/dtm-labs/logger"
"gopkg.in/yaml.v3"
)
@ -104,6 +105,7 @@ type Type struct {
AlertRetryLimit int64 `yaml:"AlertRetryLimit" default:"3"`
AlertWebHook string `yaml:"AlertWebHook"`
AdminBasePath string `yaml:"AdminBasePath"`
GrpcServiceConfig string `yaml:"GrpcServiceConfig" default:"{\"loadBalancingConfig\": [{\"round_robin\":{}}]}"`
}
// Config config
@ -124,4 +126,12 @@ func MustLoadConfig(confFile string) {
err = checkConfig(&Config)
logger.FatalfIf(err != nil, `config error: '%v'.
please visit http://d.dtm.pub to see the config document.`, err)
// Set gRPC service config for client library
if Config.GrpcServiceConfig == "" {
Config.GrpcServiceConfig = `{"loadBalancingConfig": [{"round_robin":{}}]}`
}
// Set config getter function for client library to read config directly
dtmgimp.GrpcServiceConfigGetter = func() string {
return Config.GrpcServiceConfig
}
}

Loading…
Cancel
Save