Browse Source

Merge pull request #123 from yedf/alpha

Log refactored
pull/125/head
yedf2 4 years ago
committed by GitHub
parent
commit
5e6f24b5e1
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      app/main.go
  2. 27
      bench/http.go
  3. 4
      bench/main.go
  4. 11
      common/config.go
  5. 2
      common/config_utils.go
  6. 7
      common/db.go
  7. 4
      common/types.go
  8. 22
      common/utils.go
  9. 2
      conf.sample.yml
  10. 3
      dtmcli/barrier.go
  11. 82
      dtmcli/dtmimp/utils.go
  12. 18
      dtmcli/dtmimp/utils_test.go
  13. 5
      dtmcli/dtmimp/vars.go
  14. 64
      dtmcli/logger/log.go
  15. 17
      dtmcli/logger/logger_test.go
  16. 5
      dtmgrpc/dtmgimp/grpc_clients.go
  17. 20
      dtmgrpc/dtmgimp/types.go
  18. 3
      dtmgrpc/dtmgimp/utils.go
  19. 4
      dtmsvr/api.go
  20. 6
      dtmsvr/cron.go
  21. 14
      dtmsvr/storage/boltdb/boltdb.go
  22. 13
      dtmsvr/storage/redis/redis.go
  23. 13
      dtmsvr/storage/trans.go
  24. 20
      dtmsvr/svr.go
  25. 3
      dtmsvr/trans_class.go
  26. 16
      dtmsvr/trans_process.go
  27. 8
      dtmsvr/trans_status.go
  28. 4
      dtmsvr/trans_type_msg.go
  29. 13
      dtmsvr/trans_type_saga.go
  30. 3
      dtmsvr/trans_type_tcc.go
  31. 17
      examples/base_grpc.go
  32. 11
      examples/base_http.go
  33. 11
      examples/base_types.go
  34. 4
      examples/data.go
  35. 4
      examples/grpc_msg.go
  36. 6
      examples/grpc_saga.go
  37. 3
      examples/grpc_saga_barrier.go
  38. 6
      examples/grpc_tcc.go
  39. 4
      examples/grpc_xa.go
  40. 4
      examples/http_gorm_xa.go
  41. 10
      examples/http_msg.go
  42. 24
      examples/http_saga.go
  43. 7
      examples/http_saga_barrier.go
  44. 8
      examples/http_saga_gorm_barrier.go
  45. 12
      examples/http_tcc.go
  46. 5
      examples/http_tcc_barrier.go
  47. 6
      examples/http_xa.go
  48. 5
      examples/quick_start.go
  49. 2
      helper/test-cover.sh
  50. 5
      test/base_test.go
  51. 11
      test/saga_grpc_test.go
  52. 11
      test/saga_test.go
  53. 9
      test/tcc_barrier_test.go
  54. 3
      test/tcc_grpc_test.go
  55. 9
      test/types.go

6
app/main.go

