Browse Source

merged

pull/172/head
yedf2 4 years ago
parent
commit
901decf332
  1. 5
      .github/workflows/tests.yml
  2. 6
      .golangci.yml
  3. 2
      bench/main.go
  4. 23
      bench/svr/http.go
  5. 5
      conf.sample.yml
  6. 7
      dtmcli/barrier.go
  7. 6
      dtmcli/dtmimp/trans_xa_base.go
  8. 1
      dtmcli/dtmimp/types.go
  9. 1
      dtmcli/dtmimp/types_test.go
  10. 4
      dtmcli/dtmimp/vars.go
  11. 5
      dtmcli/logger/log.go
  12. 5
      dtmcli/logger/logger_test.go
  13. 14
      dtmcli/types.go
  14. 4
      dtmcli/types_test.go
  15. 2
      dtmcli/xa.go
  16. 6
      dtmgrpc/dtmgimp/grpc_clients.go
  17. 16
      dtmgrpc/tcc.go
  18. 1
      dtmgrpc/type.go
  19. 18
      dtmsvr/config/config.go
  20. 7
      dtmsvr/config/config_utils.go
  21. 2
      dtmsvr/storage/sql/sql.go
  22. 4
      dtmsvr/svr.go
  23. 4
      dtmutil/consts.go
  24. 5
      dtmutil/db.go
  25. 8
      dtmutil/utils.go
  26. 4
      dtmutil/utils_test.go
  27. 2
      go.mod
  28. 3
      main.go
  29. 12
      test/api_test.go
  30. 2
      test/busi/base_http.go
  31. 2
      test/busi/quick_start.go
  32. 4
      test/busi/startup.go
  33. 2
      test/common_test.go
  34. 2
      test/dtmsvr_test.go
  35. 2
      test/msg_test.go
  36. 2
      test/saga_compatible_test.go
  37. 10
      test/saga_options_test.go
  38. 6
      test/saga_test.go
  39. 2
      test/tcc_cover_test.go
  40. 10
      test/tcc_test.go

5
.github/workflows/tests.yml

@ -41,5 +41,8 @@ jobs:
run: |
go mod download
- name: Run CI lint
run: make lint
- name: Run test cover
run: sh helper/test-cover.sh
run: sh helper/test-cover.sh

6
.golangci.yml

@ -1,8 +1,8 @@
run:
deadline: 5m
# skip-dirs:
# - test
# - examples
skip-dirs:
- test
# - bench
linter-settings:
goconst:

2
bench/main.go

