Browse Source

dtmcli barrier ok

pull/303/head
yedf2 4 years ago
parent
commit
573c880857
  1. 2
      dtmcli/dtmimp/db_special_test.go
  2. 4
      dtmcli/dtmimp/trans_xa_base.go
  3. 14
      helper/bench/svr/http.go
  4. 3
      test/busi/base_http.go
  5. 4
      test/xa_test.go

2
dtmcli/dtmimp/db_special_test.go

@ -24,7 +24,7 @@ func TestDBSpecial(t *testing.T) {
assert.Equal(t, "xa start 'xa1'", sp.GetXaSQL("start", "xa1"))
assert.Equal(t, "insert ignore into a(f) values(?)", sp.GetInsertIgnoreTemplate("a(f) values(?)", "c"))
SetCurrentDBType(DBTypePostgres)
sp = GetDBSpecial(DBTypeMysql)
sp = GetDBSpecial(DBTypePostgres)
assert.Equal(t, "$1 $2", sp.GetPlaceHoldSQL("? ?"))
assert.Equal(t, "begin", sp.GetXaSQL("start", "xa1"))
assert.Equal(t, "insert into a(f) values(?) on conflict ON CONSTRAINT c do nothing", sp.GetInsertIgnoreTemplate("a(f) values(?)", "c"))

4
dtmcli/dtmimp/trans_xa_base.go

@ -25,7 +25,7 @@ func XaHandlePhase2(gid string, dbConf DBConf, branchID string, op string) error
}
if op == OpRollback && err == nil {
// rollback insert a row after prepare. no-error means prepare has finished.
_, err = InsertBarrier(db, "xa", gid, branchID, OpAction, XaBarrier1, op, dbConf.Driver, "TODO")
_, err = InsertBarrier(db, "xa", gid, branchID, OpAction, XaBarrier1, op, dbConf.Driver, "")
}
return err
}
@ -52,7 +52,7 @@ func XaHandleLocalTrans(xa *TransBase, dbConf DBConf, cb func(*sql.DB) error) (r
_, _ = DBExec(dbConf.Driver, db, GetDBSpecial(dbConf.Driver).GetXaSQL("end", xaBranch))
}()
// prepare and rollback both insert a row
_, rerr = InsertBarrier(db, xa.TransType, xa.Gid, xa.BranchID, OpAction, XaBarrier1, OpAction, dbConf.Driver, "TODO")
_, rerr = InsertBarrier(db, xa.TransType, xa.Gid, xa.BranchID, OpAction, XaBarrier1, OpAction, dbConf.Driver, "")
if rerr == nil {
rerr = cb(db)
}

14
helper/bench/svr/http.go

@ -53,7 +53,7 @@ func reloadData() {
db := pdbGet()
tables := []string{"dtm_busi.user_account", "dtm_busi.user_account_log", "dtm.trans_global", "dtm.trans_branch_op", "dtm_barrier.barrier"}
for _, t := range tables {
_, err := dtmimp.DBExec(db, fmt.Sprintf("truncate %s", t))
_, err := dtmimp.DBExec(busi.BusiConf.Driver, db, fmt.Sprintf("truncate %s", t))
logger.FatalIfError(err)
}
s := "insert ignore into dtm_busi.user_account(user_id, balance) values "
@ -61,7 +61,7 @@ func reloadData() {
for i := 1; i <= total; i++ {
ss = append(ss, fmt.Sprintf("(%d, 1000000)", i))
}
_, err := dtmimp.DBExec(db, s+strings.Join(ss, ","))
_, err := dtmimp.DBExec(busi.BusiConf.Driver, db, s+strings.Join(ss, ","))
logger.FatalIfError(err)
logger.Debugf("%d users inserted. used: %dms", total, time.Since(began).Milliseconds())
}
@ -73,11 +73,11 @@ var sqls = 1
// PrepareBenchDB prepares db data for bench
func PrepareBenchDB() {
db := pdbGet()
_, err := dtmimp.DBExec(db, "CREATE DATABASE if not exists dtm_busi")
_, err := dtmimp.DBExec(busi.BusiConf.Driver, db, "CREATE DATABASE if not exists dtm_busi")
logger.FatalIfError(err)
_, err = dtmimp.DBExec(db, "drop table if exists dtm_busi.user_account_log")
_, err = dtmimp.DBExec(busi.BusiConf.Driver, db, "drop table if exists dtm_busi.user_account_log")
logger.FatalIfError(err)
_, err = dtmimp.DBExec(db, `create table if not exists dtm_busi.user_account_log (
_, err = dtmimp.DBExec(busi.BusiConf.Driver, db, `create table if not exists dtm_busi.user_account_log (
id INT(11) AUTO_INCREMENT PRIMARY KEY,
user_id INT(11) NOT NULL,
delta DECIMAL(11, 2) not null,
@ -111,10 +111,10 @@ func qsAdjustBalance(uid int, amount int, c *gin.Context) error { // nolint: unp
tb := dtmimp.TransBaseFromQuery(c.Request.URL.Query())
f := func(tx *sql.Tx) error {
for i := 0; i < sqls; i++ {
_, err := dtmimp.DBExec(tx, "insert into dtm_busi.user_account_log(user_id, delta, gid, branch_id, op, reason) values(?,?,?,?,?,?)",
_, err := dtmimp.DBExec(busi.BusiConf.Driver, tx, "insert into dtm_busi.user_account_log(user_id, delta, gid, branch_id, op, reason) values(?,?,?,?,?,?)",
uid, amount, tb.Gid, c.Query("branch_id"), tb.TransType, fmt.Sprintf("inserted by dtm transaction %s %s", tb.Gid, c.Query("branch_id")))
logger.FatalIfError(err)
_, err = dtmimp.DBExec(tx, "update dtm_busi.user_account set balance = balance + ?, update_time = now() where user_id = ?", amount, uid)
_, err = dtmimp.DBExec(busi.BusiConf.Driver, tx, "update dtm_busi.user_account set balance = balance + ?, update_time = now() where user_id = ?", amount, uid)
logger.FatalIfError(err)
}
return nil

3
test/busi/base_http.go

@ -69,7 +69,8 @@ func BaseAppStartup() *gin.Engine {
}
logger.Debugf("Starting busi at: %d", BusiPort)
go func() {
_ = app.Run(fmt.Sprintf(":%d", BusiPort))
err := app.Run(fmt.Sprintf(":%d", BusiPort))
dtmimp.FatalIfError(err)
}()
return app
}

4
test/xa_test.go

@ -43,10 +43,10 @@ func TestXaDuplicate(t *testing.T) {
sdb, err := dtmimp.StandaloneDB(busi.BusiConf)
assert.Nil(t, err)
if dtmcli.GetCurrentDBType() == dtmcli.DBTypeMysql {
_, err = dtmimp.DBExec(conf.Store.Driver, sdb, "xa recover")
_, err = dtmimp.DBExec(busi.BusiConf.Driver, sdb, "xa recover")
assert.Nil(t, err)
}
_, err = dtmimp.DBExec(conf.Store.Driver, sdb, dtmimp.GetDBSpecial(conf.Store.Driver).GetXaSQL("commit", gid+"-01")) // simulate repeated request
_, err = dtmimp.DBExec(busi.BusiConf.Driver, sdb, dtmimp.GetDBSpecial(busi.BusiConf.Driver).GetXaSQL("commit", gid+"-01")) // simulate repeated request
assert.Nil(t, err)
return xa.CallBranch(req, busi.Busi+"/TransInXa")
})

Loading…
Cancel
Save