@ -15,7 +15,7 @@ import (
"github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
"github.com/yedf/dtm/dtmsvr"
"github.com/yedf/dtm/dtmsvr/storage/registry"
"github.com/yedf/dtm/examples"
@ -50,7 +50,7 @@ func main() {
fmt.Printf("version: %s commit: %s built at: %s\n", Version, Commit, Date)
return
}
dtmimp.Logf("starting dtm....")
logger.Infof("starting dtm....")
common.MustLoadConfig()
if common.Config.ExamplesDB.Driver != "" {
dtmcli.SetCurrentDBType(common.Config.ExamplesDB.Driver)
@ -75,7 +75,7 @@ func main() {
examples.BaseAppStartup()
sample := examples.Samples[os.Args[1]]
dtmimp.LogIfFatalf(sample == nil, "no sample name for %s", os.Args[1])
logger.FatalfIf(sample == nil, "no sample name for %s", os.Args[1])
sample.Action()
}
select {}

27
bench/http.go

@ -17,6 +17,7 @@ import (
"github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
"github.com/yedf/dtm/dtmsvr"
"github.com/yedf/dtm/examples"
)
@ -32,14 +33,14 @@ var benchBusi = fmt.Sprintf("http://localhost:%d%s", benchPort, benchAPI)
func sdbGet() *sql.DB {
db, err := dtmimp.PooledDB(common.Config.Store.GetDBConf())
dtmimp.FatalIfError(err)
logger.FatalIfError(err)
return db
}
func txGet() *sql.Tx {
db := sdbGet()
tx, err := db.Begin()
dtmimp.FatalIfError(err)
logger.FatalIfError(err)
return tx
}
@ -50,7 +51,7 @@ func reloadData() {
tables := []string{"dtm_busi.user_account", "dtm_busi.user_account_log", "dtm.trans_global", "dtm.trans_branch_op", "dtm_barrier.barrier"}
for _, t := range tables {
_, err := dtmimp.DBExec(db, fmt.Sprintf("truncate %s", t))
dtmimp.FatalIfError(err)
logger.FatalIfError(err)
}
s := "insert ignore into dtm_busi.user_account(user_id, balance) values "
ss := []string{}
@ -58,8 +59,8 @@ func reloadData() {
ss = append(ss, fmt.Sprintf("(%d, 1000000)", i))
}
_, err := dtmimp.DBExec(db, s+strings.Join(ss, ","))
dtmimp.FatalIfError(err)
dtmimp.Logf("%d users inserted. used: %dms", total, time.Since(began).Milliseconds())
logger.FatalIfError(err)
logger.Debugf("%d users inserted. used: %dms", total, time.Since(began).Milliseconds())
}
var uidCounter int32 = 0
@ -70,11 +71,11 @@ var sqls int = 1
func StartSvr() {
app := common.GetGinApp()
benchAddRoute(app)
dtmimp.Logf("bench listening at %d", benchPort)
logger.Debugf("bench listening at %d", benchPort)
go app.Run(fmt.Sprintf(":%d", benchPort))
db := sdbGet()
_, err := dtmimp.DBExec(db, "drop table if exists dtm_busi.user_account_log")
dtmimp.FatalIfError(err)
logger.FatalIfError(err)
_, err = dtmimp.DBExec(db, `create table if not exists dtm_busi.user_account_log (
id INT(11) AUTO_INCREMENT PRIMARY KEY,
user_id INT(11) NOT NULL,
@ -89,7 +90,7 @@ func StartSvr() {
key(create_time)
)
`)
dtmimp.FatalIfError(err)
logger.FatalIfError(err)
}
func qsAdjustBalance(uid int, amount int, c *gin.Context) (interface{}, error) {
@ -101,21 +102,21 @@ func qsAdjustBalance(uid int, amount int, c *gin.Context) (interface{}, error) {
for i := 0; i < sqls; i++ {
_, err := dtmimp.DBExec(tx, "insert into dtm_busi.user_account_log(user_id, delta, gid, branch_id, op, reason) values(?,?,?,?,?,?)",
uid, amount, tb.Gid, c.Query("branch_id"), tb.TransType, fmt.Sprintf("inserted by dtm transaction %s %s", tb.Gid, c.Query("branch_id")))
dtmimp.FatalIfError(err)
logger.FatalIfError(err)
_, err = dtmimp.DBExec(tx, "update dtm_busi.user_account set balance = balance + ?, update_time = now() where user_id = ?", amount, uid)
dtmimp.FatalIfError(err)
logger.FatalIfError(err)
}
return nil
}
if strings.Contains(mode, "barrier") {
barrier, err := dtmcli.BarrierFromQuery(c.Request.URL.Query())
dtmimp.FatalIfError(err)
logger.FatalIfError(err)
barrier.Call(txGet(), f)
} else {
tx := txGet()
f(tx)
err := tx.Commit()
dtmimp.FatalIfError(err)
logger.FatalIfError(err)
}
return dtmcli.MapSuccess, nil
@ -150,7 +151,7 @@ func benchAddRoute(app *gin.Engine) {
req := gin.H{}
params := fmt.Sprintf("?uid=%s", suid)
params2 := fmt.Sprintf("?uid=%s", suid2)
dtmimp.Logf("mode: %s contains dtm: %t", mode, strings.Contains(mode, "dtm"))
logger.Debugf("mode: %s contains dtm: %t", mode, strings.Contains(mode, "dtm"))
if strings.Contains(mode, "dtm") {
saga := dtmcli.NewSaga(examples.DtmHttpServer, fmt.Sprintf("bench-%d", uid)).
Add(benchBusi+"/TransOut"+params, benchBusi+"/TransOutCompensate"+params, req).

4
bench/main.go

@ -6,7 +6,7 @@ import (
"github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
"github.com/yedf/dtm/dtmsvr"
"github.com/yedf/dtm/dtmsvr/storage/registry"
"github.com/yedf/dtm/examples"
@ -23,7 +23,7 @@ func main() {
fmt.Printf(hint)
return
}
dtmimp.Logf("starting dtm....")
logger.Debugf("starting dtm....")
if os.Args[1] == "http" {
fmt.Println("start bench server")
common.MustLoadConfig()

11
common/config.go

@ -7,7 +7,7 @@ import (
"path/filepath"
"github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
"gopkg.in/yaml.v2"
)
@ -61,6 +61,7 @@ type configType struct {
GrpcPort int64 `yaml:"GrpcPort" default:"36790"`
MicroService MicroService `yaml:"MicroService"`
UpdateBranchSync int64 `yaml:"UpdateBranchSync"`
LogLevel string `yaml:"LogLevel" default:"info"`
ExamplesDB dtmcli.DBConf `yaml:"ExamplesDB"`
}
@ -82,13 +83,13 @@ func MustLoadConfig() {
}
if len(cont) != 0 {
err := yaml.UnmarshalStrict(cont, &Config)
dtmimp.FatalIfError(err)
logger.FatalIfError(err)
}
scont, err := json.MarshalIndent(&Config, "", " ")
dtmimp.FatalIfError(err)
dtmimp.Logf("config is: \n%s", scont)
logger.FatalIfError(err)
logger.Debugf("config is: \n%s", scont)
err = checkConfig()
dtmimp.LogIfFatalf(err != nil, `config error: '%v'.
logger.FatalfIf(err != nil, `config error: '%v'.
check you env, and conf.yml/conf.sample.yml in current and parent path: %s.
please visit http://d.dtm.pub to see the config document.
loaded config is:

2
common/config_utils.go

@ -50,6 +50,6 @@ func toUnderscoreUpper(key string) string {
key = strings.Trim(key, "_")
matchFirstCap := regexp.MustCompile("([a-z])([A-Z]+)")
s2 := matchFirstCap.ReplaceAllString(key, "${1}_${2}")
// dtmimp.Logf("loading from env: %s", strings.ToUpper(s2))
// logger.Debugf("loading from env: %s", strings.ToUpper(s2))
return strings.ToUpper(s2)
}

7
common/db.go

@ -11,6 +11,7 @@ import (
_ "github.com/lib/pq" // register postgres driver
"github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
"gorm.io/driver/mysql"
"gorm.io/driver/postgres"
"gorm.io/gorm"
@ -65,7 +66,7 @@ func (op *tracePlugin) Initialize(db *gorm.DB) (err error) {
after := func(db *gorm.DB) {
_ts, _ := db.InstanceGet("ivy.startTime")
sql := db.Dialector.Explain(db.Statement.SQL.String(), db.Statement.Vars...)
dtmimp.Logf("used: %d ms affected: %d sql is: %s", time.Since(_ts.(time.Time)).Milliseconds(), db.RowsAffected, sql)
logger.Debugf("used: %d ms affected: %d sql is: %s", time.Since(_ts.(time.Time)).Milliseconds(), db.RowsAffected, sql)
if v, ok := db.InstanceGet("ivy.must"); ok && v.(bool) {
if db.Error != nil && db.Error != gorm.ErrRecordNotFound {
panic(db.Error)
@ -76,7 +77,7 @@ func (op *tracePlugin) Initialize(db *gorm.DB) (err error) {
beforeName := "cb_before"
afterName := "cb_after"
dtmimp.Logf("installing db plugin: %s", op.Name())
logger.Debugf("installing db plugin: %s", op.Name())
// 开始前
_ = db.Callback().Create().Before("gorm:before_create").Register(beforeName, before)
_ = db.Callback().Query().Before("gorm:query").Register(beforeName, before)
@ -108,7 +109,7 @@ func DbGet(conf dtmcli.DBConf) *DB {
dsn := dtmimp.GetDsn(conf)
db, ok := dbs.Load(dsn)
if !ok {
dtmimp.Logf("connecting %s", strings.Replace(dsn, conf.Passwrod, "****", 1))
logger.Debugf("connecting %s", strings.Replace(dsn, conf.Passwrod, "****", 1))
db1, err := gorm.Open(getGormDialetor(conf.Driver, dsn), &gorm.Config{
SkipDefaultTransaction: true,
})

4
common/types.go

@ -11,7 +11,7 @@ import (
"sync"
"github.com/go-redis/redis/v8"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
)
var rdb *redis.Client
@ -19,7 +19,7 @@ var once sync.Once
func RedisGet() *redis.Client {
once.Do(func() {
dtmimp.Logf("connecting to redis: %v", Config.Store)
logger.Debugf("connecting to redis: %v", Config.Store)
rdb = redis.NewClient(&redis.Options{
Addr: fmt.Sprintf("%s:%d", Config.Store.Host, Config.Store.Port),
Username: Config.Store.User,

22
common/utils.go

@ -20,12 +20,14 @@ import (
"github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
)
// GetGinApp init and return gin
func GetGinApp() *gin.Engine {
gin.SetMode(gin.ReleaseMode)
app := gin.Default()
app := gin.New()
app.Use(gin.Recovery())
app.Use(func(c *gin.Context) {
body := ""
if c.Request.Body != nil {
@ -36,11 +38,8 @@ func GetGinApp() *gin.Engine {
c.Request.Body = ioutil.NopCloser(bytes.NewBuffer(rb))
}
}
began := time.Now()
dtmimp.Logf("begin %s %s query: %s body: %s", c.Request.Method, c.FullPath(), c.Request.URL.RawQuery, body)
logger.Debugf("begin %s %s query: %s body: %s", c.Request.Method, c.FullPath(), c.Request.URL.RawQuery, body)
c.Next()
dtmimp.Logf("used %d ms %s %s query: %s body: %s", time.Since(began).Milliseconds(), c.Request.Method, c.FullPath(), c.Request.URL.RawQuery, body)
})
app.Any("/api/ping", func(c *gin.Context) { c.JSON(200, map[string]interface{}{"msg": "pong"}) })
return app
@ -49,6 +48,7 @@ func GetGinApp() *gin.Engine {
// WrapHandler name is clear
func WrapHandler(fn func(*gin.Context) (interface{}, error)) gin.HandlerFunc {
return func(c *gin.Context) {
began := time.Now()
r, err := func() (r interface{}, rerr error) {
defer dtmimp.P2E(&rerr)
return fn(c)
@ -59,11 +59,12 @@ func WrapHandler(fn func(*gin.Context) (interface{}, error)) gin.HandlerFunc {
} else if err == nil {
b, err = json.Marshal(r)
}
if err != nil {
dtmimp.Logf("status: 500, code: 500 message: %s", err.Error())
logger.Errorf("%2dms 500 %s %s %s %s", time.Since(began).Milliseconds(), err.Error(), c.Request.Method, c.Request.RequestURI, string(b))
c.JSON(500, map[string]interface{}{"code": 500, "message": err.Error()})
} else {
dtmimp.Logf("status: 200, content: %s", string(b))
logger.Infof("%2dms 200 %s %s %s", time.Since(began).Milliseconds(), c.Request.Method, c.Request.RequestURI, string(b))
c.Status(200)
c.Writer.Header().Add("Content-Type", "application/json")
_, err = c.Writer.Write(b)
@ -105,10 +106,10 @@ func GetNextTime(second int64) *time.Time {
// RunSQLScript 1
func RunSQLScript(conf dtmcli.DBConf, script string, skipDrop bool) {
con, err := dtmimp.StandaloneDB(conf)
dtmimp.FatalIfError(err)
logger.FatalIfError(err)
defer func() { con.Close() }()
content, err := ioutil.ReadFile(script)
dtmimp.FatalIfError(err)
logger.FatalIfError(err)
sqls := strings.Split(string(content), ";")
for _, sql := range sqls {
s := strings.TrimSpace(sql)
@ -116,6 +117,7 @@ func RunSQLScript(conf dtmcli.DBConf, script string, skipDrop bool) {
continue
}
_, err = dtmimp.DBExec(con, s)
dtmimp.FatalIfError(err)
logger.FatalIfError(err)
logger.Infof("sql scripts finished: %s", s)
}
}

2
conf.sample.yml

@ -39,6 +39,8 @@ Store: # specify which engine to store trans status
# TimeoutToFail: 35 # timeout for XA, TCC to fail. saga's timeout default to infinite, which can be overwritten in saga options
# RetryInterval: 10 # the subtrans branch will be retried after this interval
# LogLevel: 'info' # default: info. can be debug|info|warn|error
### dtm can run examples, and examples will use following config to connect db
ExamplesDB:
Driver: 'mysql'

3
dtmcli/barrier.go

@ -12,6 +12,7 @@ import (
"net/url"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
)
// BarrierBusiFunc type for busi func
@ -82,7 +83,7 @@ func (bb *BranchBarrier) Call(tx *sql.Tx, busiCall BarrierBusiFunc) (rerr error)
originAffected, _ := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, originType, bid, ti.Op)
currentAffected, rerr := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, ti.Op, bid, ti.Op)
dtmimp.Logf("originAffected: %d currentAffected: %d", originAffected, currentAffected)
logger.Debugf("originAffected: %d currentAffected: %d", originAffected, currentAffected)
if (ti.Op == BranchCancel || ti.Op == BranchCompensate) && originAffected > 0 || // 这个是空补偿
currentAffected == 0 { // 这个是重复请求或者悬挂
return

82
dtmcli/dtmimp/utils.go

@ -11,23 +11,36 @@ import (
"encoding/json"
"errors"
"fmt"
"log"
"os"
"runtime"
"runtime/debug"
"strconv"
"strings"
"sync"
"time"
"github.com/go-resty/resty/v2"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"github.com/yedf/dtm/dtmcli/logger"
)
// Logf an alias of Infof
// Deprecated: use logger.Errorf
var Logf = logger.Infof
// LogRedf an alias of Errorf
// Deprecated: use logger.Errorf
var LogRedf = logger.Errorf
// FatalIfError fatal if error is not nil
// Deprecated: use logger.FatalIfError
var FatalIfError = logger.FatalIfError
// LogIfFatalf fatal if cond is true
// Deprecated: use logger.FatalfIf
var LogIfFatalf = logger.FatalfIf
// AsError wrap a panic value as an error
func AsError(x interface{}) error {
LogRedf("panic wrapped to error: '%v'", x)
logger.Errorf("panic wrapped to error: '%v'", x)
if e, ok := x.(error); ok {
return e
}
@ -120,59 +133,6 @@ func MustRemarshal(from interface{}, to interface{}) {
E2P(err)
}
var logger *zap.SugaredLogger = nil
func init() {
InitLog()
}
// InitLog is a initialization for a logger
func InitLog() {
config := zap.NewProductionConfig()
config.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
if os.Getenv("DTM_DEBUG") != "" {
config.Encoding = "console"
config.EncoderConfig.EncodeLevel = zapcore.CapitalColorLevelEncoder
}
p, err := config.Build(zap.AddCallerSkip(1))
if err != nil {
log.Fatal("create logger failed: ", err)
}
logger = p.Sugar()
}
// Logf is log stdout
func Logf(fmt string, args ...interface{}) {
logger.Infof(fmt, args...)
}
// LogRedf is print error message with red color
func LogRedf(fmt string, args ...interface{}) {
logger.Errorf(fmt, args...)
}
// FatalExitFunc is a Fatal exit function ,it will be replaced when testing
var FatalExitFunc = func() { os.Exit(1) }
// LogFatalf is print error message with red color, and execute FatalExitFunc
func LogFatalf(fmt string, args ...interface{}) {
fmt += "\n" + string(debug.Stack())
LogRedf(fmt, args...)
FatalExitFunc()
}
// LogIfFatalf is print error message with red color, and execute LogFatalf, when condition is true
func LogIfFatalf(condition bool, fmt string, args ...interface{}) {
if condition {
LogFatalf(fmt, args...)
}
}
// FatalIfError is print error message with red color, and execute LogIfFatalf.
func FatalIfError(err error) {
LogIfFatalf(err != nil, "Fatal error: %v", err)
}
// GetFuncName get current call func name
func GetFuncName() string {
pc, _, _, _ := runtime.Caller(1)
@ -223,9 +183,9 @@ func DBExec(db DB, sql string, values ...interface{}) (affected int64, rerr erro
used := time.Since(began) / time.Millisecond
if rerr == nil {
affected, rerr = r.RowsAffected()
Logf("used: %d ms affected: %d for %s %v", used, affected, sql, values)
logger.Debugf("used: %d ms affected: %d for %s %v", used, affected, sql, values)
} else {
LogRedf("used: %d ms exec error: %v for %s %v", used, rerr, sql, values)
logger.Errorf("used: %d ms exec error: %v for %s %v", used, rerr, sql, values)
}
return
}
@ -258,7 +218,7 @@ func CheckResponse(resp *resty.Response, err error) error {
return err
}
// CheckResult is check result. Return err directly if err is not nil. And return corresponding error by calling CheckResponse if resp is the type of *resty.Response.
// CheckResult is check result. Return err directly if err is not nil. And return corresponding error by calling CheckResponse if resp is the type of *resty.Response.
// Otherwise, return error by value of str, the string after marshal.
func CheckResult(res interface{}, err error) error {
if err != nil {

18
dtmcli/dtmimp/utils_test.go

@ -8,7 +8,6 @@ package dtmimp
import (
"errors"
"fmt"
"os"
"strings"
"testing"
@ -80,20 +79,3 @@ func TestSome(t *testing.T) {
s2 := MayReplaceLocalhost("http://localhost")
assert.Equal(t, "http://localhost", s2)
}
func TestFatal(t *testing.T) {
old := FatalExitFunc
defer func() {
FatalExitFunc = old
}()
FatalExitFunc = func() { panic(fmt.Errorf("fatal")) }
err := CatchP(func() {
LogIfFatalf(true, "")
})
assert.Error(t, err, fmt.Errorf("fatal"))
}
func TestInitLog(t *testing.T) {
os.Setenv("DTM_DEBUG", "1")
InitLog()
}

5
dtmcli/dtmimp/vars.go

@ -10,6 +10,7 @@ import (
"errors"
"github.com/go-resty/resty/v2"
"github.com/yedf/dtm/dtmcli/logger"
)
// ErrFailure error of FAILURE
@ -36,12 +37,12 @@ func init() {
// RestyClient.SetRetryWaitTime(1 * time.Second)
RestyClient.OnBeforeRequest(func(c *resty.Client, r *resty.Request) error {
r.URL = MayReplaceLocalhost(r.URL)
Logf("requesting: %s %s %v %v", r.Method, r.URL, r.Body, r.QueryParam)
logger.Debugf("requesting: %s %s %v %v", r.Method, r.URL, r.Body, r.QueryParam)
return nil
})
RestyClient.OnAfterResponse(func(c *resty.Client, resp *resty.Response) error {
r := resp.Request
Logf("requested: %s %s %s", r.Method, r.URL, resp.String())
logger.Debugf("requested: %s %s %s", r.Method, r.URL, resp.String())
return nil
})
}

64
dtmcli/logger/log.go

@ -0,0 +1,64 @@
package logger
import (
"log"
"os"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
var logger *zap.SugaredLogger = nil
func init() {
InitLog("info")
}
// InitLog is a initialization for a logger
// level can be: debug info warn error
func InitLog(level string) {
config := zap.NewProductionConfig()
err := config.Level.UnmarshalText([]byte(level))
FatalIfError(err)
config.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
if os.Getenv("DTM_DEBUG") != "" {
config.Encoding = "console"
config.EncoderConfig.EncodeLevel = zapcore.CapitalColorLevelEncoder
}
p, err := config.Build(zap.AddCallerSkip(1))
FatalIfError(err)
logger = p.Sugar()
}
// Debugf log to level debug
func Debugf(fmt string, args ...interface{}) {
logger.Debugf(fmt, args...)
}
// Infof log to level info
func Infof(fmt string, args ...interface{}) {
logger.Infof(fmt, args...)
}
// Warnf log to level warn
func Warnf(fmt string, args ...interface{}) {
logger.Warnf(fmt, args...)
}
// Errorf log to level error
func Errorf(fmt string, args ...interface{}) {
logger.Errorf(fmt, args...)
}
// FatalfIf log to level error
func FatalfIf(cond bool, fmt string, args ...interface{}) {
if !cond {
return
}
log.Fatalf(fmt, args...)
}
// FatalIfError if err is not nil, then log to level fatal and call os.Exit
func FatalIfError(err error) {
FatalfIf(err != nil, "fatal error: %v", err)
}

17
dtmcli/logger/logger_test.go

@ -0,0 +1,17 @@
package logger
import (
"os"
"testing"
)
func TestInitLog(t *testing.T) {
os.Setenv("DTM_DEBUG", "1")
InitLog("debug")
Debugf("a debug msg")
Infof("a info msg")
Warnf("a warn msg")
Errorf("a error msg")
FatalfIf(false, "nothing")
FatalIfError(nil)
}

5
dtmgrpc/dtmgimp/grpc_clients.go

@ -11,6 +11,7 @@ import (
"sync"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
"github.com/yedf/dtm/dtmgrpc/dtmgpb"
grpc "google.golang.org/grpc"
)
@ -57,12 +58,12 @@ func GetGrpcConn(grpcServer string, isRaw bool) (conn *grpc.ClientConn, rerr err
if isRaw {
opts = grpc.WithDefaultCallOptions(grpc.ForceCodec(rawCodec{}))
}
dtmimp.Logf("grpc client connecting %s", grpcServer)
logger.Debugf("grpc client connecting %s", grpcServer)
conn, rerr := grpc.Dial(grpcServer, grpc.WithInsecure(), grpc.WithUnaryInterceptor(GrpcClientLog), opts)
if rerr == nil {
clients.Store(grpcServer, conn)
v = conn
dtmimp.Logf("grpc client inited for %s", grpcServer)
logger.Debugf("grpc client inited for %s", grpcServer)
}
}
return v.(*grpc.ClientConn), rerr

20
dtmgrpc/dtmgimp/types.go

@ -9,9 +9,11 @@ package dtmgimp
import (
"context"
"fmt"
"time"
"github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@ -19,28 +21,30 @@ import (
// GrpcServerLog 打印grpc服务端的日志
func GrpcServerLog(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
dtmimp.Logf("grpc server handling: %s %v", info.FullMethod, req)
began := time.Now()
logger.Debugf("grpc server handling: %s %s", info.FullMethod, dtmimp.MustMarshalString(req))
LogDtmCtx(ctx)
m, err := handler(ctx, req)
res := fmt.Sprintf("grpc server handled: %s %v result: %v err: %v", info.FullMethod, req, m, err)
res := fmt.Sprintf("%2dms %v %s %s %s",
time.Since(began).Milliseconds(), err, info.FullMethod, dtmimp.MustMarshalString(m), dtmimp.MustMarshalString(req))
if err != nil {
dtmimp.LogRedf("%s", res)
logger.Errorf("%s", res)
} else {
dtmimp.Logf("%s", res)
logger.Infof("%s", res)
}
return m, err
}
// GrpcClientLog 打印grpc服务端的日志
func GrpcClientLog(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
dtmimp.Logf("grpc client calling: %s%s %v", cc.Target(), method, req)
logger.Debugf("grpc client calling: %s%s %v", cc.Target(), method, req)
LogDtmCtx(ctx)
err := invoker(ctx, method, req, reply, cc, opts...)
res := fmt.Sprintf("grpc client called: %s%s %v result: %v err: %v", cc.Target(), method, req, reply, err)
if err != nil {
dtmimp.LogRedf("%s", res)
logger.Errorf("%s", res)
} else {
dtmimp.Logf("%s", res)
logger.Debugf("%s", res)
}
return err
}
@ -49,7 +53,7 @@ func GrpcClientLog(ctx context.Context, method string, req, reply interface{}, c
func Result2Error(res interface{}, err error) error {
e := dtmimp.CheckResult(res, err)
if e == dtmimp.ErrFailure {
dtmimp.LogRedf("failure: res: %v, err: %v", res, e)
logger.Errorf("failure: res: %v, err: %v", res, e)
return status.New(codes.Aborted, dtmcli.ResultFailure).Err()
} else if e == dtmimp.ErrOngoing {
return status.New(codes.Aborted, dtmcli.ResultOngoing).Err()

3
dtmgrpc/dtmgimp/utils.go

@ -10,6 +10,7 @@ import (
context "context"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
"github.com/yedf/dtm/dtmgrpc/dtmgpb"
"google.golang.org/grpc/metadata"
"google.golang.org/protobuf/proto"
@ -59,7 +60,7 @@ func TransInfo2Ctx(gid, transType, branchID, op, dtm string) context.Context {
func LogDtmCtx(ctx context.Context) {
tb := TransBaseFromGrpc(ctx)
if tb.Gid != "" {
dtmimp.Logf("gid: %s trans_type: %s branch_id: %s op: %s dtm: %s", tb.Gid, tb.TransType, tb.BranchID, tb.Op, tb.Dtm)
logger.Debugf("gid: %s trans_type: %s branch_id: %s op: %s dtm: %s", tb.Gid, tb.TransType, tb.BranchID, tb.Op, tb.Dtm)
}
}

4
dtmsvr/api.go

@ -11,6 +11,7 @@ import (
"github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
"github.com/yedf/dtm/dtmsvr/storage"
)
@ -71,7 +72,10 @@ func svcRegisterBranch(transType string, branch *TransBranch, data map[string]st
})
if err == storage.ErrNotFound {
msg := fmt.Sprintf("no trans with gid: %s status: %s found", branch.Gid, dtmcli.StatusPrepared)
logger.Errorf(msg)
return map[string]interface{}{"dtm_result": dtmcli.ResultFailure, "message": msg}, nil
}
logger.Infof("LockGlobalSaveBranches result: %v: gid: %s old status: %s branches: %s",
err, branch.Gid, dtmcli.StatusPrepared, dtmimp.MustMarshalString(branches))
return dtmimp.If(err != nil, nil, dtmcli.MapSuccess), err
}

6
dtmsvr/cron.go

@ -13,6 +13,7 @@ import (
"time"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
)
// NowForwardDuration will be set in test, trans may be timeout
@ -49,12 +50,13 @@ func lockOneTrans(expireIn time.Duration) *TransGlobal {
if global == nil {
return nil
}
logger.Infof("cron job return a trans: %s", global.String())
return &TransGlobal{TransGlobalStore: *global}
}
func handlePanic(perr *error) {
if err := recover(); err != nil {
dtmimp.LogRedf("----recovered panic %v\n%s", err, string(debug.Stack()))
logger.Errorf("----recovered panic %v\n%s", err, string(debug.Stack()))
if perr != nil {
*perr = fmt.Errorf("dtm panic: %v", err)
}
@ -64,6 +66,6 @@ func handlePanic(perr *error) {
func sleepCronTime() {
normal := time.Duration((float64(config.TransCronInterval) - rand.Float64()) * float64(time.Second))
interval := dtmimp.If(CronForwardDuration > 0, 1*time.Millisecond, normal).(time.Duration)
dtmimp.Logf("sleeping for %v milli", interval/time.Microsecond)
logger.Debugf("sleeping for %v milli", interval/time.Microsecond)
time.Sleep(interval)
}

14
dtmsvr/storage/boltdb/boltdb.go

@ -11,6 +11,7 @@ import (
"github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
"github.com/yedf/dtm/dtmsvr/storage"
)
@ -100,9 +101,9 @@ func cleanupGlobalWithGids(t *bolt.Tx, gids map[string]struct{}) {
return
}
dtmimp.Logf("Start to cleanup %d gids", len(gids))
logger.Debugf("Start to cleanup %d gids", len(gids))
for gid := range gids {
dtmimp.Logf("Start to delete gid: %s", gid)
logger.Debugf("Start to delete gid: %s", gid)
bucket.Delete([]byte(gid))
}
}
@ -129,9 +130,9 @@ func cleanupBranchWithGids(t *bolt.Tx, gids map[string]struct{}) {
}
}
dtmimp.Logf("Start to cleanup %d branches", len(branchKeys))
logger.Debugf("Start to cleanup %d branches", len(branchKeys))
for _, key := range branchKeys {
dtmimp.Logf("Start to delete branch: %s", key)
logger.Debugf("Start to delete branch: %s", key)
bucket.Delete([]byte(key))
}
}
@ -155,9 +156,9 @@ func cleanupIndexWithGids(t *bolt.Tx, gids map[string]struct{}) {
}
}
dtmimp.Logf("Start to cleanup %d indexes", len(indexKeys))
logger.Debugf("Start to cleanup %d indexes", len(indexKeys))
for _, key := range indexKeys {
dtmimp.Logf("Start to delete index: %s", key)
logger.Debugf("Start to delete index: %s", key)
bucket.Delete([]byte(key))
}
}
@ -241,6 +242,7 @@ func (s *BoltdbStore) PopulateData(skipDrop bool) {
return nil
})
dtmimp.E2P(err)
logger.Infof("Reset all data for boltdb")
}
}

13
dtmsvr/storage/redis/redis.go

@ -10,6 +10,7 @@ import (
"github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
"github.com/yedf/dtm/dtmsvr/storage"
)
@ -26,8 +27,11 @@ func (s *RedisStore) Ping() error {
}
func (s *RedisStore) PopulateData(skipDrop bool) {
_, err := redisGet().FlushAll(ctx).Result()
dtmimp.PanicIf(err != nil, err)
if !skipDrop {
_, err := redisGet().FlushAll(ctx).Result()
logger.Infof("call redis flushall. result: %v", err)
dtmimp.PanicIf(err != nil, err)
}
}
func (s *RedisStore) FindTransGlobalStore(gid string) *storage.TransGlobalStore {
@ -114,7 +118,7 @@ func (a *argList) AppendBranches(branches []storage.TransBranchStore) *argList {
}
func handleRedisResult(ret interface{}, err error) (string, error) {
dtmimp.Logf("result is: '%v', err: '%v'", ret, err)
logger.Debugf("result is: '%v', err: '%v'", ret, err)
if err != nil && err != redis.Nil {
return "", err
}
@ -127,7 +131,7 @@ func handleRedisResult(ret interface{}, err error) (string, error) {
}
func callLua(a *argList, lua string) (string, error) {
dtmimp.Logf("calling lua. args: %v\nlua:%s", a, lua)
logger.Debugf("calling lua. args: %v\nlua:%s", a, lua)
ret, err := redisGet().Eval(ctx, lua, a.Keys, a.List...).Result()
return handleRedisResult(ret, err)
}
@ -201,7 +205,6 @@ if os.status ~= ARGV[4] then
return 'NOT_FOUND'
end
redis.call('SET', KEYS[1], ARGV[3], 'EX', ARGV[2])
redis.log(redis.LOG_WARNING, 'finished: ', ARGV[5])
if ARGV[5] == '1' then
redis.call('ZREM', KEYS[3], gs.gid)
end

13
dtmsvr/storage/trans.go

@ -5,6 +5,7 @@ import (
"github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/dtmcli/dtmimp"
)
type TransGlobalStore struct {
@ -29,10 +30,14 @@ type TransGlobalStore struct {
}
// TableName TableName
func (*TransGlobalStore) TableName() string {
func (g *TransGlobalStore) TableName() string {
return "dtm.trans_global"
}
func (g *TransGlobalStore) String() string {
return dtmimp.MustMarshalString(g)
}
// TransBranchStore branch transaction
type TransBranchStore struct {
common.ModelBase
@ -47,6 +52,10 @@ type TransBranchStore struct {
}
// TableName TableName
func (*TransBranchStore) TableName() string {
func (b *TransBranchStore) TableName() string {
return "dtm.trans_branch_op"
}
func (b *TransBranchStore) String() string {
return dtmimp.MustMarshalString(*b)
}

20
dtmsvr/svr.go

@ -13,7 +13,7 @@ import (
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
"github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
"github.com/yedf/dtm/dtmgrpc/dtmgimp"
"github.com/yedf/dtm/dtmgrpc/dtmgpb"
"github.com/yedf/dtmdriver"
@ -22,32 +22,32 @@ import (
// StartSvr StartSvr
func StartSvr() {
dtmimp.Logf("start dtmsvr")
logger.Infof("start dtmsvr")
app := common.GetGinApp()
app = httpMetrics(app)
addRoute(app)
dtmimp.Logf("dtmsvr listen at: %d", config.HttpPort)
logger.Infof("dtmsvr listen at: %d", config.HttpPort)
go app.Run(fmt.Sprintf(":%d", config.HttpPort))
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", config.GrpcPort))
dtmimp.FatalIfError(err)
logger.FatalIfError(err)
s := grpc.NewServer(
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
grpc.UnaryServerInterceptor(grpcMetrics), grpc.UnaryServerInterceptor(dtmgimp.GrpcServerLog)),
))
dtmgpb.RegisterDtmServer(s, &dtmServer{})
dtmimp.Logf("grpc listening at %v", lis.Addr())
logger.Infof("grpc listening at %v", lis.Addr())
go func() {
err := s.Serve(lis)
dtmimp.FatalIfError(err)
logger.FatalIfError(err)
}()
go updateBranchAsync()
time.Sleep(100 * time.Millisecond)
err = dtmdriver.Use(config.MicroService.Driver)
dtmimp.FatalIfError(err)
logger.FatalIfError(err)
err = dtmdriver.GetDriver().RegisterGrpcService(config.MicroService.Target, config.MicroService.EndPoint)
dtmimp.FatalIfError(err)
logger.FatalIfError(err)
}
// PopulateDB setup mysql data
@ -79,11 +79,11 @@ func updateBranchAsync() {
for len(updates) > 0 {
dbr := GetStore().UpdateBranchesSql(updates, []string{"status", "finish_time", "update_time"})
dtmimp.Logf("flushed %d branch status to db. affected: %d", len(updates), dbr.RowsAffected)
if dbr.Error != nil {
dtmimp.LogRedf("async update branch status error: %v", dbr.Error)
logger.Errorf("async update branch status error: %v", dbr.Error)
time.Sleep(1 * time.Second)
} else {
logger.Infof("flushed %d branch status to db. affected: %d", len(updates), dbr.RowsAffected)
updates = []TransBranch{}
}
}

3
dtmsvr/trans_class.go

@ -12,6 +12,7 @@ import (
"github.com/gin-gonic/gin"
"github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
"github.com/yedf/dtm/dtmgrpc/dtmgpb"
"github.com/yedf/dtm/dtmsvr/storage"
)
@ -57,7 +58,7 @@ func TransFromContext(c *gin.Context) *TransGlobal {
e2p(err)
m := TransGlobal{}
dtmimp.MustUnmarshal(b, &m)
dtmimp.Logf("creating trans in prepare")
logger.Debugf("creating trans in prepare")
// Payloads will be store in BinPayloads, Payloads is only used to Unmarshal
for _, p := range m.Payloads {
m.BinPayloads = append(m.BinPayloads, []byte(p))

16
dtmsvr/trans_process.go

@ -12,6 +12,7 @@ import (
"github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
)
// Process process global transaction once
@ -44,16 +45,16 @@ func (t *TransGlobal) process() map[string]interface{} {
func (t *TransGlobal) processInner() (rerr error) {
defer handlePanic(&rerr)
defer func() {
if rerr != nil {
dtmimp.LogRedf("processInner got error: %s", rerr.Error())
if rerr != nil && rerr != dtmcli.ErrOngoing {
logger.Errorf("processInner got error: %s", rerr.Error())
}
if TransProcessedTestChan != nil {
dtmimp.Logf("processed: %s", t.Gid)
logger.Debugf("processed: %s", t.Gid)
TransProcessedTestChan <- t.Gid
dtmimp.Logf("notified: %s", t.Gid)
logger.Debugf("notified: %s", t.Gid)
}
}()
dtmimp.Logf("processing: %s status: %s", t.Gid, t.Status)
logger.Debugf("processing: %s status: %s", t.Gid, t.Status)
branches := GetStore().FindBranches(t.Gid)
t.lastTouched = time.Now()
rerr = t.getProcessor().ProcessOnce(branches)
@ -71,5 +72,8 @@ func (t *TransGlobal) saveNew() error {
now := time.Now()
t.CreateTime = &now
t.UpdateTime = &now
return GetStore().MaySaveNewTrans(&t.TransGlobalStore, branches)
err := GetStore().MaySaveNewTrans(&t.TransGlobalStore, branches)
logger.Infof("MaySaveNewTrans result: %v, global: %v branches: %v",
err, t.TransGlobalStore.String(), dtmimp.MustMarshalString(branches))
return err
}

8
dtmsvr/trans_status.go

@ -13,6 +13,7 @@ import (
"github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
"github.com/yedf/dtm/dtmgrpc/dtmgimp"
"github.com/yedf/dtmdriver"
"google.golang.org/grpc/codes"
@ -22,6 +23,7 @@ import (
func (t *TransGlobal) touchCronTime(ctype cronType) {
t.lastTouched = time.Now()
GetStore().TouchCronTime(&t.TransGlobalStore, t.getNextCronInterval(ctype))
logger.Infof("TouchCronTime for: %s", t.TransGlobalStore.String())
}
func (t *TransGlobal) changeStatus(status string) {
@ -36,6 +38,7 @@ func (t *TransGlobal) changeStatus(status string) {
}
t.UpdateTime = &now
GetStore().ChangeGlobalStatus(&t.TransGlobalStore, status, updates, status == dtmcli.StatusSucceed || status == dtmcli.StatusFailed)
logger.Infof("ChangeGlobalStatus to %s ok for %s", status, t.TransGlobalStore.String())
t.Status = status
}
@ -46,6 +49,8 @@ func (t *TransGlobal) changeBranchStatus(b *TransBranch, status string, branchPo
b.UpdateTime = &now
if config.Store.Driver != dtmimp.DBTypeMysql && config.Store.Driver != dtmimp.DBTypePostgres || config.UpdateBranchSync > 0 || t.updateBranchSync {
GetStore().LockGlobalSaveBranches(t.Gid, t.Status, []TransBranch{*b}, branchPos)
logger.Infof("LockGlobalSaveBranches ok: gid: %s old status: %s branches: %s",
b.Gid, dtmcli.StatusPrepared, b.String())
} else { // 为了性能优化,把branch的status更新异步化
updateBranchAsyncChan <- branchStatus{id: b.ID, status: status, finishTime: &now}
}
@ -67,6 +72,9 @@ func (t *TransGlobal) needProcess() bool {
}
func (t *TransGlobal) getURLResult(url string, branchID, op string, branchPayload []byte) (string, error) {
if url == "" { // empty url is success
return dtmcli.ResultSuccess, nil
}
if t.Protocol == "grpc" {
dtmimp.PanicIf(strings.HasPrefix(url, "http"), fmt.Errorf("bad url for grpc: %s", url))
server, method, err := dtmdriver.GetDriver().ParseServerMethod(url)

4
dtmsvr/trans_type_msg.go

@ -11,7 +11,7 @@ import (
"strings"
"github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
)
type transMsgProcessor struct {
@ -50,7 +50,7 @@ func (t *TransGlobal) mayQueryPrepared() {
} else if strings.Contains(body, dtmcli.ResultOngoing) {
t.touchCronTime(cronReset)
} else {
dtmimp.LogRedf("getting result failed for %s. error: %s", t.QueryPrepared, err.Error())
logger.Errorf("getting result failed for %s. error: %s", t.QueryPrepared, err.Error())
t.touchCronTime(cronBackoff)
}
}

13
dtmsvr/trans_type_saga.go

@ -12,6 +12,7 @@ import (
"github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
)
type transSagaProcessor struct {
@ -54,7 +55,7 @@ type branchResult struct {
func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error {
// when saga tasks is fetched, it always need to process
dtmimp.Logf("status: %s timeout: %t", t.Status, t.isTimeout())
logger.Debugf("status: %s timeout: %t", t.Status, t.isTimeout())
if t.Status == dtmcli.StatusSubmitted && t.isTimeout() {
t.changeStatus(dtmcli.StatusAborting)
}
@ -103,8 +104,8 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error {
err = dtmimp.AsError(x)
}
resultChan <- branchResult{index: i, status: branches[i].Status, op: branches[i].Op}
if err != nil {
dtmimp.LogRedf("exec branch error: %v", err)
if err != nil && err != dtmcli.ErrOngoing {
logger.Errorf("exec branch error: %v", err)
}
}()
err = t.execBranch(&branches[i], i)
@ -117,7 +118,7 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error {
toRun = append(toRun, current)
}
}
dtmimp.Logf("toRun picked for action is: %v", toRun)
logger.Debugf("toRun picked for action is: %v", toRun)
return toRun
}
runBranches := func(toRun []int) {
@ -159,9 +160,9 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error {
rsCSucceed++
}
}
dtmimp.Logf("branch done: %v", r)
logger.Debugf("branch done: %v", r)
case <-time.After(time.Duration(time.Second * 3)):
dtmimp.Logf("wait once for done")
logger.Debugf("wait once for done")
}
}

3
dtmsvr/trans_type_tcc.go

@ -9,6 +9,7 @@ package dtmsvr
import (
"github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
)
type transTccProcessor struct {
@ -33,7 +34,7 @@ func (t *transTccProcessor) ProcessOnce(branches []TransBranch) error {
op := dtmimp.If(t.Status == dtmcli.StatusSubmitted, dtmcli.BranchConfirm, dtmcli.BranchCancel).(string)
for current := len(branches) - 1; current >= 0; current-- {
if branches[current].Op == op && branches[current].Status == dtmcli.StatusPrepared {
dtmimp.Logf("branch info: current: %d ID: %d", current, branches[current].ID)
logger.Debugf("branch info: current: %d ID: %d", current, branches[current].ID)
err := t.execBranch(&branches[current], current)
if err != nil {
return err

17
examples/base_grpc.go

@ -16,6 +16,7 @@ import (
"github.com/gin-gonic/gin"
"github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
"github.com/yedf/dtm/dtmgrpc"
"github.com/yedf/dtm/dtmgrpc/dtmgimp"
@ -44,25 +45,25 @@ func init() {
// GrpcStartup for grpc
func GrpcStartup() {
conn, err := grpc.Dial(DtmGrpcServer, grpc.WithInsecure(), grpc.WithUnaryInterceptor(dtmgimp.GrpcClientLog))
dtmimp.FatalIfError(err)
logger.FatalIfError(err)
DtmClient = dtmgpb.NewDtmClient(conn)
dtmimp.Logf("dtm client inited")
logger.Debugf("dtm client inited")
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", BusiGrpcPort))
dtmimp.FatalIfError(err)
logger.FatalIfError(err)
s := grpc.NewServer(grpc.UnaryInterceptor(dtmgimp.GrpcServerLog))
RegisterBusiServer(s, &busiServer{})
go func() {
dtmimp.Logf("busi grpc listening at %v", lis.Addr())
logger.Debugf("busi grpc listening at %v", lis.Addr())
err := s.Serve(lis)
dtmimp.FatalIfError(err)
logger.FatalIfError(err)
}()
time.Sleep(100 * time.Millisecond)
}
func handleGrpcBusiness(in *BusiReq, result1 string, result2 string, busi string) error {
res := dtmimp.OrString(result1, result2, dtmcli.ResultSuccess)
dtmimp.Logf("grpc busi %s %v %s %s result: %s", busi, in, result1, result2, res)
logger.Debugf("grpc busi %s %v %s %s result: %s", busi, in, result1, result2, res)
if res == dtmcli.ResultSuccess {
return nil
} else if res == dtmcli.ResultFailure {
@ -137,9 +138,9 @@ func (s *busiServer) TransOutXa(ctx context.Context, in *BusiReq) (*emptypb.Empt
func (s *busiServer) TransInTccNested(ctx context.Context, in *BusiReq) (*emptypb.Empty, error) {
tcc, err := dtmgrpc.TccFromGrpc(ctx)
dtmimp.FatalIfError(err)
logger.FatalIfError(err)
r := &emptypb.Empty{}
err = tcc.CallBranch(in, BusiGrpc+"/examples.Busi/TransIn", BusiGrpc+"/examples.Busi/TransInConfirm", BusiGrpc+"/examples.Busi/TransInRevert", r)
dtmimp.FatalIfError(err)
logger.FatalIfError(err)
return r, handleGrpcBusiness(in, MainSwitch.TransInResult.Fetch(), in.TransInResult, dtmimp.GetFuncName())
}

11
examples/base_http.go

@ -17,6 +17,7 @@ import (
"github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
"gorm.io/driver/mysql"
"gorm.io/driver/postgres"
"gorm.io/gorm"
@ -40,7 +41,7 @@ var Busi string = fmt.Sprintf("http://localhost:%d%s", BusiPort, BusiAPI)
// BaseAppStartup base app startup
func BaseAppStartup() *gin.Engine {
dtmimp.Logf("examples starting")
logger.Debugf("examples starting")
app := common.GetGinApp()
app.Use(func(c *gin.Context) {
v := MainSwitch.NextResult.Fetch()
@ -54,10 +55,10 @@ func BaseAppStartup() *gin.Engine {
BaseAddRoute(app)
for k, v := range setupFuncs {
dtmimp.Logf("initing %s", k)
logger.Debugf("initing %s", k)
v(app)
}
dtmimp.Logf("Starting busi at: %d", BusiPort)
logger.Debugf("Starting busi at: %d", BusiPort)
go app.Run(fmt.Sprintf(":%d", BusiPort))
time.Sleep(100 * time.Millisecond)
@ -98,7 +99,7 @@ var MainSwitch mainSwitchType
func handleGeneralBusiness(c *gin.Context, result1 string, result2 string, busi string) (interface{}, error) {
info := infoFromContext(c)
res := dtmimp.OrString(result1, result2, dtmcli.ResultSuccess)
dtmimp.Logf("%s %s result: %s", busi, info.String(), res)
logger.Debugf("%s %s result: %s", busi, info.String(), res)
if res == "ERROR" {
return nil, errors.New("ERROR from user")
}
@ -136,7 +137,7 @@ func BaseAddRoute(app *gin.Engine) {
return handleGeneralBusiness(c, MainSwitch.TransOutRevertResult.Fetch(), "", "TransOutRevert")
}))
app.GET(BusiAPI+"/CanSubmit", common.WrapHandler(func(c *gin.Context) (interface{}, error) {
dtmimp.Logf("%s CanSubmit", c.Query("gid"))
logger.Debugf("%s CanSubmit", c.Query("gid"))
return dtmimp.OrString(MainSwitch.CanSubmitResult.Fetch(), dtmcli.ResultSuccess), nil
}))
app.POST(BusiAPI+"/TransInXa", common.WrapHandler(func(c *gin.Context) (interface{}, error) {

11
examples/base_types.go

@ -15,6 +15,7 @@ import (
"github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
"github.com/yedf/dtm/dtmgrpc"
)
@ -58,7 +59,7 @@ func reqFrom(c *gin.Context) *TransReq {
if !ok {
req := TransReq{}
err := c.BindJSON(&req)
dtmimp.FatalIfError(err)
logger.FatalIfError(err)
c.Set("trans_req", &req)
v = &req
}
@ -81,27 +82,27 @@ func dbGet() *common.DB {
func sdbGet() *sql.DB {
db, err := dtmimp.PooledDB(config.ExamplesDB)
dtmimp.FatalIfError(err)
logger.FatalIfError(err)
return db
}
func txGet() *sql.Tx {
db := sdbGet()
tx, err := db.Begin()
dtmimp.FatalIfError(err)
logger.FatalIfError(err)
return tx
}
// MustBarrierFromGin 1
func MustBarrierFromGin(c *gin.Context) *dtmcli.BranchBarrier {
ti, err := dtmcli.BarrierFromQuery(c.Request.URL.Query())
dtmimp.FatalIfError(err)
logger.FatalIfError(err)
return ti
}
// MustBarrierFromGrpc 1
func MustBarrierFromGrpc(ctx context.Context) *dtmcli.BranchBarrier {
ti, err := dtmgrpc.BarrierFromGrpc(ctx)
dtmimp.FatalIfError(err)
logger.FatalIfError(err)
return ti
}

4
examples/data.go

@ -10,7 +10,7 @@ import (
"fmt"
"github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
)
var config = &common.Config
@ -50,6 +50,6 @@ type sampleInfo struct {
var Samples = map[string]*sampleInfo{}
func addSample(name string, fn func() string) {
dtmimp.LogIfFatalf(Samples[name] != nil, "%s already exists", name)
logger.FatalfIf(Samples[name] != nil, "%s already exists", name)
Samples[name] = &sampleInfo{Arg: name, Action: fn}
}

4
examples/grpc_msg.go

@ -7,7 +7,7 @@
package examples
import (
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
dtmgrpc "github.com/yedf/dtm/dtmgrpc"
)
@ -19,7 +19,7 @@ func init() {
Add(BusiGrpc+"/examples.Busi/TransOut", req).
Add(BusiGrpc+"/examples.Busi/TransIn", req)
err := msg.Submit()
dtmimp.FatalIfError(err)
logger.FatalIfError(err)
return msg.Gid
})
}

6
examples/grpc_saga.go

@ -7,7 +7,7 @@
package examples
import (
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
dtmgrpc "github.com/yedf/dtm/dtmgrpc"
)
@ -19,7 +19,7 @@ func init() {
Add(BusiGrpc+"/examples.Busi/TransOut", BusiGrpc+"/examples.Busi/TransOutRevert", req).
Add(BusiGrpc+"/examples.Busi/TransIn", BusiGrpc+"/examples.Busi/TransOutRevert", req)
err := saga.Submit()
dtmimp.FatalIfError(err)
logger.FatalIfError(err)
return saga.Gid
})
addSample("grpc_saga_wait", func() string {
@ -30,7 +30,7 @@ func init() {
Add(BusiGrpc+"/examples.Busi/TransIn", BusiGrpc+"/examples.Busi/TransOutRevert", req)
saga.WaitResult = true
err := saga.Submit()
dtmimp.FatalIfError(err)
logger.FatalIfError(err)
return saga.Gid
})
}

3
examples/grpc_saga_barrier.go

@ -12,6 +12,7 @@ import (
"github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
"github.com/yedf/dtm/dtmgrpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@ -26,7 +27,7 @@ func init() {
Add(BusiGrpc+"/examples.Busi/TransOutBSaga", BusiGrpc+"/examples.Busi/TransOutRevertBSaga", req).
Add(BusiGrpc+"/examples.Busi/TransInBSaga", BusiGrpc+"/examples.Busi/TransInRevertBSaga", req)
err := saga.Submit()
dtmimp.FatalIfError(err)
logger.FatalIfError(err)
return saga.Gid
})
}

6
examples/grpc_tcc.go

@ -7,14 +7,14 @@
package examples
import (
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
dtmgrpc "github.com/yedf/dtm/dtmgrpc"
emptypb "google.golang.org/protobuf/types/known/emptypb"
)
func init() {
addSample("grpc_tcc", func() string {
dtmimp.Logf("tcc simple transaction begin")
logger.Debugf("tcc simple transaction begin")
gid := dtmgrpc.MustGenGid(DtmGrpcServer)
err := dtmgrpc.TccGlobalTransaction(DtmGrpcServer, gid, func(tcc *dtmgrpc.TccGrpc) error {
data := &BusiReq{Amount: 30}
@ -26,7 +26,7 @@ func init() {
err = tcc.CallBranch(data, BusiGrpc+"/examples.Busi/TransInTcc", BusiGrpc+"/examples.Busi/TransInConfirm", BusiGrpc+"/examples.Busi/TransInRevert", r)
return err
})
dtmimp.FatalIfError(err)
logger.FatalIfError(err)
return gid
})
}

4
examples/grpc_xa.go

@ -9,7 +9,7 @@ package examples
import (
context "context"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
"github.com/yedf/dtm/dtmgrpc"
"google.golang.org/protobuf/types/known/emptypb"
)
@ -27,7 +27,7 @@ func init() {
err = xa.CallBranch(req, BusiGrpc+"/examples.Busi/TransInXa", r)
return err
})
dtmimp.FatalIfError(err)
logger.FatalIfError(err)
return gid
})
}

4
examples/http_gorm_xa.go

@ -9,7 +9,7 @@ package examples
import (
"github.com/go-resty/resty/v2"
"github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
)
func init() {
@ -22,7 +22,7 @@ func init() {
}
return xa.CallBranch(&TransReq{Amount: 30}, Busi+"/TransInXa")
})
dtmimp.FatalIfError(err)
logger.FatalIfError(err)
return gid
})

10
examples/http_msg.go

@ -8,21 +8,21 @@ package examples
import (
"github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
)
func init() {
addSample("msg", func() string {
dtmimp.Logf("a busi transaction begin")
logger.Debugf("a busi transaction begin")
req := &TransReq{Amount: 30}
msg := dtmcli.NewMsg(DtmHttpServer, dtmcli.MustGenGid(DtmHttpServer)).
Add(Busi+"/TransOut", req).
Add(Busi+"/TransIn", req)
err := msg.Prepare(Busi + "/query")
dtmimp.FatalIfError(err)
dtmimp.Logf("busi trans submit")
logger.FatalIfError(err)
logger.Debugf("busi trans submit")
err = msg.Submit()
dtmimp.FatalIfError(err)
logger.FatalIfError(err)
return msg.Gid
})
}

24
examples/http_saga.go

@ -8,36 +8,36 @@ package examples
import (
"github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
)
func init() {
addSample("saga", func() string {
dtmimp.Logf("a saga busi transaction begin")
logger.Debugf("a saga busi transaction begin")
req := &TransReq{Amount: 30}
saga := dtmcli.NewSaga(DtmHttpServer, dtmcli.MustGenGid(DtmHttpServer)).
Add(Busi+"/TransOut", Busi+"/TransOutRevert", req).
Add(Busi+"/TransIn", Busi+"/TransInRevert", req)
dtmimp.Logf("saga busi trans submit")
logger.Debugf("saga busi trans submit")
err := saga.Submit()
dtmimp.Logf("result gid is: %s", saga.Gid)
dtmimp.FatalIfError(err)
logger.Debugf("result gid is: %s", saga.Gid)
logger.FatalIfError(err)
return saga.Gid
})
addSample("saga_wait", func() string {
dtmimp.Logf("a saga busi transaction begin")
logger.Debugf("a saga busi transaction begin")
req := &TransReq{Amount: 30}
saga := dtmcli.NewSaga(DtmHttpServer, dtmcli.MustGenGid(DtmHttpServer)).
Add(Busi+"/TransOut", Busi+"/TransOutRevert", req).
Add(Busi+"/TransIn", Busi+"/TransInRevert", req)
saga.SetOptions(&dtmcli.TransOptions{WaitResult: true})
err := saga.Submit()
dtmimp.Logf("result gid is: %s", saga.Gid)
dtmimp.FatalIfError(err)
logger.Debugf("result gid is: %s", saga.Gid)
logger.FatalIfError(err)
return saga.Gid
})
addSample("concurrent_saga", func() string {
dtmimp.Logf("a concurrent saga busi transaction begin")
logger.Debugf("a concurrent saga busi transaction begin")
req := &TransReq{Amount: 30}
csaga := dtmcli.NewSaga(DtmHttpServer, dtmcli.MustGenGid(DtmHttpServer)).
Add(Busi+"/TransOut", Busi+"/TransOutRevert", req).
@ -47,10 +47,10 @@ func init() {
EnableConcurrent().
AddBranchOrder(2, []int{0, 1}).
AddBranchOrder(3, []int{0, 1})
dtmimp.Logf("concurrent saga busi trans submit")
logger.Debugf("concurrent saga busi trans submit")
err := csaga.Submit()
dtmimp.Logf("result gid is: %s", csaga.Gid)
dtmimp.FatalIfError(err)
logger.Debugf("result gid is: %s", csaga.Gid)
logger.FatalIfError(err)
return csaga.Gid
})
}

7
examples/http_saga_barrier.go

@ -13,6 +13,7 @@ import (
"github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
)
func init() {
@ -23,14 +24,14 @@ func init() {
app.POST(BusiAPI+"/SagaBTransOutCompensate", common.WrapHandler(sagaBarrierTransOutCompensate))
}
addSample("saga_barrier", func() string {
dtmimp.Logf("a busi transaction begin")
logger.Debugf("a busi transaction begin")
req := &TransReq{Amount: 30}
saga := dtmcli.NewSaga(DtmHttpServer, dtmcli.MustGenGid(DtmHttpServer)).
Add(Busi+"/SagaBTransOut", Busi+"/SagaBTransOutCompensate", req).
Add(Busi+"/SagaBTransIn", Busi+"/SagaBTransInCompensate", req)
dtmimp.Logf("busi trans submit")
logger.Debugf("busi trans submit")
err := saga.Submit()
dtmimp.FatalIfError(err)
logger.FatalIfError(err)
return saga.Gid
})
}

8
examples/http_saga_gorm_barrier.go

@ -12,7 +12,7 @@ import (
"github.com/gin-gonic/gin"
"github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
)
func init() {
@ -20,14 +20,14 @@ func init() {
app.POST(BusiAPI+"/SagaBTransOutGorm", common.WrapHandler(sagaGormBarrierTransOut))
}
addSample("saga_gorm_barrier", func() string {
dtmimp.Logf("a busi transaction begin")
logger.Debugf("a busi transaction begin")
req := &TransReq{Amount: 30}
saga := dtmcli.NewSaga(DtmHttpServer, dtmcli.MustGenGid(DtmHttpServer)).
Add(Busi+"/SagaBTransOutGorm", Busi+"/SagaBTransOutCompensate", req).
Add(Busi+"/SagaBTransIn", Busi+"/SagaBTransInCompensate", req)
dtmimp.Logf("busi trans submit")
logger.Debugf("busi trans submit")
err := saga.Submit()
dtmimp.FatalIfError(err)
logger.FatalIfError(err)
return saga.Gid
})

12
examples/http_tcc.go

@ -11,15 +11,15 @@ import (
"github.com/go-resty/resty/v2"
"github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
)
func init() {
setupFuncs["TccSetupSetup"] = func(app *gin.Engine) {
app.POST(BusiAPI+"/TransInTccParent", common.WrapHandler(func(c *gin.Context) (interface{}, error) {
tcc, err := dtmcli.TccFromQuery(c.Request.URL.Query())
dtmimp.FatalIfError(err)
dtmimp.Logf("TransInTccParent ")
logger.FatalIfError(err)
logger.Debugf("TransInTccParent ")
return tcc.CallBranch(&TransReq{Amount: reqFrom(c).Amount}, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert")
}))
}
@ -32,11 +32,11 @@ func init() {
}
return tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TransInTccParent", Busi+"/TransInConfirm", Busi+"/TransInRevert")
})
dtmimp.FatalIfError(err)
logger.FatalIfError(err)
return gid
})
addSample("tcc", func() string {
dtmimp.Logf("tcc simple transaction begin")
logger.Debugf("tcc simple transaction begin")
gid := dtmcli.MustGenGid(DtmHttpServer)
err := dtmcli.TccGlobalTransaction(DtmHttpServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) {
resp, err := tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert")
@ -45,7 +45,7 @@ func init() {
}
return tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert")
})
dtmimp.FatalIfError(err)
logger.FatalIfError(err)
return gid
})
}

5
examples/http_tcc_barrier.go

@ -15,6 +15,7 @@ import (
"github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
)
func init() {
@ -27,7 +28,7 @@ func init() {
app.POST(BusiAPI+"/TccBTransOutCancel", common.WrapHandler(TccBarrierTransOutCancel))
}
addSample("tcc_barrier", func() string {
dtmimp.Logf("tcc transaction begin")
logger.Debugf("tcc transaction begin")
gid := dtmcli.MustGenGid(DtmHttpServer)
err := dtmcli.TccGlobalTransaction(DtmHttpServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) {
resp, err := tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TccBTransOutTry",
@ -37,7 +38,7 @@ func init() {
}
return tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TccBTransInTry", Busi+"/TccBTransInConfirm", Busi+"/TccBTransInCancel")
})
dtmimp.FatalIfError(err)
logger.FatalIfError(err)
return gid
})
}

6
examples/http_xa.go

@ -11,7 +11,7 @@ import (
"github.com/go-resty/resty/v2"
"github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
)
// XaClient XA client connection
@ -25,7 +25,7 @@ func init() {
return xa.HandleCallback(c.Query("gid"), c.Query("branch_id"), c.Query("op"))
}))
})
dtmimp.FatalIfError(err)
logger.FatalIfError(err)
}
addSample("xa", func() string {
gid := dtmcli.MustGenGid(DtmHttpServer)
@ -36,7 +36,7 @@ func init() {
}
return xa.CallBranch(&TransReq{Amount: 30}, Busi+"/TransInXa")
})
dtmimp.FatalIfError(err)
logger.FatalIfError(err)
return gid
})
}

5
examples/quick_start.go

@ -14,6 +14,7 @@ import (
"github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
)
// 启动命令:go run app/main.go qs
@ -28,7 +29,7 @@ var qsBusi = fmt.Sprintf("http://localhost:%d%s", qsBusiPort, qsBusiAPI)
func QsStartSvr() {
app := common.GetGinApp()
qsAddRoute(app)
dtmimp.Logf("quick qs examples listening at %d", qsBusiPort)
logger.Debugf("quick qs examples listening at %d", qsBusiPort)
go app.Run(fmt.Sprintf(":%d", qsBusiPort))
time.Sleep(100 * time.Millisecond)
}
@ -44,7 +45,7 @@ func QsFireRequest() string {
Add(qsBusi+"/TransIn", qsBusi+"/TransInCompensate", req)
// 提交saga事务,dtm会完成所有的子事务/回滚所有的子事务
err := saga.Submit()
dtmimp.FatalIfError(err)
logger.FatalIfError(err)
return saga.Gid
}

2
helper/test-cover.sh

@ -2,7 +2,7 @@ set -x
echo "" > coverage.txt
for store in redis mysql boltdb; do
for d in $(go list ./... | grep -v vendor); do
TEST_STORE=$store go test -covermode count -coverprofile=profile.out -coverpkg=github.com/yedf/dtm/common,github.com/yedf/dtm/dtmcli,github.com/yedf/dtm/dtmcli/dtmimp,github.com/yedf/dtm/dtmgrpc,github.com/yedf/dtm/dtmgrpc/dtmgimp,github.com/yedf/dtm/dtmsvr,github.com/yedf/dtm/dtmsvr/storage,github.com/yedf/dtm/dtmsvr/storage/boltdb,github.com/yedf/dtm/dtmsvr/storage/registry -gcflags=-l $d
TEST_STORE=$store go test -covermode count -coverprofile=profile.out -coverpkg=github.com/yedf/dtm/common,github.com/yedf/dtm/dtmcli,github.com/yedf/dtm/dtmcli/dtmimp,github.com/yedf/dtm/dtmgrpc,github.com/yedf/dtm/dtmgrpc/dtmgimp,github.com/yedf/dtm/dtmsvr,github.com/yedf/dtm/dtmsvr/storage,github.com/yedf/dtm/dtmsvr/storage/boltdb,github.com/yedf/dtm/dtmsvr/storage/redis,github.com/yedf/dtm/dtmsvr/storage/registry,github.com/yedf/dtm/dtmsvr/storage/sql,github.com/yedf/dtm/dtmsvr/storage/boltdb,github.com/yedf/dtm/dtmsvr/storage/registry -gcflags=-l $d
if [ -f profile.out ]; then
cat profile.out >> coverage.txt
echo > profile.out

5
test/base_test.go

@ -15,6 +15,7 @@ import (
"github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
"github.com/yedf/dtm/examples"
)
@ -40,7 +41,7 @@ func TestBaseSqlDB(t *testing.T) {
tx, err := db.ToSQLDB().Begin()
asserts.Nil(err)
err = barrier.Call(tx, func(tx *sql.Tx) error {
dtmimp.Logf("rollback gid2")
logger.Debugf("rollback gid2")
return fmt.Errorf("gid2 error")
})
asserts.Error(err, fmt.Errorf("gid2 error"))
@ -50,7 +51,7 @@ func TestBaseSqlDB(t *testing.T) {
asserts.Equal(dbr.RowsAffected, int64(0))
barrier.BarrierID = 0
err = barrier.CallWithDB(db.ToSQLDB(), func(tx *sql.Tx) error {
dtmimp.Logf("submit gid2")
logger.Debugf("submit gid2")
return nil
})
asserts.Nil(err)

11
test/saga_grpc_test.go

@ -75,6 +75,17 @@ func TestSagaGrpcNormalWait(t *testing.T) {
waitTransProcessed(saga.Gid)
}
func TestSagaGrpcEmptyUrl(t *testing.T) {
saga := dtmgrpc.NewSagaGrpc(examples.DtmGrpcServer, dtmimp.GetFuncName())
req := examples.GenBusiReq(30, false, false)
saga.Add(examples.BusiGrpc+"/examples.Busi/TransOut", examples.BusiGrpc+"/examples.Busi/TransOutRevert", req)
saga.Add("", examples.BusiGrpc+"/examples.Busi/TransInRevert", req)
saga.Submit()
waitTransProcessed(saga.Gid)
assert.Equal(t, []string{StatusPrepared, StatusSucceed, StatusPrepared, StatusSucceed}, getBranchesStatus(saga.Gid))
assert.Equal(t, StatusSucceed, getTransStatus(saga.Gid))
}
func genSagaGrpc(gid string, outFailed bool, inFailed bool) *dtmgrpc.SagaGrpc {
saga := dtmgrpc.NewSagaGrpc(examples.DtmGrpcServer, gid)
req := examples.GenBusiReq(30, outFailed, inFailed)

11
test/saga_test.go

@ -58,6 +58,17 @@ func TestSagaAbnormal(t *testing.T) {
assert.Error(t, err) // a succeed trans can't accept submit
}
func TestSagaEmptyUrl(t *testing.T) {
saga := dtmcli.NewSaga(examples.DtmHttpServer, dtmimp.GetFuncName())
req := examples.GenTransReq(30, false, false)
saga.Add(examples.Busi+"/TransOut", "", &req)
saga.Add("", "", &req)
saga.Submit()
waitTransProcessed(saga.Gid)
assert.Equal(t, []string{StatusPrepared, StatusSucceed, StatusPrepared, StatusSucceed}, getBranchesStatus(saga.Gid))
assert.Equal(t, StatusSucceed, getTransStatus(saga.Gid))
}
func genSaga(gid string, outFailed bool, inFailed bool) *dtmcli.Saga {
saga := dtmcli.NewSaga(examples.DtmHttpServer, gid)
req := examples.GenTransReq(30, outFailed, inFailed)

9
test/tcc_barrier_test.go

@ -19,6 +19,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
"github.com/yedf/dtm/examples"
)
@ -68,7 +69,7 @@ func TestTccBarrierDisorder(t *testing.T) {
res, err := examples.TccBarrierTransOutCancel(c)
if !sleeped {
sleeped = true
dtmimp.Logf("sleep before cancel return")
logger.Debugf("sleep before cancel return")
<-timeoutChan
finishedChan <- "1"
}
@ -89,7 +90,7 @@ func TestTccBarrierDisorder(t *testing.T) {
assert.Contains(t, resp.String(), dtmcli.ResultSuccess)
go func() {
dtmimp.Logf("sleeping to wait for tcc try timeout")
logger.Debugf("sleeping to wait for tcc try timeout")
<-timeoutChan
r, _ := dtmimp.RestyClient.R().
SetBody(body).
@ -104,10 +105,10 @@ func TestTccBarrierDisorder(t *testing.T) {
assert.True(t, strings.Contains(r.String(), dtmcli.ResultSuccess)) // 这个是悬挂操作,为了简单起见,依旧让他返回成功
finishedChan <- "1"
}()
dtmimp.Logf("cron to timeout and then call cancel")
logger.Debugf("cron to timeout and then call cancel")
go cronTransOnceForwardNow(300)
time.Sleep(100 * time.Millisecond)
dtmimp.Logf("cron to timeout and then call cancelled twice")
logger.Debugf("cron to timeout and then call cancelled twice")
cronTransOnceForwardNow(300)
timeoutChan <- "wake"
timeoutChan <- "wake"

3
test/tcc_grpc_test.go

@ -13,6 +13,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
"github.com/yedf/dtm/dtmgrpc"
"github.com/yedf/dtm/examples"
"google.golang.org/protobuf/types/known/emptypb"
@ -70,7 +71,7 @@ func TestTccGrpcNested(t *testing.T) {
func TestTccGrpcType(t *testing.T) {
_, err := dtmgrpc.TccFromGrpc(context.Background())
assert.Error(t, err)
dtmimp.Logf("expecting dtmgrpcserver error")
logger.Debugf("expecting dtmgrpcserver error")
err = dtmgrpc.TccGlobalTransaction("-", "", func(tcc *dtmgrpc.TccGrpc) error { return nil })
assert.Error(t, err)
}

9
test/types.go

@ -12,6 +12,7 @@ import (
"github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
"github.com/yedf/dtm/dtmsvr"
)
@ -23,16 +24,16 @@ func dbGet() *common.DB {
// waitTransProcessed only for test usage. wait for transaction processed once
func waitTransProcessed(gid string) {
dtmimp.Logf("waiting for gid %s", gid)
logger.Debugf("waiting for gid %s", gid)
select {
case id := <-dtmsvr.TransProcessedTestChan:
for id != gid {
dtmimp.LogRedf("-------id %s not match gid %s", id, gid)
logger.Errorf("-------id %s not match gid %s", id, gid)
id = <-dtmsvr.TransProcessedTestChan
}
dtmimp.Logf("finish for gid %s", gid)
logger.Debugf("finish for gid %s", gid)
case <-time.After(time.Duration(time.Second * 3)):
dtmimp.LogFatalf("Wait Trans timeout")
logger.FatalfIf(true, "Wait Trans timeout")
}
}

Loading…
Cancel
Save