Browse Source

dtmdriver refactored

pull/335/head
yedf2 4 years ago
parent
commit
93ec7f633a
  1. 34
      client/dtmcli/dtmimp/vars.go
  2. 2
      client/dtmgrpc/dtmgimp/grpc_clients.go
  3. 7
      client/workflow/imp.go
  4. 11
      conf.sample.yml
  5. 8
      dtmsvr/microservices/drivers.go
  6. 13
      dtmsvr/svr.go
  7. 8
      test/main_test.go

34
client/dtmcli/dtmimp/vars.go

@ -39,23 +39,25 @@ var PassthroughHeaders = []string{}
// BarrierTableName the table name of barrier table
var BarrierTableName = "dtm_barrier.barrier"
// BeforeRequest is the middleware for default resty.Client
func BeforeRequest(c *resty.Client, r *resty.Request) error {
r.URL = MayReplaceLocalhost(r.URL)
u, err := dtmdriver.GetHTTPDriver().ResolveURL(r.URL)
logger.Debugf("requesting: %s %s %s resolved: %s", r.Method, r.URL, MustMarshalString(r.Body), u)
r.URL = u
return err
}
// AfterResponse is the middleware for default resty.Client
func AfterResponse(c *resty.Client, resp *resty.Response) error {
r := resp.Request
logger.Debugf("requested: %d %s %s %s", resp.StatusCode(), r.Method, r.URL, resp.String())
return nil
// AddRestyMiddlewares will add the middlewares used by dtm
func AddRestyMiddlewares(client *resty.Client) {
client.OnBeforeRequest(func(c *resty.Client, r *resty.Request) error {
logger.Debugf("requesting: %s %s %s resolved: %s", r.Method, r.URL, MustMarshalString(r.Body), r.URL)
r.URL = MayReplaceLocalhost(r.URL)
ms := dtmdriver.Middlewares.HTTP
var err error
for i := 0; i < len(ms) && err == nil; i++ {
err = ms[i](c, r)
}
return err
})
client.OnAfterResponse(func(c *resty.Client, resp *resty.Response) error {
r := resp.Request
logger.Debugf("requested: %d %s %s %s", resp.StatusCode(), r.Method, r.URL, resp.String())
return nil
})
}
func init() {
RestyClient.OnBeforeRequest(BeforeRequest)
RestyClient.OnAfterResponse(AfterResponse)
AddRestyMiddlewares(RestyClient)
}

2
client/dtmgrpc/dtmgimp/grpc_clients.go

@ -13,6 +13,7 @@ import (
"github.com/dtm-labs/dtm/client/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/client/dtmcli/logger"
"github.com/dtm-labs/dtm/client/dtmgrpc/dtmgpb"
"github.com/dtm-labs/dtmdriver"
grpc "google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
@ -58,6 +59,7 @@ func GetGrpcConn(grpcServer string, isRaw bool) (conn *grpc.ClientConn, rerr err
}
logger.Debugf("grpc client connecting %s", grpcServer)
interceptors := append(ClientInterceptors, GrpcClientLog)
interceptors = append(interceptors, dtmdriver.Middlewares.Grpc...)
inOpt := grpc.WithChainUnaryInterceptor(interceptors...)
conn, rerr := grpc.Dial(grpcServer, inOpt, grpc.WithTransportCredentials(insecure.NewCredentials()), opts)
if rerr == nil {

7
client/workflow/imp.go

@ -88,14 +88,11 @@ func (wf *Workflow) initRestyClient() {
"branch_id": wf.currentBranch,
"op": wf.currentOp,
})
err := dtmimp.BeforeRequest(c, r)
return err
return nil
})
dtmimp.AddRestyMiddlewares(wf.restyClient)
old := wf.restyClient.GetClient().Transport
wf.restyClient.GetClient().Transport = newRoundTripper(old, wf)
wf.restyClient.OnAfterResponse(func(c *resty.Client, r *resty.Response) error {
return dtmimp.AfterResponse(c, r)
})
}
func (wf *Workflow) process(handler WfFunc, data []byte) (err error) {

11
conf.sample.yml

@ -42,13 +42,10 @@
# Target: 'etcd://localhost:2379/dtmservice' # register dtm server to this url
# EndPoint: 'localhost:36790'
# HttpMicroService: # http based microservice config
# Driver: 'dtm-driver-http' # name of the driver to handle register/discover
# RegistryType: 'nacos'
# RegistryAddress: '127.0.0.1:8848,127.0.0.1:8848'
# RegistryOptions: '{"namespaceId":"a6843d66-cf5b-43ab-b7e7-461200dfe76c","UserName":"nacos","Password":"nacos","NotLoadCacheAtStart":true}'
# Target: '{"ServiceName":"dtmService","Enable":true,"Healthy":true,"Weight":10}' # target and options
# EndPoint: '127.0.0.1:36789'
# MicroService: # grpc based microservice config
# Driver: 'dtm-driver-springcloud' # name of the driver to handle register/discover
# Target: '{"Addr":"127.0.0.1:8848,127.0.0.1:8848","Type":"nacos", "InstanceConfig":{"ServiceName":"dtmService","Enable":true,"Healthy":true,"Weight":10},"ClientConfig":{"NamespaceId":"c3dc917d-906a-429d-90a9-85012b41014e","UserName":"nacos","Password":"nacos","NotLoadCacheAtStart":true}}'
# EndPoint: 'localhost:36789'
### the unit of following configurations is second
# TransCronInterval: 3 # the interval to poll unfinished global transaction for every dtm process

8
dtmsvr/microservices/drivers.go

@ -3,8 +3,8 @@ package microservices
import (
// load the microserver drivers
_ "github.com/dtm-labs/dtmdriver-gozero"
_ "github.com/dtm-labs/dtmdriver-http"
_ "github.com/dtm-labs/dtmdriver-kratos"
_ "github.com/dtm-labs/dtmdriver-polaris"
_ "github.com/dtm-labs/dtmdriver-protocol1"
_ "github.com/dtm-labs/dtmdriver-springcloud"
// _ "github.com/dtm-labs/dtmdriver-kratos"
// _ "github.com/dtm-labs/dtmdriver-polaris"
// _ "github.com/dtm-labs/dtmdriver-protocol1"
)

13
dtmsvr/svr.go

@ -73,17 +73,8 @@ func StartSvr() *gin.Engine {
time.Sleep(100 * time.Millisecond)
err = dtmdriver.Use(conf.MicroService.Driver)
logger.FatalIfError(err)
logger.Infof("RegisterGrpcService: %s", conf.MicroService.Driver)
err = dtmdriver.GetDriver().RegisterGrpcService(conf.MicroService.Target, conf.MicroService.EndPoint)
logger.FatalIfError(err)
err = dtmdriver.UseHTTP(conf.HTTPMicroService.Driver)
logger.FatalIfError(err)
c := &conf.HTTPMicroService
logger.Infof("RegisterHTTPService: %s", conf.HTTPMicroService.Driver)
err = dtmdriver.GetHTTPDriver().Init(c.RegistryType, c.RegistryAddress, c.RegistryOptions)
logger.FatalIfError(err)
err = dtmdriver.GetHTTPDriver().RegisterService(c.Target, c.EndPoint)
logger.Infof("RegisterService: %s", conf.MicroService.Driver)
err = dtmdriver.GetDriver().RegisterService(conf.MicroService.Target, conf.MicroService.EndPoint)
logger.FatalIfError(err)
return app
}

8
test/main_test.go

@ -14,14 +14,13 @@ import (
"github.com/dtm-labs/dtm/client/dtmcli"
"github.com/dtm-labs/dtm/client/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/client/dtmcli/logger"
"github.com/dtm-labs/dtm/client/dtmgrpc"
"github.com/dtm-labs/dtm/client/workflow"
"github.com/dtm-labs/dtm/dtmsvr"
"github.com/dtm-labs/dtm/dtmsvr/config"
"github.com/dtm-labs/dtm/dtmsvr/storage/registry"
"github.com/dtm-labs/dtm/dtmutil"
"github.com/dtm-labs/dtm/test/busi"
"github.com/go-resty/resty/v2"
"github.com/dtm-labs/dtmdriver"
)
func TestMain(m *testing.M) {
@ -32,9 +31,8 @@ func TestMain(m *testing.M) {
dtmsvr.CronForwardDuration = 180 * time.Second
conf.UpdateBranchSync = 1
dtmgrpc.AddUnaryInterceptor(busi.SetGrpcHeaderForHeadersYes)
dtmcli.GetRestyClient().OnBeforeRequest(busi.SetHTTPHeaderForHeadersYes)
dtmcli.GetRestyClient().OnAfterResponse(func(c *resty.Client, resp *resty.Response) error { return nil })
dtmdriver.Middlewares.HTTP = append(dtmdriver.Middlewares.HTTP, busi.SetHTTPHeaderForHeadersYes)
dtmdriver.Middlewares.Grpc = append(dtmdriver.Middlewares.Grpc, busi.SetGrpcHeaderForHeadersYes)
tenv := dtmimp.OrString(os.Getenv("TEST_STORE"), config.Redis)
conf.Store.Host = "localhost"

Loading…
Cancel
Save