From 7e8de22d1ea4b0972a4f833c38c6c3a760f1f464 Mon Sep 17 00:00:00 2001 From: yedongfu Date: Thu, 20 May 2021 21:25:12 +0800 Subject: [PATCH] test seems good --- dtm.yml.sample | 0 dtm/saga.go | 4 ++ dtmsvr/api.go | 43 +++++++++++++ dtmsvr/config.go | 25 +++----- dtmsvr/cron.go | 28 +++++---- dtmsvr/{objects.go => db.go} | 93 ++++++++++++++-------------- dtmsvr/dtm.yml.sample | 6 ++ dtm_test.go => dtmsvr/dtmsvr_test.go | 93 +++++++++++++++------------- dtmsvr/{svr.go => main.go} | 2 +- dtmsvr/service.go | 93 ++++++---------------------- dtmsvr/{consumer.go => types.go} | 2 + examples/{service.go => api.go} | 0 examples/main.go | 6 +- main.go | 2 +- size_coverage.out | 90 +++++++++++++++++++++++++++ 15 files changed, 290 insertions(+), 197 deletions(-) delete mode 100644 dtm.yml.sample create mode 100644 dtmsvr/api.go rename dtmsvr/{objects.go => db.go} (96%) create mode 100644 dtmsvr/dtm.yml.sample rename dtm_test.go => dtmsvr/dtmsvr_test.go (81%) rename dtmsvr/{svr.go => main.go} (94%) rename dtmsvr/{consumer.go => types.go} (95%) rename examples/{service.go => api.go} (100%) create mode 100644 size_coverage.out diff --git a/dtm.yml.sample b/dtm.yml.sample deleted file mode 100644 index e69de29..0000000 diff --git a/dtm/saga.go b/dtm/saga.go index fa5e379..ca989df 100644 --- a/dtm/saga.go +++ b/dtm/saga.go @@ -3,6 +3,7 @@ package dtm import ( "encoding/json" "fmt" + "time" "github.com/go-resty/resty/v2" "github.com/sirupsen/logrus" @@ -76,6 +77,9 @@ func (s *Saga) Commit() error { var RestyClient = resty.New() func init() { + RestyClient.SetTimeout(3 * time.Second) + RestyClient.SetRetryCount(2) + RestyClient.SetRetryWaitTime(1 * time.Second) RestyClient.OnBeforeRequest(func(c *resty.Client, r *resty.Request) error { logrus.Printf("requesting: %s %s %v", r.Method, r.URL, r.Body) return nil diff --git a/dtmsvr/api.go b/dtmsvr/api.go new file mode 100644 index 0000000..87e8e2c --- /dev/null +++ b/dtmsvr/api.go @@ -0,0 +1,43 @@ +package dtmsvr + +import ( + "github.com/gin-gonic/gin" + "github.com/sirupsen/logrus" + "github.com/yedf/dtm/common" + "gorm.io/gorm/clause" +) + +func AddRoute(engine *gin.Engine) { + engine.POST("/api/dtmsvr/prepare", common.WrapHandler(Prepare)) + engine.POST("/api/dtmsvr/commit", common.WrapHandler(Commit)) +} + +func Prepare(c *gin.Context) (interface{}, error) { + db := DbGet() + m := getSagaModelFromContext(c) + m.Status = "prepared" + writeTransLog(m.Gid, "save prepared", m.Status, -1, m.Steps) + db.Must().Clauses(clause.OnConflict{ + DoNothing: true, + }).Create(&m) + return M{"message": "SUCCESS"}, nil +} + +func Commit(c *gin.Context) (interface{}, error) { + m := getSagaModelFromContext(c) + saveCommitedSagaModel(m) + go ProcessCommitedSaga(m.Gid) + return M{"message": "SUCCESS"}, nil +} + +func getSagaModelFromContext(c *gin.Context) *SagaModel { + data := M{} + b, err := c.GetRawData() + common.PanicIfError(err) + common.MustUnmarshal(b, &data) + logrus.Printf("creating saga model in prepare") + data["steps"] = common.MustMarshalString(data["steps"]) + m := SagaModel{} + common.MustRemarshal(data, &m) + return &m +} diff --git a/dtmsvr/config.go b/dtmsvr/config.go index e331582..262006d 100644 --- a/dtmsvr/config.go +++ b/dtmsvr/config.go @@ -11,24 +11,17 @@ import ( "github.com/sirupsen/logrus" "github.com/spf13/viper" + "github.com/yedf/dtm/common" ) -type Config struct { - Server string `json:"server"` -} - -var ServerConfig Config = Config{} - // formatter 自定义formatter type formatter struct{} // Format 进行格式化 func (f *formatter) Format(entry *logrus.Entry) ([]byte, error) { - var b *bytes.Buffer + var b *bytes.Buffer = &bytes.Buffer{} if entry.Buffer != nil { b = entry.Buffer - } else { - b = &bytes.Buffer{} } n := time.Now() ts := fmt.Sprintf("%d-%02d-%02d %02d:%02d:%02d.%03d", n.Year(), n.Month(), n.Day(), n.Hour(), n.Minute(), n.Second(), n.Nanosecond()/1000000) @@ -44,18 +37,16 @@ func (f *formatter) Format(entry *logrus.Entry) ([]byte, error) { return b.Bytes(), nil } +var configLoaded = false + func LoadConfig() { - if ServerConfig.Server != "" { + if configLoaded { return } + configLoaded = true logrus.SetFormatter(&formatter{}) _, file, _, _ := runtime.Caller(0) viper.SetConfigFile(filepath.Dir(file) + "/dtmsvr.yml") - if err := viper.ReadInConfig(); err != nil { - panic(err) - } - if err := viper.Unmarshal(&ServerConfig); err != nil { - panic(err) - } - logrus.Printf("config is: %v", ServerConfig) + err := viper.ReadInConfig() + common.PanicIfError(err) } diff --git a/dtmsvr/cron.go b/dtmsvr/cron.go index fb41bcc..8a3a5d9 100644 --- a/dtmsvr/cron.go +++ b/dtmsvr/cron.go @@ -5,6 +5,7 @@ import ( "strings" "time" + "github.com/sirupsen/logrus" "github.com/yedf/dtm/common" "github.com/yedf/dtm/dtm" ) @@ -12,32 +13,30 @@ import ( func CronPreparedOnce(expire time.Duration) { db := DbGet() ss := []SagaModel{} - dbr := db.Model(&SagaModel{}).Where("update_time < date_sub(now(), interval ? second)", int(expire/time.Second)).Where("status = ?", "prepared").Find(&ss) - common.PanicIfError(dbr.Error) + db.Must().Model(&SagaModel{}).Where("update_time < date_sub(now(), interval ? second)", int(expire/time.Second)).Where("status = ?", "prepared").Find(&ss) writeTransLog("", "saga fetch prepared", fmt.Sprint(len(ss)), -1, "") if len(ss) == 0 { return } for _, sm := range ss { writeTransLog(sm.Gid, "saga touch prepared", "", -1, "") - dbr = db.Model(&sm).Update("id", sm.ID) - common.PanicIfError(dbr.Error) + db.Must().Model(&sm).Update("id", sm.ID) resp, err := dtm.RestyClient.R().SetQueryParam("gid", sm.Gid).Get(sm.TransQuery) common.PanicIfError(err) body := resp.String() if strings.Contains(body, "FAIL") { writeTransLog(sm.Gid, "saga canceled", "canceled", -1, "") - dbr = db.Model(&sm).Where("status = ?", "prepared").Update("status", "canceled") - common.PanicIfError(dbr.Error) + db.Must().Model(&sm).Where("status = ?", "prepared").Update("status", "canceled") } else if strings.Contains(body, "SUCCESS") { saveCommitedSagaModel(&sm) - go ProcessCommitedSaga(sm.Gid) + ProcessCommitedSaga(sm.Gid) } } } func CronPrepared() { for { + defer handlePanic() CronPreparedOnce(10 * time.Second) } } @@ -45,22 +44,27 @@ func CronPrepared() { func CronCommitedOnce(expire time.Duration) { db := DbGet() ss := []SagaModel{} - dbr := db.Model(&SagaModel{}).Where("update_time < date_sub(now(), interval ? second)", int(expire/time.Second)).Where("status = ?", "commited").Find(&ss) - common.PanicIfError(dbr.Error) + db.Must().Model(&SagaModel{}).Where("update_time < date_sub(now(), interval ? second)", int(expire/time.Second)).Where("status = ?", "commited").Find(&ss) writeTransLog("", "saga fetch commited", fmt.Sprint(len(ss)), -1, "") if len(ss) == 0 { return } for _, sm := range ss { writeTransLog(sm.Gid, "saga touch commited", "", -1, "") - dbr = db.Model(&sm).Update("id", sm.ID) - common.PanicIfError(dbr.Error) - go ProcessCommitedSaga(sm.Gid) + db.Must().Model(&sm).Update("id", sm.ID) + ProcessCommitedSaga(sm.Gid) } } func CronCommited() { for { + defer handlePanic() CronCommitedOnce(10 * time.Second) } } + +func handlePanic() { + if err := recover(); err != nil { + logrus.Printf("----panic %s handlered", err.(error).Error()) + } +} diff --git a/dtmsvr/objects.go b/dtmsvr/db.go similarity index 96% rename from dtmsvr/objects.go rename to dtmsvr/db.go index 72b7cd8..57396bb 100644 --- a/dtmsvr/objects.go +++ b/dtmsvr/db.go @@ -12,52 +12,6 @@ import ( "gorm.io/gorm" ) -type M = map[string]interface{} - -type tracePlugin struct{} - -func (op *tracePlugin) Name() string { - return "tracePlugin" -} - -func (op *tracePlugin) Initialize(db *gorm.DB) (err error) { - before := func(db *gorm.DB) { - db.InstanceSet("ivy.startTime", time.Now()) - } - - after := func(db *gorm.DB) { - _ts, _ := db.InstanceGet("ivy.startTime") - sql := db.Dialector.Explain(db.Statement.SQL.String(), db.Statement.Vars...) - logrus.Printf("used: %d ms affected: %d sql is: %s", time.Since(_ts.(time.Time)).Milliseconds(), db.RowsAffected, sql) - if v, ok := db.InstanceGet("ivy.must"); ok && v.(bool) { - if db.Error != nil && db.Error != gorm.ErrRecordNotFound { - panic(db.Error) - } - } - } - - beforeName := "cb_before" - afterName := "cb_after" - - logrus.Printf("installing db plugin: %s", op.Name()) - // 开始前 - _ = db.Callback().Create().Before("gorm:before_create").Register(beforeName, before) - _ = db.Callback().Query().Before("gorm:query").Register(beforeName, before) - _ = db.Callback().Delete().Before("gorm:before_delete").Register(beforeName, before) - _ = db.Callback().Update().Before("gorm:setup_reflect_value").Register(beforeName, before) - _ = db.Callback().Row().Before("gorm:row").Register(beforeName, before) - _ = db.Callback().Raw().Before("gorm:raw").Register(beforeName, before) - - // 结束后 - _ = db.Callback().Create().After("gorm:after_create").Register(afterName, after) - _ = db.Callback().Query().After("gorm:after_query").Register(afterName, after) - _ = db.Callback().Delete().After("gorm:after_delete").Register(afterName, after) - _ = db.Callback().Update().After("gorm:after_update").Register(afterName, after) - _ = db.Callback().Row().After("gorm:row").Register(afterName, after) - _ = db.Callback().Raw().After("gorm:raw").Register(afterName, after) - return -} - var db *gorm.DB = nil type MyDb struct { @@ -95,12 +49,55 @@ func writeTransLog(gid string, action string, status string, step int, detail st if detail == "" { detail = "{}" } - dbr := db.Table("test1.a_dtrans_log").Create(M{ + db.Must().Table("test1.a_dtrans_log").Create(M{ "gid": gid, "action": action, "status": status, "step": step, "detail": detail, }) - common.PanicIfError(dbr.Error) +} + +type tracePlugin struct{} + +func (op *tracePlugin) Name() string { + return "tracePlugin" +} + +func (op *tracePlugin) Initialize(db *gorm.DB) (err error) { + before := func(db *gorm.DB) { + db.InstanceSet("ivy.startTime", time.Now()) + } + + after := func(db *gorm.DB) { + _ts, _ := db.InstanceGet("ivy.startTime") + sql := db.Dialector.Explain(db.Statement.SQL.String(), db.Statement.Vars...) + logrus.Printf("used: %d ms affected: %d sql is: %s", time.Since(_ts.(time.Time)).Milliseconds(), db.RowsAffected, sql) + if v, ok := db.InstanceGet("ivy.must"); ok && v.(bool) { + if db.Error != nil && db.Error != gorm.ErrRecordNotFound { + panic(db.Error) + } + } + } + + beforeName := "cb_before" + afterName := "cb_after" + + logrus.Printf("installing db plugin: %s", op.Name()) + // 开始前 + _ = db.Callback().Create().Before("gorm:before_create").Register(beforeName, before) + _ = db.Callback().Query().Before("gorm:query").Register(beforeName, before) + _ = db.Callback().Delete().Before("gorm:before_delete").Register(beforeName, before) + _ = db.Callback().Update().Before("gorm:setup_reflect_value").Register(beforeName, before) + _ = db.Callback().Row().Before("gorm:row").Register(beforeName, before) + _ = db.Callback().Raw().Before("gorm:raw").Register(beforeName, before) + + // 结束后 + _ = db.Callback().Create().After("gorm:after_create").Register(afterName, after) + _ = db.Callback().Query().After("gorm:after_query").Register(afterName, after) + _ = db.Callback().Delete().After("gorm:after_delete").Register(afterName, after) + _ = db.Callback().Update().After("gorm:after_update").Register(afterName, after) + _ = db.Callback().Row().After("gorm:row").Register(afterName, after) + _ = db.Callback().Raw().After("gorm:raw").Register(afterName, after) + return } diff --git a/dtmsvr/dtm.yml.sample b/dtmsvr/dtm.yml.sample new file mode 100644 index 0000000..800c29f --- /dev/null +++ b/dtmsvr/dtm.yml.sample @@ -0,0 +1,6 @@ +mysql: + host: 'xxx' + user: 'xxx' + password: 'xxx' + database: 'xxx' + port: '3306' diff --git a/dtm_test.go b/dtmsvr/dtmsvr_test.go similarity index 81% rename from dtm_test.go rename to dtmsvr/dtmsvr_test.go index 259b6bc..f61a42f 100644 --- a/dtm_test.go +++ b/dtmsvr/dtmsvr_test.go @@ -1,4 +1,4 @@ -package main +package dtmsvr import ( "testing" @@ -9,32 +9,64 @@ import ( "github.com/spf13/viper" "github.com/yedf/dtm/common" "github.com/yedf/dtm/dtm" - "github.com/yedf/dtm/dtmsvr" "github.com/yedf/dtm/examples" ) func TestViper(t *testing.T) { - assert.Equal(t, "test_val", viper.GetString("test")) + assert.Equal(t, true, viper.Get("mysql") != nil) +} + +func TestCover(t *testing.T) { + db := DbGet() + db.NoMust() + + defer handlePanic() + checkAffected(db.DB) } var myinit int = func() int { - dtmsvr.LoadConfig() + LoadConfig() return 0 }() +func TestDtmSvr(t *testing.T) { + SagaProcessedTestChan = make(chan string, 1) + // 清理数据 + common.PanicIfError(db.Exec("truncate test1.a_saga").Error) + common.PanicIfError(db.Exec("truncate test1.a_saga_step").Error) + common.PanicIfError(db.Exec("truncate test1.a_dtrans_log").Error) + + // 启动组件 + go StartSvr() + go examples.StartSvr() + time.Sleep(time.Duration(100 * 1000 * 1000)) + + preparePending(t) + prepareCancel(t) + commitedPending(t) + noramlSaga(t) + rollbackSaga2(t) + // assert.Equal(t, 1, 0) + // 开始测试 + + // 发送Prepare请求后,验证数据库 + // ConsumeHalfMsg 验证数据库 + // ConsumeMsg 验证数据库 +} + // 测试使用的全局对象 -var db = dtmsvr.DbGet() +var initdb = DbGet() -func getSagaModel(gid string) *dtmsvr.SagaModel { - sm := dtmsvr.SagaModel{} +func getSagaModel(gid string) *SagaModel { + sm := SagaModel{} dbr := db.Model(&sm).Where("gid=?", gid).First(&sm) common.PanicIfError(dbr.Error) return &sm } func getSagaStepStatus(gid string) []string { - steps := []dtmsvr.SagaStepModel{} - dbr := db.Model(&dtmsvr.SagaStepModel{}).Where("gid=?", gid).Find(&steps) + steps := []SagaStepModel{} + dbr := db.Model(&SagaStepModel{}).Where("gid=?", gid).Find(&steps) common.PanicIfError(dbr.Error) status := []string{} for _, step := range steps { @@ -49,14 +81,14 @@ func noramlSaga(t *testing.T) { assert.Equal(t, "prepared", getSagaModel(saga.Gid).Status) saga.Commit() assert.Equal(t, "commited", getSagaModel(saga.Gid).Status) - dtmsvr.WaitCommitedSaga(saga.Gid) + WaitCommitedSaga(saga.Gid) assert.Equal(t, []string{"pending", "finished", "pending", "finished"}, getSagaStepStatus(saga.Gid)) } func rollbackSaga2(t *testing.T) { saga := genSaga("gid-rollbackSaga2", false, true) saga.Commit() - dtmsvr.WaitCommitedSaga(saga.Gid) + WaitCommitedSaga(saga.Gid) saga.Prepare() assert.Equal(t, "rollbacked", getSagaModel(saga.Gid).Status) assert.Equal(t, []string{"rollbacked", "finished", "rollbacked", "rollbacked"}, getSagaStepStatus(saga.Gid)) @@ -66,7 +98,7 @@ func prepareCancel(t *testing.T) { saga := genSaga("gid1-prepareCancel", false, true) saga.Prepare() examples.TransQueryResult = "FAIL" - dtmsvr.CronPreparedOnce(-10 * time.Second) + CronPreparedOnce(-10 * time.Second) examples.TransQueryResult = "" assert.Equal(t, "canceled", getSagaModel(saga.Gid).Status) } @@ -75,11 +107,11 @@ func preparePending(t *testing.T) { saga := genSaga("gid1-preparePending", false, false) saga.Prepare() examples.TransQueryResult = "PENDING" - dtmsvr.CronPreparedOnce(-10 * time.Second) + CronPreparedOnce(-10 * time.Second) examples.TransQueryResult = "" assert.Equal(t, "prepared", getSagaModel(saga.Gid).Status) - dtmsvr.CronPreparedOnce(-10 * time.Second) - dtmsvr.WaitCommitedSaga(saga.Gid) + CronPreparedOnce(-10 * time.Second) + WaitCommitedSaga(saga.Gid) assert.Equal(t, "finished", getSagaModel(saga.Gid).Status) } @@ -88,40 +120,15 @@ func commitedPending(t *testing.T) { saga.Prepare() saga.Commit() examples.TransOutResult = "PENDING" - dtmsvr.WaitCommitedSaga(saga.Gid) + WaitCommitedSaga(saga.Gid) assert.Equal(t, []string{"pending", "finished", "pending", "pending"}, getSagaStepStatus(saga.Gid)) examples.TransOutResult = "" - dtmsvr.CronCommitedOnce(-10 * time.Second) - dtmsvr.WaitCommitedSaga(saga.Gid) + CronCommitedOnce(-10 * time.Second) + WaitCommitedSaga(saga.Gid) assert.Equal(t, []string{"pending", "finished", "pending", "finished"}, getSagaStepStatus(saga.Gid)) assert.Equal(t, "finished", getSagaModel(saga.Gid).Status) } -func TestDtmSvr(t *testing.T) { - dtmsvr.SagaProcessedTestChan = make(chan string, 1) - // 清理数据 - common.PanicIfError(db.Exec("truncate test1.a_saga").Error) - common.PanicIfError(db.Exec("truncate test1.a_saga_step").Error) - common.PanicIfError(db.Exec("truncate test1.a_dtrans_log").Error) - - // 启动组件 - go dtmsvr.StartSvr() - go examples.StartSvr() - time.Sleep(time.Duration(100 * 1000 * 1000)) - - preparePending(t) - prepareCancel(t) - commitedPending(t) - noramlSaga(t) - rollbackSaga2(t) - // assert.Equal(t, 1, 0) - // 开始测试 - - // 发送Prepare请求后,验证数据库 - // ConsumeHalfMsg 验证数据库 - // ConsumeMsg 验证数据库 -} - func genSaga(gid string, inFailed bool, outFailed bool) *dtm.Saga { logrus.Printf("beginning a saga test ---------------- %s", gid) saga := dtm.SagaNew(examples.DtmServer, gid, examples.Busi+"/TransQuery") diff --git a/dtmsvr/svr.go b/dtmsvr/main.go similarity index 94% rename from dtmsvr/svr.go rename to dtmsvr/main.go index 69b772b..d6af34a 100644 --- a/dtmsvr/svr.go +++ b/dtmsvr/main.go @@ -15,5 +15,5 @@ func StartSvr() { app := common.GetGinApp() AddRoute(app) logrus.Printf("dtmsvr listen at: 8080") - app.Run() + app.Run(":8080") } diff --git a/dtmsvr/service.go b/dtmsvr/service.go index 425d5a4..bafb8dc 100644 --- a/dtmsvr/service.go +++ b/dtmsvr/service.go @@ -1,12 +1,10 @@ package dtmsvr import ( - "encoding/json" "fmt" "strings" "time" - "github.com/gin-gonic/gin" "github.com/sirupsen/logrus" "github.com/yedf/dtm/common" "github.com/yedf/dtm/dtm" @@ -14,60 +12,22 @@ import ( "gorm.io/gorm/clause" ) -func AddRoute(engine *gin.Engine) { - engine.POST("/api/dtmsvr/prepare", Prepare) - engine.POST("/api/dtmsvr/commit", Commit) -} - -func getSagaModelFromContext(c *gin.Context) *SagaModel { - data := M{} - b, err := c.GetRawData() - common.PanicIfError(err) - common.MustUnmarshal(b, &data) - logrus.Printf("creating saga model in prepare") - data["steps"] = common.MustMarshalString(data["steps"]) - m := SagaModel{} - common.MustRemarshal(data, &m) - return &m -} - -func Prepare(c *gin.Context) { - db := DbGet() - m := getSagaModelFromContext(c) - m.Status = "prepared" - writeTransLog(m.Gid, "save prepared", m.Status, -1, m.Steps) - db1 := db.Clauses(clause.OnConflict{ - DoNothing: true, - }).Create(&m) - common.PanicIfError(db1.Error) - c.JSON(200, M{"message": "SUCCESS"}) -} - -func Commit(c *gin.Context) { - m := getSagaModelFromContext(c) - saveCommitedSagaModel(m) - go ProcessCommitedSaga(m.Gid) - c.JSON(200, M{"message": "SUCCESS"}) -} - func saveCommitedSagaModel(m *SagaModel) { db := DbGet() m.Status = "commited" - stepInserted := false - err := db.Transaction(func(db *gorm.DB) error { + err := db.Transaction(func(db1 *gorm.DB) error { + db := &MyDb{DB: db1} writeTransLog(m.Gid, "save commited", m.Status, -1, m.Steps) - dbr := db.Clauses(clause.OnConflict{ + dbr := db.Must().Clauses(clause.OnConflict{ DoNothing: true, }).Create(&m) - if dbr.Error == nil && dbr.RowsAffected == 0 { + if dbr.RowsAffected == 0 { writeTransLog(m.Gid, "change status", m.Status, -1, "") - dbr = db.Model(&m).Where("status=?", "prepared").Update("status", "commited") + db.Must().Model(&m).Where("status=?", "prepared").Update("status", "commited") } - common.PanicIfError(dbr.Error) nsteps := []SagaStepModel{} steps := []M{} - err := json.Unmarshal([]byte(m.Steps), &steps) - common.PanicIfError(err) + common.MustUnmarshalString(m.Steps, &steps) for _, step := range steps { nsteps = append(nsteps, SagaStepModel{ Gid: m.Gid, @@ -87,22 +47,12 @@ func saveCommitedSagaModel(m *SagaModel) { }) } writeTransLog(m.Gid, "save steps", m.Status, -1, common.MustMarshalString(nsteps)) - r := db.Clauses(clause.OnConflict{ + db.Must().Clauses(clause.OnConflict{ DoNothing: true, }).Create(&nsteps) - if db.Error != nil { - return db.Error - } - if r.RowsAffected == int64(len(nsteps)) { - stepInserted = true - } - logrus.Printf("rows affected: %d nsteps length: %d, stepInersted: %t", r.RowsAffected, int64(len(nsteps)), stepInserted) - return db.Error + return nil }) common.PanicIfError(err) - if !stepInserted { - return - } } var SagaProcessedTestChan chan string = nil // 用于测试时,通知处理结束 @@ -124,20 +74,16 @@ func ProcessCommitedSaga(gid string) { SagaProcessedTestChan <- gid } } +func checkAffected(db1 *gorm.DB) { + if db1.RowsAffected == 0 { + panic(fmt.Errorf("duplicate updating")) + } +} func innerProcessCommitedSaga(gid string) (rerr error) { steps := []SagaStepModel{} db := DbGet() - db1 := db.Order("id asc").Find(&steps) - if db1.Error != nil { - return db1.Error - } - checkAffected := func(db1 *gorm.DB) { - common.PanicIfError(db1.Error) - if db1.RowsAffected == 0 { - panic(fmt.Errorf("duplicate updating")) - } - } + db.Must().Order("id asc").Find(&steps) current := 0 // 当前正在处理的步骤 for ; current < len(steps); current++ { step := steps[current] @@ -150,16 +96,17 @@ func innerProcessCommitedSaga(gid string) (rerr error) { return err } body := resp.String() + db.Must().Model(&SagaModel{}).Where("gid=?", gid).Update("gid", gid) // 更新update_time,避免被定时任务再次 if strings.Contains(body, "SUCCESS") { writeTransLog(gid, "step finished", "finished", step.Step, "") - dbr := db.Model(&step).Where("status=?", "pending").Updates(M{ + dbr := db.Must().Model(&step).Where("status=?", "pending").Updates(M{ "status": "finished", "finish_time": time.Now(), }) checkAffected(dbr) } else if strings.Contains(body, "FAIL") { writeTransLog(gid, "step rollbacked", "rollbacked", step.Step, "") - dbr := db.Model(&step).Where("status=?", "pending").Updates(M{ + dbr := db.Must().Model(&step).Where("status=?", "pending").Updates(M{ "status": "rollbacked", "rollback_time": time.Now(), }) @@ -172,7 +119,7 @@ func innerProcessCommitedSaga(gid string) (rerr error) { } if current == len(steps) { // saga 事务完成 writeTransLog(gid, "saga finished", "finished", -1, "") - dbr := db.Model(&SagaModel{}).Where("gid=? and status=?", gid, "commited").Updates(M{ + dbr := db.Must().Model(&SagaModel{}).Where("gid=? and status=?", gid, "commited").Updates(M{ "status": "finished", "finish_time": time.Now(), }) @@ -191,7 +138,7 @@ func innerProcessCommitedSaga(gid string) (rerr error) { body := resp.String() if strings.Contains(body, "SUCCESS") { writeTransLog(gid, "step rollbacked", "rollbacked", step.Step, "") - dbr := db.Model(&step).Where("status=?", step.Status).Updates(M{ + dbr := db.Must().Model(&step).Where("status=?", step.Status).Updates(M{ "status": "rollbacked", "rollback_time": time.Now(), }) @@ -204,7 +151,7 @@ func innerProcessCommitedSaga(gid string) (rerr error) { return fmt.Errorf("saga current not -1") } writeTransLog(gid, "saga rollbacked", "rollbacked", -1, "") - dbr := db.Model(&SagaModel{}).Where("status=? and gid=?", "commited", gid).Updates(M{ + dbr := db.Must().Model(&SagaModel{}).Where("status=? and gid=?", "commited", gid).Updates(M{ "status": "rollbacked", "rollback_time": time.Now(), }) diff --git a/dtmsvr/consumer.go b/dtmsvr/types.go similarity index 95% rename from dtmsvr/consumer.go rename to dtmsvr/types.go index dc57abb..bcf4bef 100644 --- a/dtmsvr/consumer.go +++ b/dtmsvr/types.go @@ -38,3 +38,5 @@ type SagaStepModel struct { func (*SagaStepModel) TableName() string { return "test1.a_saga_step" } + +type M = map[string]interface{} diff --git a/examples/service.go b/examples/api.go similarity index 100% rename from examples/service.go rename to examples/api.go diff --git a/examples/main.go b/examples/main.go index d7b1297..9c5a9fa 100644 --- a/examples/main.go +++ b/examples/main.go @@ -26,9 +26,11 @@ func FireRequest() { saga.Add(Busi+"/TransIn", Busi+"/TransInCompensate", req) saga.Add(Busi+"/TransOut", Busi+"/TransOutCompensate", req) - saga.Prepare() + err := saga.Prepare() + common.PanicIfError(err) logrus.Printf("busi trans commit") - saga.Commit() + err = saga.Commit() + common.PanicIfError(err) } func StartSvr() { diff --git a/main.go b/main.go index adf274e..9be05a1 100644 --- a/main.go +++ b/main.go @@ -12,6 +12,6 @@ type M = map[string]interface{} func main() { dtmsvr.LoadConfig() go dtmsvr.StartSvr() - go examples.Main() + go examples.StartSvr() time.Sleep(1000 * 1000 * 1000 * 1000) } diff --git a/size_coverage.out b/size_coverage.out new file mode 100644 index 0000000..7def9a3 --- /dev/null +++ b/size_coverage.out @@ -0,0 +1,90 @@ +mode: set +github.com/yedf/dtm/dtmsvr/types.go:22.38,24.2 1 1 +github.com/yedf/dtm/dtmsvr/types.go:38.42,40.2 1 1 +github.com/yedf/dtm/dtmsvr/api.go:10.35,13.2 2 1 +github.com/yedf/dtm/dtmsvr/api.go:15.51,24.2 6 1 +github.com/yedf/dtm/dtmsvr/api.go:26.50,31.2 4 1 +github.com/yedf/dtm/dtmsvr/api.go:33.57,43.2 9 1 +github.com/yedf/dtm/dtmsvr/config.go:21.65,23.25 2 1 +github.com/yedf/dtm/dtmsvr/config.go:26.2,30.20 5 1 +github.com/yedf/dtm/dtmsvr/config.go:36.2,37.23 2 1 +github.com/yedf/dtm/dtmsvr/config.go:23.25,25.3 1 1 +github.com/yedf/dtm/dtmsvr/config.go:30.20,32.36 2 1 +github.com/yedf/dtm/dtmsvr/config.go:32.36,33.9 1 1 +github.com/yedf/dtm/dtmsvr/config.go:42.19,43.18 1 1 +github.com/yedf/dtm/dtmsvr/config.go:46.2,51.26 6 1 +github.com/yedf/dtm/dtmsvr/config.go:43.18,45.3 1 1 +github.com/yedf/dtm/dtmsvr/cron.go:13.45,18.18 5 1 +github.com/yedf/dtm/dtmsvr/cron.go:21.2,21.24 1 1 +github.com/yedf/dtm/dtmsvr/cron.go:18.18,20.3 1 0 +github.com/yedf/dtm/dtmsvr/cron.go:21.24,27.37 6 1 +github.com/yedf/dtm/dtmsvr/cron.go:27.37,30.4 2 1 +github.com/yedf/dtm/dtmsvr/cron.go:30.9,30.47 1 1 +github.com/yedf/dtm/dtmsvr/cron.go:30.47,33.4 2 1 +github.com/yedf/dtm/dtmsvr/cron.go:37.21,38.6 1 0 +github.com/yedf/dtm/dtmsvr/cron.go:38.6,41.3 2 0 +github.com/yedf/dtm/dtmsvr/cron.go:44.45,49.18 5 1 +github.com/yedf/dtm/dtmsvr/cron.go:52.2,52.24 1 1 +github.com/yedf/dtm/dtmsvr/cron.go:49.18,51.3 1 0 +github.com/yedf/dtm/dtmsvr/cron.go:52.24,56.3 3 1 +github.com/yedf/dtm/dtmsvr/cron.go:59.21,60.6 1 0 +github.com/yedf/dtm/dtmsvr/cron.go:60.6,63.3 2 0 +github.com/yedf/dtm/dtmsvr/cron.go:66.20,67.34 1 1 +github.com/yedf/dtm/dtmsvr/cron.go:67.34,69.3 1 1 +github.com/yedf/dtm/dtmsvr/db.go:21.29,24.2 2 1 +github.com/yedf/dtm/dtmsvr/db.go:26.31,29.2 2 1 +github.com/yedf/dtm/dtmsvr/db.go:31.20,33.15 2 1 +github.com/yedf/dtm/dtmsvr/db.go:44.2,44.22 1 1 +github.com/yedf/dtm/dtmsvr/db.go:33.15,43.3 7 1 +github.com/yedf/dtm/dtmsvr/db.go:47.87,49.18 2 1 +github.com/yedf/dtm/dtmsvr/db.go:52.2,58.4 1 1 +github.com/yedf/dtm/dtmsvr/db.go:49.18,51.3 1 1 +github.com/yedf/dtm/dtmsvr/db.go:63.38,65.2 1 1 +github.com/yedf/dtm/dtmsvr/db.go:67.60,68.30 1 1 +github.com/yedf/dtm/dtmsvr/db.go:72.2,72.29 1 1 +github.com/yedf/dtm/dtmsvr/db.go:83.2,102.8 16 1 +github.com/yedf/dtm/dtmsvr/db.go:68.30,70.3 1 1 +github.com/yedf/dtm/dtmsvr/db.go:72.29,76.58 4 1 +github.com/yedf/dtm/dtmsvr/db.go:76.58,77.61 1 1 +github.com/yedf/dtm/dtmsvr/db.go:77.61,78.20 1 0 +github.com/yedf/dtm/dtmsvr/main.go:8.13,11.2 2 0 +github.com/yedf/dtm/dtmsvr/main.go:13.17,19.2 5 1 +github.com/yedf/dtm/dtmsvr/service.go:15.42,18.49 3 1 +github.com/yedf/dtm/dtmsvr/service.go:55.2,55.26 1 1 +github.com/yedf/dtm/dtmsvr/service.go:18.49,24.28 4 1 +github.com/yedf/dtm/dtmsvr/service.go:28.3,31.30 4 1 +github.com/yedf/dtm/dtmsvr/service.go:49.3,53.13 3 1 +github.com/yedf/dtm/dtmsvr/service.go:24.28,27.4 2 1 +github.com/yedf/dtm/dtmsvr/service.go:31.30,48.4 2 1 +github.com/yedf/dtm/dtmsvr/service.go:60.35,62.16 2 1 +github.com/yedf/dtm/dtmsvr/service.go:62.16,65.3 2 0 +github.com/yedf/dtm/dtmsvr/service.go:68.38,70.16 2 1 +github.com/yedf/dtm/dtmsvr/service.go:73.2,73.34 1 1 +github.com/yedf/dtm/dtmsvr/service.go:70.16,72.3 1 1 +github.com/yedf/dtm/dtmsvr/service.go:73.34,75.3 1 1 +github.com/yedf/dtm/dtmsvr/service.go:77.34,78.27 1 1 +github.com/yedf/dtm/dtmsvr/service.go:78.27,79.42 1 1 +github.com/yedf/dtm/dtmsvr/service.go:83.56,88.40 5 1 +github.com/yedf/dtm/dtmsvr/service.go:120.2,120.27 1 1 +github.com/yedf/dtm/dtmsvr/service.go:129.2,129.53 1 1 +github.com/yedf/dtm/dtmsvr/service.go:150.2,150.19 1 1 +github.com/yedf/dtm/dtmsvr/service.go:153.2,159.12 4 1 +github.com/yedf/dtm/dtmsvr/service.go:88.40,90.114 2 1 +github.com/yedf/dtm/dtmsvr/service.go:93.3,93.56 1 1 +github.com/yedf/dtm/dtmsvr/service.go:90.114,91.12 1 1 +github.com/yedf/dtm/dtmsvr/service.go:93.56,95.18 2 1 +github.com/yedf/dtm/dtmsvr/service.go:98.4,100.41 3 1 +github.com/yedf/dtm/dtmsvr/service.go:95.18,97.5 1 0 +github.com/yedf/dtm/dtmsvr/service.go:100.41,107.5 3 1 +github.com/yedf/dtm/dtmsvr/service.go:107.10,107.45 1 1 +github.com/yedf/dtm/dtmsvr/service.go:107.45,114.10 4 1 +github.com/yedf/dtm/dtmsvr/service.go:115.10,117.5 1 1 +github.com/yedf/dtm/dtmsvr/service.go:120.27,128.3 4 1 +github.com/yedf/dtm/dtmsvr/service.go:129.53,131.60 2 1 +github.com/yedf/dtm/dtmsvr/service.go:134.3,135.17 2 1 +github.com/yedf/dtm/dtmsvr/service.go:138.3,139.40 2 1 +github.com/yedf/dtm/dtmsvr/service.go:131.60,132.12 1 1 +github.com/yedf/dtm/dtmsvr/service.go:135.17,137.4 1 0 +github.com/yedf/dtm/dtmsvr/service.go:139.40,146.4 3 1 +github.com/yedf/dtm/dtmsvr/service.go:146.9,148.4 1 0 +github.com/yedf/dtm/dtmsvr/service.go:150.19,152.3 1 0