diff --git a/common/utils.go b/common/utils.go index f9db2b1..c121733 100644 --- a/common/utils.go +++ b/common/utils.go @@ -118,5 +118,6 @@ func RunSQLScript(conf dtmcli.DBConf, script string, skipDrop bool) { } _, err = dtmimp.DBExec(con, s) logger.FatalIfError(err) + logger.Infof("sql scripts finished: %s", s) } } diff --git a/dtmcli/dtmimp/vars.go b/dtmcli/dtmimp/vars.go index ee9bad6..c7af1ff 100644 --- a/dtmcli/dtmimp/vars.go +++ b/dtmcli/dtmimp/vars.go @@ -10,6 +10,7 @@ import ( "errors" "github.com/go-resty/resty/v2" + "github.com/yedf/dtm/dtmcli/logger" ) // ErrFailure error of FAILURE @@ -36,12 +37,12 @@ func init() { // RestyClient.SetRetryWaitTime(1 * time.Second) RestyClient.OnBeforeRequest(func(c *resty.Client, r *resty.Request) error { r.URL = MayReplaceLocalhost(r.URL) - Logf("requesting: %s %s %v %v", r.Method, r.URL, r.Body, r.QueryParam) + logger.Debugf("requesting: %s %s %v %v", r.Method, r.URL, r.Body, r.QueryParam) return nil }) RestyClient.OnAfterResponse(func(c *resty.Client, resp *resty.Response) error { r := resp.Request - Logf("requested: %s %s %s", r.Method, r.URL, resp.String()) + logger.Debugf("requested: %s %s %s", r.Method, r.URL, resp.String()) return nil }) } diff --git a/dtmsvr/api.go b/dtmsvr/api.go index eb26ffb..5ca0e9e 100644 --- a/dtmsvr/api.go +++ b/dtmsvr/api.go @@ -11,6 +11,7 @@ import ( "github.com/yedf/dtm/dtmcli" "github.com/yedf/dtm/dtmcli/dtmimp" + "github.com/yedf/dtm/dtmcli/logger" "github.com/yedf/dtm/dtmsvr/storage" ) @@ -71,7 +72,10 @@ func svcRegisterBranch(transType string, branch *TransBranch, data map[string]st }) if err == storage.ErrNotFound { msg := fmt.Sprintf("no trans with gid: %s status: %s found", branch.Gid, dtmcli.StatusPrepared) + logger.Errorf(msg) return map[string]interface{}{"dtm_result": dtmcli.ResultFailure, "message": msg}, nil } + logger.Infof("LockGlobalSaveBranches result: %v: gid: %s old status: %s branches: %s", + err, branch.Gid, dtmcli.StatusPrepared, dtmimp.MustMarshalString(branches)) return dtmimp.If(err != nil, nil, dtmcli.MapSuccess), err } diff --git a/dtmsvr/cron.go b/dtmsvr/cron.go index bd31d29..120b74c 100644 --- a/dtmsvr/cron.go +++ b/dtmsvr/cron.go @@ -50,6 +50,7 @@ func lockOneTrans(expireIn time.Duration) *TransGlobal { if global == nil { return nil } + logger.Infof("cron job return a trans: %s", global.String()) return &TransGlobal{TransGlobalStore: *global} } diff --git a/dtmsvr/storage/boltdb/boltdb.go b/dtmsvr/storage/boltdb/boltdb.go index 31a0804..d02ba5c 100644 --- a/dtmsvr/storage/boltdb/boltdb.go +++ b/dtmsvr/storage/boltdb/boltdb.go @@ -242,6 +242,7 @@ func (s *BoltdbStore) PopulateData(skipDrop bool) { return nil }) dtmimp.E2P(err) + logger.Infof("Reset all data for boltdb") } } diff --git a/dtmsvr/storage/redis/redis.go b/dtmsvr/storage/redis/redis.go index 00cecd4..084ce63 100644 --- a/dtmsvr/storage/redis/redis.go +++ b/dtmsvr/storage/redis/redis.go @@ -27,8 +27,11 @@ func (s *RedisStore) Ping() error { } func (s *RedisStore) PopulateData(skipDrop bool) { - _, err := redisGet().FlushAll(ctx).Result() - dtmimp.PanicIf(err != nil, err) + if !skipDrop { + _, err := redisGet().FlushAll(ctx).Result() + logger.Infof("call redis flushall. result: %v", err) + dtmimp.PanicIf(err != nil, err) + } } func (s *RedisStore) FindTransGlobalStore(gid string) *storage.TransGlobalStore { diff --git a/dtmsvr/storage/trans.go b/dtmsvr/storage/trans.go index dbabc01..24abd58 100644 --- a/dtmsvr/storage/trans.go +++ b/dtmsvr/storage/trans.go @@ -5,6 +5,7 @@ import ( "github.com/yedf/dtm/common" "github.com/yedf/dtm/dtmcli" + "github.com/yedf/dtm/dtmcli/dtmimp" ) type TransGlobalStore struct { @@ -29,10 +30,14 @@ type TransGlobalStore struct { } // TableName TableName -func (*TransGlobalStore) TableName() string { +func (g *TransGlobalStore) TableName() string { return "dtm.trans_global" } +func (g *TransGlobalStore) String() string { + return dtmimp.MustMarshalString(g) +} + // TransBranchStore branch transaction type TransBranchStore struct { common.ModelBase @@ -47,6 +52,10 @@ type TransBranchStore struct { } // TableName TableName -func (*TransBranchStore) TableName() string { +func (b *TransBranchStore) TableName() string { return "dtm.trans_branch_op" } + +func (b *TransBranchStore) String() string { + return dtmimp.MustMarshalString(*b) +} diff --git a/dtmsvr/svr.go b/dtmsvr/svr.go index 4f6c866..283c442 100644 --- a/dtmsvr/svr.go +++ b/dtmsvr/svr.go @@ -79,11 +79,11 @@ func updateBranchAsync() { for len(updates) > 0 { dbr := GetStore().UpdateBranchesSql(updates, []string{"status", "finish_time", "update_time"}) - logger.Debugf("flushed %d branch status to db. affected: %d", len(updates), dbr.RowsAffected) if dbr.Error != nil { logger.Errorf("async update branch status error: %v", dbr.Error) time.Sleep(1 * time.Second) } else { + logger.Infof("flushed %d branch status to db. affected: %d", len(updates), dbr.RowsAffected) updates = []TransBranch{} } } diff --git a/dtmsvr/trans_process.go b/dtmsvr/trans_process.go index a9ef2f9..721919e 100644 --- a/dtmsvr/trans_process.go +++ b/dtmsvr/trans_process.go @@ -45,7 +45,7 @@ func (t *TransGlobal) process() map[string]interface{} { func (t *TransGlobal) processInner() (rerr error) { defer handlePanic(&rerr) defer func() { - if rerr != nil { + if rerr != nil && rerr != dtmcli.ErrOngoing { logger.Errorf("processInner got error: %s", rerr.Error()) } if TransProcessedTestChan != nil { @@ -72,5 +72,8 @@ func (t *TransGlobal) saveNew() error { now := time.Now() t.CreateTime = &now t.UpdateTime = &now - return GetStore().MaySaveNewTrans(&t.TransGlobalStore, branches) + err := GetStore().MaySaveNewTrans(&t.TransGlobalStore, branches) + logger.Infof("MaySaveNewTrans result: %v, global: %v branches: %v", + err, t.TransGlobalStore.String(), dtmimp.MustMarshalString(branches)) + return err } diff --git a/dtmsvr/trans_status.go b/dtmsvr/trans_status.go index d7097ca..371bef8 100644 --- a/dtmsvr/trans_status.go +++ b/dtmsvr/trans_status.go @@ -13,6 +13,7 @@ import ( "github.com/yedf/dtm/dtmcli" "github.com/yedf/dtm/dtmcli/dtmimp" + "github.com/yedf/dtm/dtmcli/logger" "github.com/yedf/dtm/dtmgrpc/dtmgimp" "github.com/yedf/dtmdriver" "google.golang.org/grpc/codes" @@ -22,6 +23,7 @@ import ( func (t *TransGlobal) touchCronTime(ctype cronType) { t.lastTouched = time.Now() GetStore().TouchCronTime(&t.TransGlobalStore, t.getNextCronInterval(ctype)) + logger.Infof("TouchCronTime for: %s", t.TransGlobalStore.String()) } func (t *TransGlobal) changeStatus(status string) { @@ -36,6 +38,7 @@ func (t *TransGlobal) changeStatus(status string) { } t.UpdateTime = &now GetStore().ChangeGlobalStatus(&t.TransGlobalStore, status, updates, status == dtmcli.StatusSucceed || status == dtmcli.StatusFailed) + logger.Infof("ChangeGlobalStatus to %s ok for %s", status, t.TransGlobalStore.String()) t.Status = status } @@ -46,6 +49,8 @@ func (t *TransGlobal) changeBranchStatus(b *TransBranch, status string, branchPo b.UpdateTime = &now if config.Store.Driver != dtmimp.DBTypeMysql && config.Store.Driver != dtmimp.DBTypePostgres || config.UpdateBranchSync > 0 || t.updateBranchSync { GetStore().LockGlobalSaveBranches(t.Gid, t.Status, []TransBranch{*b}, branchPos) + logger.Infof("LockGlobalSaveBranches ok: gid: %s old status: %s branches: %s", + b.Gid, dtmcli.StatusPrepared, b.String()) } else { // 为了性能优化,把branch的status更新异步化 updateBranchAsyncChan <- branchStatus{id: b.ID, status: status, finishTime: &now} } diff --git a/dtmsvr/trans_type_saga.go b/dtmsvr/trans_type_saga.go index bdeea9b..e710d46 100644 --- a/dtmsvr/trans_type_saga.go +++ b/dtmsvr/trans_type_saga.go @@ -104,7 +104,7 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error { err = dtmimp.AsError(x) } resultChan <- branchResult{index: i, status: branches[i].Status, op: branches[i].Op} - if err != nil { + if err != nil && err != dtmcli.ErrOngoing { logger.Errorf("exec branch error: %v", err) } }()