From 93ec7f633a2cbb202a821db257fa8790cb17191f Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Sat, 23 Jul 2022 17:15:56 +0800 Subject: [PATCH] dtmdriver refactored --- client/dtmcli/dtmimp/vars.go | 34 ++++++++++++++------------ client/dtmgrpc/dtmgimp/grpc_clients.go | 2 ++ client/workflow/imp.go | 7 ++---- conf.sample.yml | 11 +++------ dtmsvr/microservices/drivers.go | 8 +++--- dtmsvr/svr.go | 13 ++-------- test/main_test.go | 8 +++--- 7 files changed, 35 insertions(+), 48 deletions(-) diff --git a/client/dtmcli/dtmimp/vars.go b/client/dtmcli/dtmimp/vars.go index a34d29f..5ea4b02 100644 --- a/client/dtmcli/dtmimp/vars.go +++ b/client/dtmcli/dtmimp/vars.go @@ -39,23 +39,25 @@ var PassthroughHeaders = []string{} // BarrierTableName the table name of barrier table var BarrierTableName = "dtm_barrier.barrier" -// BeforeRequest is the middleware for default resty.Client -func BeforeRequest(c *resty.Client, r *resty.Request) error { - r.URL = MayReplaceLocalhost(r.URL) - u, err := dtmdriver.GetHTTPDriver().ResolveURL(r.URL) - logger.Debugf("requesting: %s %s %s resolved: %s", r.Method, r.URL, MustMarshalString(r.Body), u) - r.URL = u - return err -} - -// AfterResponse is the middleware for default resty.Client -func AfterResponse(c *resty.Client, resp *resty.Response) error { - r := resp.Request - logger.Debugf("requested: %d %s %s %s", resp.StatusCode(), r.Method, r.URL, resp.String()) - return nil +// AddRestyMiddlewares will add the middlewares used by dtm +func AddRestyMiddlewares(client *resty.Client) { + client.OnBeforeRequest(func(c *resty.Client, r *resty.Request) error { + logger.Debugf("requesting: %s %s %s resolved: %s", r.Method, r.URL, MustMarshalString(r.Body), r.URL) + r.URL = MayReplaceLocalhost(r.URL) + ms := dtmdriver.Middlewares.HTTP + var err error + for i := 0; i < len(ms) && err == nil; i++ { + err = ms[i](c, r) + } + return err + }) + client.OnAfterResponse(func(c *resty.Client, resp *resty.Response) error { + r := resp.Request + logger.Debugf("requested: %d %s %s %s", resp.StatusCode(), r.Method, r.URL, resp.String()) + return nil + }) } func init() { - RestyClient.OnBeforeRequest(BeforeRequest) - RestyClient.OnAfterResponse(AfterResponse) + AddRestyMiddlewares(RestyClient) } diff --git a/client/dtmgrpc/dtmgimp/grpc_clients.go b/client/dtmgrpc/dtmgimp/grpc_clients.go index 109785a..cfce542 100644 --- a/client/dtmgrpc/dtmgimp/grpc_clients.go +++ b/client/dtmgrpc/dtmgimp/grpc_clients.go @@ -13,6 +13,7 @@ import ( "github.com/dtm-labs/dtm/client/dtmcli/dtmimp" "github.com/dtm-labs/dtm/client/dtmcli/logger" "github.com/dtm-labs/dtm/client/dtmgrpc/dtmgpb" + "github.com/dtm-labs/dtmdriver" grpc "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) @@ -58,6 +59,7 @@ func GetGrpcConn(grpcServer string, isRaw bool) (conn *grpc.ClientConn, rerr err } logger.Debugf("grpc client connecting %s", grpcServer) interceptors := append(ClientInterceptors, GrpcClientLog) + interceptors = append(interceptors, dtmdriver.Middlewares.Grpc...) inOpt := grpc.WithChainUnaryInterceptor(interceptors...) conn, rerr := grpc.Dial(grpcServer, inOpt, grpc.WithTransportCredentials(insecure.NewCredentials()), opts) if rerr == nil { diff --git a/client/workflow/imp.go b/client/workflow/imp.go index 7fcab39..3429b42 100644 --- a/client/workflow/imp.go +++ b/client/workflow/imp.go @@ -88,14 +88,11 @@ func (wf *Workflow) initRestyClient() { "branch_id": wf.currentBranch, "op": wf.currentOp, }) - err := dtmimp.BeforeRequest(c, r) - return err + return nil }) + dtmimp.AddRestyMiddlewares(wf.restyClient) old := wf.restyClient.GetClient().Transport wf.restyClient.GetClient().Transport = newRoundTripper(old, wf) - wf.restyClient.OnAfterResponse(func(c *resty.Client, r *resty.Response) error { - return dtmimp.AfterResponse(c, r) - }) } func (wf *Workflow) process(handler WfFunc, data []byte) (err error) { diff --git a/conf.sample.yml b/conf.sample.yml index fb74f14..984b440 100644 --- a/conf.sample.yml +++ b/conf.sample.yml @@ -42,13 +42,10 @@ # Target: 'etcd://localhost:2379/dtmservice' # register dtm server to this url # EndPoint: 'localhost:36790' -# HttpMicroService: # http based microservice config -# Driver: 'dtm-driver-http' # name of the driver to handle register/discover -# RegistryType: 'nacos' -# RegistryAddress: '127.0.0.1:8848,127.0.0.1:8848' -# RegistryOptions: '{"namespaceId":"a6843d66-cf5b-43ab-b7e7-461200dfe76c","UserName":"nacos","Password":"nacos","NotLoadCacheAtStart":true}' -# Target: '{"ServiceName":"dtmService","Enable":true,"Healthy":true,"Weight":10}' # target and options -# EndPoint: '127.0.0.1:36789' +# MicroService: # grpc based microservice config +# Driver: 'dtm-driver-springcloud' # name of the driver to handle register/discover +# Target: '{"Addr":"127.0.0.1:8848,127.0.0.1:8848","Type":"nacos", "InstanceConfig":{"ServiceName":"dtmService","Enable":true,"Healthy":true,"Weight":10},"ClientConfig":{"NamespaceId":"c3dc917d-906a-429d-90a9-85012b41014e","UserName":"nacos","Password":"nacos","NotLoadCacheAtStart":true}}' +# EndPoint: 'localhost:36789' ### the unit of following configurations is second # TransCronInterval: 3 # the interval to poll unfinished global transaction for every dtm process diff --git a/dtmsvr/microservices/drivers.go b/dtmsvr/microservices/drivers.go index cdf7afb..0476a8a 100644 --- a/dtmsvr/microservices/drivers.go +++ b/dtmsvr/microservices/drivers.go @@ -3,8 +3,8 @@ package microservices import ( // load the microserver drivers _ "github.com/dtm-labs/dtmdriver-gozero" - _ "github.com/dtm-labs/dtmdriver-http" - _ "github.com/dtm-labs/dtmdriver-kratos" - _ "github.com/dtm-labs/dtmdriver-polaris" - _ "github.com/dtm-labs/dtmdriver-protocol1" + _ "github.com/dtm-labs/dtmdriver-springcloud" + // _ "github.com/dtm-labs/dtmdriver-kratos" + // _ "github.com/dtm-labs/dtmdriver-polaris" + // _ "github.com/dtm-labs/dtmdriver-protocol1" ) diff --git a/dtmsvr/svr.go b/dtmsvr/svr.go index b5c7604..8a41b76 100644 --- a/dtmsvr/svr.go +++ b/dtmsvr/svr.go @@ -73,17 +73,8 @@ func StartSvr() *gin.Engine { time.Sleep(100 * time.Millisecond) err = dtmdriver.Use(conf.MicroService.Driver) logger.FatalIfError(err) - logger.Infof("RegisterGrpcService: %s", conf.MicroService.Driver) - err = dtmdriver.GetDriver().RegisterGrpcService(conf.MicroService.Target, conf.MicroService.EndPoint) - logger.FatalIfError(err) - - err = dtmdriver.UseHTTP(conf.HTTPMicroService.Driver) - logger.FatalIfError(err) - c := &conf.HTTPMicroService - logger.Infof("RegisterHTTPService: %s", conf.HTTPMicroService.Driver) - err = dtmdriver.GetHTTPDriver().Init(c.RegistryType, c.RegistryAddress, c.RegistryOptions) - logger.FatalIfError(err) - err = dtmdriver.GetHTTPDriver().RegisterService(c.Target, c.EndPoint) + logger.Infof("RegisterService: %s", conf.MicroService.Driver) + err = dtmdriver.GetDriver().RegisterService(conf.MicroService.Target, conf.MicroService.EndPoint) logger.FatalIfError(err) return app } diff --git a/test/main_test.go b/test/main_test.go index 71e74c0..1d34e65 100644 --- a/test/main_test.go +++ b/test/main_test.go @@ -14,14 +14,13 @@ import ( "github.com/dtm-labs/dtm/client/dtmcli" "github.com/dtm-labs/dtm/client/dtmcli/dtmimp" "github.com/dtm-labs/dtm/client/dtmcli/logger" - "github.com/dtm-labs/dtm/client/dtmgrpc" "github.com/dtm-labs/dtm/client/workflow" "github.com/dtm-labs/dtm/dtmsvr" "github.com/dtm-labs/dtm/dtmsvr/config" "github.com/dtm-labs/dtm/dtmsvr/storage/registry" "github.com/dtm-labs/dtm/dtmutil" "github.com/dtm-labs/dtm/test/busi" - "github.com/go-resty/resty/v2" + "github.com/dtm-labs/dtmdriver" ) func TestMain(m *testing.M) { @@ -32,9 +31,8 @@ func TestMain(m *testing.M) { dtmsvr.CronForwardDuration = 180 * time.Second conf.UpdateBranchSync = 1 - dtmgrpc.AddUnaryInterceptor(busi.SetGrpcHeaderForHeadersYes) - dtmcli.GetRestyClient().OnBeforeRequest(busi.SetHTTPHeaderForHeadersYes) - dtmcli.GetRestyClient().OnAfterResponse(func(c *resty.Client, resp *resty.Response) error { return nil }) + dtmdriver.Middlewares.HTTP = append(dtmdriver.Middlewares.HTTP, busi.SetHTTPHeaderForHeadersYes) + dtmdriver.Middlewares.Grpc = append(dtmdriver.Middlewares.Grpc, busi.SetGrpcHeaderForHeadersYes) tenv := dtmimp.OrString(os.Getenv("TEST_STORE"), config.Redis) conf.Store.Host = "localhost"