Browse Source

yedf

pull/262/head
yedf2 4 years ago
parent
commit
7414304ea6
  1. 2
      .gitignore
  2. 13
      conf.sample.yml
  3. 6
      dtmcli/dtmimp/vars.go
  4. 36
      dtmsvr/config/config.go
  5. 32
      dtmsvr/svr.go
  6. 4
      go.mod

2
.gitignore

@ -18,3 +18,5 @@ test.sh
dtm
dtm-*
dtm.*
cache

13
conf.sample.yml

@ -37,12 +37,13 @@
# SuccessDataExpire: 86400 # successful Trans data will expire in 1 days. only for redis.
# RedisPrefix: '{a}' # default value is '{a}'. Redis storage prefix. store data to only one slot in cluster
# MicroService:
# Driver: 'dtm-driver-nacos' # name of the driver to handle register/discover
# Target: '127.0.0.1:8848' # register dtm server to this url
# EndPoint: '127.0.0.1:36789'
# OptionsJson: '{"username": "nacos", "password": "nacos", "namespaceId": "c3dc917d-906a-429d-90a9-85012b41067e"}' # micro service other config message, example: '{"username": "nacos", "password": "nacos"}'
HttpMicroService:
Driver: 'dtm-driver-http'
RegistryType: 'nacos'
RegistryAddress: '127.0.0.1:8848,127.0.0.1:8848'
RegistryOptions: '{"UserName":"nacos","Password":"nacos","NotLoadCacheAtStart":true}'
Target: '{"ServiceName":"dtm","Enable":true,"Healthy":true,"Weight":10}'
EndPoint: '127.0.0.1:36789'
### the unit of following configurations is second
# TransCronInterval: 3 # the interval to poll unfinished global transaction for every dtm process
# TimeoutToFail: 35 # timeout for XA, TCC to fail. saga's timeout default to infinite, which can be overwritten in saga options

6
dtmcli/dtmimp/vars.go

