Browse Source

Logf changed to Debugf

pull/123/head
yedf2 4 years ago
parent
commit
6fed8dae6b
  1. 3
      app/main.go
  2. 6
      bench/http.go
  3. 4
      bench/main.go
  4. 3
      common/config.go
  5. 2
      common/config_utils.go
  6. 7
      common/db.go
  7. 4
      common/types.go
  8. 8
      common/utils.go
  9. 3
      dtmcli/barrier.go
  10. 5
      dtmgrpc/dtmgimp/grpc_clients.go
  11. 15
      dtmgrpc/dtmgimp/types.go
  12. 3
      dtmgrpc/dtmgimp/utils.go
  13. 5
      dtmsvr/cron.go
  14. 13
      dtmsvr/storage/boltdb/boltdb.go
  15. 5
      dtmsvr/storage/redis/redis.go
  16. 11
      dtmsvr/svr.go
  17. 3
      dtmsvr/trans_class.go
  18. 9
      dtmsvr/trans_process.go
  19. 4
      dtmsvr/trans_type_msg.go
  20. 11
      dtmsvr/trans_type_saga.go
  21. 3
      dtmsvr/trans_type_tcc.go
  22. 6
      examples/base_grpc.go
  23. 11
      examples/base_http.go
  24. 3
      examples/grpc_tcc.go
  25. 5
      examples/http_msg.go
  26. 17
      examples/http_saga.go
  27. 4
      examples/http_saga_barrier.go
  28. 5
      examples/http_saga_gorm_barrier.go
  29. 5
      examples/http_tcc.go
  30. 2
      examples/http_tcc_barrier.go
  31. 2
      examples/quick_start.go
  32. 5
      test/base_test.go
  33. 9
      test/tcc_barrier_test.go
  34. 3
      test/tcc_grpc_test.go
  35. 6
      test/types.go

3
app/main.go

