Browse Source

ready to run bench

pull/39/head
yedf2 5 years ago
parent
commit
8eda41849d
  1. 2
      .travis.yml
  2. 3
      app/main.go
  3. 70
      bench/http.go
  4. 17
      bench/run.sh
  5. 4
      common/types.go
  6. 38
      dtmsvr/dtmsvr.go
  7. 2
      dtmsvr/dtmsvr.mysql.sql
  8. 18
      dtmsvr/trans.go
  9. 6
      dtmsvr/utils.go
  10. 44
      test/dtmsvr_test.go
  11. 41
      test/main_test.go

2
.travis.yml

@ -14,4 +14,4 @@ before_install:
- go get -t -v ./...
- go get github.com/mattn/goveralls
script:
- $GOPATH/bin/goveralls -service=travis-ci -ignore="examples/*,dtmgrpc/*.pb.go"
- $GOPATH/bin/goveralls -service=travis-ci -ignore="examples/*,dtmgrpc/*.pb.go,bench/*"

3
app/main.go

@ -18,7 +18,8 @@ usage:
Available commands:
dtmsvr run dtm as a server
dev create all needed table and run dtm as a server
dev create all needed table and run dtm as a server
bench start bench server
quick_start run quick start example (dtm will create needed table)
qs same as quick_start

70
bench/http.go

