Browse Source

storage change add Infof

pull/123/head
yedf2 4 years ago
parent
commit
f616653422
  1. 1
      common/utils.go
  2. 5
      dtmcli/dtmimp/vars.go
  3. 4
      dtmsvr/api.go
  4. 1
      dtmsvr/cron.go
  5. 1
      dtmsvr/storage/boltdb/boltdb.go
  6. 7
      dtmsvr/storage/redis/redis.go
  7. 13
      dtmsvr/storage/trans.go
  8. 2
      dtmsvr/svr.go
  9. 7
      dtmsvr/trans_process.go
  10. 5
      dtmsvr/trans_status.go
  11. 2
      dtmsvr/trans_type_saga.go

1
common/utils.go

@ -118,5 +118,6 @@ func RunSQLScript(conf dtmcli.DBConf, script string, skipDrop bool) {
} }
_, err = dtmimp.DBExec(con, s) _, err = dtmimp.DBExec(con, s)
logger.FatalIfError(err) logger.FatalIfError(err)
logger.Infof("sql scripts finished: %s", s)
} }
} }

5
dtmcli/dtmimp/vars.go

@ -10,6 +10,7 @@ import (
"errors" "errors"
"github.com/go-resty/resty/v2" "github.com/go-resty/resty/v2"
"github.com/yedf/dtm/dtmcli/logger"
) )
// ErrFailure error of FAILURE // ErrFailure error of FAILURE
@ -36,12 +37,12 @@ func init() {
// RestyClient.SetRetryWaitTime(1 * time.Second) // RestyClient.SetRetryWaitTime(1 * time.Second)
RestyClient.OnBeforeRequest(func(c *resty.Client, r *resty.Request) error { RestyClient.OnBeforeRequest(func(c *resty.Client, r *resty.Request) error {
r.URL = MayReplaceLocalhost(r.URL) r.URL = MayReplaceLocalhost(r.URL)
Logf("requesting: %s %s %v %v", r.Method, r.URL, r.Body, r.QueryParam) logger.Debugf("requesting: %s %s %v %v", r.Method, r.URL, r.Body, r.QueryParam)
return nil return nil
}) })
RestyClient.OnAfterResponse(func(c *resty.Client, resp *resty.Response) error { RestyClient.OnAfterResponse(func(c *resty.Client, resp *resty.Response) error {
r := resp.Request r := resp.Request
Logf("requested: %s %s %s", r.Method, r.URL, resp.String()) logger.Debugf("requested: %s %s %s", r.Method, r.URL, resp.String())
return nil return nil
}) })
} }

4
dtmsvr/api.go

@ -11,6 +11,7 @@ import (
"github.com/yedf/dtm/dtmcli" "github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/dtmcli/dtmimp" "github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
"github.com/yedf/dtm/dtmsvr/storage" "github.com/yedf/dtm/dtmsvr/storage"
) )
@ -71,7 +72,10 @@ func svcRegisterBranch(transType string, branch *TransBranch, data map[string]st
}) })
if err == storage.ErrNotFound { if err == storage.ErrNotFound {
msg := fmt.Sprintf("no trans with gid: %s status: %s found", branch.Gid, dtmcli.StatusPrepared) msg := fmt.Sprintf("no trans with gid: %s status: %s found", branch.Gid, dtmcli.StatusPrepared)
logger.Errorf(msg)
return map[string]interface{}{"dtm_result": dtmcli.ResultFailure, "message": msg}, nil return map[string]interface{}{"dtm_result": dtmcli.ResultFailure, "message": msg}, nil
} }
logger.Infof("LockGlobalSaveBranches result: %v: gid: %s old status: %s branches: %s",
err, branch.Gid, dtmcli.StatusPrepared, dtmimp.MustMarshalString(branches))
return dtmimp.If(err != nil, nil, dtmcli.MapSuccess), err return dtmimp.If(err != nil, nil, dtmcli.MapSuccess), err
} }

1
dtmsvr/cron.go