@ -15,7 +15,6 @@ import (
"github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
"github.com/yedf/dtm/dtmsvr"
"github.com/yedf/dtm/dtmsvr/storage/registry"
@ -51,7 +50,7 @@ func main() {
fmt.Printf("version: %s commit: %s built at: %s\n", Version, Commit, Date)
return
}
dtmimp.Logf("starting dtm....")
logger.Debugf("starting dtm....")
common.MustLoadConfig()
if common.Config.ExamplesDB.Driver != "" {
dtmcli.SetCurrentDBType(common.Config.ExamplesDB.Driver)

6
bench/http.go

@ -60,7 +60,7 @@ func reloadData() {
}
_, err := dtmimp.DBExec(db, s+strings.Join(ss, ","))
logger.FatalIfError(err)
dtmimp.Logf("%d users inserted. used: %dms", total, time.Since(began).Milliseconds())
logger.Debugf("%d users inserted. used: %dms", total, time.Since(began).Milliseconds())
}
var uidCounter int32 = 0
@ -71,7 +71,7 @@ var sqls int = 1
func StartSvr() {
app := common.GetGinApp()
benchAddRoute(app)
dtmimp.Logf("bench listening at %d", benchPort)
logger.Debugf("bench listening at %d", benchPort)
go app.Run(fmt.Sprintf(":%d", benchPort))
db := sdbGet()
_, err := dtmimp.DBExec(db, "drop table if exists dtm_busi.user_account_log")
@ -151,7 +151,7 @@ func benchAddRoute(app *gin.Engine) {
req := gin.H{}
params := fmt.Sprintf("?uid=%s", suid)
params2 := fmt.Sprintf("?uid=%s", suid2)
dtmimp.Logf("mode: %s contains dtm: %t", mode, strings.Contains(mode, "dtm"))
logger.Debugf("mode: %s contains dtm: %t", mode, strings.Contains(mode, "dtm"))
if strings.Contains(mode, "dtm") {
saga := dtmcli.NewSaga(examples.DtmHttpServer, fmt.Sprintf("bench-%d", uid)).
Add(benchBusi+"/TransOut"+params, benchBusi+"/TransOutCompensate"+params, req).

4
bench/main.go

@ -6,7 +6,7 @@ import (
"github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
"github.com/yedf/dtm/dtmsvr"
"github.com/yedf/dtm/dtmsvr/storage/registry"
"github.com/yedf/dtm/examples"
@ -23,7 +23,7 @@ func main() {
fmt.Printf(hint)
return
}
dtmimp.Logf("starting dtm....")
logger.Debugf("starting dtm....")
if os.Args[1] == "http" {
fmt.Println("start bench server")
common.MustLoadConfig()

3
common/config.go

@ -7,7 +7,6 @@ import (
"path/filepath"
"github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
"gopkg.in/yaml.v2"
)
@ -87,7 +86,7 @@ func MustLoadConfig() {
}
scont, err := json.MarshalIndent(&Config, "", " ")
logger.FatalIfError(err)
dtmimp.Logf("config is: \n%s", scont)
logger.Debugf("config is: \n%s", scont)
err = checkConfig()
logger.FatalfIf(err != nil, `config error: '%v'.
check you env, and conf.yml/conf.sample.yml in current and parent path: %s.

2
common/config_utils.go

@ -50,6 +50,6 @@ func toUnderscoreUpper(key string) string {
key = strings.Trim(key, "_")
matchFirstCap := regexp.MustCompile("([a-z])([A-Z]+)")
s2 := matchFirstCap.ReplaceAllString(key, "${1}_${2}")
// dtmimp.Logf("loading from env: %s", strings.ToUpper(s2))
// logger.Debugf("loading from env: %s", strings.ToUpper(s2))
return strings.ToUpper(s2)
}

7
common/db.go

@ -11,6 +11,7 @@ import (
_ "github.com/lib/pq" // register postgres driver
"github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
"gorm.io/driver/mysql"
"gorm.io/driver/postgres"
"gorm.io/gorm"
@ -65,7 +66,7 @@ func (op *tracePlugin) Initialize(db *gorm.DB) (err error) {
after := func(db *gorm.DB) {
_ts, _ := db.InstanceGet("ivy.startTime")
sql := db.Dialector.Explain(db.Statement.SQL.String(), db.Statement.Vars...)
dtmimp.Logf("used: %d ms affected: %d sql is: %s", time.Since(_ts.(time.Time)).Milliseconds(), db.RowsAffected, sql)
logger.Debugf("used: %d ms affected: %d sql is: %s", time.Since(_ts.(time.Time)).Milliseconds(), db.RowsAffected, sql)
if v, ok := db.InstanceGet("ivy.must"); ok && v.(bool) {
if db.Error != nil && db.Error != gorm.ErrRecordNotFound {
panic(db.Error)
@ -76,7 +77,7 @@ func (op *tracePlugin) Initialize(db *gorm.DB) (err error) {
beforeName := "cb_before"
afterName := "cb_after"
dtmimp.Logf("installing db plugin: %s", op.Name())
logger.Debugf("installing db plugin: %s", op.Name())
// 开始前
_ = db.Callback().Create().Before("gorm:before_create").Register(beforeName, before)
_ = db.Callback().Query().Before("gorm:query").Register(beforeName, before)
@ -108,7 +109,7 @@ func DbGet(conf dtmcli.DBConf) *DB {
dsn := dtmimp.GetDsn(conf)
db, ok := dbs.Load(dsn)
if !ok {
dtmimp.Logf("connecting %s", strings.Replace(dsn, conf.Passwrod, "****", 1))
logger.Debugf("connecting %s", strings.Replace(dsn, conf.Passwrod, "****", 1))
db1, err := gorm.Open(getGormDialetor(conf.Driver, dsn), &gorm.Config{
SkipDefaultTransaction: true,
})

4
common/types.go

@ -11,7 +11,7 @@ import (
"sync"
"github.com/go-redis/redis/v8"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
)
var rdb *redis.Client
@ -19,7 +19,7 @@ var once sync.Once
func RedisGet() *redis.Client {
once.Do(func() {
dtmimp.Logf("connecting to redis: %v", Config.Store)
logger.Debugf("connecting to redis: %v", Config.Store)
rdb = redis.NewClient(&redis.Options{
Addr: fmt.Sprintf("%s:%d", Config.Store.Host, Config.Store.Port),
Username: Config.Store.User,

8
common/utils.go

@ -38,9 +38,9 @@ func GetGinApp() *gin.Engine {
}
}
began := time.Now()
dtmimp.Logf("begin %s %s query: %s body: %s", c.Request.Method, c.FullPath(), c.Request.URL.RawQuery, body)
logger.Debugf("begin %s %s query: %s body: %s", c.Request.Method, c.FullPath(), c.Request.URL.RawQuery, body)
c.Next()
dtmimp.Logf("used %d ms %s %s query: %s body: %s", time.Since(began).Milliseconds(), c.Request.Method, c.FullPath(), c.Request.URL.RawQuery, body)
logger.Debugf("used %d ms %s %s query: %s body: %s", time.Since(began).Milliseconds(), c.Request.Method, c.FullPath(), c.Request.URL.RawQuery, body)
})
app.Any("/api/ping", func(c *gin.Context) { c.JSON(200, map[string]interface{}{"msg": "pong"}) })
@ -61,10 +61,10 @@ func WrapHandler(fn func(*gin.Context) (interface{}, error)) gin.HandlerFunc {
b, err = json.Marshal(r)
}
if err != nil {
dtmimp.Logf("status: 500, code: 500 message: %s", err.Error())
logger.Debugf("status: 500, code: 500 message: %s", err.Error())
c.JSON(500, map[string]interface{}{"code": 500, "message": err.Error()})
} else {
dtmimp.Logf("status: 200, content: %s", string(b))
logger.Debugf("status: 200, content: %s", string(b))
c.Status(200)
c.Writer.Header().Add("Content-Type", "application/json")
_, err = c.Writer.Write(b)

3
dtmcli/barrier.go

@ -12,6 +12,7 @@ import (
"net/url"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
)
// BarrierBusiFunc type for busi func
@ -82,7 +83,7 @@ func (bb *BranchBarrier) Call(tx *sql.Tx, busiCall BarrierBusiFunc) (rerr error)
originAffected, _ := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, originType, bid, ti.Op)
currentAffected, rerr := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, ti.Op, bid, ti.Op)
dtmimp.Logf("originAffected: %d currentAffected: %d", originAffected, currentAffected)
logger.Debugf("originAffected: %d currentAffected: %d", originAffected, currentAffected)
if (ti.Op == BranchCancel || ti.Op == BranchCompensate) && originAffected > 0 || // 这个是空补偿
currentAffected == 0 { // 这个是重复请求或者悬挂
return

5
dtmgrpc/dtmgimp/grpc_clients.go

@ -11,6 +11,7 @@ import (
"sync"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
"github.com/yedf/dtm/dtmgrpc/dtmgpb"
grpc "google.golang.org/grpc"
)
@ -57,12 +58,12 @@ func GetGrpcConn(grpcServer string, isRaw bool) (conn *grpc.ClientConn, rerr err
if isRaw {
opts = grpc.WithDefaultCallOptions(grpc.ForceCodec(rawCodec{}))
}
dtmimp.Logf("grpc client connecting %s", grpcServer)
logger.Debugf("grpc client connecting %s", grpcServer)
conn, rerr := grpc.Dial(grpcServer, grpc.WithInsecure(), grpc.WithUnaryInterceptor(GrpcClientLog), opts)
if rerr == nil {
clients.Store(grpcServer, conn)
v = conn
dtmimp.Logf("grpc client inited for %s", grpcServer)
logger.Debugf("grpc client inited for %s", grpcServer)
}
}
return v.(*grpc.ClientConn), rerr

15
dtmgrpc/dtmgimp/types.go

@ -12,6 +12,7 @@ import (
"github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@ -19,28 +20,28 @@ import (
// GrpcServerLog 打印grpc服务端的日志
func GrpcServerLog(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
dtmimp.Logf("grpc server handling: %s %v", info.FullMethod, req)
logger.Debugf("grpc server handling: %s %v", info.FullMethod, req)
LogDtmCtx(ctx)
m, err := handler(ctx, req)
res := fmt.Sprintf("grpc server handled: %s %v result: %v err: %v", info.FullMethod, req, m, err)
if err != nil {
dtmimp.LogRedf("%s", res)
logger.Errorf("%s", res)
} else {
dtmimp.Logf("%s", res)
logger.Debugf("%s", res)
}
return m, err
}
// GrpcClientLog 打印grpc服务端的日志
func GrpcClientLog(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
dtmimp.Logf("grpc client calling: %s%s %v", cc.Target(), method, req)
logger.Debugf("grpc client calling: %s%s %v", cc.Target(), method, req)
LogDtmCtx(ctx)
err := invoker(ctx, method, req, reply, cc, opts...)
res := fmt.Sprintf("grpc client called: %s%s %v result: %v err: %v", cc.Target(), method, req, reply, err)
if err != nil {
dtmimp.LogRedf("%s", res)
logger.Errorf("%s", res)
} else {
dtmimp.Logf("%s", res)
logger.Debugf("%s", res)
}
return err
}
@ -49,7 +50,7 @@ func GrpcClientLog(ctx context.Context, method string, req, reply interface{}, c
func Result2Error(res interface{}, err error) error {
e := dtmimp.CheckResult(res, err)
if e == dtmimp.ErrFailure {
dtmimp.LogRedf("failure: res: %v, err: %v", res, e)
logger.Errorf("failure: res: %v, err: %v", res, e)
return status.New(codes.Aborted, dtmcli.ResultFailure).Err()
} else if e == dtmimp.ErrOngoing {
return status.New(codes.Aborted, dtmcli.ResultOngoing).Err()

3
dtmgrpc/dtmgimp/utils.go

@ -10,6 +10,7 @@ import (
context "context"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
"github.com/yedf/dtm/dtmgrpc/dtmgpb"
"google.golang.org/grpc/metadata"
"google.golang.org/protobuf/proto"
@ -59,7 +60,7 @@ func TransInfo2Ctx(gid, transType, branchID, op, dtm string) context.Context {
func LogDtmCtx(ctx context.Context) {
tb := TransBaseFromGrpc(ctx)
if tb.Gid != "" {
dtmimp.Logf("gid: %s trans_type: %s branch_id: %s op: %s dtm: %s", tb.Gid, tb.TransType, tb.BranchID, tb.Op, tb.Dtm)
logger.Debugf("gid: %s trans_type: %s branch_id: %s op: %s dtm: %s", tb.Gid, tb.TransType, tb.BranchID, tb.Op, tb.Dtm)
}
}

5
dtmsvr/cron.go

@ -13,6 +13,7 @@ import (
"time"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
)
// NowForwardDuration will be set in test, trans may be timeout
@ -54,7 +55,7 @@ func lockOneTrans(expireIn time.Duration) *TransGlobal {
func handlePanic(perr *error) {
if err := recover(); err != nil {
dtmimp.LogRedf("----recovered panic %v\n%s", err, string(debug.Stack()))
logger.Errorf("----recovered panic %v\n%s", err, string(debug.Stack()))
if perr != nil {
*perr = fmt.Errorf("dtm panic: %v", err)
}
@ -64,6 +65,6 @@ func handlePanic(perr *error) {
func sleepCronTime() {
normal := time.Duration((float64(config.TransCronInterval) - rand.Float64()) * float64(time.Second))
interval := dtmimp.If(CronForwardDuration > 0, 1*time.Millisecond, normal).(time.Duration)
dtmimp.Logf("sleeping for %v milli", interval/time.Microsecond)
logger.Debugf("sleeping for %v milli", interval/time.Microsecond)
time.Sleep(interval)
}

13
dtmsvr/storage/boltdb/boltdb.go

@ -11,6 +11,7 @@ import (
"github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
"github.com/yedf/dtm/dtmsvr/storage"
)
@ -100,9 +101,9 @@ func cleanupGlobalWithGids(t *bolt.Tx, gids map[string]struct{}) {
return
}
dtmimp.Logf("Start to cleanup %d gids", len(gids))
logger.Debugf("Start to cleanup %d gids", len(gids))
for gid := range gids {
dtmimp.Logf("Start to delete gid: %s", gid)
logger.Debugf("Start to delete gid: %s", gid)
bucket.Delete([]byte(gid))
}
}
@ -129,9 +130,9 @@ func cleanupBranchWithGids(t *bolt.Tx, gids map[string]struct{}) {
}
}
dtmimp.Logf("Start to cleanup %d branches", len(branchKeys))
logger.Debugf("Start to cleanup %d branches", len(branchKeys))
for _, key := range branchKeys {
dtmimp.Logf("Start to delete branch: %s", key)
logger.Debugf("Start to delete branch: %s", key)
bucket.Delete([]byte(key))
}
}
@ -155,9 +156,9 @@ func cleanupIndexWithGids(t *bolt.Tx, gids map[string]struct{}) {
}
}
dtmimp.Logf("Start to cleanup %d indexes", len(indexKeys))
logger.Debugf("Start to cleanup %d indexes", len(indexKeys))
for _, key := range indexKeys {
dtmimp.Logf("Start to delete index: %s", key)
logger.Debugf("Start to delete index: %s", key)
bucket.Delete([]byte(key))
}
}

5
dtmsvr/storage/redis/redis.go

@ -10,6 +10,7 @@ import (
"github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
"github.com/yedf/dtm/dtmsvr/storage"
)
@ -114,7 +115,7 @@ func (a *argList) AppendBranches(branches []storage.TransBranchStore) *argList {
}
func handleRedisResult(ret interface{}, err error) (string, error) {
dtmimp.Logf("result is: '%v', err: '%v'", ret, err)
logger.Debugf("result is: '%v', err: '%v'", ret, err)
if err != nil && err != redis.Nil {
return "", err
}
@ -127,7 +128,7 @@ func handleRedisResult(ret interface{}, err error) (string, error) {
}
func callLua(a *argList, lua string) (string, error) {
dtmimp.Logf("calling lua. args: %v\nlua:%s", a, lua)
logger.Debugf("calling lua. args: %v\nlua:%s", a, lua)
ret, err := redisGet().Eval(ctx, lua, a.Keys, a.List...).Result()
return handleRedisResult(ret, err)
}

11
dtmsvr/svr.go

@ -13,7 +13,6 @@ import (
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
"github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
"github.com/yedf/dtm/dtmgrpc/dtmgimp"
"github.com/yedf/dtm/dtmgrpc/dtmgpb"
@ -23,11 +22,11 @@ import (
// StartSvr StartSvr
func StartSvr() {
dtmimp.Logf("start dtmsvr")
logger.Debugf("start dtmsvr")
app := common.GetGinApp()
app = httpMetrics(app)
addRoute(app)
dtmimp.Logf("dtmsvr listen at: %d", config.HttpPort)
logger.Debugf("dtmsvr listen at: %d", config.HttpPort)
go app.Run(fmt.Sprintf(":%d", config.HttpPort))
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", config.GrpcPort))
@ -37,7 +36,7 @@ func StartSvr() {
grpc.UnaryServerInterceptor(grpcMetrics), grpc.UnaryServerInterceptor(dtmgimp.GrpcServerLog)),
))
dtmgpb.RegisterDtmServer(s, &dtmServer{})
dtmimp.Logf("grpc listening at %v", lis.Addr())
logger.Debugf("grpc listening at %v", lis.Addr())
go func() {
err := s.Serve(lis)
logger.FatalIfError(err)
@ -80,9 +79,9 @@ func updateBranchAsync() {
for len(updates) > 0 {
dbr := GetStore().UpdateBranchesSql(updates, []string{"status", "finish_time", "update_time"})
dtmimp.Logf("flushed %d branch status to db. affected: %d", len(updates), dbr.RowsAffected)
logger.Debugf("flushed %d branch status to db. affected: %d", len(updates), dbr.RowsAffected)
if dbr.Error != nil {
dtmimp.LogRedf("async update branch status error: %v", dbr.Error)
logger.Errorf("async update branch status error: %v", dbr.Error)
time.Sleep(1 * time.Second)
} else {
updates = []TransBranch{}

3
dtmsvr/trans_class.go

@ -12,6 +12,7 @@ import (
"github.com/gin-gonic/gin"
"github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
"github.com/yedf/dtm/dtmgrpc/dtmgpb"
"github.com/yedf/dtm/dtmsvr/storage"
)
@ -57,7 +58,7 @@ func TransFromContext(c *gin.Context) *TransGlobal {
e2p(err)
m := TransGlobal{}
dtmimp.MustUnmarshal(b, &m)
dtmimp.Logf("creating trans in prepare")
logger.Debugf("creating trans in prepare")
// Payloads will be store in BinPayloads, Payloads is only used to Unmarshal
for _, p := range m.Payloads {
m.BinPayloads = append(m.BinPayloads, []byte(p))

9
dtmsvr/trans_process.go

@ -12,6 +12,7 @@ import (
"github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
)
// Process process global transaction once
@ -45,15 +46,15 @@ func (t *TransGlobal) processInner() (rerr error) {
defer handlePanic(&rerr)
defer func() {
if rerr != nil {
dtmimp.LogRedf("processInner got error: %s", rerr.Error())
logger.Errorf("processInner got error: %s", rerr.Error())
}
if TransProcessedTestChan != nil {
dtmimp.Logf("processed: %s", t.Gid)
logger.Debugf("processed: %s", t.Gid)
TransProcessedTestChan <- t.Gid
dtmimp.Logf("notified: %s", t.Gid)
logger.Debugf("notified: %s", t.Gid)
}
}()
dtmimp.Logf("processing: %s status: %s", t.Gid, t.Status)
logger.Debugf("processing: %s status: %s", t.Gid, t.Status)
branches := GetStore().FindBranches(t.Gid)
t.lastTouched = time.Now()
rerr = t.getProcessor().ProcessOnce(branches)

4
dtmsvr/trans_type_msg.go

@ -11,7 +11,7 @@ import (
"strings"
"github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
)
type transMsgProcessor struct {
@ -50,7 +50,7 @@ func (t *TransGlobal) mayQueryPrepared() {
} else if strings.Contains(body, dtmcli.ResultOngoing) {
t.touchCronTime(cronReset)
} else {
dtmimp.LogRedf("getting result failed for %s. error: %s", t.QueryPrepared, err.Error())
logger.Errorf("getting result failed for %s. error: %s", t.QueryPrepared, err.Error())
t.touchCronTime(cronBackoff)
}
}

11
dtmsvr/trans_type_saga.go

@ -12,6 +12,7 @@ import (
"github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
)
type transSagaProcessor struct {
@ -54,7 +55,7 @@ type branchResult struct {
func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error {
// when saga tasks is fetched, it always need to process
dtmimp.Logf("status: %s timeout: %t", t.Status, t.isTimeout())
logger.Debugf("status: %s timeout: %t", t.Status, t.isTimeout())
if t.Status == dtmcli.StatusSubmitted && t.isTimeout() {
t.changeStatus(dtmcli.StatusAborting)
}
@ -104,7 +105,7 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error {
}
resultChan <- branchResult{index: i, status: branches[i].Status, op: branches[i].Op}
if err != nil {
dtmimp.LogRedf("exec branch error: %v", err)
logger.Errorf("exec branch error: %v", err)
}
}()
err = t.execBranch(&branches[i], i)
@ -117,7 +118,7 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error {
toRun = append(toRun, current)
}
}
dtmimp.Logf("toRun picked for action is: %v", toRun)
logger.Debugf("toRun picked for action is: %v", toRun)
return toRun
}
runBranches := func(toRun []int) {
@ -159,9 +160,9 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error {
rsCSucceed++
}
}
dtmimp.Logf("branch done: %v", r)
logger.Debugf("branch done: %v", r)
case <-time.After(time.Duration(time.Second * 3)):
dtmimp.Logf("wait once for done")
logger.Debugf("wait once for done")
}
}

3
dtmsvr/trans_type_tcc.go

@ -9,6 +9,7 @@ package dtmsvr
import (
"github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
)
type transTccProcessor struct {
@ -33,7 +34,7 @@ func (t *transTccProcessor) ProcessOnce(branches []TransBranch) error {
op := dtmimp.If(t.Status == dtmcli.StatusSubmitted, dtmcli.BranchConfirm, dtmcli.BranchCancel).(string)
for current := len(branches) - 1; current >= 0; current-- {
if branches[current].Op == op && branches[current].Status == dtmcli.StatusPrepared {
dtmimp.Logf("branch info: current: %d ID: %d", current, branches[current].ID)
logger.Debugf("branch info: current: %d ID: %d", current, branches[current].ID)
err := t.execBranch(&branches[current], current)
if err != nil {
return err

6
examples/base_grpc.go

@ -47,14 +47,14 @@ func GrpcStartup() {
conn, err := grpc.Dial(DtmGrpcServer, grpc.WithInsecure(), grpc.WithUnaryInterceptor(dtmgimp.GrpcClientLog))
logger.FatalIfError(err)
DtmClient = dtmgpb.NewDtmClient(conn)
dtmimp.Logf("dtm client inited")
logger.Debugf("dtm client inited")
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", BusiGrpcPort))
logger.FatalIfError(err)
s := grpc.NewServer(grpc.UnaryInterceptor(dtmgimp.GrpcServerLog))
RegisterBusiServer(s, &busiServer{})
go func() {
dtmimp.Logf("busi grpc listening at %v", lis.Addr())
logger.Debugf("busi grpc listening at %v", lis.Addr())
err := s.Serve(lis)
logger.FatalIfError(err)
}()
@ -63,7 +63,7 @@ func GrpcStartup() {
func handleGrpcBusiness(in *BusiReq, result1 string, result2 string, busi string) error {
res := dtmimp.OrString(result1, result2, dtmcli.ResultSuccess)
dtmimp.Logf("grpc busi %s %v %s %s result: %s", busi, in, result1, result2, res)
logger.Debugf("grpc busi %s %v %s %s result: %s", busi, in, result1, result2, res)
if res == dtmcli.ResultSuccess {
return nil
} else if res == dtmcli.ResultFailure {

11
examples/base_http.go

@ -17,6 +17,7 @@ import (
"github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
"gorm.io/driver/mysql"
"gorm.io/driver/postgres"
"gorm.io/gorm"
@ -40,7 +41,7 @@ var Busi string = fmt.Sprintf("http://localhost:%d%s", BusiPort, BusiAPI)
// BaseAppStartup base app startup
func BaseAppStartup() *gin.Engine {
dtmimp.Logf("examples starting")
logger.Debugf("examples starting")
app := common.GetGinApp()
app.Use(func(c *gin.Context) {
v := MainSwitch.NextResult.Fetch()
@ -54,10 +55,10 @@ func BaseAppStartup() *gin.Engine {
BaseAddRoute(app)
for k, v := range setupFuncs {
dtmimp.Logf("initing %s", k)
logger.Debugf("initing %s", k)
v(app)
}
dtmimp.Logf("Starting busi at: %d", BusiPort)
logger.Debugf("Starting busi at: %d", BusiPort)
go app.Run(fmt.Sprintf(":%d", BusiPort))
time.Sleep(100 * time.Millisecond)
@ -98,7 +99,7 @@ var MainSwitch mainSwitchType
func handleGeneralBusiness(c *gin.Context, result1 string, result2 string, busi string) (interface{}, error) {
info := infoFromContext(c)
res := dtmimp.OrString(result1, result2, dtmcli.ResultSuccess)
dtmimp.Logf("%s %s result: %s", busi, info.String(), res)
logger.Debugf("%s %s result: %s", busi, info.String(), res)
if res == "ERROR" {
return nil, errors.New("ERROR from user")
}
@ -136,7 +137,7 @@ func BaseAddRoute(app *gin.Engine) {
return handleGeneralBusiness(c, MainSwitch.TransOutRevertResult.Fetch(), "", "TransOutRevert")
}))
app.GET(BusiAPI+"/CanSubmit", common.WrapHandler(func(c *gin.Context) (interface{}, error) {
dtmimp.Logf("%s CanSubmit", c.Query("gid"))
logger.Debugf("%s CanSubmit", c.Query("gid"))
return dtmimp.OrString(MainSwitch.CanSubmitResult.Fetch(), dtmcli.ResultSuccess), nil
}))
app.POST(BusiAPI+"/TransInXa", common.WrapHandler(func(c *gin.Context) (interface{}, error) {

3
examples/grpc_tcc.go

@ -7,7 +7,6 @@
package examples
import (
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
dtmgrpc "github.com/yedf/dtm/dtmgrpc"
emptypb "google.golang.org/protobuf/types/known/emptypb"
@ -15,7 +14,7 @@ import (
func init() {
addSample("grpc_tcc", func() string {
dtmimp.Logf("tcc simple transaction begin")
logger.Debugf("tcc simple transaction begin")
gid := dtmgrpc.MustGenGid(DtmGrpcServer)
err := dtmgrpc.TccGlobalTransaction(DtmGrpcServer, gid, func(tcc *dtmgrpc.TccGrpc) error {
data := &BusiReq{Amount: 30}

5
examples/http_msg.go

@ -8,20 +8,19 @@ package examples
import (
"github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
)
func init() {
addSample("msg", func() string {
dtmimp.Logf("a busi transaction begin")
logger.Debugf("a busi transaction begin")
req := &TransReq{Amount: 30}
msg := dtmcli.NewMsg(DtmHttpServer, dtmcli.MustGenGid(DtmHttpServer)).
Add(Busi+"/TransOut", req).
Add(Busi+"/TransIn", req)
err := msg.Prepare(Busi + "/query")
logger.FatalIfError(err)
dtmimp.Logf("busi trans submit")
logger.Debugf("busi trans submit")
err = msg.Submit()
logger.FatalIfError(err)
return msg.Gid

17
examples/http_saga.go

@ -8,37 +8,36 @@ package examples
import (
"github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
)
func init() {
addSample("saga", func() string {
dtmimp.Logf("a saga busi transaction begin")
logger.Debugf("a saga busi transaction begin")
req := &TransReq{Amount: 30}
saga := dtmcli.NewSaga(DtmHttpServer, dtmcli.MustGenGid(DtmHttpServer)).
Add(Busi+"/TransOut", Busi+"/TransOutRevert", req).
Add(Busi+"/TransIn", Busi+"/TransInRevert", req)
dtmimp.Logf("saga busi trans submit")
logger.Debugf("saga busi trans submit")
err := saga.Submit()
dtmimp.Logf("result gid is: %s", saga.Gid)
logger.Debugf("result gid is: %s", saga.Gid)
logger.FatalIfError(err)
return saga.Gid
})
addSample("saga_wait", func() string {
dtmimp.Logf("a saga busi transaction begin")
logger.Debugf("a saga busi transaction begin")
req := &TransReq{Amount: 30}
saga := dtmcli.NewSaga(DtmHttpServer, dtmcli.MustGenGid(DtmHttpServer)).
Add(Busi+"/TransOut", Busi+"/TransOutRevert", req).
Add(Busi+"/TransIn", Busi+"/TransInRevert", req)
saga.SetOptions(&dtmcli.TransOptions{WaitResult: true})
err := saga.Submit()
dtmimp.Logf("result gid is: %s", saga.Gid)
logger.Debugf("result gid is: %s", saga.Gid)
logger.FatalIfError(err)
return saga.Gid
})
addSample("concurrent_saga", func() string {
dtmimp.Logf("a concurrent saga busi transaction begin")
logger.Debugf("a concurrent saga busi transaction begin")
req := &TransReq{Amount: 30}
csaga := dtmcli.NewSaga(DtmHttpServer, dtmcli.MustGenGid(DtmHttpServer)).
Add(Busi+"/TransOut", Busi+"/TransOutRevert", req).
@ -48,9 +47,9 @@ func init() {
EnableConcurrent().
AddBranchOrder(2, []int{0, 1}).
AddBranchOrder(3, []int{0, 1})
dtmimp.Logf("concurrent saga busi trans submit")
logger.Debugf("concurrent saga busi trans submit")
err := csaga.Submit()
dtmimp.Logf("result gid is: %s", csaga.Gid)
logger.Debugf("result gid is: %s", csaga.Gid)
logger.FatalIfError(err)
return csaga.Gid
})

4
examples/http_saga_barrier.go

@ -24,12 +24,12 @@ func init() {
app.POST(BusiAPI+"/SagaBTransOutCompensate", common.WrapHandler(sagaBarrierTransOutCompensate))
}
addSample("saga_barrier", func() string {
dtmimp.Logf("a busi transaction begin")
logger.Debugf("a busi transaction begin")
req := &TransReq{Amount: 30}
saga := dtmcli.NewSaga(DtmHttpServer, dtmcli.MustGenGid(DtmHttpServer)).
Add(Busi+"/SagaBTransOut", Busi+"/SagaBTransOutCompensate", req).
Add(Busi+"/SagaBTransIn", Busi+"/SagaBTransInCompensate", req)
dtmimp.Logf("busi trans submit")
logger.Debugf("busi trans submit")
err := saga.Submit()
logger.FatalIfError(err)
return saga.Gid

5
examples/http_saga_gorm_barrier.go

@ -12,7 +12,6 @@ import (
"github.com/gin-gonic/gin"
"github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
)
@ -21,12 +20,12 @@ func init() {
app.POST(BusiAPI+"/SagaBTransOutGorm", common.WrapHandler(sagaGormBarrierTransOut))
}
addSample("saga_gorm_barrier", func() string {
dtmimp.Logf("a busi transaction begin")
logger.Debugf("a busi transaction begin")
req := &TransReq{Amount: 30}
saga := dtmcli.NewSaga(DtmHttpServer, dtmcli.MustGenGid(DtmHttpServer)).
Add(Busi+"/SagaBTransOutGorm", Busi+"/SagaBTransOutCompensate", req).
Add(Busi+"/SagaBTransIn", Busi+"/SagaBTransInCompensate", req)
dtmimp.Logf("busi trans submit")
logger.Debugf("busi trans submit")
err := saga.Submit()
logger.FatalIfError(err)
return saga.Gid

5
examples/http_tcc.go

@ -11,7 +11,6 @@ import (
"github.com/go-resty/resty/v2"
"github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
)
@ -20,7 +19,7 @@ func init() {
app.POST(BusiAPI+"/TransInTccParent", common.WrapHandler(func(c *gin.Context) (interface{}, error) {
tcc, err := dtmcli.TccFromQuery(c.Request.URL.Query())
logger.FatalIfError(err)
dtmimp.Logf("TransInTccParent ")
logger.Debugf("TransInTccParent ")
return tcc.CallBranch(&TransReq{Amount: reqFrom(c).Amount}, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert")
}))
}
@ -37,7 +36,7 @@ func init() {
return gid
})
addSample("tcc", func() string {
dtmimp.Logf("tcc simple transaction begin")
logger.Debugf("tcc simple transaction begin")
gid := dtmcli.MustGenGid(DtmHttpServer)
err := dtmcli.TccGlobalTransaction(DtmHttpServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) {
resp, err := tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert")

2
examples/http_tcc_barrier.go

@ -28,7 +28,7 @@ func init() {
app.POST(BusiAPI+"/TccBTransOutCancel", common.WrapHandler(TccBarrierTransOutCancel))
}
addSample("tcc_barrier", func() string {
dtmimp.Logf("tcc transaction begin")
logger.Debugf("tcc transaction begin")
gid := dtmcli.MustGenGid(DtmHttpServer)
err := dtmcli.TccGlobalTransaction(DtmHttpServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) {
resp, err := tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TccBTransOutTry",

2
examples/quick_start.go

@ -29,7 +29,7 @@ var qsBusi = fmt.Sprintf("http://localhost:%d%s", qsBusiPort, qsBusiAPI)
func QsStartSvr() {
app := common.GetGinApp()
qsAddRoute(app)
dtmimp.Logf("quick qs examples listening at %d", qsBusiPort)
logger.Debugf("quick qs examples listening at %d", qsBusiPort)
go app.Run(fmt.Sprintf(":%d", qsBusiPort))
time.Sleep(100 * time.Millisecond)
}

5
test/base_test.go

@ -15,6 +15,7 @@ import (
"github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
"github.com/yedf/dtm/examples"
)
@ -40,7 +41,7 @@ func TestBaseSqlDB(t *testing.T) {
tx, err := db.ToSQLDB().Begin()
asserts.Nil(err)
err = barrier.Call(tx, func(tx *sql.Tx) error {
dtmimp.Logf("rollback gid2")
logger.Debugf("rollback gid2")
return fmt.Errorf("gid2 error")
})
asserts.Error(err, fmt.Errorf("gid2 error"))
@ -50,7 +51,7 @@ func TestBaseSqlDB(t *testing.T) {
asserts.Equal(dbr.RowsAffected, int64(0))
barrier.BarrierID = 0
err = barrier.CallWithDB(db.ToSQLDB(), func(tx *sql.Tx) error {
dtmimp.Logf("submit gid2")
logger.Debugf("submit gid2")
return nil
})
asserts.Nil(err)

9
test/tcc_barrier_test.go

@ -19,6 +19,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
"github.com/yedf/dtm/examples"
)
@ -68,7 +69,7 @@ func TestTccBarrierDisorder(t *testing.T) {
res, err := examples.TccBarrierTransOutCancel(c)
if !sleeped {
sleeped = true
dtmimp.Logf("sleep before cancel return")
logger.Debugf("sleep before cancel return")
<-timeoutChan
finishedChan <- "1"
}
@ -89,7 +90,7 @@ func TestTccBarrierDisorder(t *testing.T) {
assert.Contains(t, resp.String(), dtmcli.ResultSuccess)
go func() {
dtmimp.Logf("sleeping to wait for tcc try timeout")
logger.Debugf("sleeping to wait for tcc try timeout")
<-timeoutChan
r, _ := dtmimp.RestyClient.R().
SetBody(body).
@ -104,10 +105,10 @@ func TestTccBarrierDisorder(t *testing.T) {
assert.True(t, strings.Contains(r.String(), dtmcli.ResultSuccess)) // 这个是悬挂操作,为了简单起见,依旧让他返回成功
finishedChan <- "1"
}()
dtmimp.Logf("cron to timeout and then call cancel")
logger.Debugf("cron to timeout and then call cancel")
go cronTransOnceForwardNow(300)
time.Sleep(100 * time.Millisecond)
dtmimp.Logf("cron to timeout and then call cancelled twice")
logger.Debugf("cron to timeout and then call cancelled twice")
cronTransOnceForwardNow(300)
timeoutChan <- "wake"
timeoutChan <- "wake"

3
test/tcc_grpc_test.go

@ -13,6 +13,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
"github.com/yedf/dtm/dtmgrpc"
"github.com/yedf/dtm/examples"
"google.golang.org/protobuf/types/known/emptypb"
@ -70,7 +71,7 @@ func TestTccGrpcNested(t *testing.T) {
func TestTccGrpcType(t *testing.T) {
_, err := dtmgrpc.TccFromGrpc(context.Background())
assert.Error(t, err)
dtmimp.Logf("expecting dtmgrpcserver error")
logger.Debugf("expecting dtmgrpcserver error")
err = dtmgrpc.TccGlobalTransaction("-", "", func(tcc *dtmgrpc.TccGrpc) error { return nil })
assert.Error(t, err)
}

6
test/types.go

@ -24,14 +24,14 @@ func dbGet() *common.DB {
// waitTransProcessed only for test usage. wait for transaction processed once
func waitTransProcessed(gid string) {
dtmimp.Logf("waiting for gid %s", gid)
logger.Debugf("waiting for gid %s", gid)
select {
case id := <-dtmsvr.TransProcessedTestChan:
for id != gid {
dtmimp.LogRedf("-------id %s not match gid %s", id, gid)
logger.Errorf("-------id %s not match gid %s", id, gid)
id = <-dtmsvr.TransProcessedTestChan
}
dtmimp.Logf("finish for gid %s", gid)
logger.Debugf("finish for gid %s", gid)
case <-time.After(time.Duration(time.Second * 3)):
logger.FatalfIf(true, "Wait Trans timeout")
}

Loading…
Cancel
Save