diff --git a/app/main.go b/app/main.go index aa87722..edd4b9a 100644 --- a/app/main.go +++ b/app/main.go @@ -15,7 +15,7 @@ import ( "github.com/yedf/dtm/common" "github.com/yedf/dtm/dtmcli" - "github.com/yedf/dtm/dtmcli/dtmimp" + "github.com/yedf/dtm/dtmcli/logger" "github.com/yedf/dtm/dtmsvr" "github.com/yedf/dtm/dtmsvr/storage/registry" "github.com/yedf/dtm/examples" @@ -50,7 +50,7 @@ func main() { fmt.Printf("version: %s commit: %s built at: %s\n", Version, Commit, Date) return } - dtmimp.Logf("starting dtm....") + logger.Infof("starting dtm....") common.MustLoadConfig() if common.Config.ExamplesDB.Driver != "" { dtmcli.SetCurrentDBType(common.Config.ExamplesDB.Driver) @@ -75,7 +75,7 @@ func main() { examples.BaseAppStartup() sample := examples.Samples[os.Args[1]] - dtmimp.LogIfFatalf(sample == nil, "no sample name for %s", os.Args[1]) + logger.FatalfIf(sample == nil, "no sample name for %s", os.Args[1]) sample.Action() } select {} diff --git a/bench/http.go b/bench/http.go index 8a6a470..403abd3 100644 --- a/bench/http.go +++ b/bench/http.go @@ -17,6 +17,7 @@ import ( "github.com/yedf/dtm/common" "github.com/yedf/dtm/dtmcli" "github.com/yedf/dtm/dtmcli/dtmimp" + "github.com/yedf/dtm/dtmcli/logger" "github.com/yedf/dtm/dtmsvr" "github.com/yedf/dtm/examples" ) @@ -32,14 +33,14 @@ var benchBusi = fmt.Sprintf("http://localhost:%d%s", benchPort, benchAPI) func sdbGet() *sql.DB { db, err := dtmimp.PooledDB(common.Config.Store.GetDBConf()) - dtmimp.FatalIfError(err) + logger.FatalIfError(err) return db } func txGet() *sql.Tx { db := sdbGet() tx, err := db.Begin() - dtmimp.FatalIfError(err) + logger.FatalIfError(err) return tx } @@ -50,7 +51,7 @@ func reloadData() { tables := []string{"dtm_busi.user_account", "dtm_busi.user_account_log", "dtm.trans_global", "dtm.trans_branch_op", "dtm_barrier.barrier"} for _, t := range tables { _, err := dtmimp.DBExec(db, fmt.Sprintf("truncate %s", t)) - dtmimp.FatalIfError(err) + logger.FatalIfError(err) } s := "insert ignore into dtm_busi.user_account(user_id, balance) values " ss := []string{} @@ -58,8 +59,8 @@ func reloadData() { ss = append(ss, fmt.Sprintf("(%d, 1000000)", i)) } _, err := dtmimp.DBExec(db, s+strings.Join(ss, ",")) - dtmimp.FatalIfError(err) - dtmimp.Logf("%d users inserted. used: %dms", total, time.Since(began).Milliseconds()) + logger.FatalIfError(err) + logger.Debugf("%d users inserted. used: %dms", total, time.Since(began).Milliseconds()) } var uidCounter int32 = 0 @@ -70,11 +71,11 @@ var sqls int = 1 func StartSvr() { app := common.GetGinApp() benchAddRoute(app) - dtmimp.Logf("bench listening at %d", benchPort) + logger.Debugf("bench listening at %d", benchPort) go app.Run(fmt.Sprintf(":%d", benchPort)) db := sdbGet() _, err := dtmimp.DBExec(db, "drop table if exists dtm_busi.user_account_log") - dtmimp.FatalIfError(err) + logger.FatalIfError(err) _, err = dtmimp.DBExec(db, `create table if not exists dtm_busi.user_account_log ( id INT(11) AUTO_INCREMENT PRIMARY KEY, user_id INT(11) NOT NULL, @@ -89,7 +90,7 @@ func StartSvr() { key(create_time) ) `) - dtmimp.FatalIfError(err) + logger.FatalIfError(err) } func qsAdjustBalance(uid int, amount int, c *gin.Context) (interface{}, error) { @@ -101,21 +102,21 @@ func qsAdjustBalance(uid int, amount int, c *gin.Context) (interface{}, error) { for i := 0; i < sqls; i++ { _, err := dtmimp.DBExec(tx, "insert into dtm_busi.user_account_log(user_id, delta, gid, branch_id, op, reason) values(?,?,?,?,?,?)", uid, amount, tb.Gid, c.Query("branch_id"), tb.TransType, fmt.Sprintf("inserted by dtm transaction %s %s", tb.Gid, c.Query("branch_id"))) - dtmimp.FatalIfError(err) + logger.FatalIfError(err) _, err = dtmimp.DBExec(tx, "update dtm_busi.user_account set balance = balance + ?, update_time = now() where user_id = ?", amount, uid) - dtmimp.FatalIfError(err) + logger.FatalIfError(err) } return nil } if strings.Contains(mode, "barrier") { barrier, err := dtmcli.BarrierFromQuery(c.Request.URL.Query()) - dtmimp.FatalIfError(err) + logger.FatalIfError(err) barrier.Call(txGet(), f) } else { tx := txGet() f(tx) err := tx.Commit() - dtmimp.FatalIfError(err) + logger.FatalIfError(err) } return dtmcli.MapSuccess, nil @@ -150,7 +151,7 @@ func benchAddRoute(app *gin.Engine) { req := gin.H{} params := fmt.Sprintf("?uid=%s", suid) params2 := fmt.Sprintf("?uid=%s", suid2) - dtmimp.Logf("mode: %s contains dtm: %t", mode, strings.Contains(mode, "dtm")) + logger.Debugf("mode: %s contains dtm: %t", mode, strings.Contains(mode, "dtm")) if strings.Contains(mode, "dtm") { saga := dtmcli.NewSaga(examples.DtmHttpServer, fmt.Sprintf("bench-%d", uid)). Add(benchBusi+"/TransOut"+params, benchBusi+"/TransOutCompensate"+params, req). diff --git a/bench/main.go b/bench/main.go index f757c8c..ebd604d 100644 --- a/bench/main.go +++ b/bench/main.go @@ -6,7 +6,7 @@ import ( "github.com/yedf/dtm/common" "github.com/yedf/dtm/dtmcli" - "github.com/yedf/dtm/dtmcli/dtmimp" + "github.com/yedf/dtm/dtmcli/logger" "github.com/yedf/dtm/dtmsvr" "github.com/yedf/dtm/dtmsvr/storage/registry" "github.com/yedf/dtm/examples" @@ -23,7 +23,7 @@ func main() { fmt.Printf(hint) return } - dtmimp.Logf("starting dtm....") + logger.Debugf("starting dtm....") if os.Args[1] == "http" { fmt.Println("start bench server") common.MustLoadConfig() diff --git a/common/config.go b/common/config.go index afc5852..6044305 100644 --- a/common/config.go +++ b/common/config.go @@ -7,7 +7,7 @@ import ( "path/filepath" "github.com/yedf/dtm/dtmcli" - "github.com/yedf/dtm/dtmcli/dtmimp" + "github.com/yedf/dtm/dtmcli/logger" "gopkg.in/yaml.v2" ) @@ -61,6 +61,7 @@ type configType struct { GrpcPort int64 `yaml:"GrpcPort" default:"36790"` MicroService MicroService `yaml:"MicroService"` UpdateBranchSync int64 `yaml:"UpdateBranchSync"` + LogLevel string `yaml:"LogLevel" default:"info"` ExamplesDB dtmcli.DBConf `yaml:"ExamplesDB"` } @@ -82,13 +83,13 @@ func MustLoadConfig() { } if len(cont) != 0 { err := yaml.UnmarshalStrict(cont, &Config) - dtmimp.FatalIfError(err) + logger.FatalIfError(err) } scont, err := json.MarshalIndent(&Config, "", " ") - dtmimp.FatalIfError(err) - dtmimp.Logf("config is: \n%s", scont) + logger.FatalIfError(err) + logger.Debugf("config is: \n%s", scont) err = checkConfig() - dtmimp.LogIfFatalf(err != nil, `config error: '%v'. + logger.FatalfIf(err != nil, `config error: '%v'. check you env, and conf.yml/conf.sample.yml in current and parent path: %s. please visit http://d.dtm.pub to see the config document. loaded config is: diff --git a/common/config_utils.go b/common/config_utils.go index 44c93f6..e54a834 100644 --- a/common/config_utils.go +++ b/common/config_utils.go @@ -50,6 +50,6 @@ func toUnderscoreUpper(key string) string { key = strings.Trim(key, "_") matchFirstCap := regexp.MustCompile("([a-z])([A-Z]+)") s2 := matchFirstCap.ReplaceAllString(key, "${1}_${2}") - // dtmimp.Logf("loading from env: %s", strings.ToUpper(s2)) + // logger.Debugf("loading from env: %s", strings.ToUpper(s2)) return strings.ToUpper(s2) } diff --git a/common/db.go b/common/db.go index a9c315d..f2d1c66 100644 --- a/common/db.go +++ b/common/db.go @@ -11,6 +11,7 @@ import ( _ "github.com/lib/pq" // register postgres driver "github.com/yedf/dtm/dtmcli" "github.com/yedf/dtm/dtmcli/dtmimp" + "github.com/yedf/dtm/dtmcli/logger" "gorm.io/driver/mysql" "gorm.io/driver/postgres" "gorm.io/gorm" @@ -65,7 +66,7 @@ func (op *tracePlugin) Initialize(db *gorm.DB) (err error) { after := func(db *gorm.DB) { _ts, _ := db.InstanceGet("ivy.startTime") sql := db.Dialector.Explain(db.Statement.SQL.String(), db.Statement.Vars...) - dtmimp.Logf("used: %d ms affected: %d sql is: %s", time.Since(_ts.(time.Time)).Milliseconds(), db.RowsAffected, sql) + logger.Debugf("used: %d ms affected: %d sql is: %s", time.Since(_ts.(time.Time)).Milliseconds(), db.RowsAffected, sql) if v, ok := db.InstanceGet("ivy.must"); ok && v.(bool) { if db.Error != nil && db.Error != gorm.ErrRecordNotFound { panic(db.Error) @@ -76,7 +77,7 @@ func (op *tracePlugin) Initialize(db *gorm.DB) (err error) { beforeName := "cb_before" afterName := "cb_after" - dtmimp.Logf("installing db plugin: %s", op.Name()) + logger.Debugf("installing db plugin: %s", op.Name()) // 开始前 _ = db.Callback().Create().Before("gorm:before_create").Register(beforeName, before) _ = db.Callback().Query().Before("gorm:query").Register(beforeName, before) @@ -108,7 +109,7 @@ func DbGet(conf dtmcli.DBConf) *DB { dsn := dtmimp.GetDsn(conf) db, ok := dbs.Load(dsn) if !ok { - dtmimp.Logf("connecting %s", strings.Replace(dsn, conf.Passwrod, "****", 1)) + logger.Debugf("connecting %s", strings.Replace(dsn, conf.Passwrod, "****", 1)) db1, err := gorm.Open(getGormDialetor(conf.Driver, dsn), &gorm.Config{ SkipDefaultTransaction: true, }) diff --git a/common/types.go b/common/types.go index 358dd80..19bd8ea 100644 --- a/common/types.go +++ b/common/types.go @@ -11,7 +11,7 @@ import ( "sync" "github.com/go-redis/redis/v8" - "github.com/yedf/dtm/dtmcli/dtmimp" + "github.com/yedf/dtm/dtmcli/logger" ) var rdb *redis.Client @@ -19,7 +19,7 @@ var once sync.Once func RedisGet() *redis.Client { once.Do(func() { - dtmimp.Logf("connecting to redis: %v", Config.Store) + logger.Debugf("connecting to redis: %v", Config.Store) rdb = redis.NewClient(&redis.Options{ Addr: fmt.Sprintf("%s:%d", Config.Store.Host, Config.Store.Port), Username: Config.Store.User, diff --git a/common/utils.go b/common/utils.go index 9465a6c..a712f6d 100644 --- a/common/utils.go +++ b/common/utils.go @@ -20,12 +20,14 @@ import ( "github.com/yedf/dtm/dtmcli" "github.com/yedf/dtm/dtmcli/dtmimp" + "github.com/yedf/dtm/dtmcli/logger" ) // GetGinApp init and return gin func GetGinApp() *gin.Engine { gin.SetMode(gin.ReleaseMode) - app := gin.Default() + app := gin.New() + app.Use(gin.Recovery()) app.Use(func(c *gin.Context) { body := "" if c.Request.Body != nil { @@ -36,11 +38,8 @@ func GetGinApp() *gin.Engine { c.Request.Body = ioutil.NopCloser(bytes.NewBuffer(rb)) } } - began := time.Now() - dtmimp.Logf("begin %s %s query: %s body: %s", c.Request.Method, c.FullPath(), c.Request.URL.RawQuery, body) + logger.Debugf("begin %s %s query: %s body: %s", c.Request.Method, c.FullPath(), c.Request.URL.RawQuery, body) c.Next() - dtmimp.Logf("used %d ms %s %s query: %s body: %s", time.Since(began).Milliseconds(), c.Request.Method, c.FullPath(), c.Request.URL.RawQuery, body) - }) app.Any("/api/ping", func(c *gin.Context) { c.JSON(200, map[string]interface{}{"msg": "pong"}) }) return app @@ -49,6 +48,7 @@ func GetGinApp() *gin.Engine { // WrapHandler name is clear func WrapHandler(fn func(*gin.Context) (interface{}, error)) gin.HandlerFunc { return func(c *gin.Context) { + began := time.Now() r, err := func() (r interface{}, rerr error) { defer dtmimp.P2E(&rerr) return fn(c) @@ -59,11 +59,12 @@ func WrapHandler(fn func(*gin.Context) (interface{}, error)) gin.HandlerFunc { } else if err == nil { b, err = json.Marshal(r) } + if err != nil { - dtmimp.Logf("status: 500, code: 500 message: %s", err.Error()) + logger.Errorf("%2dms 500 %s %s %s %s", time.Since(began).Milliseconds(), err.Error(), c.Request.Method, c.Request.RequestURI, string(b)) c.JSON(500, map[string]interface{}{"code": 500, "message": err.Error()}) } else { - dtmimp.Logf("status: 200, content: %s", string(b)) + logger.Infof("%2dms 200 %s %s %s", time.Since(began).Milliseconds(), c.Request.Method, c.Request.RequestURI, string(b)) c.Status(200) c.Writer.Header().Add("Content-Type", "application/json") _, err = c.Writer.Write(b) @@ -105,10 +106,10 @@ func GetNextTime(second int64) *time.Time { // RunSQLScript 1 func RunSQLScript(conf dtmcli.DBConf, script string, skipDrop bool) { con, err := dtmimp.StandaloneDB(conf) - dtmimp.FatalIfError(err) + logger.FatalIfError(err) defer func() { con.Close() }() content, err := ioutil.ReadFile(script) - dtmimp.FatalIfError(err) + logger.FatalIfError(err) sqls := strings.Split(string(content), ";") for _, sql := range sqls { s := strings.TrimSpace(sql) @@ -116,6 +117,7 @@ func RunSQLScript(conf dtmcli.DBConf, script string, skipDrop bool) { continue } _, err = dtmimp.DBExec(con, s) - dtmimp.FatalIfError(err) + logger.FatalIfError(err) + logger.Infof("sql scripts finished: %s", s) } } diff --git a/conf.sample.yml b/conf.sample.yml index 3d698c0..cc7f447 100644 --- a/conf.sample.yml +++ b/conf.sample.yml @@ -39,6 +39,8 @@ Store: # specify which engine to store trans status # TimeoutToFail: 35 # timeout for XA, TCC to fail. saga's timeout default to infinite, which can be overwritten in saga options # RetryInterval: 10 # the subtrans branch will be retried after this interval +# LogLevel: 'info' # default: info. can be debug|info|warn|error + ### dtm can run examples, and examples will use following config to connect db ExamplesDB: Driver: 'mysql' diff --git a/dtmcli/barrier.go b/dtmcli/barrier.go index 480ae17..ede7080 100644 --- a/dtmcli/barrier.go +++ b/dtmcli/barrier.go @@ -12,6 +12,7 @@ import ( "net/url" "github.com/yedf/dtm/dtmcli/dtmimp" + "github.com/yedf/dtm/dtmcli/logger" ) // BarrierBusiFunc type for busi func @@ -82,7 +83,7 @@ func (bb *BranchBarrier) Call(tx *sql.Tx, busiCall BarrierBusiFunc) (rerr error) originAffected, _ := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, originType, bid, ti.Op) currentAffected, rerr := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, ti.Op, bid, ti.Op) - dtmimp.Logf("originAffected: %d currentAffected: %d", originAffected, currentAffected) + logger.Debugf("originAffected: %d currentAffected: %d", originAffected, currentAffected) if (ti.Op == BranchCancel || ti.Op == BranchCompensate) && originAffected > 0 || // 这个是空补偿 currentAffected == 0 { // 这个是重复请求或者悬挂 return diff --git a/dtmcli/dtmimp/utils.go b/dtmcli/dtmimp/utils.go index 693f08d..eb264bf 100644 --- a/dtmcli/dtmimp/utils.go +++ b/dtmcli/dtmimp/utils.go @@ -11,23 +11,36 @@ import ( "encoding/json" "errors" "fmt" - "log" "os" "runtime" - "runtime/debug" "strconv" "strings" "sync" "time" "github.com/go-resty/resty/v2" - "go.uber.org/zap" - "go.uber.org/zap/zapcore" + "github.com/yedf/dtm/dtmcli/logger" ) +// Logf an alias of Infof +// Deprecated: use logger.Errorf +var Logf = logger.Infof + +// LogRedf an alias of Errorf +// Deprecated: use logger.Errorf +var LogRedf = logger.Errorf + +// FatalIfError fatal if error is not nil +// Deprecated: use logger.FatalIfError +var FatalIfError = logger.FatalIfError + +// LogIfFatalf fatal if cond is true +// Deprecated: use logger.FatalfIf +var LogIfFatalf = logger.FatalfIf + // AsError wrap a panic value as an error func AsError(x interface{}) error { - LogRedf("panic wrapped to error: '%v'", x) + logger.Errorf("panic wrapped to error: '%v'", x) if e, ok := x.(error); ok { return e } @@ -120,59 +133,6 @@ func MustRemarshal(from interface{}, to interface{}) { E2P(err) } -var logger *zap.SugaredLogger = nil - -func init() { - InitLog() -} - -// InitLog is a initialization for a logger -func InitLog() { - config := zap.NewProductionConfig() - config.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder - if os.Getenv("DTM_DEBUG") != "" { - config.Encoding = "console" - config.EncoderConfig.EncodeLevel = zapcore.CapitalColorLevelEncoder - } - p, err := config.Build(zap.AddCallerSkip(1)) - if err != nil { - log.Fatal("create logger failed: ", err) - } - logger = p.Sugar() -} - -// Logf is log stdout -func Logf(fmt string, args ...interface{}) { - logger.Infof(fmt, args...) -} - -// LogRedf is print error message with red color -func LogRedf(fmt string, args ...interface{}) { - logger.Errorf(fmt, args...) -} - -// FatalExitFunc is a Fatal exit function ,it will be replaced when testing -var FatalExitFunc = func() { os.Exit(1) } - -// LogFatalf is print error message with red color, and execute FatalExitFunc -func LogFatalf(fmt string, args ...interface{}) { - fmt += "\n" + string(debug.Stack()) - LogRedf(fmt, args...) - FatalExitFunc() -} - -// LogIfFatalf is print error message with red color, and execute LogFatalf, when condition is true -func LogIfFatalf(condition bool, fmt string, args ...interface{}) { - if condition { - LogFatalf(fmt, args...) - } -} - -// FatalIfError is print error message with red color, and execute LogIfFatalf. -func FatalIfError(err error) { - LogIfFatalf(err != nil, "Fatal error: %v", err) -} - // GetFuncName get current call func name func GetFuncName() string { pc, _, _, _ := runtime.Caller(1) @@ -223,9 +183,9 @@ func DBExec(db DB, sql string, values ...interface{}) (affected int64, rerr erro used := time.Since(began) / time.Millisecond if rerr == nil { affected, rerr = r.RowsAffected() - Logf("used: %d ms affected: %d for %s %v", used, affected, sql, values) + logger.Debugf("used: %d ms affected: %d for %s %v", used, affected, sql, values) } else { - LogRedf("used: %d ms exec error: %v for %s %v", used, rerr, sql, values) + logger.Errorf("used: %d ms exec error: %v for %s %v", used, rerr, sql, values) } return } @@ -258,7 +218,7 @@ func CheckResponse(resp *resty.Response, err error) error { return err } -// CheckResult is check result. Return err directly if err is not nil. And return corresponding error by calling CheckResponse if resp is the type of *resty.Response. +// CheckResult is check result. Return err directly if err is not nil. And return corresponding error by calling CheckResponse if resp is the type of *resty.Response. // Otherwise, return error by value of str, the string after marshal. func CheckResult(res interface{}, err error) error { if err != nil { diff --git a/dtmcli/dtmimp/utils_test.go b/dtmcli/dtmimp/utils_test.go index 1625ced..167560c 100644 --- a/dtmcli/dtmimp/utils_test.go +++ b/dtmcli/dtmimp/utils_test.go @@ -8,7 +8,6 @@ package dtmimp import ( "errors" - "fmt" "os" "strings" "testing" @@ -80,20 +79,3 @@ func TestSome(t *testing.T) { s2 := MayReplaceLocalhost("http://localhost") assert.Equal(t, "http://localhost", s2) } - -func TestFatal(t *testing.T) { - old := FatalExitFunc - defer func() { - FatalExitFunc = old - }() - FatalExitFunc = func() { panic(fmt.Errorf("fatal")) } - err := CatchP(func() { - LogIfFatalf(true, "") - }) - assert.Error(t, err, fmt.Errorf("fatal")) -} - -func TestInitLog(t *testing.T) { - os.Setenv("DTM_DEBUG", "1") - InitLog() -} diff --git a/dtmcli/dtmimp/vars.go b/dtmcli/dtmimp/vars.go index ee9bad6..c7af1ff 100644 --- a/dtmcli/dtmimp/vars.go +++ b/dtmcli/dtmimp/vars.go @@ -10,6 +10,7 @@ import ( "errors" "github.com/go-resty/resty/v2" + "github.com/yedf/dtm/dtmcli/logger" ) // ErrFailure error of FAILURE @@ -36,12 +37,12 @@ func init() { // RestyClient.SetRetryWaitTime(1 * time.Second) RestyClient.OnBeforeRequest(func(c *resty.Client, r *resty.Request) error { r.URL = MayReplaceLocalhost(r.URL) - Logf("requesting: %s %s %v %v", r.Method, r.URL, r.Body, r.QueryParam) + logger.Debugf("requesting: %s %s %v %v", r.Method, r.URL, r.Body, r.QueryParam) return nil }) RestyClient.OnAfterResponse(func(c *resty.Client, resp *resty.Response) error { r := resp.Request - Logf("requested: %s %s %s", r.Method, r.URL, resp.String()) + logger.Debugf("requested: %s %s %s", r.Method, r.URL, resp.String()) return nil }) } diff --git a/dtmcli/logger/log.go b/dtmcli/logger/log.go new file mode 100644 index 0000000..7d3650a --- /dev/null +++ b/dtmcli/logger/log.go @@ -0,0 +1,64 @@ +package logger + +import ( + "log" + "os" + + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +var logger *zap.SugaredLogger = nil + +func init() { + InitLog("info") +} + +// InitLog is a initialization for a logger +// level can be: debug info warn error +func InitLog(level string) { + config := zap.NewProductionConfig() + err := config.Level.UnmarshalText([]byte(level)) + FatalIfError(err) + config.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder + if os.Getenv("DTM_DEBUG") != "" { + config.Encoding = "console" + config.EncoderConfig.EncodeLevel = zapcore.CapitalColorLevelEncoder + } + p, err := config.Build(zap.AddCallerSkip(1)) + FatalIfError(err) + logger = p.Sugar() +} + +// Debugf log to level debug +func Debugf(fmt string, args ...interface{}) { + logger.Debugf(fmt, args...) +} + +// Infof log to level info +func Infof(fmt string, args ...interface{}) { + logger.Infof(fmt, args...) +} + +// Warnf log to level warn +func Warnf(fmt string, args ...interface{}) { + logger.Warnf(fmt, args...) +} + +// Errorf log to level error +func Errorf(fmt string, args ...interface{}) { + logger.Errorf(fmt, args...) +} + +// FatalfIf log to level error +func FatalfIf(cond bool, fmt string, args ...interface{}) { + if !cond { + return + } + log.Fatalf(fmt, args...) +} + +// FatalIfError if err is not nil, then log to level fatal and call os.Exit +func FatalIfError(err error) { + FatalfIf(err != nil, "fatal error: %v", err) +} diff --git a/dtmcli/logger/logger_test.go b/dtmcli/logger/logger_test.go new file mode 100644 index 0000000..669a39f --- /dev/null +++ b/dtmcli/logger/logger_test.go @@ -0,0 +1,17 @@ +package logger + +import ( + "os" + "testing" +) + +func TestInitLog(t *testing.T) { + os.Setenv("DTM_DEBUG", "1") + InitLog("debug") + Debugf("a debug msg") + Infof("a info msg") + Warnf("a warn msg") + Errorf("a error msg") + FatalfIf(false, "nothing") + FatalIfError(nil) +} diff --git a/dtmgrpc/dtmgimp/grpc_clients.go b/dtmgrpc/dtmgimp/grpc_clients.go index 17c7b54..e437054 100644 --- a/dtmgrpc/dtmgimp/grpc_clients.go +++ b/dtmgrpc/dtmgimp/grpc_clients.go @@ -11,6 +11,7 @@ import ( "sync" "github.com/yedf/dtm/dtmcli/dtmimp" + "github.com/yedf/dtm/dtmcli/logger" "github.com/yedf/dtm/dtmgrpc/dtmgpb" grpc "google.golang.org/grpc" ) @@ -57,12 +58,12 @@ func GetGrpcConn(grpcServer string, isRaw bool) (conn *grpc.ClientConn, rerr err if isRaw { opts = grpc.WithDefaultCallOptions(grpc.ForceCodec(rawCodec{})) } - dtmimp.Logf("grpc client connecting %s", grpcServer) + logger.Debugf("grpc client connecting %s", grpcServer) conn, rerr := grpc.Dial(grpcServer, grpc.WithInsecure(), grpc.WithUnaryInterceptor(GrpcClientLog), opts) if rerr == nil { clients.Store(grpcServer, conn) v = conn - dtmimp.Logf("grpc client inited for %s", grpcServer) + logger.Debugf("grpc client inited for %s", grpcServer) } } return v.(*grpc.ClientConn), rerr diff --git a/dtmgrpc/dtmgimp/types.go b/dtmgrpc/dtmgimp/types.go index a188f23..594c6c9 100644 --- a/dtmgrpc/dtmgimp/types.go +++ b/dtmgrpc/dtmgimp/types.go @@ -9,9 +9,11 @@ package dtmgimp import ( "context" "fmt" + "time" "github.com/yedf/dtm/dtmcli" "github.com/yedf/dtm/dtmcli/dtmimp" + "github.com/yedf/dtm/dtmcli/logger" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -19,28 +21,30 @@ import ( // GrpcServerLog 打印grpc服务端的日志 func GrpcServerLog(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { - dtmimp.Logf("grpc server handling: %s %v", info.FullMethod, req) + began := time.Now() + logger.Debugf("grpc server handling: %s %s", info.FullMethod, dtmimp.MustMarshalString(req)) LogDtmCtx(ctx) m, err := handler(ctx, req) - res := fmt.Sprintf("grpc server handled: %s %v result: %v err: %v", info.FullMethod, req, m, err) + res := fmt.Sprintf("%2dms %v %s %s %s", + time.Since(began).Milliseconds(), err, info.FullMethod, dtmimp.MustMarshalString(m), dtmimp.MustMarshalString(req)) if err != nil { - dtmimp.LogRedf("%s", res) + logger.Errorf("%s", res) } else { - dtmimp.Logf("%s", res) + logger.Infof("%s", res) } return m, err } // GrpcClientLog 打印grpc服务端的日志 func GrpcClientLog(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { - dtmimp.Logf("grpc client calling: %s%s %v", cc.Target(), method, req) + logger.Debugf("grpc client calling: %s%s %v", cc.Target(), method, req) LogDtmCtx(ctx) err := invoker(ctx, method, req, reply, cc, opts...) res := fmt.Sprintf("grpc client called: %s%s %v result: %v err: %v", cc.Target(), method, req, reply, err) if err != nil { - dtmimp.LogRedf("%s", res) + logger.Errorf("%s", res) } else { - dtmimp.Logf("%s", res) + logger.Debugf("%s", res) } return err } @@ -49,7 +53,7 @@ func GrpcClientLog(ctx context.Context, method string, req, reply interface{}, c func Result2Error(res interface{}, err error) error { e := dtmimp.CheckResult(res, err) if e == dtmimp.ErrFailure { - dtmimp.LogRedf("failure: res: %v, err: %v", res, e) + logger.Errorf("failure: res: %v, err: %v", res, e) return status.New(codes.Aborted, dtmcli.ResultFailure).Err() } else if e == dtmimp.ErrOngoing { return status.New(codes.Aborted, dtmcli.ResultOngoing).Err() diff --git a/dtmgrpc/dtmgimp/utils.go b/dtmgrpc/dtmgimp/utils.go index 357d0c7..e600166 100644 --- a/dtmgrpc/dtmgimp/utils.go +++ b/dtmgrpc/dtmgimp/utils.go @@ -10,6 +10,7 @@ import ( context "context" "github.com/yedf/dtm/dtmcli/dtmimp" + "github.com/yedf/dtm/dtmcli/logger" "github.com/yedf/dtm/dtmgrpc/dtmgpb" "google.golang.org/grpc/metadata" "google.golang.org/protobuf/proto" @@ -59,7 +60,7 @@ func TransInfo2Ctx(gid, transType, branchID, op, dtm string) context.Context { func LogDtmCtx(ctx context.Context) { tb := TransBaseFromGrpc(ctx) if tb.Gid != "" { - dtmimp.Logf("gid: %s trans_type: %s branch_id: %s op: %s dtm: %s", tb.Gid, tb.TransType, tb.BranchID, tb.Op, tb.Dtm) + logger.Debugf("gid: %s trans_type: %s branch_id: %s op: %s dtm: %s", tb.Gid, tb.TransType, tb.BranchID, tb.Op, tb.Dtm) } } diff --git a/dtmsvr/api.go b/dtmsvr/api.go index eb26ffb..5ca0e9e 100644 --- a/dtmsvr/api.go +++ b/dtmsvr/api.go @@ -11,6 +11,7 @@ import ( "github.com/yedf/dtm/dtmcli" "github.com/yedf/dtm/dtmcli/dtmimp" + "github.com/yedf/dtm/dtmcli/logger" "github.com/yedf/dtm/dtmsvr/storage" ) @@ -71,7 +72,10 @@ func svcRegisterBranch(transType string, branch *TransBranch, data map[string]st }) if err == storage.ErrNotFound { msg := fmt.Sprintf("no trans with gid: %s status: %s found", branch.Gid, dtmcli.StatusPrepared) + logger.Errorf(msg) return map[string]interface{}{"dtm_result": dtmcli.ResultFailure, "message": msg}, nil } + logger.Infof("LockGlobalSaveBranches result: %v: gid: %s old status: %s branches: %s", + err, branch.Gid, dtmcli.StatusPrepared, dtmimp.MustMarshalString(branches)) return dtmimp.If(err != nil, nil, dtmcli.MapSuccess), err } diff --git a/dtmsvr/cron.go b/dtmsvr/cron.go index c4ef282..120b74c 100644 --- a/dtmsvr/cron.go +++ b/dtmsvr/cron.go @@ -13,6 +13,7 @@ import ( "time" "github.com/yedf/dtm/dtmcli/dtmimp" + "github.com/yedf/dtm/dtmcli/logger" ) // NowForwardDuration will be set in test, trans may be timeout @@ -49,12 +50,13 @@ func lockOneTrans(expireIn time.Duration) *TransGlobal { if global == nil { return nil } + logger.Infof("cron job return a trans: %s", global.String()) return &TransGlobal{TransGlobalStore: *global} } func handlePanic(perr *error) { if err := recover(); err != nil { - dtmimp.LogRedf("----recovered panic %v\n%s", err, string(debug.Stack())) + logger.Errorf("----recovered panic %v\n%s", err, string(debug.Stack())) if perr != nil { *perr = fmt.Errorf("dtm panic: %v", err) } @@ -64,6 +66,6 @@ func handlePanic(perr *error) { func sleepCronTime() { normal := time.Duration((float64(config.TransCronInterval) - rand.Float64()) * float64(time.Second)) interval := dtmimp.If(CronForwardDuration > 0, 1*time.Millisecond, normal).(time.Duration) - dtmimp.Logf("sleeping for %v milli", interval/time.Microsecond) + logger.Debugf("sleeping for %v milli", interval/time.Microsecond) time.Sleep(interval) } diff --git a/dtmsvr/storage/boltdb/boltdb.go b/dtmsvr/storage/boltdb/boltdb.go index f5b9670..d02ba5c 100644 --- a/dtmsvr/storage/boltdb/boltdb.go +++ b/dtmsvr/storage/boltdb/boltdb.go @@ -11,6 +11,7 @@ import ( "github.com/yedf/dtm/common" "github.com/yedf/dtm/dtmcli/dtmimp" + "github.com/yedf/dtm/dtmcli/logger" "github.com/yedf/dtm/dtmsvr/storage" ) @@ -100,9 +101,9 @@ func cleanupGlobalWithGids(t *bolt.Tx, gids map[string]struct{}) { return } - dtmimp.Logf("Start to cleanup %d gids", len(gids)) + logger.Debugf("Start to cleanup %d gids", len(gids)) for gid := range gids { - dtmimp.Logf("Start to delete gid: %s", gid) + logger.Debugf("Start to delete gid: %s", gid) bucket.Delete([]byte(gid)) } } @@ -129,9 +130,9 @@ func cleanupBranchWithGids(t *bolt.Tx, gids map[string]struct{}) { } } - dtmimp.Logf("Start to cleanup %d branches", len(branchKeys)) + logger.Debugf("Start to cleanup %d branches", len(branchKeys)) for _, key := range branchKeys { - dtmimp.Logf("Start to delete branch: %s", key) + logger.Debugf("Start to delete branch: %s", key) bucket.Delete([]byte(key)) } } @@ -155,9 +156,9 @@ func cleanupIndexWithGids(t *bolt.Tx, gids map[string]struct{}) { } } - dtmimp.Logf("Start to cleanup %d indexes", len(indexKeys)) + logger.Debugf("Start to cleanup %d indexes", len(indexKeys)) for _, key := range indexKeys { - dtmimp.Logf("Start to delete index: %s", key) + logger.Debugf("Start to delete index: %s", key) bucket.Delete([]byte(key)) } } @@ -241,6 +242,7 @@ func (s *BoltdbStore) PopulateData(skipDrop bool) { return nil }) dtmimp.E2P(err) + logger.Infof("Reset all data for boltdb") } } diff --git a/dtmsvr/storage/redis/redis.go b/dtmsvr/storage/redis/redis.go index 348e572..001b849 100644 --- a/dtmsvr/storage/redis/redis.go +++ b/dtmsvr/storage/redis/redis.go @@ -10,6 +10,7 @@ import ( "github.com/yedf/dtm/common" "github.com/yedf/dtm/dtmcli/dtmimp" + "github.com/yedf/dtm/dtmcli/logger" "github.com/yedf/dtm/dtmsvr/storage" ) @@ -26,8 +27,11 @@ func (s *RedisStore) Ping() error { } func (s *RedisStore) PopulateData(skipDrop bool) { - _, err := redisGet().FlushAll(ctx).Result() - dtmimp.PanicIf(err != nil, err) + if !skipDrop { + _, err := redisGet().FlushAll(ctx).Result() + logger.Infof("call redis flushall. result: %v", err) + dtmimp.PanicIf(err != nil, err) + } } func (s *RedisStore) FindTransGlobalStore(gid string) *storage.TransGlobalStore { @@ -114,7 +118,7 @@ func (a *argList) AppendBranches(branches []storage.TransBranchStore) *argList { } func handleRedisResult(ret interface{}, err error) (string, error) { - dtmimp.Logf("result is: '%v', err: '%v'", ret, err) + logger.Debugf("result is: '%v', err: '%v'", ret, err) if err != nil && err != redis.Nil { return "", err } @@ -127,7 +131,7 @@ func handleRedisResult(ret interface{}, err error) (string, error) { } func callLua(a *argList, lua string) (string, error) { - dtmimp.Logf("calling lua. args: %v\nlua:%s", a, lua) + logger.Debugf("calling lua. args: %v\nlua:%s", a, lua) ret, err := redisGet().Eval(ctx, lua, a.Keys, a.List...).Result() return handleRedisResult(ret, err) } @@ -201,7 +205,6 @@ if os.status ~= ARGV[4] then return 'NOT_FOUND' end redis.call('SET', KEYS[1], ARGV[3], 'EX', ARGV[2]) -redis.log(redis.LOG_WARNING, 'finished: ', ARGV[5]) if ARGV[5] == '1' then redis.call('ZREM', KEYS[3], gs.gid) end diff --git a/dtmsvr/storage/trans.go b/dtmsvr/storage/trans.go index dbabc01..24abd58 100644 --- a/dtmsvr/storage/trans.go +++ b/dtmsvr/storage/trans.go @@ -5,6 +5,7 @@ import ( "github.com/yedf/dtm/common" "github.com/yedf/dtm/dtmcli" + "github.com/yedf/dtm/dtmcli/dtmimp" ) type TransGlobalStore struct { @@ -29,10 +30,14 @@ type TransGlobalStore struct { } // TableName TableName -func (*TransGlobalStore) TableName() string { +func (g *TransGlobalStore) TableName() string { return "dtm.trans_global" } +func (g *TransGlobalStore) String() string { + return dtmimp.MustMarshalString(g) +} + // TransBranchStore branch transaction type TransBranchStore struct { common.ModelBase @@ -47,6 +52,10 @@ type TransBranchStore struct { } // TableName TableName -func (*TransBranchStore) TableName() string { +func (b *TransBranchStore) TableName() string { return "dtm.trans_branch_op" } + +func (b *TransBranchStore) String() string { + return dtmimp.MustMarshalString(*b) +} diff --git a/dtmsvr/svr.go b/dtmsvr/svr.go index ac0dbf2..2aa8950 100644 --- a/dtmsvr/svr.go +++ b/dtmsvr/svr.go @@ -13,7 +13,7 @@ import ( grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" "github.com/yedf/dtm/common" - "github.com/yedf/dtm/dtmcli/dtmimp" + "github.com/yedf/dtm/dtmcli/logger" "github.com/yedf/dtm/dtmgrpc/dtmgimp" "github.com/yedf/dtm/dtmgrpc/dtmgpb" "github.com/yedf/dtmdriver" @@ -22,32 +22,32 @@ import ( // StartSvr StartSvr func StartSvr() { - dtmimp.Logf("start dtmsvr") + logger.Infof("start dtmsvr") app := common.GetGinApp() app = httpMetrics(app) addRoute(app) - dtmimp.Logf("dtmsvr listen at: %d", config.HttpPort) + logger.Infof("dtmsvr listen at: %d", config.HttpPort) go app.Run(fmt.Sprintf(":%d", config.HttpPort)) lis, err := net.Listen("tcp", fmt.Sprintf(":%d", config.GrpcPort)) - dtmimp.FatalIfError(err) + logger.FatalIfError(err) s := grpc.NewServer( grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer( grpc.UnaryServerInterceptor(grpcMetrics), grpc.UnaryServerInterceptor(dtmgimp.GrpcServerLog)), )) dtmgpb.RegisterDtmServer(s, &dtmServer{}) - dtmimp.Logf("grpc listening at %v", lis.Addr()) + logger.Infof("grpc listening at %v", lis.Addr()) go func() { err := s.Serve(lis) - dtmimp.FatalIfError(err) + logger.FatalIfError(err) }() go updateBranchAsync() time.Sleep(100 * time.Millisecond) err = dtmdriver.Use(config.MicroService.Driver) - dtmimp.FatalIfError(err) + logger.FatalIfError(err) err = dtmdriver.GetDriver().RegisterGrpcService(config.MicroService.Target, config.MicroService.EndPoint) - dtmimp.FatalIfError(err) + logger.FatalIfError(err) } // PopulateDB setup mysql data @@ -79,11 +79,11 @@ func updateBranchAsync() { for len(updates) > 0 { dbr := GetStore().UpdateBranchesSql(updates, []string{"status", "finish_time", "update_time"}) - dtmimp.Logf("flushed %d branch status to db. affected: %d", len(updates), dbr.RowsAffected) if dbr.Error != nil { - dtmimp.LogRedf("async update branch status error: %v", dbr.Error) + logger.Errorf("async update branch status error: %v", dbr.Error) time.Sleep(1 * time.Second) } else { + logger.Infof("flushed %d branch status to db. affected: %d", len(updates), dbr.RowsAffected) updates = []TransBranch{} } } diff --git a/dtmsvr/trans_class.go b/dtmsvr/trans_class.go index d8ea01b..faf3125 100644 --- a/dtmsvr/trans_class.go +++ b/dtmsvr/trans_class.go @@ -12,6 +12,7 @@ import ( "github.com/gin-gonic/gin" "github.com/yedf/dtm/dtmcli" "github.com/yedf/dtm/dtmcli/dtmimp" + "github.com/yedf/dtm/dtmcli/logger" "github.com/yedf/dtm/dtmgrpc/dtmgpb" "github.com/yedf/dtm/dtmsvr/storage" ) @@ -57,7 +58,7 @@ func TransFromContext(c *gin.Context) *TransGlobal { e2p(err) m := TransGlobal{} dtmimp.MustUnmarshal(b, &m) - dtmimp.Logf("creating trans in prepare") + logger.Debugf("creating trans in prepare") // Payloads will be store in BinPayloads, Payloads is only used to Unmarshal for _, p := range m.Payloads { m.BinPayloads = append(m.BinPayloads, []byte(p)) diff --git a/dtmsvr/trans_process.go b/dtmsvr/trans_process.go index 38a68b9..721919e 100644 --- a/dtmsvr/trans_process.go +++ b/dtmsvr/trans_process.go @@ -12,6 +12,7 @@ import ( "github.com/yedf/dtm/common" "github.com/yedf/dtm/dtmcli" "github.com/yedf/dtm/dtmcli/dtmimp" + "github.com/yedf/dtm/dtmcli/logger" ) // Process process global transaction once @@ -44,16 +45,16 @@ func (t *TransGlobal) process() map[string]interface{} { func (t *TransGlobal) processInner() (rerr error) { defer handlePanic(&rerr) defer func() { - if rerr != nil { - dtmimp.LogRedf("processInner got error: %s", rerr.Error()) + if rerr != nil && rerr != dtmcli.ErrOngoing { + logger.Errorf("processInner got error: %s", rerr.Error()) } if TransProcessedTestChan != nil { - dtmimp.Logf("processed: %s", t.Gid) + logger.Debugf("processed: %s", t.Gid) TransProcessedTestChan <- t.Gid - dtmimp.Logf("notified: %s", t.Gid) + logger.Debugf("notified: %s", t.Gid) } }() - dtmimp.Logf("processing: %s status: %s", t.Gid, t.Status) + logger.Debugf("processing: %s status: %s", t.Gid, t.Status) branches := GetStore().FindBranches(t.Gid) t.lastTouched = time.Now() rerr = t.getProcessor().ProcessOnce(branches) @@ -71,5 +72,8 @@ func (t *TransGlobal) saveNew() error { now := time.Now() t.CreateTime = &now t.UpdateTime = &now - return GetStore().MaySaveNewTrans(&t.TransGlobalStore, branches) + err := GetStore().MaySaveNewTrans(&t.TransGlobalStore, branches) + logger.Infof("MaySaveNewTrans result: %v, global: %v branches: %v", + err, t.TransGlobalStore.String(), dtmimp.MustMarshalString(branches)) + return err } diff --git a/dtmsvr/trans_status.go b/dtmsvr/trans_status.go index b7fda72..371bef8 100644 --- a/dtmsvr/trans_status.go +++ b/dtmsvr/trans_status.go @@ -13,6 +13,7 @@ import ( "github.com/yedf/dtm/dtmcli" "github.com/yedf/dtm/dtmcli/dtmimp" + "github.com/yedf/dtm/dtmcli/logger" "github.com/yedf/dtm/dtmgrpc/dtmgimp" "github.com/yedf/dtmdriver" "google.golang.org/grpc/codes" @@ -22,6 +23,7 @@ import ( func (t *TransGlobal) touchCronTime(ctype cronType) { t.lastTouched = time.Now() GetStore().TouchCronTime(&t.TransGlobalStore, t.getNextCronInterval(ctype)) + logger.Infof("TouchCronTime for: %s", t.TransGlobalStore.String()) } func (t *TransGlobal) changeStatus(status string) { @@ -36,6 +38,7 @@ func (t *TransGlobal) changeStatus(status string) { } t.UpdateTime = &now GetStore().ChangeGlobalStatus(&t.TransGlobalStore, status, updates, status == dtmcli.StatusSucceed || status == dtmcli.StatusFailed) + logger.Infof("ChangeGlobalStatus to %s ok for %s", status, t.TransGlobalStore.String()) t.Status = status } @@ -46,6 +49,8 @@ func (t *TransGlobal) changeBranchStatus(b *TransBranch, status string, branchPo b.UpdateTime = &now if config.Store.Driver != dtmimp.DBTypeMysql && config.Store.Driver != dtmimp.DBTypePostgres || config.UpdateBranchSync > 0 || t.updateBranchSync { GetStore().LockGlobalSaveBranches(t.Gid, t.Status, []TransBranch{*b}, branchPos) + logger.Infof("LockGlobalSaveBranches ok: gid: %s old status: %s branches: %s", + b.Gid, dtmcli.StatusPrepared, b.String()) } else { // 为了性能优化,把branch的status更新异步化 updateBranchAsyncChan <- branchStatus{id: b.ID, status: status, finishTime: &now} } @@ -67,6 +72,9 @@ func (t *TransGlobal) needProcess() bool { } func (t *TransGlobal) getURLResult(url string, branchID, op string, branchPayload []byte) (string, error) { + if url == "" { // empty url is success + return dtmcli.ResultSuccess, nil + } if t.Protocol == "grpc" { dtmimp.PanicIf(strings.HasPrefix(url, "http"), fmt.Errorf("bad url for grpc: %s", url)) server, method, err := dtmdriver.GetDriver().ParseServerMethod(url) diff --git a/dtmsvr/trans_type_msg.go b/dtmsvr/trans_type_msg.go index f6bc9b1..63a62b6 100644 --- a/dtmsvr/trans_type_msg.go +++ b/dtmsvr/trans_type_msg.go @@ -11,7 +11,7 @@ import ( "strings" "github.com/yedf/dtm/dtmcli" - "github.com/yedf/dtm/dtmcli/dtmimp" + "github.com/yedf/dtm/dtmcli/logger" ) type transMsgProcessor struct { @@ -50,7 +50,7 @@ func (t *TransGlobal) mayQueryPrepared() { } else if strings.Contains(body, dtmcli.ResultOngoing) { t.touchCronTime(cronReset) } else { - dtmimp.LogRedf("getting result failed for %s. error: %s", t.QueryPrepared, err.Error()) + logger.Errorf("getting result failed for %s. error: %s", t.QueryPrepared, err.Error()) t.touchCronTime(cronBackoff) } } diff --git a/dtmsvr/trans_type_saga.go b/dtmsvr/trans_type_saga.go index dad3d6f..e710d46 100644 --- a/dtmsvr/trans_type_saga.go +++ b/dtmsvr/trans_type_saga.go @@ -12,6 +12,7 @@ import ( "github.com/yedf/dtm/dtmcli" "github.com/yedf/dtm/dtmcli/dtmimp" + "github.com/yedf/dtm/dtmcli/logger" ) type transSagaProcessor struct { @@ -54,7 +55,7 @@ type branchResult struct { func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error { // when saga tasks is fetched, it always need to process - dtmimp.Logf("status: %s timeout: %t", t.Status, t.isTimeout()) + logger.Debugf("status: %s timeout: %t", t.Status, t.isTimeout()) if t.Status == dtmcli.StatusSubmitted && t.isTimeout() { t.changeStatus(dtmcli.StatusAborting) } @@ -103,8 +104,8 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error { err = dtmimp.AsError(x) } resultChan <- branchResult{index: i, status: branches[i].Status, op: branches[i].Op} - if err != nil { - dtmimp.LogRedf("exec branch error: %v", err) + if err != nil && err != dtmcli.ErrOngoing { + logger.Errorf("exec branch error: %v", err) } }() err = t.execBranch(&branches[i], i) @@ -117,7 +118,7 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error { toRun = append(toRun, current) } } - dtmimp.Logf("toRun picked for action is: %v", toRun) + logger.Debugf("toRun picked for action is: %v", toRun) return toRun } runBranches := func(toRun []int) { @@ -159,9 +160,9 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error { rsCSucceed++ } } - dtmimp.Logf("branch done: %v", r) + logger.Debugf("branch done: %v", r) case <-time.After(time.Duration(time.Second * 3)): - dtmimp.Logf("wait once for done") + logger.Debugf("wait once for done") } } diff --git a/dtmsvr/trans_type_tcc.go b/dtmsvr/trans_type_tcc.go index 88145d1..2c32ab6 100644 --- a/dtmsvr/trans_type_tcc.go +++ b/dtmsvr/trans_type_tcc.go @@ -9,6 +9,7 @@ package dtmsvr import ( "github.com/yedf/dtm/dtmcli" "github.com/yedf/dtm/dtmcli/dtmimp" + "github.com/yedf/dtm/dtmcli/logger" ) type transTccProcessor struct { @@ -33,7 +34,7 @@ func (t *transTccProcessor) ProcessOnce(branches []TransBranch) error { op := dtmimp.If(t.Status == dtmcli.StatusSubmitted, dtmcli.BranchConfirm, dtmcli.BranchCancel).(string) for current := len(branches) - 1; current >= 0; current-- { if branches[current].Op == op && branches[current].Status == dtmcli.StatusPrepared { - dtmimp.Logf("branch info: current: %d ID: %d", current, branches[current].ID) + logger.Debugf("branch info: current: %d ID: %d", current, branches[current].ID) err := t.execBranch(&branches[current], current) if err != nil { return err diff --git a/examples/base_grpc.go b/examples/base_grpc.go index 3bff6a5..07eb749 100644 --- a/examples/base_grpc.go +++ b/examples/base_grpc.go @@ -16,6 +16,7 @@ import ( "github.com/gin-gonic/gin" "github.com/yedf/dtm/dtmcli" "github.com/yedf/dtm/dtmcli/dtmimp" + "github.com/yedf/dtm/dtmcli/logger" "github.com/yedf/dtm/dtmgrpc" "github.com/yedf/dtm/dtmgrpc/dtmgimp" @@ -44,25 +45,25 @@ func init() { // GrpcStartup for grpc func GrpcStartup() { conn, err := grpc.Dial(DtmGrpcServer, grpc.WithInsecure(), grpc.WithUnaryInterceptor(dtmgimp.GrpcClientLog)) - dtmimp.FatalIfError(err) + logger.FatalIfError(err) DtmClient = dtmgpb.NewDtmClient(conn) - dtmimp.Logf("dtm client inited") + logger.Debugf("dtm client inited") lis, err := net.Listen("tcp", fmt.Sprintf(":%d", BusiGrpcPort)) - dtmimp.FatalIfError(err) + logger.FatalIfError(err) s := grpc.NewServer(grpc.UnaryInterceptor(dtmgimp.GrpcServerLog)) RegisterBusiServer(s, &busiServer{}) go func() { - dtmimp.Logf("busi grpc listening at %v", lis.Addr()) + logger.Debugf("busi grpc listening at %v", lis.Addr()) err := s.Serve(lis) - dtmimp.FatalIfError(err) + logger.FatalIfError(err) }() time.Sleep(100 * time.Millisecond) } func handleGrpcBusiness(in *BusiReq, result1 string, result2 string, busi string) error { res := dtmimp.OrString(result1, result2, dtmcli.ResultSuccess) - dtmimp.Logf("grpc busi %s %v %s %s result: %s", busi, in, result1, result2, res) + logger.Debugf("grpc busi %s %v %s %s result: %s", busi, in, result1, result2, res) if res == dtmcli.ResultSuccess { return nil } else if res == dtmcli.ResultFailure { @@ -137,9 +138,9 @@ func (s *busiServer) TransOutXa(ctx context.Context, in *BusiReq) (*emptypb.Empt func (s *busiServer) TransInTccNested(ctx context.Context, in *BusiReq) (*emptypb.Empty, error) { tcc, err := dtmgrpc.TccFromGrpc(ctx) - dtmimp.FatalIfError(err) + logger.FatalIfError(err) r := &emptypb.Empty{} err = tcc.CallBranch(in, BusiGrpc+"/examples.Busi/TransIn", BusiGrpc+"/examples.Busi/TransInConfirm", BusiGrpc+"/examples.Busi/TransInRevert", r) - dtmimp.FatalIfError(err) + logger.FatalIfError(err) return r, handleGrpcBusiness(in, MainSwitch.TransInResult.Fetch(), in.TransInResult, dtmimp.GetFuncName()) } diff --git a/examples/base_http.go b/examples/base_http.go index 8aa7f37..401d1f3 100644 --- a/examples/base_http.go +++ b/examples/base_http.go @@ -17,6 +17,7 @@ import ( "github.com/yedf/dtm/common" "github.com/yedf/dtm/dtmcli" "github.com/yedf/dtm/dtmcli/dtmimp" + "github.com/yedf/dtm/dtmcli/logger" "gorm.io/driver/mysql" "gorm.io/driver/postgres" "gorm.io/gorm" @@ -40,7 +41,7 @@ var Busi string = fmt.Sprintf("http://localhost:%d%s", BusiPort, BusiAPI) // BaseAppStartup base app startup func BaseAppStartup() *gin.Engine { - dtmimp.Logf("examples starting") + logger.Debugf("examples starting") app := common.GetGinApp() app.Use(func(c *gin.Context) { v := MainSwitch.NextResult.Fetch() @@ -54,10 +55,10 @@ func BaseAppStartup() *gin.Engine { BaseAddRoute(app) for k, v := range setupFuncs { - dtmimp.Logf("initing %s", k) + logger.Debugf("initing %s", k) v(app) } - dtmimp.Logf("Starting busi at: %d", BusiPort) + logger.Debugf("Starting busi at: %d", BusiPort) go app.Run(fmt.Sprintf(":%d", BusiPort)) time.Sleep(100 * time.Millisecond) @@ -98,7 +99,7 @@ var MainSwitch mainSwitchType func handleGeneralBusiness(c *gin.Context, result1 string, result2 string, busi string) (interface{}, error) { info := infoFromContext(c) res := dtmimp.OrString(result1, result2, dtmcli.ResultSuccess) - dtmimp.Logf("%s %s result: %s", busi, info.String(), res) + logger.Debugf("%s %s result: %s", busi, info.String(), res) if res == "ERROR" { return nil, errors.New("ERROR from user") } @@ -136,7 +137,7 @@ func BaseAddRoute(app *gin.Engine) { return handleGeneralBusiness(c, MainSwitch.TransOutRevertResult.Fetch(), "", "TransOutRevert") })) app.GET(BusiAPI+"/CanSubmit", common.WrapHandler(func(c *gin.Context) (interface{}, error) { - dtmimp.Logf("%s CanSubmit", c.Query("gid")) + logger.Debugf("%s CanSubmit", c.Query("gid")) return dtmimp.OrString(MainSwitch.CanSubmitResult.Fetch(), dtmcli.ResultSuccess), nil })) app.POST(BusiAPI+"/TransInXa", common.WrapHandler(func(c *gin.Context) (interface{}, error) { diff --git a/examples/base_types.go b/examples/base_types.go index a40f491..f906b4a 100644 --- a/examples/base_types.go +++ b/examples/base_types.go @@ -15,6 +15,7 @@ import ( "github.com/yedf/dtm/common" "github.com/yedf/dtm/dtmcli" "github.com/yedf/dtm/dtmcli/dtmimp" + "github.com/yedf/dtm/dtmcli/logger" "github.com/yedf/dtm/dtmgrpc" ) @@ -58,7 +59,7 @@ func reqFrom(c *gin.Context) *TransReq { if !ok { req := TransReq{} err := c.BindJSON(&req) - dtmimp.FatalIfError(err) + logger.FatalIfError(err) c.Set("trans_req", &req) v = &req } @@ -81,27 +82,27 @@ func dbGet() *common.DB { func sdbGet() *sql.DB { db, err := dtmimp.PooledDB(config.ExamplesDB) - dtmimp.FatalIfError(err) + logger.FatalIfError(err) return db } func txGet() *sql.Tx { db := sdbGet() tx, err := db.Begin() - dtmimp.FatalIfError(err) + logger.FatalIfError(err) return tx } // MustBarrierFromGin 1 func MustBarrierFromGin(c *gin.Context) *dtmcli.BranchBarrier { ti, err := dtmcli.BarrierFromQuery(c.Request.URL.Query()) - dtmimp.FatalIfError(err) + logger.FatalIfError(err) return ti } // MustBarrierFromGrpc 1 func MustBarrierFromGrpc(ctx context.Context) *dtmcli.BranchBarrier { ti, err := dtmgrpc.BarrierFromGrpc(ctx) - dtmimp.FatalIfError(err) + logger.FatalIfError(err) return ti } diff --git a/examples/data.go b/examples/data.go index b0459c4..0a4f52c 100644 --- a/examples/data.go +++ b/examples/data.go @@ -10,7 +10,7 @@ import ( "fmt" "github.com/yedf/dtm/common" - "github.com/yedf/dtm/dtmcli/dtmimp" + "github.com/yedf/dtm/dtmcli/logger" ) var config = &common.Config @@ -50,6 +50,6 @@ type sampleInfo struct { var Samples = map[string]*sampleInfo{} func addSample(name string, fn func() string) { - dtmimp.LogIfFatalf(Samples[name] != nil, "%s already exists", name) + logger.FatalfIf(Samples[name] != nil, "%s already exists", name) Samples[name] = &sampleInfo{Arg: name, Action: fn} } diff --git a/examples/grpc_msg.go b/examples/grpc_msg.go index 670e8e6..2bbe650 100644 --- a/examples/grpc_msg.go +++ b/examples/grpc_msg.go @@ -7,7 +7,7 @@ package examples import ( - "github.com/yedf/dtm/dtmcli/dtmimp" + "github.com/yedf/dtm/dtmcli/logger" dtmgrpc "github.com/yedf/dtm/dtmgrpc" ) @@ -19,7 +19,7 @@ func init() { Add(BusiGrpc+"/examples.Busi/TransOut", req). Add(BusiGrpc+"/examples.Busi/TransIn", req) err := msg.Submit() - dtmimp.FatalIfError(err) + logger.FatalIfError(err) return msg.Gid }) } diff --git a/examples/grpc_saga.go b/examples/grpc_saga.go index fad3d8f..3a29de4 100644 --- a/examples/grpc_saga.go +++ b/examples/grpc_saga.go @@ -7,7 +7,7 @@ package examples import ( - "github.com/yedf/dtm/dtmcli/dtmimp" + "github.com/yedf/dtm/dtmcli/logger" dtmgrpc "github.com/yedf/dtm/dtmgrpc" ) @@ -19,7 +19,7 @@ func init() { Add(BusiGrpc+"/examples.Busi/TransOut", BusiGrpc+"/examples.Busi/TransOutRevert", req). Add(BusiGrpc+"/examples.Busi/TransIn", BusiGrpc+"/examples.Busi/TransOutRevert", req) err := saga.Submit() - dtmimp.FatalIfError(err) + logger.FatalIfError(err) return saga.Gid }) addSample("grpc_saga_wait", func() string { @@ -30,7 +30,7 @@ func init() { Add(BusiGrpc+"/examples.Busi/TransIn", BusiGrpc+"/examples.Busi/TransOutRevert", req) saga.WaitResult = true err := saga.Submit() - dtmimp.FatalIfError(err) + logger.FatalIfError(err) return saga.Gid }) } diff --git a/examples/grpc_saga_barrier.go b/examples/grpc_saga_barrier.go index 777b236..892e140 100644 --- a/examples/grpc_saga_barrier.go +++ b/examples/grpc_saga_barrier.go @@ -12,6 +12,7 @@ import ( "github.com/yedf/dtm/dtmcli" "github.com/yedf/dtm/dtmcli/dtmimp" + "github.com/yedf/dtm/dtmcli/logger" "github.com/yedf/dtm/dtmgrpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -26,7 +27,7 @@ func init() { Add(BusiGrpc+"/examples.Busi/TransOutBSaga", BusiGrpc+"/examples.Busi/TransOutRevertBSaga", req). Add(BusiGrpc+"/examples.Busi/TransInBSaga", BusiGrpc+"/examples.Busi/TransInRevertBSaga", req) err := saga.Submit() - dtmimp.FatalIfError(err) + logger.FatalIfError(err) return saga.Gid }) } diff --git a/examples/grpc_tcc.go b/examples/grpc_tcc.go index a806983..5a6c4fd 100644 --- a/examples/grpc_tcc.go +++ b/examples/grpc_tcc.go @@ -7,14 +7,14 @@ package examples import ( - "github.com/yedf/dtm/dtmcli/dtmimp" + "github.com/yedf/dtm/dtmcli/logger" dtmgrpc "github.com/yedf/dtm/dtmgrpc" emptypb "google.golang.org/protobuf/types/known/emptypb" ) func init() { addSample("grpc_tcc", func() string { - dtmimp.Logf("tcc simple transaction begin") + logger.Debugf("tcc simple transaction begin") gid := dtmgrpc.MustGenGid(DtmGrpcServer) err := dtmgrpc.TccGlobalTransaction(DtmGrpcServer, gid, func(tcc *dtmgrpc.TccGrpc) error { data := &BusiReq{Amount: 30} @@ -26,7 +26,7 @@ func init() { err = tcc.CallBranch(data, BusiGrpc+"/examples.Busi/TransInTcc", BusiGrpc+"/examples.Busi/TransInConfirm", BusiGrpc+"/examples.Busi/TransInRevert", r) return err }) - dtmimp.FatalIfError(err) + logger.FatalIfError(err) return gid }) } diff --git a/examples/grpc_xa.go b/examples/grpc_xa.go index 41aba41..c4f9d3c 100644 --- a/examples/grpc_xa.go +++ b/examples/grpc_xa.go @@ -9,7 +9,7 @@ package examples import ( context "context" - "github.com/yedf/dtm/dtmcli/dtmimp" + "github.com/yedf/dtm/dtmcli/logger" "github.com/yedf/dtm/dtmgrpc" "google.golang.org/protobuf/types/known/emptypb" ) @@ -27,7 +27,7 @@ func init() { err = xa.CallBranch(req, BusiGrpc+"/examples.Busi/TransInXa", r) return err }) - dtmimp.FatalIfError(err) + logger.FatalIfError(err) return gid }) } diff --git a/examples/http_gorm_xa.go b/examples/http_gorm_xa.go index 6034469..caacb79 100644 --- a/examples/http_gorm_xa.go +++ b/examples/http_gorm_xa.go @@ -9,7 +9,7 @@ package examples import ( "github.com/go-resty/resty/v2" "github.com/yedf/dtm/dtmcli" - "github.com/yedf/dtm/dtmcli/dtmimp" + "github.com/yedf/dtm/dtmcli/logger" ) func init() { @@ -22,7 +22,7 @@ func init() { } return xa.CallBranch(&TransReq{Amount: 30}, Busi+"/TransInXa") }) - dtmimp.FatalIfError(err) + logger.FatalIfError(err) return gid }) diff --git a/examples/http_msg.go b/examples/http_msg.go index be0e9ac..3f40805 100644 --- a/examples/http_msg.go +++ b/examples/http_msg.go @@ -8,21 +8,21 @@ package examples import ( "github.com/yedf/dtm/dtmcli" - "github.com/yedf/dtm/dtmcli/dtmimp" + "github.com/yedf/dtm/dtmcli/logger" ) func init() { addSample("msg", func() string { - dtmimp.Logf("a busi transaction begin") + logger.Debugf("a busi transaction begin") req := &TransReq{Amount: 30} msg := dtmcli.NewMsg(DtmHttpServer, dtmcli.MustGenGid(DtmHttpServer)). Add(Busi+"/TransOut", req). Add(Busi+"/TransIn", req) err := msg.Prepare(Busi + "/query") - dtmimp.FatalIfError(err) - dtmimp.Logf("busi trans submit") + logger.FatalIfError(err) + logger.Debugf("busi trans submit") err = msg.Submit() - dtmimp.FatalIfError(err) + logger.FatalIfError(err) return msg.Gid }) } diff --git a/examples/http_saga.go b/examples/http_saga.go index 54e2885..d7dd59b 100644 --- a/examples/http_saga.go +++ b/examples/http_saga.go @@ -8,36 +8,36 @@ package examples import ( "github.com/yedf/dtm/dtmcli" - "github.com/yedf/dtm/dtmcli/dtmimp" + "github.com/yedf/dtm/dtmcli/logger" ) func init() { addSample("saga", func() string { - dtmimp.Logf("a saga busi transaction begin") + logger.Debugf("a saga busi transaction begin") req := &TransReq{Amount: 30} saga := dtmcli.NewSaga(DtmHttpServer, dtmcli.MustGenGid(DtmHttpServer)). Add(Busi+"/TransOut", Busi+"/TransOutRevert", req). Add(Busi+"/TransIn", Busi+"/TransInRevert", req) - dtmimp.Logf("saga busi trans submit") + logger.Debugf("saga busi trans submit") err := saga.Submit() - dtmimp.Logf("result gid is: %s", saga.Gid) - dtmimp.FatalIfError(err) + logger.Debugf("result gid is: %s", saga.Gid) + logger.FatalIfError(err) return saga.Gid }) addSample("saga_wait", func() string { - dtmimp.Logf("a saga busi transaction begin") + logger.Debugf("a saga busi transaction begin") req := &TransReq{Amount: 30} saga := dtmcli.NewSaga(DtmHttpServer, dtmcli.MustGenGid(DtmHttpServer)). Add(Busi+"/TransOut", Busi+"/TransOutRevert", req). Add(Busi+"/TransIn", Busi+"/TransInRevert", req) saga.SetOptions(&dtmcli.TransOptions{WaitResult: true}) err := saga.Submit() - dtmimp.Logf("result gid is: %s", saga.Gid) - dtmimp.FatalIfError(err) + logger.Debugf("result gid is: %s", saga.Gid) + logger.FatalIfError(err) return saga.Gid }) addSample("concurrent_saga", func() string { - dtmimp.Logf("a concurrent saga busi transaction begin") + logger.Debugf("a concurrent saga busi transaction begin") req := &TransReq{Amount: 30} csaga := dtmcli.NewSaga(DtmHttpServer, dtmcli.MustGenGid(DtmHttpServer)). Add(Busi+"/TransOut", Busi+"/TransOutRevert", req). @@ -47,10 +47,10 @@ func init() { EnableConcurrent(). AddBranchOrder(2, []int{0, 1}). AddBranchOrder(3, []int{0, 1}) - dtmimp.Logf("concurrent saga busi trans submit") + logger.Debugf("concurrent saga busi trans submit") err := csaga.Submit() - dtmimp.Logf("result gid is: %s", csaga.Gid) - dtmimp.FatalIfError(err) + logger.Debugf("result gid is: %s", csaga.Gid) + logger.FatalIfError(err) return csaga.Gid }) } diff --git a/examples/http_saga_barrier.go b/examples/http_saga_barrier.go index c68aeb8..c633c58 100644 --- a/examples/http_saga_barrier.go +++ b/examples/http_saga_barrier.go @@ -13,6 +13,7 @@ import ( "github.com/yedf/dtm/common" "github.com/yedf/dtm/dtmcli" "github.com/yedf/dtm/dtmcli/dtmimp" + "github.com/yedf/dtm/dtmcli/logger" ) func init() { @@ -23,14 +24,14 @@ func init() { app.POST(BusiAPI+"/SagaBTransOutCompensate", common.WrapHandler(sagaBarrierTransOutCompensate)) } addSample("saga_barrier", func() string { - dtmimp.Logf("a busi transaction begin") + logger.Debugf("a busi transaction begin") req := &TransReq{Amount: 30} saga := dtmcli.NewSaga(DtmHttpServer, dtmcli.MustGenGid(DtmHttpServer)). Add(Busi+"/SagaBTransOut", Busi+"/SagaBTransOutCompensate", req). Add(Busi+"/SagaBTransIn", Busi+"/SagaBTransInCompensate", req) - dtmimp.Logf("busi trans submit") + logger.Debugf("busi trans submit") err := saga.Submit() - dtmimp.FatalIfError(err) + logger.FatalIfError(err) return saga.Gid }) } diff --git a/examples/http_saga_gorm_barrier.go b/examples/http_saga_gorm_barrier.go index 1f81c44..9499506 100644 --- a/examples/http_saga_gorm_barrier.go +++ b/examples/http_saga_gorm_barrier.go @@ -12,7 +12,7 @@ import ( "github.com/gin-gonic/gin" "github.com/yedf/dtm/common" "github.com/yedf/dtm/dtmcli" - "github.com/yedf/dtm/dtmcli/dtmimp" + "github.com/yedf/dtm/dtmcli/logger" ) func init() { @@ -20,14 +20,14 @@ func init() { app.POST(BusiAPI+"/SagaBTransOutGorm", common.WrapHandler(sagaGormBarrierTransOut)) } addSample("saga_gorm_barrier", func() string { - dtmimp.Logf("a busi transaction begin") + logger.Debugf("a busi transaction begin") req := &TransReq{Amount: 30} saga := dtmcli.NewSaga(DtmHttpServer, dtmcli.MustGenGid(DtmHttpServer)). Add(Busi+"/SagaBTransOutGorm", Busi+"/SagaBTransOutCompensate", req). Add(Busi+"/SagaBTransIn", Busi+"/SagaBTransInCompensate", req) - dtmimp.Logf("busi trans submit") + logger.Debugf("busi trans submit") err := saga.Submit() - dtmimp.FatalIfError(err) + logger.FatalIfError(err) return saga.Gid }) diff --git a/examples/http_tcc.go b/examples/http_tcc.go index 49c949f..76dff2f 100644 --- a/examples/http_tcc.go +++ b/examples/http_tcc.go @@ -11,15 +11,15 @@ import ( "github.com/go-resty/resty/v2" "github.com/yedf/dtm/common" "github.com/yedf/dtm/dtmcli" - "github.com/yedf/dtm/dtmcli/dtmimp" + "github.com/yedf/dtm/dtmcli/logger" ) func init() { setupFuncs["TccSetupSetup"] = func(app *gin.Engine) { app.POST(BusiAPI+"/TransInTccParent", common.WrapHandler(func(c *gin.Context) (interface{}, error) { tcc, err := dtmcli.TccFromQuery(c.Request.URL.Query()) - dtmimp.FatalIfError(err) - dtmimp.Logf("TransInTccParent ") + logger.FatalIfError(err) + logger.Debugf("TransInTccParent ") return tcc.CallBranch(&TransReq{Amount: reqFrom(c).Amount}, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert") })) } @@ -32,11 +32,11 @@ func init() { } return tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TransInTccParent", Busi+"/TransInConfirm", Busi+"/TransInRevert") }) - dtmimp.FatalIfError(err) + logger.FatalIfError(err) return gid }) addSample("tcc", func() string { - dtmimp.Logf("tcc simple transaction begin") + logger.Debugf("tcc simple transaction begin") gid := dtmcli.MustGenGid(DtmHttpServer) err := dtmcli.TccGlobalTransaction(DtmHttpServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) { resp, err := tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert") @@ -45,7 +45,7 @@ func init() { } return tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert") }) - dtmimp.FatalIfError(err) + logger.FatalIfError(err) return gid }) } diff --git a/examples/http_tcc_barrier.go b/examples/http_tcc_barrier.go index 85e65da..56a0467 100644 --- a/examples/http_tcc_barrier.go +++ b/examples/http_tcc_barrier.go @@ -15,6 +15,7 @@ import ( "github.com/yedf/dtm/common" "github.com/yedf/dtm/dtmcli" "github.com/yedf/dtm/dtmcli/dtmimp" + "github.com/yedf/dtm/dtmcli/logger" ) func init() { @@ -27,7 +28,7 @@ func init() { app.POST(BusiAPI+"/TccBTransOutCancel", common.WrapHandler(TccBarrierTransOutCancel)) } addSample("tcc_barrier", func() string { - dtmimp.Logf("tcc transaction begin") + logger.Debugf("tcc transaction begin") gid := dtmcli.MustGenGid(DtmHttpServer) err := dtmcli.TccGlobalTransaction(DtmHttpServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) { resp, err := tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TccBTransOutTry", @@ -37,7 +38,7 @@ func init() { } return tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TccBTransInTry", Busi+"/TccBTransInConfirm", Busi+"/TccBTransInCancel") }) - dtmimp.FatalIfError(err) + logger.FatalIfError(err) return gid }) } diff --git a/examples/http_xa.go b/examples/http_xa.go index 5ed0159..a288b45 100644 --- a/examples/http_xa.go +++ b/examples/http_xa.go @@ -11,7 +11,7 @@ import ( "github.com/go-resty/resty/v2" "github.com/yedf/dtm/common" "github.com/yedf/dtm/dtmcli" - "github.com/yedf/dtm/dtmcli/dtmimp" + "github.com/yedf/dtm/dtmcli/logger" ) // XaClient XA client connection @@ -25,7 +25,7 @@ func init() { return xa.HandleCallback(c.Query("gid"), c.Query("branch_id"), c.Query("op")) })) }) - dtmimp.FatalIfError(err) + logger.FatalIfError(err) } addSample("xa", func() string { gid := dtmcli.MustGenGid(DtmHttpServer) @@ -36,7 +36,7 @@ func init() { } return xa.CallBranch(&TransReq{Amount: 30}, Busi+"/TransInXa") }) - dtmimp.FatalIfError(err) + logger.FatalIfError(err) return gid }) } diff --git a/examples/quick_start.go b/examples/quick_start.go index e71d816..47329ed 100644 --- a/examples/quick_start.go +++ b/examples/quick_start.go @@ -14,6 +14,7 @@ import ( "github.com/yedf/dtm/common" "github.com/yedf/dtm/dtmcli" "github.com/yedf/dtm/dtmcli/dtmimp" + "github.com/yedf/dtm/dtmcli/logger" ) // 启动命令:go run app/main.go qs @@ -28,7 +29,7 @@ var qsBusi = fmt.Sprintf("http://localhost:%d%s", qsBusiPort, qsBusiAPI) func QsStartSvr() { app := common.GetGinApp() qsAddRoute(app) - dtmimp.Logf("quick qs examples listening at %d", qsBusiPort) + logger.Debugf("quick qs examples listening at %d", qsBusiPort) go app.Run(fmt.Sprintf(":%d", qsBusiPort)) time.Sleep(100 * time.Millisecond) } @@ -44,7 +45,7 @@ func QsFireRequest() string { Add(qsBusi+"/TransIn", qsBusi+"/TransInCompensate", req) // 提交saga事务,dtm会完成所有的子事务/回滚所有的子事务 err := saga.Submit() - dtmimp.FatalIfError(err) + logger.FatalIfError(err) return saga.Gid } diff --git a/helper/test-cover.sh b/helper/test-cover.sh index 0bf9fbe..b94a361 100644 --- a/helper/test-cover.sh +++ b/helper/test-cover.sh @@ -2,7 +2,7 @@ set -x echo "" > coverage.txt for store in redis mysql boltdb; do for d in $(go list ./... | grep -v vendor); do - TEST_STORE=$store go test -covermode count -coverprofile=profile.out -coverpkg=github.com/yedf/dtm/common,github.com/yedf/dtm/dtmcli,github.com/yedf/dtm/dtmcli/dtmimp,github.com/yedf/dtm/dtmgrpc,github.com/yedf/dtm/dtmgrpc/dtmgimp,github.com/yedf/dtm/dtmsvr,github.com/yedf/dtm/dtmsvr/storage,github.com/yedf/dtm/dtmsvr/storage/boltdb,github.com/yedf/dtm/dtmsvr/storage/registry -gcflags=-l $d + TEST_STORE=$store go test -covermode count -coverprofile=profile.out -coverpkg=github.com/yedf/dtm/common,github.com/yedf/dtm/dtmcli,github.com/yedf/dtm/dtmcli/dtmimp,github.com/yedf/dtm/dtmgrpc,github.com/yedf/dtm/dtmgrpc/dtmgimp,github.com/yedf/dtm/dtmsvr,github.com/yedf/dtm/dtmsvr/storage,github.com/yedf/dtm/dtmsvr/storage/boltdb,github.com/yedf/dtm/dtmsvr/storage/redis,github.com/yedf/dtm/dtmsvr/storage/registry,github.com/yedf/dtm/dtmsvr/storage/sql,github.com/yedf/dtm/dtmsvr/storage/boltdb,github.com/yedf/dtm/dtmsvr/storage/registry -gcflags=-l $d if [ -f profile.out ]; then cat profile.out >> coverage.txt echo > profile.out diff --git a/test/base_test.go b/test/base_test.go index 7dbc192..deffa8d 100644 --- a/test/base_test.go +++ b/test/base_test.go @@ -15,6 +15,7 @@ import ( "github.com/yedf/dtm/common" "github.com/yedf/dtm/dtmcli" "github.com/yedf/dtm/dtmcli/dtmimp" + "github.com/yedf/dtm/dtmcli/logger" "github.com/yedf/dtm/examples" ) @@ -40,7 +41,7 @@ func TestBaseSqlDB(t *testing.T) { tx, err := db.ToSQLDB().Begin() asserts.Nil(err) err = barrier.Call(tx, func(tx *sql.Tx) error { - dtmimp.Logf("rollback gid2") + logger.Debugf("rollback gid2") return fmt.Errorf("gid2 error") }) asserts.Error(err, fmt.Errorf("gid2 error")) @@ -50,7 +51,7 @@ func TestBaseSqlDB(t *testing.T) { asserts.Equal(dbr.RowsAffected, int64(0)) barrier.BarrierID = 0 err = barrier.CallWithDB(db.ToSQLDB(), func(tx *sql.Tx) error { - dtmimp.Logf("submit gid2") + logger.Debugf("submit gid2") return nil }) asserts.Nil(err) diff --git a/test/saga_grpc_test.go b/test/saga_grpc_test.go index ca06f07..d611241 100644 --- a/test/saga_grpc_test.go +++ b/test/saga_grpc_test.go @@ -75,6 +75,17 @@ func TestSagaGrpcNormalWait(t *testing.T) { waitTransProcessed(saga.Gid) } +func TestSagaGrpcEmptyUrl(t *testing.T) { + saga := dtmgrpc.NewSagaGrpc(examples.DtmGrpcServer, dtmimp.GetFuncName()) + req := examples.GenBusiReq(30, false, false) + saga.Add(examples.BusiGrpc+"/examples.Busi/TransOut", examples.BusiGrpc+"/examples.Busi/TransOutRevert", req) + saga.Add("", examples.BusiGrpc+"/examples.Busi/TransInRevert", req) + saga.Submit() + waitTransProcessed(saga.Gid) + assert.Equal(t, []string{StatusPrepared, StatusSucceed, StatusPrepared, StatusSucceed}, getBranchesStatus(saga.Gid)) + assert.Equal(t, StatusSucceed, getTransStatus(saga.Gid)) +} + func genSagaGrpc(gid string, outFailed bool, inFailed bool) *dtmgrpc.SagaGrpc { saga := dtmgrpc.NewSagaGrpc(examples.DtmGrpcServer, gid) req := examples.GenBusiReq(30, outFailed, inFailed) diff --git a/test/saga_test.go b/test/saga_test.go index 0e42b05..5fbc166 100644 --- a/test/saga_test.go +++ b/test/saga_test.go @@ -58,6 +58,17 @@ func TestSagaAbnormal(t *testing.T) { assert.Error(t, err) // a succeed trans can't accept submit } +func TestSagaEmptyUrl(t *testing.T) { + saga := dtmcli.NewSaga(examples.DtmHttpServer, dtmimp.GetFuncName()) + req := examples.GenTransReq(30, false, false) + saga.Add(examples.Busi+"/TransOut", "", &req) + saga.Add("", "", &req) + saga.Submit() + waitTransProcessed(saga.Gid) + assert.Equal(t, []string{StatusPrepared, StatusSucceed, StatusPrepared, StatusSucceed}, getBranchesStatus(saga.Gid)) + assert.Equal(t, StatusSucceed, getTransStatus(saga.Gid)) +} + func genSaga(gid string, outFailed bool, inFailed bool) *dtmcli.Saga { saga := dtmcli.NewSaga(examples.DtmHttpServer, gid) req := examples.GenTransReq(30, outFailed, inFailed) diff --git a/test/tcc_barrier_test.go b/test/tcc_barrier_test.go index a1b3c88..238a70d 100644 --- a/test/tcc_barrier_test.go +++ b/test/tcc_barrier_test.go @@ -19,6 +19,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/yedf/dtm/dtmcli" "github.com/yedf/dtm/dtmcli/dtmimp" + "github.com/yedf/dtm/dtmcli/logger" "github.com/yedf/dtm/examples" ) @@ -68,7 +69,7 @@ func TestTccBarrierDisorder(t *testing.T) { res, err := examples.TccBarrierTransOutCancel(c) if !sleeped { sleeped = true - dtmimp.Logf("sleep before cancel return") + logger.Debugf("sleep before cancel return") <-timeoutChan finishedChan <- "1" } @@ -89,7 +90,7 @@ func TestTccBarrierDisorder(t *testing.T) { assert.Contains(t, resp.String(), dtmcli.ResultSuccess) go func() { - dtmimp.Logf("sleeping to wait for tcc try timeout") + logger.Debugf("sleeping to wait for tcc try timeout") <-timeoutChan r, _ := dtmimp.RestyClient.R(). SetBody(body). @@ -104,10 +105,10 @@ func TestTccBarrierDisorder(t *testing.T) { assert.True(t, strings.Contains(r.String(), dtmcli.ResultSuccess)) // 这个是悬挂操作,为了简单起见,依旧让他返回成功 finishedChan <- "1" }() - dtmimp.Logf("cron to timeout and then call cancel") + logger.Debugf("cron to timeout and then call cancel") go cronTransOnceForwardNow(300) time.Sleep(100 * time.Millisecond) - dtmimp.Logf("cron to timeout and then call cancelled twice") + logger.Debugf("cron to timeout and then call cancelled twice") cronTransOnceForwardNow(300) timeoutChan <- "wake" timeoutChan <- "wake" diff --git a/test/tcc_grpc_test.go b/test/tcc_grpc_test.go index ff4b2df..16ba193 100644 --- a/test/tcc_grpc_test.go +++ b/test/tcc_grpc_test.go @@ -13,6 +13,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/yedf/dtm/dtmcli" "github.com/yedf/dtm/dtmcli/dtmimp" + "github.com/yedf/dtm/dtmcli/logger" "github.com/yedf/dtm/dtmgrpc" "github.com/yedf/dtm/examples" "google.golang.org/protobuf/types/known/emptypb" @@ -70,7 +71,7 @@ func TestTccGrpcNested(t *testing.T) { func TestTccGrpcType(t *testing.T) { _, err := dtmgrpc.TccFromGrpc(context.Background()) assert.Error(t, err) - dtmimp.Logf("expecting dtmgrpcserver error") + logger.Debugf("expecting dtmgrpcserver error") err = dtmgrpc.TccGlobalTransaction("-", "", func(tcc *dtmgrpc.TccGrpc) error { return nil }) assert.Error(t, err) } diff --git a/test/types.go b/test/types.go index 7c54b86..bef96cc 100644 --- a/test/types.go +++ b/test/types.go @@ -12,6 +12,7 @@ import ( "github.com/yedf/dtm/common" "github.com/yedf/dtm/dtmcli" "github.com/yedf/dtm/dtmcli/dtmimp" + "github.com/yedf/dtm/dtmcli/logger" "github.com/yedf/dtm/dtmsvr" ) @@ -23,16 +24,16 @@ func dbGet() *common.DB { // waitTransProcessed only for test usage. wait for transaction processed once func waitTransProcessed(gid string) { - dtmimp.Logf("waiting for gid %s", gid) + logger.Debugf("waiting for gid %s", gid) select { case id := <-dtmsvr.TransProcessedTestChan: for id != gid { - dtmimp.LogRedf("-------id %s not match gid %s", id, gid) + logger.Errorf("-------id %s not match gid %s", id, gid) id = <-dtmsvr.TransProcessedTestChan } - dtmimp.Logf("finish for gid %s", gid) + logger.Debugf("finish for gid %s", gid) case <-time.After(time.Duration(time.Second * 3)): - dtmimp.LogFatalf("Wait Trans timeout") + logger.FatalfIf(true, "Wait Trans timeout") } }