@ -18,7 +18,7 @@ import (
// 事务参与者的服务地址
const benchAPI = "/api/busi_bench"
const benchPort = 8083
const total = 1000000
const total = 200000
var benchBusi = fmt.Sprintf("http://localhost:%d%s", benchPort, benchAPI)
@ -38,41 +38,71 @@ func txGet() *sql.Tx {
func reloadData() {
began := time.Now()
db := sdbGet()
tables := []string{"dtm_busi.user_account", "dtm.trans_global", "dtm.trans_branch"}
_, err := dtmcli.DBExec(db, `create table if not exists dtm_busi.user_account_log (
id INT(11) AUTO_INCREMENT PRIMARY KEY,
user_id INT(11) NOT NULL,
delta DECIMAL(11, 2) not null,
gid varchar(45) not null,
branch_id varchar(45) not null,
reason varchar(45),
create_time datetime not null default now(),
update_time datetime not null default now(),
key(user_id),
key(create_time)
)
`)
dtmcli.FatalIfError(err)
tables := []string{"dtm_busi.user_account", "dtm_busi.user_account_log", "dtm.trans_global", "dtm.trans_branch", "dtm_barrier.barrier"}
for _, t := range tables {
dtmcli.DBExec(db, fmt.Sprintf("truncate %s", t))
_, err = dtmcli.DBExec(db, fmt.Sprintf("truncate %s", t))
dtmcli.FatalIfError(err)
}
s := "insert ignore into dtm_busi.user_account(user_id, balance) values "
ss := []string{}
for i := 1; i <= total; i++ {
ss = append(ss, fmt.Sprintf("(%d, 1000000)", i))
}
db.Exec(s + strings.Join(ss, ","))
_, err = db.Exec(s + strings.Join(ss, ","))
dtmcli.FatalIfError(err)
dtmcli.Logf("%d users inserted. used: %dms", total, time.Since(began).Milliseconds())
}
var uidCounter int32 = 0
var mode string = ""
var sqls int = 1
// StartSvr 1
func StartSvr() {
app := common.GetGinApp()
benchAddRoute(app)
dtmcli.Logf("bench listening at %d", benchPort)
reloadData()
go app.Run(fmt.Sprintf(":%d", benchPort))
time.Sleep(100 * time.Millisecond)
reloadData()
time.Sleep(1100 * time.Millisecond) // sleep 1 second for async branch status update to finish
}
func qsAdjustBalance(uid int, amount int) (interface{}, error) {
func qsAdjustBalance(uid int, amount int, c *gin.Context) (interface{}, error) {
if strings.Contains(mode, "empty") {
return dtmcli.MapSuccess, nil
} else {
tx := txGet()
for i := 0; i < 5; i++ {
_, err := dtmcli.DBExec(tx, "update dtm_busi.user_account set balance = balance + ? where user_id = ?", amount, uid)
}
tb := dtmcli.TransBaseFromQuery(c.Request.URL.Query())
f := func(tx dtmcli.DB) error {
for i := 0; i < sqls; i++ {
_, err := dtmcli.DBExec(tx, "insert into dtm_busi.user_account_log(user_id, delta, gid, branch_id, branch_type, reason) values(?,?,?,?,?,?)",
uid, amount, tb.Gid, c.Query("branch_id"), tb.TransType, fmt.Sprintf("inserted by dtm transaction %s %s", tb.Gid, c.Query("branch_id")))
dtmcli.FatalIfError(err)
_, err = dtmcli.DBExec(tx, "update dtm_busi.user_account set balance = balance + ?, update_time = now() where user_id = ?", amount, uid)
dtmcli.FatalIfError(err)
}
return nil
}
if strings.Contains(mode, "barrier") {
barrier, err := dtmcli.BarrierFromQuery(c.Request.URL.Query())
dtmcli.FatalIfError(err)
barrier.Call(txGet(), f)
} else {
tx := txGet()
f(tx)
err := tx.Commit()
dtmcli.FatalIfError(err)
}
@ -82,20 +112,24 @@ func qsAdjustBalance(uid int, amount int) (interface{}, error) {
func benchAddRoute(app *gin.Engine) {
app.POST(benchAPI+"/TransIn", common.WrapHandler(func(c *gin.Context) (interface{}, error) {
return qsAdjustBalance(dtmcli.MustAtoi(c.Query("uid")), 1)
return qsAdjustBalance(dtmcli.MustAtoi(c.Query("uid")), 1, c)
}))
app.POST(benchAPI+"/TransInCompensate", common.WrapHandler(func(c *gin.Context) (interface{}, error) {
return qsAdjustBalance(dtmcli.MustAtoi(c.Query("uid")), -1)
return qsAdjustBalance(dtmcli.MustAtoi(c.Query("uid")), -1, c)
}))
app.POST(benchAPI+"/TransOut", common.WrapHandler(func(c *gin.Context) (interface{}, error) {
return qsAdjustBalance(dtmcli.MustAtoi(c.Query("uid")), -1)
return qsAdjustBalance(dtmcli.MustAtoi(c.Query("uid")), -1, c)
}))
app.POST(benchAPI+"/TransOutCompensate", common.WrapHandler(func(c *gin.Context) (interface{}, error) {
return qsAdjustBalance(dtmcli.MustAtoi(c.Query("uid")), 30)
return qsAdjustBalance(dtmcli.MustAtoi(c.Query("uid")), 30, c)
}))
app.Any(benchAPI+"/reloadData", common.WrapHandler(func(c *gin.Context) (interface{}, error) {
reloadData()
mode = c.Query("m")
s := c.Query("sqls")
if s != "" {
sqls = dtmcli.MustAtoi(s)
}
return nil, nil
}))
app.Any(benchAPI+"/bench", common.WrapHandler(func(c *gin.Context) (interface{}, error) {
@ -112,12 +146,12 @@ func benchAddRoute(app *gin.Engine) {
Add(benchBusi+"/TransIn"+params2, benchBusi+"/TransInCompensate"+params2, req)
saga.WaitResult = true
err := saga.Submit()
dtmcli.FatalIfError(err)
dtmcli.E2P(err)
} else {
_, err := dtmcli.RestyClient.R().SetBody(gin.H{}).SetQueryParam("uid", suid2).Post(benchBusi + "/TransOut")
dtmcli.FatalIfError(err)
dtmcli.E2P(err)
_, err = dtmcli.RestyClient.R().SetBody(gin.H{}).SetQueryParam("uid", suid).Post(benchBusi + "/TransIn")
dtmcli.FatalIfError(err)
dtmcli.E2P(err)
}
return nil, nil
}))

17
bench/run.sh

@ -2,14 +2,17 @@
# go run ../app/main.go
set -x
TIME=1
CURRENT=10
curl "http://127.0.0.1:8083/api/busi_bench/reloadData?m=raw_empty" && ab -t $TIME -c $CURRENT "http://127.0.0.1:8083/api/busi_bench/bench"
curl "http://127.0.0.1:8083/api/busi_bench/reloadData?m=raw_tx" && ab -t $TIME -c $CURRENT "http://127.0.0.1:8083/api/busi_bench/bench"
curl "http://127.0.0.1:8083/api/busi_bench/reloadData?m=dtm_empty" && ab -t $TIME -c $CURRENT "http://127.0.0.1:8083/api/busi_bench/bench"
curl "http://127.0.0.1:8083/api/busi_bench/reloadData?m=dtm_tx" && ab -t $TIME -c $CURRENT "http://127.0.0.1:8083/api/busi_bench/bench"
export TIME=10
export CONCURRENT=20
curl "http://127.0.0.1:8083/api/busi_bench/reloadData?m=dtm_tx&sqls=5" && ab -t $TIME -c $CONCURRENT "http://127.0.0.1:8083/api/busi_bench/bench"
curl "http://127.0.0.1:8083/api/busi_bench/reloadData?m=dtm_barrier&sqls=5" && ab -t $TIME -c $CONCURRENT "http://127.0.0.1:8083/api/busi_bench/bench"
curl "http://127.0.0.1:8083/api/busi_bench/reloadData?m=raw_tx&sqls=5" && ab -t $TIME -c $CONCURRENT "http://127.0.0.1:8083/api/busi_bench/bench"
curl "http://127.0.0.1:8083/api/busi_bench/reloadData?m=dtm_tx&sqls=1" && ab -t $TIME -c $CONCURRENT "http://127.0.0.1:8083/api/busi_bench/bench"
curl "http://127.0.0.1:8083/api/busi_bench/reloadData?m=dtm_barrier&sqls=1" && ab -t $TIME -c $CONCURRENT "http://127.0.0.1:8083/api/busi_bench/bench"
curl "http://127.0.0.1:8083/api/busi_bench/reloadData?m=raw_tx&sqls=1" && ab -t $TIME -c $CONCURRENT "http://127.0.0.1:8083/api/busi_bench/bench"
curl "http://127.0.0.1:8083/api/busi_bench/reloadData?m=raw_empty" && ab -t $TIME -c $CONCURRENT "http://127.0.0.1:8083/api/busi_bench/bench"
# curl "http://127.0.0.1:8083/api/busi_bench/reloadData?m=raw_empty" && curl "http://127.0.0.1:8083/api/busi_bench/bench"
# curl "http://127.0.0.1:8083/api/busi_bench/reloadData?m=raw_tx" && curl "http://127.0.0.1:8083/api/busi_bench/bench"
# curl "http://127.0.0.1:8083/api/busi_bench/reloadData?m=dtm_empty" && curl "http://127.0.0.1:8083/api/busi_bench/bench"
# curl "http://127.0.0.1:8083/api/busi_bench/reloadData?m=dtm_tx" && curl "http://127.0.0.1:8083/api/busi_bench/bench"
# curl "http://127.0.0.1:8083/api/busi_bench/reloadData?m=dtm_barrier" && curl "http://127.0.0.1:8083/api/busi_bench/bench"

4
common/types.go

@ -123,7 +123,7 @@ type dtmConfigType struct {
TransCronInterval int64 `yaml:"TransCronInterval"` // 单位秒 当事务等待这个时间之后,还没有变化,则进行一轮处理,包括prepared中的任务和committed的任务
DB map[string]string `yaml:"DB"`
DisableLocalhost int64 `yaml:"DisableLocalhost"`
RetryLimit int64 `yaml:"RetryLimit"`
UpdateBranchSync int64 `yaml:"UpdateBranchSync"`
}
// DtmConfig 配置
@ -146,7 +146,7 @@ func init() {
"password": os.Getenv("DB_PASSWORD"),
}
DtmConfig.DisableLocalhost = getIntEnv("DISABLE_LOCALHOST", "0")
DtmConfig.RetryLimit = getIntEnv("RETRY_LIMIT", "2000000000")
DtmConfig.UpdateBranchSync = getIntEnv("UPDATE_BRANCH_SYNC", "0")
cont := []byte{}
for d := MustGetwd(); d != "" && d != "/"; d = filepath.Dir(d) {
cont1, err := ioutil.ReadFile(d + "/conf.yml")

38
dtmsvr/dtmsvr.go

@ -8,8 +8,9 @@ import (
"github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/dtmgrpc"
"gorm.io/gorm/clause"
"github.com/grpc-ecosystem/go-grpc-middleware"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
"google.golang.org/grpc"
"github.com/yedf/dtm/examples"
@ -40,6 +41,7 @@ func StartSvr() {
err := s.Serve(lis)
dtmcli.FatalIfError(err)
}()
go updateBranchAsync()
// prometheus exporter
dtmcli.Logf("prometheus exporter listen at: %d", metricsPort)
@ -52,3 +54,37 @@ func PopulateDB(skipDrop bool) {
file := fmt.Sprintf("%s/dtmsvr.%s.sql", common.GetCallerCodeDir(), config.DB["driver"])
examples.RunSQLScript(config.DB, file, skipDrop)
}
// UpdateBranchAsyncInterval unit millisecond
var UpdateBranchAsyncInterval time.Duration = 1000
var updateBranchAsyncChan chan branchStatus = make(chan branchStatus, 1000)
func updateBranchAsync() {
for { // flush branches every second
updates := []TransBranch{}
started := time.Now()
for time.Since(started) < UpdateBranchAsyncInterval*time.Millisecond {
select {
case updateBranch := <-updateBranchAsyncChan:
updates = append(updates, TransBranch{
ModelBase: common.ModelBase{ID: updateBranch.id},
Status: updateBranch.status,
FinishTime: updateBranch.finish_time,
})
case <-time.After(50 * time.Millisecond):
}
}
for len(updates) > 0 {
dbr := dbGet().Clauses(clause.OnConflict{
DoUpdates: clause.AssignmentColumns([]string{"status", "finish_time"}),
}).Create(updates)
dtmcli.Logf("flushed %d branch status to db. affected: %d", len(updates), dbr.RowsAffected)
if dbr.Error != nil {
dtmcli.LogRedf("async update branch status error: %v", dbr.Error)
time.Sleep(1 * time.Second)
} else {
updates = []TransBranch{}
}
}
}
}

2
dtmsvr/dtmsvr.mysql.sql

@ -5,7 +5,7 @@ CREATE TABLE if not EXISTS dtm.trans_global (
`id` int(11) NOT NULL AUTO_INCREMENT,
`gid` varchar(128) NOT NULL COMMENT '事务全局id',
`trans_type` varchar(45) not null COMMENT '事务类型: saga | xa | tcc | msg',
`data` TEXT COMMENT '事务携带的数据',
-- `data` TEXT COMMENT '事务携带的数据', -- 影响性能,不必要存储
`status` varchar(45) NOT NULL COMMENT '全局事务的状态 prepared | submitted | finished | rollbacked',
`query_prepared` varchar(128) NOT NULL COMMENT 'prepared状态事务的查询api',
`protocol` varchar(45) not null comment '通信协议 http | grpc',

18
dtmsvr/trans.go

@ -25,7 +25,7 @@ type TransGlobal struct {
common.ModelBase
Gid string `json:"gid"`
TransType string `json:"trans_type"`
Data string `json:"data"`
Data string `json:"data" gorm:"-"`
Status string `json:"status"`
QueryPrepared string `json:"query_prepared"`
Protocol string `json:"protocol"`
@ -92,13 +92,17 @@ func (*TransBranch) TableName() string {
func (t *TransBranch) changeStatus(db *common.DB, status string) *gorm.DB {
writeTransLog(t.Gid, "branch change", status, t.BranchID, "")
dbr := db.Must().Model(t).Where("status=?", t.Status).Updates(M{
"status": status,
"finish_time": time.Now(),
})
checkAffected(dbr)
if common.DtmConfig.UpdateBranchSync > 0 {
dbr := db.Must().Model(t).Updates(M{
"status": status,
"finish_time": time.Now(),
})
checkAffected(dbr)
} else { // 为了性能优化,把branch的status更新异步化
updateBranchAsyncChan <- branchStatus{id: t.ID, status: status}
}
t.Status = status
return dbr
return db.DB
}
func checkAffected(db1 *gorm.DB) {

6
dtmsvr/utils.go

@ -15,6 +15,12 @@ import (
// M a short name
type M = map[string]interface{}
type branchStatus struct {
id uint
status string
finish_time *time.Time
}
var p2e = dtmcli.P2E
var e2p = dtmcli.E2P

44
test/dtmsvr_test.go

@ -26,35 +26,6 @@ type BarrierModel struct {
// TableName gorm table name
func (BarrierModel) TableName() string { return "dtm_barrier.barrier" }
func resetXaData() {
if config.DB["driver"] != "mysql" {
return
}
db := dbGet()
type XaRow struct {
Data string
}
xas := []XaRow{}
db.Must().Raw("xa recover").Scan(&xas)
for _, xa := range xas {
db.Must().Exec(fmt.Sprintf("xa rollback '%s'", xa.Data))
}
}
func TestMain(m *testing.M) {
dtmsvr.TransProcessedTestChan = make(chan string, 1)
dtmsvr.CronForwardDuration = 60 * time.Second
dtmsvr.PopulateDB(false)
examples.PopulateDB(false)
// 启动组件
go dtmsvr.StartSvr()
examples.GrpcStartup()
app = examples.BaseAppStartup()
resetXaData()
m.Run()
}
func getTransStatus(gid string) string {
sm := TransGlobal{}
dbr := dbGet().Model(&sm).Where("gid=?", gid).First(&sm)
@ -144,3 +115,18 @@ func TestSqlDB(t *testing.T) {
dbr = db.Model(&BarrierModel{}).Where("gid=?", "gid2").Find(&[]BarrierModel{})
asserts.Equal(dbr.RowsAffected, int64(1))
}
func TestUpdateBranchAsync(t *testing.T) {
common.DtmConfig.UpdateBranchSync = 0
dtmsvr.UpdateBranchAsyncInterval = 50
saga := genSaga("gid-update-branch-async", false, false)
saga.WaitResult = true
err := saga.Submit()
assert.Nil(t, err)
WaitTransProcessed(saga.Gid)
time.Sleep(100 * time.Millisecond)
assert.Equal(t, []string{dtmcli.StatusPrepared, dtmcli.StatusSucceed, dtmcli.StatusPrepared, dtmcli.StatusSucceed}, getBranchesStatus(saga.Gid))
assert.Equal(t, dtmcli.StatusSucceed, getTransStatus(saga.Gid))
common.DtmConfig.UpdateBranchSync = 1
dtmsvr.UpdateBranchAsyncInterval = 1000
}

41
test/main_test.go

@ -0,0 +1,41 @@
package test
import (
"fmt"
"testing"
"time"
"github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmsvr"
"github.com/yedf/dtm/examples"
)
func TestMain(m *testing.M) {
dtmsvr.TransProcessedTestChan = make(chan string, 1)
dtmsvr.CronForwardDuration = 60 * time.Second
common.DtmConfig.UpdateBranchSync = 1
dtmsvr.PopulateDB(false)
examples.PopulateDB(false)
// 启动组件
go dtmsvr.StartSvr()
examples.GrpcStartup()
app = examples.BaseAppStartup()
resetXaData()
m.Run()
}
func resetXaData() {
if config.DB["driver"] != "mysql" {
return
}
db := dbGet()
type XaRow struct {
Data string
}
xas := []XaRow{}
db.Must().Raw("xa recover").Scan(&xas)
for _, xa := range xas {
db.Must().Exec(fmt.Sprintf("xa rollback '%s'", xa.Data))
}
}
Loading…
Cancel
Save