@ -21,7 +21,7 @@ usage:
`
func hintAndExit() {
fmt.Printf(usage)
fmt.Print(usage)
os.Exit(0)
}

23
bench/svr/http.go

@ -65,10 +65,11 @@ func reloadData() {
logger.Debugf("%d users inserted. used: %dms", total, time.Since(began).Milliseconds())
}
var uidCounter int32 = 0
var mode string = ""
var sqls int = 1
var uidCounter int32
var mode string
var sqls = 1
// PrepareBenchDB prepares db data for bench
func PrepareBenchDB() {
db := pdbGet()
_, err := dtmimp.DBExec(db, "drop table if exists dtm_busi.user_account_log")
@ -95,7 +96,9 @@ func StartSvr() {
app := dtmutil.GetGinApp()
benchAddRoute(app)
logger.Debugf("bench listening at %d", benchPort)
go app.Run(fmt.Sprintf(":%s", benchPort))
go func() {
_ = app.Run(fmt.Sprintf(":%s", benchPort))
}()
}
func qsAdjustBalance(uid int, amount int, c *gin.Context) (interface{}, error) {
@ -116,11 +119,13 @@ func qsAdjustBalance(uid int, amount int, c *gin.Context) (interface{}, error) {
if strings.Contains(mode, "barrier") {
barrier, err := dtmcli.BarrierFromQuery(c.Request.URL.Query())
logger.FatalIfError(err)
barrier.Call(txGet(), f)
err = barrier.Call(txGet(), f)
logger.FatalIfError(err)
} else {
tx := txGet()
f(tx)
err := tx.Commit()
err := f(tx)
logger.FatalIfError(err)
err = tx.Commit()
logger.FatalIfError(err)
}
@ -158,7 +163,7 @@ func benchAddRoute(app *gin.Engine) {
params2 := fmt.Sprintf("?uid=%s", suid2)
logger.Debugf("mode: %s contains dtm: %t", mode, strings.Contains(mode, "dtm"))
if strings.Contains(mode, "dtm") {
saga := dtmcli.NewSaga(dtmutil.DefaultHttpServer, fmt.Sprintf("bench-%d", uid)).
saga := dtmcli.NewSaga(dtmutil.DefaultHTTPServer, fmt.Sprintf("bench-%d", uid)).
Add(benchBusi+"/TransOut"+params, benchBusi+"/TransOutCompensate"+params, req).
Add(benchBusi+"/TransIn"+params2, benchBusi+"/TransInCompensate"+params2, req)
saga.WaitResult = true
@ -175,7 +180,7 @@ func benchAddRoute(app *gin.Engine) {
app.Any(benchAPI+"/benchEmptyUrl", dtmutil.WrapHandler(func(c *gin.Context) (interface{}, error) {
gid := shortuuid.New()
req := gin.H{}
saga := dtmcli.NewSaga(dtmutil.DefaultHttpServer, gid).
saga := dtmcli.NewSaga(dtmutil.DefaultHTTPServer, gid).
Add("", "", req).
Add("", "", req)
saga.WaitResult = true

5
conf.sample.yml

@ -2,7 +2,7 @@
### dtm can be run without any config.
### all config in this file is optional. the default value is as specified in each line
### all configs can be specified from env. for example:
### Store.MaxOpenConns can also specified from env: STORE_MAX_OPEN_CONNS
### MicroService.EndPoint => MICRO_SERVICE_END_POINT
#####################################################################
# Store: # specify which engine to store trans status
@ -49,4 +49,5 @@
# RetryInterval: 10 # the subtrans branch will be retried after this interval
# LogLevel: 'info' # default: info. can be debug|info|warn|error
# HttpPort: 36789
# GrpcPort: 36790

7
dtmcli/barrier.go

@ -45,7 +45,7 @@ func BarrierFrom(transType, gid, branchID, op string) (*BranchBarrier, error) {
Op: op,
}
if ti.TransType == "" || ti.Gid == "" || ti.BranchID == "" || ti.Op == "" {
return nil, fmt.Errorf("invlid trans info: %v", ti)
return nil, fmt.Errorf("invalid trans info: %v", ti)
}
return ti, nil
}
@ -67,10 +67,10 @@ func (bb *BranchBarrier) Call(tx *sql.Tx, busiCall BarrierBusiFunc) (rerr error)
defer func() {
// Logf("barrier call error is %v", rerr)
if x := recover(); x != nil {
tx.Rollback()
_ = tx.Rollback()
panic(x)
} else if rerr != nil {
tx.Rollback()
_ = tx.Rollback()
} else {
rerr = tx.Commit()
}
@ -101,6 +101,7 @@ func (bb *BranchBarrier) CallWithDB(db *sql.DB, busiCall BarrierBusiFunc) error
return err
}
// QueryPrepared queries prepared data
func (bb *BranchBarrier) QueryPrepared(db *sql.DB) error {
_, err := insertBarrier(db, bb.TransType, bb.Gid, "00", "msg", "01", "rollback")
var reason string

6
dtmcli/dtmimp/trans_xa_base.go

@ -24,7 +24,9 @@ func (xc *XaClientBase) HandleCallback(gid string, branchID string, action strin
if err != nil {
return err
}
defer db.Close()
defer func() {
_ = db.Close()
}()
xaID := gid + "-" + branchID
_, err = DBExec(db, GetDBSpecial().GetXaSQL(action, xaID))
if err != nil &&
@ -41,7 +43,7 @@ func (xc *XaClientBase) HandleLocalTrans(xa *TransBase, cb func(*sql.DB) error)
if rerr != nil {
return
}
defer func() { db.Close() }()
defer func() { _ = db.Close() }()
defer func() {
x := recover()
_, err := DBExec(db, GetDBSpecial().GetXaSQL("end", xaBranch))

1
dtmcli/dtmimp/types.go

@ -14,6 +14,7 @@ type DB interface {
QueryRow(query string, args ...interface{}) *sql.Row
}
// DBConf defines db config
type DBConf struct {
Driver string `yaml:"Driver"`
Host string `yaml:"Host"`

1
dtmcli/dtmimp/types_test.go

@ -22,4 +22,5 @@ func TestTypes(t *testing.T) {
idGen := BranchIDGen{subBranchID: 99}
idGen.NewSubBranchID()
})
assert.Error(t, err)
}

4
dtmcli/dtmimp/vars.go

@ -19,8 +19,8 @@ var ErrFailure = errors.New("FAILURE")
// ErrOngoing error of ONGOING
var ErrOngoing = errors.New("ONGOING")
// XaSqlTimeoutMs milliseconds for Xa sql to timeout
var XaSqlTimeoutMs = 15000
// XaSQLTimeoutMs milliseconds for Xa sql to timeout
var XaSQLTimeoutMs = 15000
// MapSuccess HTTP result of SUCCESS
var MapSuccess = map[string]interface{}{"dtm_result": ResultSuccess}

5
dtmcli/logger/log.go

@ -10,10 +10,10 @@ import (
//var logger *zap.SugaredLogger = nil
var logger Logger = nil
var logger Logger
func init() {
InitLog("info")
InitLog(os.Getenv("LOG_LEVEL"))
}
// Logger logger interface
@ -24,6 +24,7 @@ type Logger interface {
Errorf(format string, args ...interface{})
}
// WithLogger replaces default logger
func WithLogger(log Logger) {
logger = log
}

5
dtmcli/logger/logger_test.go

@ -1,9 +1,10 @@
package logger
import (
"go.uber.org/zap"
"os"
"testing"
"go.uber.org/zap"
)
func TestInitLog(t *testing.T) {
@ -26,4 +27,4 @@ func TestWithLogger(t *testing.T) {
Errorf("a error msg")
FatalfIf(false, "nothing")
FatalIfError(nil)
}
}

14
dtmcli/types.go

@ -29,6 +29,7 @@ type DB = dtmimp.DB
// TransOptions transaction option
type TransOptions = dtmimp.TransOptions
// DBConf declares db configuration
type DBConf = dtmimp.DBConf
// SetCurrentDBType set currentDBType
@ -41,16 +42,17 @@ func GetCurrentDBType() string {
return dtmimp.GetCurrentDBType()
}
// SetXaSqlTimeoutMs set XaSqlTimeoutMs
func SetXaSqlTimeoutMs(ms int) {
dtmimp.XaSqlTimeoutMs = ms
// SetXaSQLTimeoutMs set XaSQLTimeoutMs
func SetXaSQLTimeoutMs(ms int) {
dtmimp.XaSQLTimeoutMs = ms
}
// GetXaSqlTimeoutMs get XaSqlTimeoutMs
func GetXaSqlTimeoutMs() int {
return dtmimp.XaSqlTimeoutMs
// GetXaSQLTimeoutMs get XaSQLTimeoutMs
func GetXaSQLTimeoutMs() int {
return dtmimp.XaSQLTimeoutMs
}
// SetBarrierTableName sets barrier table name
func SetBarrierTableName(tablename string) {
dtmimp.BarrierTableName = tablename
}

4
dtmcli/types_test.go

@ -26,7 +26,7 @@ func TestTypes(t *testing.T) {
}
func TestXaSqlTimeout(t *testing.T) {
old := GetXaSqlTimeoutMs()
SetXaSqlTimeoutMs(old)
old := GetXaSQLTimeoutMs()
SetXaSQLTimeoutMs(old)
SetBarrierTableName(dtmimp.BarrierTableName) // just cover this func
}

2
dtmcli/xa.go

@ -86,7 +86,7 @@ func (xc *XaClient) XaGlobalTransaction(gid string, xaFunc XaGlobalFunc) (rerr e
return xc.XaGlobalTransaction2(gid, func(x *Xa) {}, xaFunc)
}
// XaGlobalTransaction start a xa global transaction
// XaGlobalTransaction2 start a xa global transaction
func (xc *XaClient) XaGlobalTransaction2(gid string, custom func(*Xa), xaFunc XaGlobalFunc) (rerr error) {
xa := &Xa{TransBase: *dtmimp.NewTransBase(gid, "xa", xc.XaClientBase.Server, "")}
custom(xa)

6
dtmgrpc/dtmgimp/grpc_clients.go

@ -26,9 +26,8 @@ func (cb rawCodec) Marshal(v interface{}) ([]byte, error) {
func (cb rawCodec) Unmarshal(data []byte, v interface{}) error {
ba, ok := v.(*[]byte)
dtmimp.PanicIf(!ok, fmt.Errorf("please pass in *[]byte"))
for _, byte := range data {
*ba = append(*ba, byte)
}
*ba = append(*ba, data...)
return nil
}
@ -36,6 +35,7 @@ func (cb rawCodec) Name() string { return "dtm_raw" }
var normalClients, rawClients sync.Map
// ClientInterceptors declares grpc.UnaryClientInterceptors slice
var ClientInterceptors = []grpc.UnaryClientInterceptor{}
// MustGetDtmClient 1

16
dtmgrpc/tcc.go

@ -75,13 +75,15 @@ func TccFromGrpc(ctx context.Context) (*TccGrpc, error) {
func (t *TccGrpc) CallBranch(busiMsg proto.Message, tryURL string, confirmURL string, cancelURL string, reply interface{}) error {
branchID := t.NewSubBranchID()
bd, err := proto.Marshal(busiMsg)
_, err = dtmgimp.MustGetDtmClient(t.Dtm).RegisterBranch(context.Background(), &dtmgpb.DtmBranchRequest{
Gid: t.Gid,
TransType: t.TransType,
BranchID: branchID,
BusiPayload: bd,
Data: map[string]string{"confirm": confirmURL, "cancel": cancelURL},
})
if err == nil {
_, err = dtmgimp.MustGetDtmClient(t.Dtm).RegisterBranch(context.Background(), &dtmgpb.DtmBranchRequest{
Gid: t.Gid,
TransType: t.TransType,
BranchID: branchID,
BusiPayload: bd,
Data: map[string]string{"confirm": confirmURL, "cancel": cancelURL},
})
}
if err != nil {
return err
}

1
dtmgrpc/type.go

@ -29,6 +29,7 @@ func UseDriver(driverName string) error {
return dtmdriver.Use(driverName)
}
// AddUnaryInterceptor adds grpc.UnaryClientInterceptor
func AddUnaryInterceptor(interceptor grpc.UnaryClientInterceptor) {
dtmgimp.ClientInterceptors = append(dtmgimp.ClientInterceptors, interceptor)
}

18
dtmsvr/config/config.go

@ -10,11 +10,16 @@ import (
)
const (
// DtmMetricsPort is metric port
DtmMetricsPort = 8889
Mysql = "mysql"
Redis = "redis"
BoltDb = "boltdb"
Postgres = "postgres"
// Mysql is mysql driver
Mysql = "mysql"
// Redis is redis driver
Redis = "redis"
// BoltDb is boltdb driver
BoltDb = "boltdb"
// Postgres is postgres driver
Postgres = "postgres"
)
// MicroService config type for micro service
@ -24,6 +29,7 @@ type MicroService struct {
EndPoint string `yaml:"EndPoint"`
}
// Store defines storage relevant info
type Store struct {
Driver string `yaml:"Driver" default:"boltdb"`
Host string `yaml:"Host"`
@ -39,10 +45,12 @@ type Store struct {
TransBranchOpTable string `yaml:"BranchTransOpTable" default:"dtm.trans_branch_op"`
}
// IsDB checks config driver is mysql or postgres
func (s *Store) IsDB() bool {
return s.Driver == dtmcli.DBTypeMysql || s.Driver == dtmcli.DBTypePostgres
}
// GetDBConf returns db conf info
func (s *Store) GetDBConf() dtmcli.DBConf {
return dtmcli.DBConf{
Driver: s.Driver,
@ -58,7 +66,7 @@ type configType struct {
TransCronInterval int64 `yaml:"TransCronInterval" default:"3"`
TimeoutToFail int64 `yaml:"TimeoutToFail" default:"35"`
RetryInterval int64 `yaml:"RetryInterval" default:"10"`
HttpPort int64 `yaml:"HttpPort" default:"36789"`
HTTPPort int64 `yaml:"HttpPort" default:"36789"`
GrpcPort int64 `yaml:"GrpcPort" default:"36790"`
MicroService MicroService `yaml:"MicroService"`
UpdateBranchSync int64 `yaml:"UpdateBranchSync"`

7
dtmsvr/config/config_utils.go

@ -49,9 +49,12 @@ func loadFromEnvInner(prefix string, conf reflect.Value, defaultValue string) {
func toUnderscoreUpper(key string) string {
key = strings.Trim(key, "_")
matchLastCap := regexp.MustCompile("([A-Z])([A-Z][a-z])")
s2 := matchLastCap.ReplaceAllString(key, "${1}_${2}")
matchFirstCap := regexp.MustCompile("([a-z])([A-Z]+)")
s2 := matchFirstCap.ReplaceAllString(key, "${1}_${2}")
// logger.Debugf("loading from env: %s", strings.ToUpper(s2))
s2 = matchFirstCap.ReplaceAllString(s2, "${1}_${2}")
// logger.Infof("loading from env: %s", strings.ToUpper(s2))
return strings.ToUpper(s2)
}

2
dtmsvr/storage/sql/sql.go

@ -31,7 +31,7 @@ func (s *Store) Ping() error {
// PopulateData populates data to db
func (s *Store) PopulateData(skipDrop bool) {
file := fmt.Sprintf("%s/dtmsvr.storage.%s.sql", dtmutil.GetSqlDir(), conf.Store.Driver)
file := fmt.Sprintf("%s/dtmsvr.storage.%s.sql", dtmutil.GetSQLDir(), conf.Store.Driver)
dtmutil.RunSQLScript(conf.Store.GetDBConf(), file, skipDrop)
}

4
dtmsvr/svr.go

@ -26,9 +26,9 @@ func StartSvr() {
app := dtmutil.GetGinApp()
app = httpMetrics(app)
addRoute(app)
logger.Infof("dtmsvr listen at: %d", conf.HttpPort)
logger.Infof("dtmsvr listen at: %d", conf.HTTPPort)
go func() {
err := app.Run(fmt.Sprintf(":%d", conf.HttpPort))
err := app.Run(fmt.Sprintf(":%d", conf.HTTPPort))
if err != nil {
logger.Errorf("start server err: %v", err)
}

4
dtmutil/consts.go

@ -7,8 +7,8 @@
package dtmutil
const (
// DefaultHttpServer default url for http server. used by test and examples
DefaultHttpServer = "http://localhost:36789/api/dtmsvr"
// DefaultHTTPServer default url for http server. used by test and examples
DefaultHTTPServer = "http://localhost:36789/api/dtmsvr"
// DefaultGrpcServer default url for grpc server. used by test and examples
DefaultGrpcServer = "localhost:36790"
)

5
dtmutil/db.go

@ -28,7 +28,7 @@ func getGormDialetor(driver string, dsn string) gorm.Dialector {
if driver == dtmcli.DBTypePostgres {
return postgres.Open(dsn)
}
dtmimp.PanicIf(driver != dtmcli.DBTypeMysql, fmt.Errorf("unkown driver: %s", driver))
dtmimp.PanicIf(driver != dtmcli.DBTypeMysql, fmt.Errorf("unknown driver: %s", driver))
return mysql.Open(dsn)
}
@ -106,7 +106,8 @@ func DbGet(conf dtmcli.DBConf, ops ...func(*gorm.DB)) *DB {
SkipDefaultTransaction: true,
})
dtmimp.E2P(err)
db1.Use(&tracePlugin{})
err = db1.Use(&tracePlugin{})
dtmimp.E2P(err)
db = &DB{DB: db1}
for _, op := range ops {
op(db1)

8
dtmutil/utils.go

@ -80,8 +80,8 @@ func MustGetwd() string {
return wd
}
// GetSqlDir 获取调用该函数的caller源代码的目录,主要用于测试时,查找相关文件
func GetSqlDir() string {
// GetSQLDir 获取调用该函数的caller源代码的目录,主要用于测试时,查找相关文件
func GetSQLDir() string {
wd := MustGetwd()
if filepath.Base(wd) == "test" {
wd = filepath.Dir(wd)
@ -89,6 +89,7 @@ func GetSqlDir() string {
return wd + "/sqls"
}
// RecoverPanic execs recovery operation
func RecoverPanic(err *error) {
if x := recover(); x != nil {
e := dtmimp.AsError(x)
@ -98,6 +99,7 @@ func RecoverPanic(err *error) {
}
}
// GetNextTime gets next time from second
func GetNextTime(second int64) *time.Time {
next := time.Now().Add(time.Duration(second) * time.Second)
return &next
@ -107,7 +109,7 @@ func GetNextTime(second int64) *time.Time {
func RunSQLScript(conf dtmcli.DBConf, script string, skipDrop bool) {
con, err := dtmimp.StandaloneDB(conf)
logger.FatalIfError(err)
defer func() { con.Close() }()
defer func() { _ = con.Close() }()
content, err := ioutil.ReadFile(script)
logger.FatalIfError(err)
sqls := strings.Split(string(content), ";")

4
dtmutil/utils_test.go

@ -31,7 +31,7 @@ func TestGin(t *testing.T) {
req, _ := http.NewRequest("GET", api, body)
w := httptest.NewRecorder()
app.ServeHTTP(w, req)
return string(w.Body.Bytes())
return w.Body.String()
}
assert.Equal(t, "{\"msg\":\"pong\"}", getResultString("/api/ping", nil))
assert.Equal(t, "1", getResultString("/api/sample", nil))
@ -42,7 +42,7 @@ func TestFuncs(t *testing.T) {
wd := MustGetwd()
assert.NotEqual(t, "", wd)
dir1 := GetSqlDir()
dir1 := GetSQLDir()
assert.Equal(t, true, strings.HasSuffix(dir1, "/sqls"))
}

2
go.mod

@ -3,7 +3,7 @@ module github.com/dtm-labs/dtm
go 1.16
require (
bou.ke/monkey v1.0.2 // indirect
bou.ke/monkey v1.0.2
github.com/dtm-labs/dtmdriver v0.0.1
github.com/dtm-labs/dtmdriver-gozero v0.0.1
github.com/dtm-labs/dtmdriver-polaris v0.0.2

3
main.go

@ -25,6 +25,7 @@ import (
_ "github.com/dtm-labs/dtmdriver-protocol1"
)
// Version declares version info
var Version string
func version() {
@ -63,7 +64,7 @@ func main() {
if *isReset {
dtmsvr.PopulateDB(false)
}
maxprocs.Set(maxprocs.Logger(logger.Infof))
_, _ = maxprocs.Set(maxprocs.Logger(logger.Infof))
registry.WaitStoreUp()
dtmsvr.StartSvr() // 启动dtmsvr的api服务
go dtmsvr.CronExpiredTrans(-1) // 启动dtmsvr的定时过期查询

12
test/api_test.go

@ -20,7 +20,7 @@ func TestAPIQuery(t *testing.T) {
err := genMsg(gid).Submit()
assert.Nil(t, err)
waitTransProcessed(gid)
resp, err := dtmimp.RestyClient.R().SetQueryParam("gid", gid).Get(dtmutil.DefaultHttpServer + "/query")
resp, err := dtmimp.RestyClient.R().SetQueryParam("gid", gid).Get(dtmutil.DefaultHTTPServer + "/query")
assert.Nil(t, err)
m := map[string]interface{}{}
assert.Equal(t, resp.StatusCode(), 200)
@ -28,11 +28,11 @@ func TestAPIQuery(t *testing.T) {
assert.NotEqual(t, nil, m["transaction"])
assert.Equal(t, 2, len(m["branches"].([]interface{})))
resp, err = dtmimp.RestyClient.R().SetQueryParam("gid", "").Get(dtmutil.DefaultHttpServer + "/query")
resp, err = dtmimp.RestyClient.R().SetQueryParam("gid", "").Get(dtmutil.DefaultHTTPServer + "/query")
e2p(err)
assert.Equal(t, resp.StatusCode(), 500)
resp, err = dtmimp.RestyClient.R().SetQueryParam("gid", "1").Get(dtmutil.DefaultHttpServer + "/query")
resp, err = dtmimp.RestyClient.R().SetQueryParam("gid", "1").Get(dtmutil.DefaultHTTPServer + "/query")
e2p(err)
assert.Equal(t, resp.StatusCode(), 200)
dtmimp.MustUnmarshalString(resp.String(), &m)
@ -47,7 +47,7 @@ func TestAPIAll(t *testing.T) {
assert.Nil(t, err)
waitTransProcessed(gid)
}
resp, err := dtmimp.RestyClient.R().SetQueryParam("limit", "1").Get(dtmutil.DefaultHttpServer + "/all")
resp, err := dtmimp.RestyClient.R().SetQueryParam("limit", "1").Get(dtmutil.DefaultHTTPServer + "/all")
assert.Nil(t, err)
m := map[string]interface{}{}
dtmimp.MustUnmarshalString(resp.String(), &m)
@ -57,7 +57,7 @@ func TestAPIAll(t *testing.T) {
resp, err = dtmimp.RestyClient.R().SetQueryParams(map[string]string{
"limit": "1",
"position": nextPos,
}).Get(dtmutil.DefaultHttpServer + "/all")
}).Get(dtmutil.DefaultHTTPServer + "/all")
assert.Nil(t, err)
dtmimp.MustUnmarshalString(resp.String(), &m)
nextPos2 := m["next_position"].(string)
@ -67,7 +67,7 @@ func TestAPIAll(t *testing.T) {
resp, err = dtmimp.RestyClient.R().SetQueryParams(map[string]string{
"limit": "1000",
"position": nextPos,
}).Get(dtmutil.DefaultHttpServer + "/all")
}).Get(dtmutil.DefaultHTTPServer + "/all")
assert.Nil(t, err)
dtmimp.MustUnmarshalString(resp.String(), &m)
nextPos3 := m["next_position"].(string)

2
test/busi/base_http.go

@ -61,7 +61,7 @@ func BaseAppStartup() *gin.Engine {
c.Next()
})
var err error
XaClient, err = dtmcli.NewXaClient(dtmutil.DefaultHttpServer, BusiConf, Busi+"/xa", func(path string, xa *dtmcli.XaClient) {
XaClient, err = dtmcli.NewXaClient(dtmutil.DefaultHTTPServer, BusiConf, Busi+"/xa", func(path string, xa *dtmcli.XaClient) {
app.POST(path, dtmutil.WrapHandler(func(c *gin.Context) (interface{}, error) {
return xa.HandleCallback(c.Query("gid"), c.Query("branch_id"), c.Query("op"))
}))

2
test/busi/quick_start.go

@ -35,7 +35,7 @@ func QsStartSvr() {
func QsFireRequest() string {
req := &gin.H{"amount": 30} // 微服务的载荷
// DtmServer为DTM服务的地址
saga := dtmcli.NewSaga(dtmutil.DefaultHttpServer, dtmcli.MustGenGid(dtmutil.DefaultHttpServer)).
saga := dtmcli.NewSaga(dtmutil.DefaultHTTPServer, dtmcli.MustGenGid(dtmutil.DefaultHTTPServer)).
// 添加一个TransOut的子事务,正向操作为url: qsBusi+"/TransOut", 逆向操作为url: qsBusi+"/TransOutCompensate"
Add(qsBusi+"/TransOut", qsBusi+"/TransOutCompensate", req).
// 添加一个TransIn的子事务,正向操作为url: qsBusi+"/TransOut", 逆向操作为url: qsBusi+"/TransInCompensate"

4
test/busi/startup.go

@ -16,8 +16,8 @@ func Startup() *gin.Engine {
// PopulateDB populate example mysql data
func PopulateDB(skipDrop bool) {
resetXaData()
file := fmt.Sprintf("%s/busi.%s.sql", dtmutil.GetSqlDir(), BusiConf.Driver)
file := fmt.Sprintf("%s/busi.%s.sql", dtmutil.GetSQLDir(), BusiConf.Driver)
dtmutil.RunSQLScript(BusiConf, file, skipDrop)
file = fmt.Sprintf("%s/dtmcli.barrier.%s.sql", dtmutil.GetSqlDir(), BusiConf.Driver)
file = fmt.Sprintf("%s/dtmcli.barrier.%s.sql", dtmutil.GetSQLDir(), BusiConf.Driver)
dtmutil.RunSQLScript(BusiConf, file, skipDrop)
}

2
test/common_test.go

@ -44,5 +44,5 @@ func testDbAlone(t *testing.T) {
func TestMustGenGid(t *testing.T) {
dtmgrpc.MustGenGid(dtmutil.DefaultGrpcServer)
dtmcli.MustGenGid(dtmutil.DefaultHttpServer)
dtmcli.MustGenGid(dtmutil.DefaultHTTPServer)
}

2
test/dtmsvr_test.go

@ -17,7 +17,7 @@ import (
"github.com/stretchr/testify/assert"
)
var DtmServer = dtmutil.DefaultHttpServer
var DtmServer = dtmutil.DefaultHTTPServer
var DtmGrpcServer = dtmutil.DefaultGrpcServer
var Busi = busi.Busi

2
test/msg_test.go

@ -71,7 +71,7 @@ func TestMsgAbnormal(t *testing.T) {
func genMsg(gid string) *dtmcli.Msg {
req := busi.GenTransReq(30, false, false)
msg := dtmcli.NewMsg(dtmutil.DefaultHttpServer, gid).
msg := dtmcli.NewMsg(dtmutil.DefaultHTTPServer, gid).
Add(busi.Busi+"/TransOut", &req).
Add(busi.Busi+"/TransIn", &req)
msg.QueryPrepared = busi.Busi + "/QueryPrepared"

2
test/saga_compatible_test.go

@ -20,7 +20,7 @@ func TestSagaCompatibleNormal(t *testing.T) { // compatible with old http, which
gid := dtmimp.GetFuncName()
body := fmt.Sprintf(`{"gid":"%s","trans_type":"saga","steps":[{"action":"%s/TransOut","compensate":"%s/TransOutRevert","data":"{\"amount\":30,\"transInResult\":\"SUCCESS\",\"transOutResult\":\"SUCCESS\"}"},{"action":"%s/TransIn","compensate":"%s/TransInRevert","data":"{\"amount\":30,\"transInResult\":\"SUCCESS\",\"transOutResult\":\"SUCCESS\"}"}]}`,
gid, busi.Busi, busi.Busi, busi.Busi, busi.Busi)
dtmimp.RestyClient.R().SetBody(body).Post(fmt.Sprintf("%s/submit", dtmutil.DefaultHttpServer))
dtmimp.RestyClient.R().SetBody(body).Post(fmt.Sprintf("%s/submit", dtmutil.DefaultHTTPServer))
waitTransProcessed(gid)
assert.Equal(t, []string{StatusPrepared, StatusSucceed, StatusPrepared, StatusSucceed}, getBranchesStatus(gid))
assert.Equal(t, StatusSucceed, getTransStatus(gid))

10
test/saga_options_test.go

@ -94,7 +94,7 @@ func TestSagaOptionsRollbackWait(t *testing.T) {
func TestSagaPassthroughHeadersYes(t *testing.T) {
gidYes := dtmimp.GetFuncName()
sagaYes := dtmcli.NewSaga(dtmutil.DefaultHttpServer, gidYes)
sagaYes := dtmcli.NewSaga(dtmutil.DefaultHTTPServer, gidYes)
sagaYes.WaitResult = true
sagaYes.PassthroughHeaders = []string{"test_header"}
sagaYes.Add(busi.Busi+"/TransOutHeaderYes", "", nil)
@ -105,7 +105,7 @@ func TestSagaPassthroughHeadersYes(t *testing.T) {
func TestSagaCronPassthroughHeadersYes(t *testing.T) {
gidYes := dtmimp.GetFuncName()
sagaYes := dtmcli.NewSaga(dtmutil.DefaultHttpServer, gidYes)
sagaYes := dtmcli.NewSaga(dtmutil.DefaultHTTPServer, gidYes)
sagaYes.PassthroughHeaders = []string{"test_header"}
sagaYes.Add(busi.Busi+"/TransOutHeaderYes", "", nil)
busi.MainSwitch.TransOutResult.SetOnce("ONGOING")
@ -120,7 +120,7 @@ func TestSagaCronPassthroughHeadersYes(t *testing.T) {
func TestSagaPassthroughHeadersNo(t *testing.T) {
gidNo := dtmimp.GetFuncName()
sagaNo := dtmcli.NewSaga(dtmutil.DefaultHttpServer, gidNo)
sagaNo := dtmcli.NewSaga(dtmutil.DefaultHTTPServer, gidNo)
sagaNo.WaitResult = true
sagaNo.Add(busi.Busi+"/TransOutHeaderNo", "", nil)
err := sagaNo.Submit()
@ -130,7 +130,7 @@ func TestSagaPassthroughHeadersNo(t *testing.T) {
func TestSagaHeaders(t *testing.T) {
gidYes := dtmimp.GetFuncName()
sagaYes := dtmcli.NewSaga(dtmutil.DefaultHttpServer, gidYes)
sagaYes := dtmcli.NewSaga(dtmutil.DefaultHTTPServer, gidYes)
sagaYes.BranchHeaders = map[string]string{
"test_header": "test",
}
@ -143,7 +143,7 @@ func TestSagaHeaders(t *testing.T) {
func TestSagaHeadersYes1(t *testing.T) {
gidYes := dtmimp.GetFuncName()
sagaYes := dtmcli.NewSaga(dtmutil.DefaultHttpServer, gidYes)
sagaYes := dtmcli.NewSaga(dtmutil.DefaultHTTPServer, gidYes)
sagaYes.BranchHeaders = map[string]string{
"test_header": "test",
}

6
test/saga_test.go

@ -64,7 +64,7 @@ func TestSagaAbnormal(t *testing.T) {
}
func TestSagaEmptyUrl(t *testing.T) {
saga := dtmcli.NewSaga(dtmutil.DefaultHttpServer, dtmimp.GetFuncName())
saga := dtmcli.NewSaga(dtmutil.DefaultHTTPServer, dtmimp.GetFuncName())
req := busi.GenTransReq(30, false, false)
saga.Add(busi.Busi+"/TransOut", "", &req)
saga.Add("", "", &req)
@ -75,7 +75,7 @@ func TestSagaEmptyUrl(t *testing.T) {
}
func genSaga(gid string, outFailed bool, inFailed bool) *dtmcli.Saga {
saga := dtmcli.NewSaga(dtmutil.DefaultHttpServer, gid)
saga := dtmcli.NewSaga(dtmutil.DefaultHTTPServer, gid)
req := busi.GenTransReq(30, outFailed, inFailed)
saga.Add(busi.Busi+"/TransOut", busi.Busi+"/TransOutRevert", &req)
saga.Add(busi.Busi+"/TransIn", busi.Busi+"/TransInRevert", &req)
@ -83,7 +83,7 @@ func genSaga(gid string, outFailed bool, inFailed bool) *dtmcli.Saga {
}
func genSaga1(gid string, outFailed bool, inFailed bool) *dtmcli.Saga {
saga := dtmcli.NewSaga(dtmutil.DefaultHttpServer, gid)
saga := dtmcli.NewSaga(dtmutil.DefaultHTTPServer, gid)
req := busi.GenTransReq(30, outFailed, inFailed)
saga.Add(busi.Busi+"/TransOut", busi.Busi+"/TransOutRevert", &req)
return saga

2
test/tcc_cover_test.go

@ -21,7 +21,7 @@ func TestTccCoverNotConnected(t *testing.T) {
func TestTccCoverPanic(t *testing.T) {
gid := dtmimp.GetFuncName()
err := dtmimp.CatchP(func() {
_ = dtmcli.TccGlobalTransaction(dtmutil.DefaultHttpServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) {
_ = dtmcli.TccGlobalTransaction(dtmutil.DefaultHTTPServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) {
panic("user panic")
})
assert.FailNow(t, "not executed")

10
test/tcc_test.go

@ -20,7 +20,7 @@ import (
func TestTccNormal(t *testing.T) {
req := busi.GenTransReq(30, false, false)
gid := dtmimp.GetFuncName()
err := dtmcli.TccGlobalTransaction(dtmutil.DefaultHttpServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) {
err := dtmcli.TccGlobalTransaction(dtmutil.DefaultHTTPServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) {
_, err := tcc.CallBranch(req, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert")
assert.Nil(t, err)
return tcc.CallBranch(req, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert")
@ -34,7 +34,7 @@ func TestTccNormal(t *testing.T) {
func TestTccRollback(t *testing.T) {
gid := dtmimp.GetFuncName()
req := busi.GenTransReq(30, false, true)
err := dtmcli.TccGlobalTransaction(dtmutil.DefaultHttpServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) {
err := dtmcli.TccGlobalTransaction(dtmutil.DefaultHTTPServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) {
_, rerr := tcc.CallBranch(req, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert")
assert.Nil(t, rerr)
busi.MainSwitch.TransOutRevertResult.SetOnce(dtmcli.ResultOngoing)
@ -54,7 +54,7 @@ func TestTccTimeout(t *testing.T) {
gid := dtmimp.GetFuncName()
timeoutChan := make(chan int, 1)
err := dtmcli.TccGlobalTransaction(dtmutil.DefaultHttpServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) {
err := dtmcli.TccGlobalTransaction(dtmutil.DefaultHTTPServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) {
_, err := tcc.CallBranch(req, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert")
assert.Nil(t, err)
go func() {
@ -74,7 +74,7 @@ func TestTccTimeout(t *testing.T) {
func TestTccCompatible(t *testing.T) {
req := busi.GenTransReq(30, false, false)
gid := dtmimp.GetFuncName()
err := dtmcli.TccGlobalTransaction(dtmutil.DefaultHttpServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) {
err := dtmcli.TccGlobalTransaction(dtmutil.DefaultHTTPServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) {
_, err := tcc.CallBranch(req, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert")
assert.Nil(t, err)
return tcc.CallBranch(req, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert")
@ -89,7 +89,7 @@ func TestTccCompatible(t *testing.T) {
func TestTccHeaders(t *testing.T) {
req := busi.GenTransReq(30, false, false)
gid := dtmimp.GetFuncName()
err := dtmcli.TccGlobalTransaction2(dtmutil.DefaultHttpServer, gid, func(t *dtmcli.Tcc) {
err := dtmcli.TccGlobalTransaction2(dtmutil.DefaultHTTPServer, gid, func(t *dtmcli.Tcc) {
t.BranchHeaders = map[string]string{
"test_header": "test",
}

Loading…
Cancel
Save