@ -10,6 +10,7 @@ import (
"errors"
"github.com/dtm-labs/dtm/dtmcli/logger"
"github.com/dtm-labs/dtmdriver"
"github.com/go-resty/resty/v2"
)
@ -44,8 +45,11 @@ var BarrierTableName = "dtm_barrier.barrier"
func init() {
RestyClient.OnBeforeRequest(func(c *resty.Client, r *resty.Request) error {
r.URL = MayReplaceLocalhost(r.URL)
u, err := dtmdriver.GetHTTPDriver().ResolveURL(r.URL)
logger.Debugf("requesting: %s %s %s", r.Method, r.URL, MustMarshalString(r.Body))
return nil
r.URL = u
logger.Debugf("resolved: %s err: %v", r.URL, err)
return err
})
RestyClient.OnAfterResponse(func(c *resty.Client, resp *resty.Response) error {
r := resp.Request

36
dtmsvr/config/config.go

@ -30,6 +30,15 @@ type MicroService struct {
OptionsJson string `yaml:"OptionsJson"`
}
type HTTPMicroService struct {
Driver string `yaml:"Driver" default:"default"`
RegistryType string `yaml:"RegistryType" default:""`
RegistryAddress string `yaml:"RegistryAddress" default:""`
RegistryOptions string `yaml:"RegistryOptions" default:"{}"`
Target string `yaml:"Target"`
EndPoint string `yaml:"EndPoint"`
}
// Log config customize log
type Log struct {
Outputs string `yaml:"Outputs" default:"stderr"`
@ -71,19 +80,20 @@ func (s *Store) GetDBConf() dtmcli.DBConf {
}
type configType struct {
Store Store `yaml:"Store"`
TransCronInterval int64 `yaml:"TransCronInterval" default:"3"`
TimeoutToFail int64 `yaml:"TimeoutToFail" default:"35"`
RetryInterval int64 `yaml:"RetryInterval" default:"10"`
RequestTimeout int64 `yaml:"RequestTimeout" default:"3"`
HTTPPort int64 `yaml:"HttpPort" default:"36789"`
GrpcPort int64 `yaml:"GrpcPort" default:"36790"`
JSONRPCPort int64 `yaml:"JsonRpcPort" default:"36791"`
MicroService MicroService `yaml:"MicroService"`
UpdateBranchSync int64 `yaml:"UpdateBranchSync"`
UpdateBranchAsyncGoroutineNum int64 `yaml:"UpdateBranchAsyncGoroutineNum" default:"1"`
LogLevel string `yaml:"LogLevel" default:"info"`
Log Log `yaml:"Log"`
Store Store `yaml:"Store"`
TransCronInterval int64 `yaml:"TransCronInterval" default:"3"`
TimeoutToFail int64 `yaml:"TimeoutToFail" default:"35"`
RetryInterval int64 `yaml:"RetryInterval" default:"10"`
RequestTimeout int64 `yaml:"RequestTimeout" default:"3"`
HTTPPort int64 `yaml:"HttpPort" default:"36789"`
GrpcPort int64 `yaml:"GrpcPort" default:"36790"`
JSONRPCPort int64 `yaml:"JsonRpcPort" default:"36791"`
MicroService MicroService `yaml:"MicroService"`
HTTPMicroService HTTPMicroService `yaml:"HttpMicroService"`
UpdateBranchSync int64 `yaml:"UpdateBranchSync"`
UpdateBranchAsyncGoroutineNum int64 `yaml:"UpdateBranchAsyncGoroutineNum" default:"1"`
LogLevel string `yaml:"LogLevel" default:"info"`
Log Log `yaml:"Log"`
}
// Config config

32
dtmsvr/svr.go

@ -8,9 +8,7 @@ package dtmsvr
import (
"context"
"encoding/json"
"fmt"
"github.com/horseLk/dtmdriver-nacos/httpdriver"
"net"
"time"
@ -72,27 +70,17 @@ func StartSvr() {
time.Sleep(100 * time.Millisecond)
err = dtmdriver.Use(conf.MicroService.Driver)
logger.FatalIfError(err)
logger.Infof("RegisterGrpcService: %s", conf.MicroService.Driver)
err = dtmdriver.GetDriver().RegisterGrpcService(conf.MicroService.Target, conf.MicroService.EndPoint)
logger.FatalIfError(err)
if v, ok := dtmdriver.GetDriver().(httpdriver.HttpDriver); ok {
logger.Infof("RegisterHttpSerrvice: %s", v)
options := make(map[string]string)
if conf.MicroService.OptionsJson == "" {
conf.MicroService.OptionsJson = "{}"
}
err = json.Unmarshal([]byte(conf.MicroService.OptionsJson), &options)
if err != nil {
logger.FatalIfError(err)
}
routesInfo := app.Routes()
paths := make([]string, 0)
for _, routeInfo := range routesInfo {
paths = append(paths, routeInfo.Path)
}
err = v.RegisterHttpService(conf.MicroService.Target, conf.MicroService.EndPoint, options)
} else {
logger.Infof("RegisterGrpcService: %s", conf.MicroService.Driver)
err = dtmdriver.GetDriver().RegisterGrpcService(conf.MicroService.Target, conf.MicroService.EndPoint)
}
err = dtmdriver.UseHTTP(conf.HTTPMicroService.Driver)
logger.FatalIfError(err)
c := &conf.HTTPMicroService
logger.Infof("RegisterHTTPService: %s", conf.HTTPMicroService.Driver)
err = dtmdriver.GetHTTPDriver().Init(c.RegistryType, c.RegistryAddress, c.RegistryOptions)
logger.FatalIfError(err)
err = dtmdriver.GetHTTPDriver().RegisterService(c.Target, c.EndPoint)
logger.FatalIfError(err)
}

4
go.mod

@ -35,3 +35,7 @@ require (
gorm.io/gorm v1.22.2
// gotest.tools v2.2.0+incompatible
)
replace github.com/dtm-labs/dtmdriver v0.0.1 => /Users/wangxi/dtm/dtmdriver
replace github.com/horseLk/dtmdriver-nacos v1.1.0 => /Users/wangxi/dtm/dtmdriver-http-nacos

Loading…
Cancel
Save