From 6fed8dae6b506a7654be39271110cf5fb6a874e9 Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Fri, 24 Dec 2021 17:05:04 +0800 Subject: [PATCH] Logf changed to Debugf --- app/main.go | 3 +-- bench/http.go | 6 +++--- bench/main.go | 4 ++-- common/config.go | 3 +-- common/config_utils.go | 2 +- common/db.go | 7 ++++--- common/types.go | 4 ++-- common/utils.go | 8 ++++---- dtmcli/barrier.go | 3 ++- dtmgrpc/dtmgimp/grpc_clients.go | 5 +++-- dtmgrpc/dtmgimp/types.go | 15 ++++++++------- dtmgrpc/dtmgimp/utils.go | 3 ++- dtmsvr/cron.go | 5 +++-- dtmsvr/storage/boltdb/boltdb.go | 13 +++++++------ dtmsvr/storage/redis/redis.go | 5 +++-- dtmsvr/svr.go | 11 +++++------ dtmsvr/trans_class.go | 3 ++- dtmsvr/trans_process.go | 9 +++++---- dtmsvr/trans_type_msg.go | 4 ++-- dtmsvr/trans_type_saga.go | 11 ++++++----- dtmsvr/trans_type_tcc.go | 3 ++- examples/base_grpc.go | 6 +++--- examples/base_http.go | 11 ++++++----- examples/grpc_tcc.go | 3 +-- examples/http_msg.go | 5 ++--- examples/http_saga.go | 17 ++++++++--------- examples/http_saga_barrier.go | 4 ++-- examples/http_saga_gorm_barrier.go | 5 ++--- examples/http_tcc.go | 5 ++--- examples/http_tcc_barrier.go | 2 +- examples/quick_start.go | 2 +- test/base_test.go | 5 +++-- test/tcc_barrier_test.go | 9 +++++---- test/tcc_grpc_test.go | 3 ++- test/types.go | 6 +++--- 35 files changed, 109 insertions(+), 101 deletions(-) diff --git a/app/main.go b/app/main.go index 3382911..68a8d57 100644 --- a/app/main.go +++ b/app/main.go @@ -15,7 +15,6 @@ import ( "github.com/yedf/dtm/common" "github.com/yedf/dtm/dtmcli" - "github.com/yedf/dtm/dtmcli/dtmimp" "github.com/yedf/dtm/dtmcli/logger" "github.com/yedf/dtm/dtmsvr" "github.com/yedf/dtm/dtmsvr/storage/registry" @@ -51,7 +50,7 @@ func main() { fmt.Printf("version: %s commit: %s built at: %s\n", Version, Commit, Date) return } - dtmimp.Logf("starting dtm....") + logger.Debugf("starting dtm....") common.MustLoadConfig() if common.Config.ExamplesDB.Driver != "" { dtmcli.SetCurrentDBType(common.Config.ExamplesDB.Driver) diff --git a/bench/http.go b/bench/http.go index 2e8389c..403abd3 100644 --- a/bench/http.go +++ b/bench/http.go @@ -60,7 +60,7 @@ func reloadData() { } _, err := dtmimp.DBExec(db, s+strings.Join(ss, ",")) logger.FatalIfError(err) - dtmimp.Logf("%d users inserted. used: %dms", total, time.Since(began).Milliseconds()) + logger.Debugf("%d users inserted. used: %dms", total, time.Since(began).Milliseconds()) } var uidCounter int32 = 0 @@ -71,7 +71,7 @@ var sqls int = 1 func StartSvr() { app := common.GetGinApp() benchAddRoute(app) - dtmimp.Logf("bench listening at %d", benchPort) + logger.Debugf("bench listening at %d", benchPort) go app.Run(fmt.Sprintf(":%d", benchPort)) db := sdbGet() _, err := dtmimp.DBExec(db, "drop table if exists dtm_busi.user_account_log") @@ -151,7 +151,7 @@ func benchAddRoute(app *gin.Engine) { req := gin.H{} params := fmt.Sprintf("?uid=%s", suid) params2 := fmt.Sprintf("?uid=%s", suid2) - dtmimp.Logf("mode: %s contains dtm: %t", mode, strings.Contains(mode, "dtm")) + logger.Debugf("mode: %s contains dtm: %t", mode, strings.Contains(mode, "dtm")) if strings.Contains(mode, "dtm") { saga := dtmcli.NewSaga(examples.DtmHttpServer, fmt.Sprintf("bench-%d", uid)). Add(benchBusi+"/TransOut"+params, benchBusi+"/TransOutCompensate"+params, req). diff --git a/bench/main.go b/bench/main.go index f757c8c..ebd604d 100644 --- a/bench/main.go +++ b/bench/main.go @@ -6,7 +6,7 @@ import ( "github.com/yedf/dtm/common" "github.com/yedf/dtm/dtmcli" - "github.com/yedf/dtm/dtmcli/dtmimp" + "github.com/yedf/dtm/dtmcli/logger" "github.com/yedf/dtm/dtmsvr" "github.com/yedf/dtm/dtmsvr/storage/registry" "github.com/yedf/dtm/examples" @@ -23,7 +23,7 @@ func main() { fmt.Printf(hint) return } - dtmimp.Logf("starting dtm....") + logger.Debugf("starting dtm....") if os.Args[1] == "http" { fmt.Println("start bench server") common.MustLoadConfig() diff --git a/common/config.go b/common/config.go index 6a0838a..940dd1a 100644 --- a/common/config.go +++ b/common/config.go @@ -7,7 +7,6 @@ import ( "path/filepath" "github.com/yedf/dtm/dtmcli" - "github.com/yedf/dtm/dtmcli/dtmimp" "github.com/yedf/dtm/dtmcli/logger" "gopkg.in/yaml.v2" ) @@ -87,7 +86,7 @@ func MustLoadConfig() { } scont, err := json.MarshalIndent(&Config, "", " ") logger.FatalIfError(err) - dtmimp.Logf("config is: \n%s", scont) + logger.Debugf("config is: \n%s", scont) err = checkConfig() logger.FatalfIf(err != nil, `config error: '%v'. check you env, and conf.yml/conf.sample.yml in current and parent path: %s. diff --git a/common/config_utils.go b/common/config_utils.go index 44c93f6..e54a834 100644 --- a/common/config_utils.go +++ b/common/config_utils.go @@ -50,6 +50,6 @@ func toUnderscoreUpper(key string) string { key = strings.Trim(key, "_") matchFirstCap := regexp.MustCompile("([a-z])([A-Z]+)") s2 := matchFirstCap.ReplaceAllString(key, "${1}_${2}") - // dtmimp.Logf("loading from env: %s", strings.ToUpper(s2)) + // logger.Debugf("loading from env: %s", strings.ToUpper(s2)) return strings.ToUpper(s2) } diff --git a/common/db.go b/common/db.go index a9c315d..f2d1c66 100644 --- a/common/db.go +++ b/common/db.go @@ -11,6 +11,7 @@ import ( _ "github.com/lib/pq" // register postgres driver "github.com/yedf/dtm/dtmcli" "github.com/yedf/dtm/dtmcli/dtmimp" + "github.com/yedf/dtm/dtmcli/logger" "gorm.io/driver/mysql" "gorm.io/driver/postgres" "gorm.io/gorm" @@ -65,7 +66,7 @@ func (op *tracePlugin) Initialize(db *gorm.DB) (err error) { after := func(db *gorm.DB) { _ts, _ := db.InstanceGet("ivy.startTime") sql := db.Dialector.Explain(db.Statement.SQL.String(), db.Statement.Vars...) - dtmimp.Logf("used: %d ms affected: %d sql is: %s", time.Since(_ts.(time.Time)).Milliseconds(), db.RowsAffected, sql) + logger.Debugf("used: %d ms affected: %d sql is: %s", time.Since(_ts.(time.Time)).Milliseconds(), db.RowsAffected, sql) if v, ok := db.InstanceGet("ivy.must"); ok && v.(bool) { if db.Error != nil && db.Error != gorm.ErrRecordNotFound { panic(db.Error) @@ -76,7 +77,7 @@ func (op *tracePlugin) Initialize(db *gorm.DB) (err error) { beforeName := "cb_before" afterName := "cb_after" - dtmimp.Logf("installing db plugin: %s", op.Name()) + logger.Debugf("installing db plugin: %s", op.Name()) // 开始前 _ = db.Callback().Create().Before("gorm:before_create").Register(beforeName, before) _ = db.Callback().Query().Before("gorm:query").Register(beforeName, before) @@ -108,7 +109,7 @@ func DbGet(conf dtmcli.DBConf) *DB { dsn := dtmimp.GetDsn(conf) db, ok := dbs.Load(dsn) if !ok { - dtmimp.Logf("connecting %s", strings.Replace(dsn, conf.Passwrod, "****", 1)) + logger.Debugf("connecting %s", strings.Replace(dsn, conf.Passwrod, "****", 1)) db1, err := gorm.Open(getGormDialetor(conf.Driver, dsn), &gorm.Config{ SkipDefaultTransaction: true, }) diff --git a/common/types.go b/common/types.go index 358dd80..19bd8ea 100644 --- a/common/types.go +++ b/common/types.go @@ -11,7 +11,7 @@ import ( "sync" "github.com/go-redis/redis/v8" - "github.com/yedf/dtm/dtmcli/dtmimp" + "github.com/yedf/dtm/dtmcli/logger" ) var rdb *redis.Client @@ -19,7 +19,7 @@ var once sync.Once func RedisGet() *redis.Client { once.Do(func() { - dtmimp.Logf("connecting to redis: %v", Config.Store) + logger.Debugf("connecting to redis: %v", Config.Store) rdb = redis.NewClient(&redis.Options{ Addr: fmt.Sprintf("%s:%d", Config.Store.Host, Config.Store.Port), Username: Config.Store.User, diff --git a/common/utils.go b/common/utils.go index 5e690d9..f9db2b1 100644 --- a/common/utils.go +++ b/common/utils.go @@ -38,9 +38,9 @@ func GetGinApp() *gin.Engine { } } began := time.Now() - dtmimp.Logf("begin %s %s query: %s body: %s", c.Request.Method, c.FullPath(), c.Request.URL.RawQuery, body) + logger.Debugf("begin %s %s query: %s body: %s", c.Request.Method, c.FullPath(), c.Request.URL.RawQuery, body) c.Next() - dtmimp.Logf("used %d ms %s %s query: %s body: %s", time.Since(began).Milliseconds(), c.Request.Method, c.FullPath(), c.Request.URL.RawQuery, body) + logger.Debugf("used %d ms %s %s query: %s body: %s", time.Since(began).Milliseconds(), c.Request.Method, c.FullPath(), c.Request.URL.RawQuery, body) }) app.Any("/api/ping", func(c *gin.Context) { c.JSON(200, map[string]interface{}{"msg": "pong"}) }) @@ -61,10 +61,10 @@ func WrapHandler(fn func(*gin.Context) (interface{}, error)) gin.HandlerFunc { b, err = json.Marshal(r) } if err != nil { - dtmimp.Logf("status: 500, code: 500 message: %s", err.Error()) + logger.Debugf("status: 500, code: 500 message: %s", err.Error()) c.JSON(500, map[string]interface{}{"code": 500, "message": err.Error()}) } else { - dtmimp.Logf("status: 200, content: %s", string(b)) + logger.Debugf("status: 200, content: %s", string(b)) c.Status(200) c.Writer.Header().Add("Content-Type", "application/json") _, err = c.Writer.Write(b) diff --git a/dtmcli/barrier.go b/dtmcli/barrier.go index 480ae17..ede7080 100644 --- a/dtmcli/barrier.go +++ b/dtmcli/barrier.go @@ -12,6 +12,7 @@ import ( "net/url" "github.com/yedf/dtm/dtmcli/dtmimp" + "github.com/yedf/dtm/dtmcli/logger" ) // BarrierBusiFunc type for busi func @@ -82,7 +83,7 @@ func (bb *BranchBarrier) Call(tx *sql.Tx, busiCall BarrierBusiFunc) (rerr error) originAffected, _ := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, originType, bid, ti.Op) currentAffected, rerr := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, ti.Op, bid, ti.Op) - dtmimp.Logf("originAffected: %d currentAffected: %d", originAffected, currentAffected) + logger.Debugf("originAffected: %d currentAffected: %d", originAffected, currentAffected) if (ti.Op == BranchCancel || ti.Op == BranchCompensate) && originAffected > 0 || // 这个是空补偿 currentAffected == 0 { // 这个是重复请求或者悬挂 return diff --git a/dtmgrpc/dtmgimp/grpc_clients.go b/dtmgrpc/dtmgimp/grpc_clients.go index 17c7b54..e437054 100644 --- a/dtmgrpc/dtmgimp/grpc_clients.go +++ b/dtmgrpc/dtmgimp/grpc_clients.go @@ -11,6 +11,7 @@ import ( "sync" "github.com/yedf/dtm/dtmcli/dtmimp" + "github.com/yedf/dtm/dtmcli/logger" "github.com/yedf/dtm/dtmgrpc/dtmgpb" grpc "google.golang.org/grpc" ) @@ -57,12 +58,12 @@ func GetGrpcConn(grpcServer string, isRaw bool) (conn *grpc.ClientConn, rerr err if isRaw { opts = grpc.WithDefaultCallOptions(grpc.ForceCodec(rawCodec{})) } - dtmimp.Logf("grpc client connecting %s", grpcServer) + logger.Debugf("grpc client connecting %s", grpcServer) conn, rerr := grpc.Dial(grpcServer, grpc.WithInsecure(), grpc.WithUnaryInterceptor(GrpcClientLog), opts) if rerr == nil { clients.Store(grpcServer, conn) v = conn - dtmimp.Logf("grpc client inited for %s", grpcServer) + logger.Debugf("grpc client inited for %s", grpcServer) } } return v.(*grpc.ClientConn), rerr diff --git a/dtmgrpc/dtmgimp/types.go b/dtmgrpc/dtmgimp/types.go index a188f23..31f0bf9 100644 --- a/dtmgrpc/dtmgimp/types.go +++ b/dtmgrpc/dtmgimp/types.go @@ -12,6 +12,7 @@ import ( "github.com/yedf/dtm/dtmcli" "github.com/yedf/dtm/dtmcli/dtmimp" + "github.com/yedf/dtm/dtmcli/logger" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -19,28 +20,28 @@ import ( // GrpcServerLog 打印grpc服务端的日志 func GrpcServerLog(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { - dtmimp.Logf("grpc server handling: %s %v", info.FullMethod, req) + logger.Debugf("grpc server handling: %s %v", info.FullMethod, req) LogDtmCtx(ctx) m, err := handler(ctx, req) res := fmt.Sprintf("grpc server handled: %s %v result: %v err: %v", info.FullMethod, req, m, err) if err != nil { - dtmimp.LogRedf("%s", res) + logger.Errorf("%s", res) } else { - dtmimp.Logf("%s", res) + logger.Debugf("%s", res) } return m, err } // GrpcClientLog 打印grpc服务端的日志 func GrpcClientLog(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { - dtmimp.Logf("grpc client calling: %s%s %v", cc.Target(), method, req) + logger.Debugf("grpc client calling: %s%s %v", cc.Target(), method, req) LogDtmCtx(ctx) err := invoker(ctx, method, req, reply, cc, opts...) res := fmt.Sprintf("grpc client called: %s%s %v result: %v err: %v", cc.Target(), method, req, reply, err) if err != nil { - dtmimp.LogRedf("%s", res) + logger.Errorf("%s", res) } else { - dtmimp.Logf("%s", res) + logger.Debugf("%s", res) } return err } @@ -49,7 +50,7 @@ func GrpcClientLog(ctx context.Context, method string, req, reply interface{}, c func Result2Error(res interface{}, err error) error { e := dtmimp.CheckResult(res, err) if e == dtmimp.ErrFailure { - dtmimp.LogRedf("failure: res: %v, err: %v", res, e) + logger.Errorf("failure: res: %v, err: %v", res, e) return status.New(codes.Aborted, dtmcli.ResultFailure).Err() } else if e == dtmimp.ErrOngoing { return status.New(codes.Aborted, dtmcli.ResultOngoing).Err() diff --git a/dtmgrpc/dtmgimp/utils.go b/dtmgrpc/dtmgimp/utils.go index 357d0c7..e600166 100644 --- a/dtmgrpc/dtmgimp/utils.go +++ b/dtmgrpc/dtmgimp/utils.go @@ -10,6 +10,7 @@ import ( context "context" "github.com/yedf/dtm/dtmcli/dtmimp" + "github.com/yedf/dtm/dtmcli/logger" "github.com/yedf/dtm/dtmgrpc/dtmgpb" "google.golang.org/grpc/metadata" "google.golang.org/protobuf/proto" @@ -59,7 +60,7 @@ func TransInfo2Ctx(gid, transType, branchID, op, dtm string) context.Context { func LogDtmCtx(ctx context.Context) { tb := TransBaseFromGrpc(ctx) if tb.Gid != "" { - dtmimp.Logf("gid: %s trans_type: %s branch_id: %s op: %s dtm: %s", tb.Gid, tb.TransType, tb.BranchID, tb.Op, tb.Dtm) + logger.Debugf("gid: %s trans_type: %s branch_id: %s op: %s dtm: %s", tb.Gid, tb.TransType, tb.BranchID, tb.Op, tb.Dtm) } } diff --git a/dtmsvr/cron.go b/dtmsvr/cron.go index c4ef282..bd31d29 100644 --- a/dtmsvr/cron.go +++ b/dtmsvr/cron.go @@ -13,6 +13,7 @@ import ( "time" "github.com/yedf/dtm/dtmcli/dtmimp" + "github.com/yedf/dtm/dtmcli/logger" ) // NowForwardDuration will be set in test, trans may be timeout @@ -54,7 +55,7 @@ func lockOneTrans(expireIn time.Duration) *TransGlobal { func handlePanic(perr *error) { if err := recover(); err != nil { - dtmimp.LogRedf("----recovered panic %v\n%s", err, string(debug.Stack())) + logger.Errorf("----recovered panic %v\n%s", err, string(debug.Stack())) if perr != nil { *perr = fmt.Errorf("dtm panic: %v", err) } @@ -64,6 +65,6 @@ func handlePanic(perr *error) { func sleepCronTime() { normal := time.Duration((float64(config.TransCronInterval) - rand.Float64()) * float64(time.Second)) interval := dtmimp.If(CronForwardDuration > 0, 1*time.Millisecond, normal).(time.Duration) - dtmimp.Logf("sleeping for %v milli", interval/time.Microsecond) + logger.Debugf("sleeping for %v milli", interval/time.Microsecond) time.Sleep(interval) } diff --git a/dtmsvr/storage/boltdb/boltdb.go b/dtmsvr/storage/boltdb/boltdb.go index f5b9670..31a0804 100644 --- a/dtmsvr/storage/boltdb/boltdb.go +++ b/dtmsvr/storage/boltdb/boltdb.go @@ -11,6 +11,7 @@ import ( "github.com/yedf/dtm/common" "github.com/yedf/dtm/dtmcli/dtmimp" + "github.com/yedf/dtm/dtmcli/logger" "github.com/yedf/dtm/dtmsvr/storage" ) @@ -100,9 +101,9 @@ func cleanupGlobalWithGids(t *bolt.Tx, gids map[string]struct{}) { return } - dtmimp.Logf("Start to cleanup %d gids", len(gids)) + logger.Debugf("Start to cleanup %d gids", len(gids)) for gid := range gids { - dtmimp.Logf("Start to delete gid: %s", gid) + logger.Debugf("Start to delete gid: %s", gid) bucket.Delete([]byte(gid)) } } @@ -129,9 +130,9 @@ func cleanupBranchWithGids(t *bolt.Tx, gids map[string]struct{}) { } } - dtmimp.Logf("Start to cleanup %d branches", len(branchKeys)) + logger.Debugf("Start to cleanup %d branches", len(branchKeys)) for _, key := range branchKeys { - dtmimp.Logf("Start to delete branch: %s", key) + logger.Debugf("Start to delete branch: %s", key) bucket.Delete([]byte(key)) } } @@ -155,9 +156,9 @@ func cleanupIndexWithGids(t *bolt.Tx, gids map[string]struct{}) { } } - dtmimp.Logf("Start to cleanup %d indexes", len(indexKeys)) + logger.Debugf("Start to cleanup %d indexes", len(indexKeys)) for _, key := range indexKeys { - dtmimp.Logf("Start to delete index: %s", key) + logger.Debugf("Start to delete index: %s", key) bucket.Delete([]byte(key)) } } diff --git a/dtmsvr/storage/redis/redis.go b/dtmsvr/storage/redis/redis.go index 348e572..00cecd4 100644 --- a/dtmsvr/storage/redis/redis.go +++ b/dtmsvr/storage/redis/redis.go @@ -10,6 +10,7 @@ import ( "github.com/yedf/dtm/common" "github.com/yedf/dtm/dtmcli/dtmimp" + "github.com/yedf/dtm/dtmcli/logger" "github.com/yedf/dtm/dtmsvr/storage" ) @@ -114,7 +115,7 @@ func (a *argList) AppendBranches(branches []storage.TransBranchStore) *argList { } func handleRedisResult(ret interface{}, err error) (string, error) { - dtmimp.Logf("result is: '%v', err: '%v'", ret, err) + logger.Debugf("result is: '%v', err: '%v'", ret, err) if err != nil && err != redis.Nil { return "", err } @@ -127,7 +128,7 @@ func handleRedisResult(ret interface{}, err error) (string, error) { } func callLua(a *argList, lua string) (string, error) { - dtmimp.Logf("calling lua. args: %v\nlua:%s", a, lua) + logger.Debugf("calling lua. args: %v\nlua:%s", a, lua) ret, err := redisGet().Eval(ctx, lua, a.Keys, a.List...).Result() return handleRedisResult(ret, err) } diff --git a/dtmsvr/svr.go b/dtmsvr/svr.go index de67eeb..4f6c866 100644 --- a/dtmsvr/svr.go +++ b/dtmsvr/svr.go @@ -13,7 +13,6 @@ import ( grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" "github.com/yedf/dtm/common" - "github.com/yedf/dtm/dtmcli/dtmimp" "github.com/yedf/dtm/dtmcli/logger" "github.com/yedf/dtm/dtmgrpc/dtmgimp" "github.com/yedf/dtm/dtmgrpc/dtmgpb" @@ -23,11 +22,11 @@ import ( // StartSvr StartSvr func StartSvr() { - dtmimp.Logf("start dtmsvr") + logger.Debugf("start dtmsvr") app := common.GetGinApp() app = httpMetrics(app) addRoute(app) - dtmimp.Logf("dtmsvr listen at: %d", config.HttpPort) + logger.Debugf("dtmsvr listen at: %d", config.HttpPort) go app.Run(fmt.Sprintf(":%d", config.HttpPort)) lis, err := net.Listen("tcp", fmt.Sprintf(":%d", config.GrpcPort)) @@ -37,7 +36,7 @@ func StartSvr() { grpc.UnaryServerInterceptor(grpcMetrics), grpc.UnaryServerInterceptor(dtmgimp.GrpcServerLog)), )) dtmgpb.RegisterDtmServer(s, &dtmServer{}) - dtmimp.Logf("grpc listening at %v", lis.Addr()) + logger.Debugf("grpc listening at %v", lis.Addr()) go func() { err := s.Serve(lis) logger.FatalIfError(err) @@ -80,9 +79,9 @@ func updateBranchAsync() { for len(updates) > 0 { dbr := GetStore().UpdateBranchesSql(updates, []string{"status", "finish_time", "update_time"}) - dtmimp.Logf("flushed %d branch status to db. affected: %d", len(updates), dbr.RowsAffected) + logger.Debugf("flushed %d branch status to db. affected: %d", len(updates), dbr.RowsAffected) if dbr.Error != nil { - dtmimp.LogRedf("async update branch status error: %v", dbr.Error) + logger.Errorf("async update branch status error: %v", dbr.Error) time.Sleep(1 * time.Second) } else { updates = []TransBranch{} diff --git a/dtmsvr/trans_class.go b/dtmsvr/trans_class.go index d8ea01b..faf3125 100644 --- a/dtmsvr/trans_class.go +++ b/dtmsvr/trans_class.go @@ -12,6 +12,7 @@ import ( "github.com/gin-gonic/gin" "github.com/yedf/dtm/dtmcli" "github.com/yedf/dtm/dtmcli/dtmimp" + "github.com/yedf/dtm/dtmcli/logger" "github.com/yedf/dtm/dtmgrpc/dtmgpb" "github.com/yedf/dtm/dtmsvr/storage" ) @@ -57,7 +58,7 @@ func TransFromContext(c *gin.Context) *TransGlobal { e2p(err) m := TransGlobal{} dtmimp.MustUnmarshal(b, &m) - dtmimp.Logf("creating trans in prepare") + logger.Debugf("creating trans in prepare") // Payloads will be store in BinPayloads, Payloads is only used to Unmarshal for _, p := range m.Payloads { m.BinPayloads = append(m.BinPayloads, []byte(p)) diff --git a/dtmsvr/trans_process.go b/dtmsvr/trans_process.go index 38a68b9..a9ef2f9 100644 --- a/dtmsvr/trans_process.go +++ b/dtmsvr/trans_process.go @@ -12,6 +12,7 @@ import ( "github.com/yedf/dtm/common" "github.com/yedf/dtm/dtmcli" "github.com/yedf/dtm/dtmcli/dtmimp" + "github.com/yedf/dtm/dtmcli/logger" ) // Process process global transaction once @@ -45,15 +46,15 @@ func (t *TransGlobal) processInner() (rerr error) { defer handlePanic(&rerr) defer func() { if rerr != nil { - dtmimp.LogRedf("processInner got error: %s", rerr.Error()) + logger.Errorf("processInner got error: %s", rerr.Error()) } if TransProcessedTestChan != nil { - dtmimp.Logf("processed: %s", t.Gid) + logger.Debugf("processed: %s", t.Gid) TransProcessedTestChan <- t.Gid - dtmimp.Logf("notified: %s", t.Gid) + logger.Debugf("notified: %s", t.Gid) } }() - dtmimp.Logf("processing: %s status: %s", t.Gid, t.Status) + logger.Debugf("processing: %s status: %s", t.Gid, t.Status) branches := GetStore().FindBranches(t.Gid) t.lastTouched = time.Now() rerr = t.getProcessor().ProcessOnce(branches) diff --git a/dtmsvr/trans_type_msg.go b/dtmsvr/trans_type_msg.go index f6bc9b1..63a62b6 100644 --- a/dtmsvr/trans_type_msg.go +++ b/dtmsvr/trans_type_msg.go @@ -11,7 +11,7 @@ import ( "strings" "github.com/yedf/dtm/dtmcli" - "github.com/yedf/dtm/dtmcli/dtmimp" + "github.com/yedf/dtm/dtmcli/logger" ) type transMsgProcessor struct { @@ -50,7 +50,7 @@ func (t *TransGlobal) mayQueryPrepared() { } else if strings.Contains(body, dtmcli.ResultOngoing) { t.touchCronTime(cronReset) } else { - dtmimp.LogRedf("getting result failed for %s. error: %s", t.QueryPrepared, err.Error()) + logger.Errorf("getting result failed for %s. error: %s", t.QueryPrepared, err.Error()) t.touchCronTime(cronBackoff) } } diff --git a/dtmsvr/trans_type_saga.go b/dtmsvr/trans_type_saga.go index dad3d6f..bdeea9b 100644 --- a/dtmsvr/trans_type_saga.go +++ b/dtmsvr/trans_type_saga.go @@ -12,6 +12,7 @@ import ( "github.com/yedf/dtm/dtmcli" "github.com/yedf/dtm/dtmcli/dtmimp" + "github.com/yedf/dtm/dtmcli/logger" ) type transSagaProcessor struct { @@ -54,7 +55,7 @@ type branchResult struct { func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error { // when saga tasks is fetched, it always need to process - dtmimp.Logf("status: %s timeout: %t", t.Status, t.isTimeout()) + logger.Debugf("status: %s timeout: %t", t.Status, t.isTimeout()) if t.Status == dtmcli.StatusSubmitted && t.isTimeout() { t.changeStatus(dtmcli.StatusAborting) } @@ -104,7 +105,7 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error { } resultChan <- branchResult{index: i, status: branches[i].Status, op: branches[i].Op} if err != nil { - dtmimp.LogRedf("exec branch error: %v", err) + logger.Errorf("exec branch error: %v", err) } }() err = t.execBranch(&branches[i], i) @@ -117,7 +118,7 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error { toRun = append(toRun, current) } } - dtmimp.Logf("toRun picked for action is: %v", toRun) + logger.Debugf("toRun picked for action is: %v", toRun) return toRun } runBranches := func(toRun []int) { @@ -159,9 +160,9 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error { rsCSucceed++ } } - dtmimp.Logf("branch done: %v", r) + logger.Debugf("branch done: %v", r) case <-time.After(time.Duration(time.Second * 3)): - dtmimp.Logf("wait once for done") + logger.Debugf("wait once for done") } } diff --git a/dtmsvr/trans_type_tcc.go b/dtmsvr/trans_type_tcc.go index 88145d1..2c32ab6 100644 --- a/dtmsvr/trans_type_tcc.go +++ b/dtmsvr/trans_type_tcc.go @@ -9,6 +9,7 @@ package dtmsvr import ( "github.com/yedf/dtm/dtmcli" "github.com/yedf/dtm/dtmcli/dtmimp" + "github.com/yedf/dtm/dtmcli/logger" ) type transTccProcessor struct { @@ -33,7 +34,7 @@ func (t *transTccProcessor) ProcessOnce(branches []TransBranch) error { op := dtmimp.If(t.Status == dtmcli.StatusSubmitted, dtmcli.BranchConfirm, dtmcli.BranchCancel).(string) for current := len(branches) - 1; current >= 0; current-- { if branches[current].Op == op && branches[current].Status == dtmcli.StatusPrepared { - dtmimp.Logf("branch info: current: %d ID: %d", current, branches[current].ID) + logger.Debugf("branch info: current: %d ID: %d", current, branches[current].ID) err := t.execBranch(&branches[current], current) if err != nil { return err diff --git a/examples/base_grpc.go b/examples/base_grpc.go index 6be8aef..07eb749 100644 --- a/examples/base_grpc.go +++ b/examples/base_grpc.go @@ -47,14 +47,14 @@ func GrpcStartup() { conn, err := grpc.Dial(DtmGrpcServer, grpc.WithInsecure(), grpc.WithUnaryInterceptor(dtmgimp.GrpcClientLog)) logger.FatalIfError(err) DtmClient = dtmgpb.NewDtmClient(conn) - dtmimp.Logf("dtm client inited") + logger.Debugf("dtm client inited") lis, err := net.Listen("tcp", fmt.Sprintf(":%d", BusiGrpcPort)) logger.FatalIfError(err) s := grpc.NewServer(grpc.UnaryInterceptor(dtmgimp.GrpcServerLog)) RegisterBusiServer(s, &busiServer{}) go func() { - dtmimp.Logf("busi grpc listening at %v", lis.Addr()) + logger.Debugf("busi grpc listening at %v", lis.Addr()) err := s.Serve(lis) logger.FatalIfError(err) }() @@ -63,7 +63,7 @@ func GrpcStartup() { func handleGrpcBusiness(in *BusiReq, result1 string, result2 string, busi string) error { res := dtmimp.OrString(result1, result2, dtmcli.ResultSuccess) - dtmimp.Logf("grpc busi %s %v %s %s result: %s", busi, in, result1, result2, res) + logger.Debugf("grpc busi %s %v %s %s result: %s", busi, in, result1, result2, res) if res == dtmcli.ResultSuccess { return nil } else if res == dtmcli.ResultFailure { diff --git a/examples/base_http.go b/examples/base_http.go index 8aa7f37..401d1f3 100644 --- a/examples/base_http.go +++ b/examples/base_http.go @@ -17,6 +17,7 @@ import ( "github.com/yedf/dtm/common" "github.com/yedf/dtm/dtmcli" "github.com/yedf/dtm/dtmcli/dtmimp" + "github.com/yedf/dtm/dtmcli/logger" "gorm.io/driver/mysql" "gorm.io/driver/postgres" "gorm.io/gorm" @@ -40,7 +41,7 @@ var Busi string = fmt.Sprintf("http://localhost:%d%s", BusiPort, BusiAPI) // BaseAppStartup base app startup func BaseAppStartup() *gin.Engine { - dtmimp.Logf("examples starting") + logger.Debugf("examples starting") app := common.GetGinApp() app.Use(func(c *gin.Context) { v := MainSwitch.NextResult.Fetch() @@ -54,10 +55,10 @@ func BaseAppStartup() *gin.Engine { BaseAddRoute(app) for k, v := range setupFuncs { - dtmimp.Logf("initing %s", k) + logger.Debugf("initing %s", k) v(app) } - dtmimp.Logf("Starting busi at: %d", BusiPort) + logger.Debugf("Starting busi at: %d", BusiPort) go app.Run(fmt.Sprintf(":%d", BusiPort)) time.Sleep(100 * time.Millisecond) @@ -98,7 +99,7 @@ var MainSwitch mainSwitchType func handleGeneralBusiness(c *gin.Context, result1 string, result2 string, busi string) (interface{}, error) { info := infoFromContext(c) res := dtmimp.OrString(result1, result2, dtmcli.ResultSuccess) - dtmimp.Logf("%s %s result: %s", busi, info.String(), res) + logger.Debugf("%s %s result: %s", busi, info.String(), res) if res == "ERROR" { return nil, errors.New("ERROR from user") } @@ -136,7 +137,7 @@ func BaseAddRoute(app *gin.Engine) { return handleGeneralBusiness(c, MainSwitch.TransOutRevertResult.Fetch(), "", "TransOutRevert") })) app.GET(BusiAPI+"/CanSubmit", common.WrapHandler(func(c *gin.Context) (interface{}, error) { - dtmimp.Logf("%s CanSubmit", c.Query("gid")) + logger.Debugf("%s CanSubmit", c.Query("gid")) return dtmimp.OrString(MainSwitch.CanSubmitResult.Fetch(), dtmcli.ResultSuccess), nil })) app.POST(BusiAPI+"/TransInXa", common.WrapHandler(func(c *gin.Context) (interface{}, error) { diff --git a/examples/grpc_tcc.go b/examples/grpc_tcc.go index 6bf8cf1..5a6c4fd 100644 --- a/examples/grpc_tcc.go +++ b/examples/grpc_tcc.go @@ -7,7 +7,6 @@ package examples import ( - "github.com/yedf/dtm/dtmcli/dtmimp" "github.com/yedf/dtm/dtmcli/logger" dtmgrpc "github.com/yedf/dtm/dtmgrpc" emptypb "google.golang.org/protobuf/types/known/emptypb" @@ -15,7 +14,7 @@ import ( func init() { addSample("grpc_tcc", func() string { - dtmimp.Logf("tcc simple transaction begin") + logger.Debugf("tcc simple transaction begin") gid := dtmgrpc.MustGenGid(DtmGrpcServer) err := dtmgrpc.TccGlobalTransaction(DtmGrpcServer, gid, func(tcc *dtmgrpc.TccGrpc) error { data := &BusiReq{Amount: 30} diff --git a/examples/http_msg.go b/examples/http_msg.go index 59760df..3f40805 100644 --- a/examples/http_msg.go +++ b/examples/http_msg.go @@ -8,20 +8,19 @@ package examples import ( "github.com/yedf/dtm/dtmcli" - "github.com/yedf/dtm/dtmcli/dtmimp" "github.com/yedf/dtm/dtmcli/logger" ) func init() { addSample("msg", func() string { - dtmimp.Logf("a busi transaction begin") + logger.Debugf("a busi transaction begin") req := &TransReq{Amount: 30} msg := dtmcli.NewMsg(DtmHttpServer, dtmcli.MustGenGid(DtmHttpServer)). Add(Busi+"/TransOut", req). Add(Busi+"/TransIn", req) err := msg.Prepare(Busi + "/query") logger.FatalIfError(err) - dtmimp.Logf("busi trans submit") + logger.Debugf("busi trans submit") err = msg.Submit() logger.FatalIfError(err) return msg.Gid diff --git a/examples/http_saga.go b/examples/http_saga.go index 208d06d..d7dd59b 100644 --- a/examples/http_saga.go +++ b/examples/http_saga.go @@ -8,37 +8,36 @@ package examples import ( "github.com/yedf/dtm/dtmcli" - "github.com/yedf/dtm/dtmcli/dtmimp" "github.com/yedf/dtm/dtmcli/logger" ) func init() { addSample("saga", func() string { - dtmimp.Logf("a saga busi transaction begin") + logger.Debugf("a saga busi transaction begin") req := &TransReq{Amount: 30} saga := dtmcli.NewSaga(DtmHttpServer, dtmcli.MustGenGid(DtmHttpServer)). Add(Busi+"/TransOut", Busi+"/TransOutRevert", req). Add(Busi+"/TransIn", Busi+"/TransInRevert", req) - dtmimp.Logf("saga busi trans submit") + logger.Debugf("saga busi trans submit") err := saga.Submit() - dtmimp.Logf("result gid is: %s", saga.Gid) + logger.Debugf("result gid is: %s", saga.Gid) logger.FatalIfError(err) return saga.Gid }) addSample("saga_wait", func() string { - dtmimp.Logf("a saga busi transaction begin") + logger.Debugf("a saga busi transaction begin") req := &TransReq{Amount: 30} saga := dtmcli.NewSaga(DtmHttpServer, dtmcli.MustGenGid(DtmHttpServer)). Add(Busi+"/TransOut", Busi+"/TransOutRevert", req). Add(Busi+"/TransIn", Busi+"/TransInRevert", req) saga.SetOptions(&dtmcli.TransOptions{WaitResult: true}) err := saga.Submit() - dtmimp.Logf("result gid is: %s", saga.Gid) + logger.Debugf("result gid is: %s", saga.Gid) logger.FatalIfError(err) return saga.Gid }) addSample("concurrent_saga", func() string { - dtmimp.Logf("a concurrent saga busi transaction begin") + logger.Debugf("a concurrent saga busi transaction begin") req := &TransReq{Amount: 30} csaga := dtmcli.NewSaga(DtmHttpServer, dtmcli.MustGenGid(DtmHttpServer)). Add(Busi+"/TransOut", Busi+"/TransOutRevert", req). @@ -48,9 +47,9 @@ func init() { EnableConcurrent(). AddBranchOrder(2, []int{0, 1}). AddBranchOrder(3, []int{0, 1}) - dtmimp.Logf("concurrent saga busi trans submit") + logger.Debugf("concurrent saga busi trans submit") err := csaga.Submit() - dtmimp.Logf("result gid is: %s", csaga.Gid) + logger.Debugf("result gid is: %s", csaga.Gid) logger.FatalIfError(err) return csaga.Gid }) diff --git a/examples/http_saga_barrier.go b/examples/http_saga_barrier.go index 1b53bbe..c633c58 100644 --- a/examples/http_saga_barrier.go +++ b/examples/http_saga_barrier.go @@ -24,12 +24,12 @@ func init() { app.POST(BusiAPI+"/SagaBTransOutCompensate", common.WrapHandler(sagaBarrierTransOutCompensate)) } addSample("saga_barrier", func() string { - dtmimp.Logf("a busi transaction begin") + logger.Debugf("a busi transaction begin") req := &TransReq{Amount: 30} saga := dtmcli.NewSaga(DtmHttpServer, dtmcli.MustGenGid(DtmHttpServer)). Add(Busi+"/SagaBTransOut", Busi+"/SagaBTransOutCompensate", req). Add(Busi+"/SagaBTransIn", Busi+"/SagaBTransInCompensate", req) - dtmimp.Logf("busi trans submit") + logger.Debugf("busi trans submit") err := saga.Submit() logger.FatalIfError(err) return saga.Gid diff --git a/examples/http_saga_gorm_barrier.go b/examples/http_saga_gorm_barrier.go index 7b1e65f..9499506 100644 --- a/examples/http_saga_gorm_barrier.go +++ b/examples/http_saga_gorm_barrier.go @@ -12,7 +12,6 @@ import ( "github.com/gin-gonic/gin" "github.com/yedf/dtm/common" "github.com/yedf/dtm/dtmcli" - "github.com/yedf/dtm/dtmcli/dtmimp" "github.com/yedf/dtm/dtmcli/logger" ) @@ -21,12 +20,12 @@ func init() { app.POST(BusiAPI+"/SagaBTransOutGorm", common.WrapHandler(sagaGormBarrierTransOut)) } addSample("saga_gorm_barrier", func() string { - dtmimp.Logf("a busi transaction begin") + logger.Debugf("a busi transaction begin") req := &TransReq{Amount: 30} saga := dtmcli.NewSaga(DtmHttpServer, dtmcli.MustGenGid(DtmHttpServer)). Add(Busi+"/SagaBTransOutGorm", Busi+"/SagaBTransOutCompensate", req). Add(Busi+"/SagaBTransIn", Busi+"/SagaBTransInCompensate", req) - dtmimp.Logf("busi trans submit") + logger.Debugf("busi trans submit") err := saga.Submit() logger.FatalIfError(err) return saga.Gid diff --git a/examples/http_tcc.go b/examples/http_tcc.go index 1692d3c..76dff2f 100644 --- a/examples/http_tcc.go +++ b/examples/http_tcc.go @@ -11,7 +11,6 @@ import ( "github.com/go-resty/resty/v2" "github.com/yedf/dtm/common" "github.com/yedf/dtm/dtmcli" - "github.com/yedf/dtm/dtmcli/dtmimp" "github.com/yedf/dtm/dtmcli/logger" ) @@ -20,7 +19,7 @@ func init() { app.POST(BusiAPI+"/TransInTccParent", common.WrapHandler(func(c *gin.Context) (interface{}, error) { tcc, err := dtmcli.TccFromQuery(c.Request.URL.Query()) logger.FatalIfError(err) - dtmimp.Logf("TransInTccParent ") + logger.Debugf("TransInTccParent ") return tcc.CallBranch(&TransReq{Amount: reqFrom(c).Amount}, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert") })) } @@ -37,7 +36,7 @@ func init() { return gid }) addSample("tcc", func() string { - dtmimp.Logf("tcc simple transaction begin") + logger.Debugf("tcc simple transaction begin") gid := dtmcli.MustGenGid(DtmHttpServer) err := dtmcli.TccGlobalTransaction(DtmHttpServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) { resp, err := tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert") diff --git a/examples/http_tcc_barrier.go b/examples/http_tcc_barrier.go index ef52111..56a0467 100644 --- a/examples/http_tcc_barrier.go +++ b/examples/http_tcc_barrier.go @@ -28,7 +28,7 @@ func init() { app.POST(BusiAPI+"/TccBTransOutCancel", common.WrapHandler(TccBarrierTransOutCancel)) } addSample("tcc_barrier", func() string { - dtmimp.Logf("tcc transaction begin") + logger.Debugf("tcc transaction begin") gid := dtmcli.MustGenGid(DtmHttpServer) err := dtmcli.TccGlobalTransaction(DtmHttpServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) { resp, err := tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TccBTransOutTry", diff --git a/examples/quick_start.go b/examples/quick_start.go index 147d1dd..47329ed 100644 --- a/examples/quick_start.go +++ b/examples/quick_start.go @@ -29,7 +29,7 @@ var qsBusi = fmt.Sprintf("http://localhost:%d%s", qsBusiPort, qsBusiAPI) func QsStartSvr() { app := common.GetGinApp() qsAddRoute(app) - dtmimp.Logf("quick qs examples listening at %d", qsBusiPort) + logger.Debugf("quick qs examples listening at %d", qsBusiPort) go app.Run(fmt.Sprintf(":%d", qsBusiPort)) time.Sleep(100 * time.Millisecond) } diff --git a/test/base_test.go b/test/base_test.go index 7dbc192..deffa8d 100644 --- a/test/base_test.go +++ b/test/base_test.go @@ -15,6 +15,7 @@ import ( "github.com/yedf/dtm/common" "github.com/yedf/dtm/dtmcli" "github.com/yedf/dtm/dtmcli/dtmimp" + "github.com/yedf/dtm/dtmcli/logger" "github.com/yedf/dtm/examples" ) @@ -40,7 +41,7 @@ func TestBaseSqlDB(t *testing.T) { tx, err := db.ToSQLDB().Begin() asserts.Nil(err) err = barrier.Call(tx, func(tx *sql.Tx) error { - dtmimp.Logf("rollback gid2") + logger.Debugf("rollback gid2") return fmt.Errorf("gid2 error") }) asserts.Error(err, fmt.Errorf("gid2 error")) @@ -50,7 +51,7 @@ func TestBaseSqlDB(t *testing.T) { asserts.Equal(dbr.RowsAffected, int64(0)) barrier.BarrierID = 0 err = barrier.CallWithDB(db.ToSQLDB(), func(tx *sql.Tx) error { - dtmimp.Logf("submit gid2") + logger.Debugf("submit gid2") return nil }) asserts.Nil(err) diff --git a/test/tcc_barrier_test.go b/test/tcc_barrier_test.go index a1b3c88..238a70d 100644 --- a/test/tcc_barrier_test.go +++ b/test/tcc_barrier_test.go @@ -19,6 +19,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/yedf/dtm/dtmcli" "github.com/yedf/dtm/dtmcli/dtmimp" + "github.com/yedf/dtm/dtmcli/logger" "github.com/yedf/dtm/examples" ) @@ -68,7 +69,7 @@ func TestTccBarrierDisorder(t *testing.T) { res, err := examples.TccBarrierTransOutCancel(c) if !sleeped { sleeped = true - dtmimp.Logf("sleep before cancel return") + logger.Debugf("sleep before cancel return") <-timeoutChan finishedChan <- "1" } @@ -89,7 +90,7 @@ func TestTccBarrierDisorder(t *testing.T) { assert.Contains(t, resp.String(), dtmcli.ResultSuccess) go func() { - dtmimp.Logf("sleeping to wait for tcc try timeout") + logger.Debugf("sleeping to wait for tcc try timeout") <-timeoutChan r, _ := dtmimp.RestyClient.R(). SetBody(body). @@ -104,10 +105,10 @@ func TestTccBarrierDisorder(t *testing.T) { assert.True(t, strings.Contains(r.String(), dtmcli.ResultSuccess)) // 这个是悬挂操作,为了简单起见,依旧让他返回成功 finishedChan <- "1" }() - dtmimp.Logf("cron to timeout and then call cancel") + logger.Debugf("cron to timeout and then call cancel") go cronTransOnceForwardNow(300) time.Sleep(100 * time.Millisecond) - dtmimp.Logf("cron to timeout and then call cancelled twice") + logger.Debugf("cron to timeout and then call cancelled twice") cronTransOnceForwardNow(300) timeoutChan <- "wake" timeoutChan <- "wake" diff --git a/test/tcc_grpc_test.go b/test/tcc_grpc_test.go index ff4b2df..16ba193 100644 --- a/test/tcc_grpc_test.go +++ b/test/tcc_grpc_test.go @@ -13,6 +13,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/yedf/dtm/dtmcli" "github.com/yedf/dtm/dtmcli/dtmimp" + "github.com/yedf/dtm/dtmcli/logger" "github.com/yedf/dtm/dtmgrpc" "github.com/yedf/dtm/examples" "google.golang.org/protobuf/types/known/emptypb" @@ -70,7 +71,7 @@ func TestTccGrpcNested(t *testing.T) { func TestTccGrpcType(t *testing.T) { _, err := dtmgrpc.TccFromGrpc(context.Background()) assert.Error(t, err) - dtmimp.Logf("expecting dtmgrpcserver error") + logger.Debugf("expecting dtmgrpcserver error") err = dtmgrpc.TccGlobalTransaction("-", "", func(tcc *dtmgrpc.TccGrpc) error { return nil }) assert.Error(t, err) } diff --git a/test/types.go b/test/types.go index ef1fd40..bef96cc 100644 --- a/test/types.go +++ b/test/types.go @@ -24,14 +24,14 @@ func dbGet() *common.DB { // waitTransProcessed only for test usage. wait for transaction processed once func waitTransProcessed(gid string) { - dtmimp.Logf("waiting for gid %s", gid) + logger.Debugf("waiting for gid %s", gid) select { case id := <-dtmsvr.TransProcessedTestChan: for id != gid { - dtmimp.LogRedf("-------id %s not match gid %s", id, gid) + logger.Errorf("-------id %s not match gid %s", id, gid) id = <-dtmsvr.TransProcessedTestChan } - dtmimp.Logf("finish for gid %s", gid) + logger.Debugf("finish for gid %s", gid) case <-time.After(time.Duration(time.Second * 3)): logger.FatalfIf(true, "Wait Trans timeout") }