@ -50,6 +50,7 @@ func lockOneTrans(expireIn time.Duration) *TransGlobal {
if global == nil { if global == nil {
return nil return nil
} }
logger.Infof("cron job return a trans: %s", global.String())
return &TransGlobal{TransGlobalStore: *global} return &TransGlobal{TransGlobalStore: *global}
} }

1
dtmsvr/storage/boltdb/boltdb.go

@ -242,6 +242,7 @@ func (s *BoltdbStore) PopulateData(skipDrop bool) {
return nil return nil
}) })
dtmimp.E2P(err) dtmimp.E2P(err)
logger.Infof("Reset all data for boltdb")
} }
} }

7
dtmsvr/storage/redis/redis.go

@ -27,8 +27,11 @@ func (s *RedisStore) Ping() error {
} }
func (s *RedisStore) PopulateData(skipDrop bool) { func (s *RedisStore) PopulateData(skipDrop bool) {
_, err := redisGet().FlushAll(ctx).Result() if !skipDrop {
dtmimp.PanicIf(err != nil, err) _, err := redisGet().FlushAll(ctx).Result()
logger.Infof("call redis flushall. result: %v", err)
dtmimp.PanicIf(err != nil, err)
}
} }
func (s *RedisStore) FindTransGlobalStore(gid string) *storage.TransGlobalStore { func (s *RedisStore) FindTransGlobalStore(gid string) *storage.TransGlobalStore {

13
dtmsvr/storage/trans.go

@ -5,6 +5,7 @@ import (
"github.com/yedf/dtm/common" "github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmcli" "github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/dtmcli/dtmimp"
) )
type TransGlobalStore struct { type TransGlobalStore struct {
@ -29,10 +30,14 @@ type TransGlobalStore struct {
} }
// TableName TableName // TableName TableName
func (*TransGlobalStore) TableName() string { func (g *TransGlobalStore) TableName() string {
return "dtm.trans_global" return "dtm.trans_global"
} }
func (g *TransGlobalStore) String() string {
return dtmimp.MustMarshalString(g)
}
// TransBranchStore branch transaction // TransBranchStore branch transaction
type TransBranchStore struct { type TransBranchStore struct {
common.ModelBase common.ModelBase
@ -47,6 +52,10 @@ type TransBranchStore struct {
} }
// TableName TableName // TableName TableName
func (*TransBranchStore) TableName() string { func (b *TransBranchStore) TableName() string {
return "dtm.trans_branch_op" return "dtm.trans_branch_op"
} }
func (b *TransBranchStore) String() string {
return dtmimp.MustMarshalString(*b)
}

2
dtmsvr/svr.go

@ -79,11 +79,11 @@ func updateBranchAsync() {
for len(updates) > 0 { for len(updates) > 0 {
dbr := GetStore().UpdateBranchesSql(updates, []string{"status", "finish_time", "update_time"}) dbr := GetStore().UpdateBranchesSql(updates, []string{"status", "finish_time", "update_time"})
logger.Debugf("flushed %d branch status to db. affected: %d", len(updates), dbr.RowsAffected)
if dbr.Error != nil { if dbr.Error != nil {
logger.Errorf("async update branch status error: %v", dbr.Error) logger.Errorf("async update branch status error: %v", dbr.Error)
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
} else { } else {
logger.Infof("flushed %d branch status to db. affected: %d", len(updates), dbr.RowsAffected)
updates = []TransBranch{} updates = []TransBranch{}
} }
} }

7
dtmsvr/trans_process.go

@ -45,7 +45,7 @@ func (t *TransGlobal) process() map[string]interface{} {
func (t *TransGlobal) processInner() (rerr error) { func (t *TransGlobal) processInner() (rerr error) {
defer handlePanic(&rerr) defer handlePanic(&rerr)
defer func() { defer func() {
if rerr != nil { if rerr != nil && rerr != dtmcli.ErrOngoing {
logger.Errorf("processInner got error: %s", rerr.Error()) logger.Errorf("processInner got error: %s", rerr.Error())
} }
if TransProcessedTestChan != nil { if TransProcessedTestChan != nil {
@ -72,5 +72,8 @@ func (t *TransGlobal) saveNew() error {
now := time.Now() now := time.Now()
t.CreateTime = &now t.CreateTime = &now
t.UpdateTime = &now t.UpdateTime = &now
return GetStore().MaySaveNewTrans(&t.TransGlobalStore, branches) err := GetStore().MaySaveNewTrans(&t.TransGlobalStore, branches)
logger.Infof("MaySaveNewTrans result: %v, global: %v branches: %v",
err, t.TransGlobalStore.String(), dtmimp.MustMarshalString(branches))
return err
} }

5
dtmsvr/trans_status.go

@ -13,6 +13,7 @@ import (
"github.com/yedf/dtm/dtmcli" "github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/dtmcli/dtmimp" "github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmcli/logger"
"github.com/yedf/dtm/dtmgrpc/dtmgimp" "github.com/yedf/dtm/dtmgrpc/dtmgimp"
"github.com/yedf/dtmdriver" "github.com/yedf/dtmdriver"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
@ -22,6 +23,7 @@ import (
func (t *TransGlobal) touchCronTime(ctype cronType) { func (t *TransGlobal) touchCronTime(ctype cronType) {
t.lastTouched = time.Now() t.lastTouched = time.Now()
GetStore().TouchCronTime(&t.TransGlobalStore, t.getNextCronInterval(ctype)) GetStore().TouchCronTime(&t.TransGlobalStore, t.getNextCronInterval(ctype))
logger.Infof("TouchCronTime for: %s", t.TransGlobalStore.String())
} }
func (t *TransGlobal) changeStatus(status string) { func (t *TransGlobal) changeStatus(status string) {
@ -36,6 +38,7 @@ func (t *TransGlobal) changeStatus(status string) {
} }
t.UpdateTime = &now t.UpdateTime = &now
GetStore().ChangeGlobalStatus(&t.TransGlobalStore, status, updates, status == dtmcli.StatusSucceed || status == dtmcli.StatusFailed) GetStore().ChangeGlobalStatus(&t.TransGlobalStore, status, updates, status == dtmcli.StatusSucceed || status == dtmcli.StatusFailed)
logger.Infof("ChangeGlobalStatus to %s ok for %s", status, t.TransGlobalStore.String())
t.Status = status t.Status = status
} }
@ -46,6 +49,8 @@ func (t *TransGlobal) changeBranchStatus(b *TransBranch, status string, branchPo
b.UpdateTime = &now b.UpdateTime = &now
if config.Store.Driver != dtmimp.DBTypeMysql && config.Store.Driver != dtmimp.DBTypePostgres || config.UpdateBranchSync > 0 || t.updateBranchSync { if config.Store.Driver != dtmimp.DBTypeMysql && config.Store.Driver != dtmimp.DBTypePostgres || config.UpdateBranchSync > 0 || t.updateBranchSync {
GetStore().LockGlobalSaveBranches(t.Gid, t.Status, []TransBranch{*b}, branchPos) GetStore().LockGlobalSaveBranches(t.Gid, t.Status, []TransBranch{*b}, branchPos)
logger.Infof("LockGlobalSaveBranches ok: gid: %s old status: %s branches: %s",
b.Gid, dtmcli.StatusPrepared, b.String())
} else { // 为了性能优化,把branch的status更新异步化 } else { // 为了性能优化,把branch的status更新异步化
updateBranchAsyncChan <- branchStatus{id: b.ID, status: status, finishTime: &now} updateBranchAsyncChan <- branchStatus{id: b.ID, status: status, finishTime: &now}
} }

2
dtmsvr/trans_type_saga.go

@ -104,7 +104,7 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error {
err = dtmimp.AsError(x) err = dtmimp.AsError(x)
} }
resultChan <- branchResult{index: i, status: branches[i].Status, op: branches[i].Op} resultChan <- branchResult{index: i, status: branches[i].Status, op: branches[i].Op}
if err != nil { if err != nil && err != dtmcli.ErrOngoing {
logger.Errorf("exec branch error: %v", err) logger.Errorf("exec branch error: %v", err)
} }
}() }()

Loading…
Cancel
Save