mirror of https://github.com/dtm-labs/dtm.git
csharpjavadistributed-transactionsdtmgogolangmicroservicenodejsphpdatabasesagaseatatcctransactiontransactionsxapythondistributed
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
169 lines
5.3 KiB
169 lines
5.3 KiB
/*
|
|
* Copyright (c) 2021 yedf. All rights reserved.
|
|
* Use of this source code is governed by a BSD-style
|
|
* license that can be found in the LICENSE file.
|
|
*/
|
|
|
|
package bench
|
|
|
|
import (
|
|
"database/sql"
|
|
"fmt"
|
|
"strings"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/gin-gonic/gin"
|
|
"github.com/yedf/dtm/common"
|
|
"github.com/yedf/dtm/dtmcli"
|
|
"github.com/yedf/dtm/dtmcli/dtmimp"
|
|
"github.com/yedf/dtm/dtmsvr"
|
|
"github.com/yedf/dtm/examples"
|
|
)
|
|
|
|
// 启动命令:go run app/main.go qs
|
|
|
|
// 事务参与者的服务地址
|
|
const benchAPI = "/api/busi_bench"
|
|
const benchPort = 8083
|
|
const total = 200000
|
|
|
|
var benchBusi = fmt.Sprintf("http://localhost:%d%s", benchPort, benchAPI)
|
|
|
|
func sdbGet() *sql.DB {
|
|
db, err := dtmimp.PooledDB(common.DtmConfig.DB)
|
|
dtmimp.FatalIfError(err)
|
|
return db
|
|
}
|
|
|
|
func txGet() *sql.Tx {
|
|
db := sdbGet()
|
|
tx, err := db.Begin()
|
|
dtmimp.FatalIfError(err)
|
|
return tx
|
|
}
|
|
|
|
func reloadData() {
|
|
time.Sleep(dtmsvr.UpdateBranchAsyncInterval * 2)
|
|
began := time.Now()
|
|
db := sdbGet()
|
|
tables := []string{"dtm_busi.user_account", "dtm_busi.user_account_log", "dtm.trans_global", "dtm.trans_branch_op", "dtm_barrier.barrier"}
|
|
for _, t := range tables {
|
|
_, err := dtmimp.DBExec(db, fmt.Sprintf("truncate %s", t))
|
|
dtmimp.FatalIfError(err)
|
|
}
|
|
s := "insert ignore into dtm_busi.user_account(user_id, balance) values "
|
|
ss := []string{}
|
|
for i := 1; i <= total; i++ {
|
|
ss = append(ss, fmt.Sprintf("(%d, 1000000)", i))
|
|
}
|
|
_, err := dtmimp.DBExec(db, s+strings.Join(ss, ","))
|
|
dtmimp.FatalIfError(err)
|
|
dtmimp.Logf("%d users inserted. used: %dms", total, time.Since(began).Milliseconds())
|
|
}
|
|
|
|
var uidCounter int32 = 0
|
|
var mode string = ""
|
|
var sqls int = 1
|
|
|
|
// StartSvr 1
|
|
func StartSvr() {
|
|
app := common.GetGinApp()
|
|
benchAddRoute(app)
|
|
dtmimp.Logf("bench listening at %d", benchPort)
|
|
go app.Run(fmt.Sprintf(":%d", benchPort))
|
|
db := sdbGet()
|
|
_, err := dtmimp.DBExec(db, "drop table if exists dtm_busi.user_account_log")
|
|
dtmimp.FatalIfError(err)
|
|
_, err = dtmimp.DBExec(db, `create table if not exists dtm_busi.user_account_log (
|
|
id INT(11) AUTO_INCREMENT PRIMARY KEY,
|
|
user_id INT(11) NOT NULL,
|
|
delta DECIMAL(11, 2) not null,
|
|
gid varchar(45) not null,
|
|
branch_id varchar(45) not null,
|
|
op varchar(45) not null,
|
|
reason varchar(45),
|
|
create_time datetime not null default now(),
|
|
update_time datetime not null default now(),
|
|
key(user_id),
|
|
key(create_time)
|
|
)
|
|
`)
|
|
dtmimp.FatalIfError(err)
|
|
}
|
|
|
|
func qsAdjustBalance(uid int, amount int, c *gin.Context) (interface{}, error) {
|
|
if strings.Contains(mode, "empty") || sqls == 0 {
|
|
return dtmcli.MapSuccess, nil
|
|
}
|
|
tb := dtmimp.TransBaseFromQuery(c.Request.URL.Query())
|
|
f := func(tx dtmcli.DB) error {
|
|
for i := 0; i < sqls; i++ {
|
|
_, err := dtmimp.DBExec(tx, "insert into dtm_busi.user_account_log(user_id, delta, gid, branch_id, op, reason) values(?,?,?,?,?,?)",
|
|
uid, amount, tb.Gid, c.Query("branch_id"), tb.TransType, fmt.Sprintf("inserted by dtm transaction %s %s", tb.Gid, c.Query("branch_id")))
|
|
dtmimp.FatalIfError(err)
|
|
_, err = dtmimp.DBExec(tx, "update dtm_busi.user_account set balance = balance + ?, update_time = now() where user_id = ?", amount, uid)
|
|
dtmimp.FatalIfError(err)
|
|
}
|
|
return nil
|
|
}
|
|
if strings.Contains(mode, "barrier") {
|
|
barrier, err := dtmcli.BarrierFromQuery(c.Request.URL.Query())
|
|
dtmimp.FatalIfError(err)
|
|
barrier.Call(txGet(), f)
|
|
} else {
|
|
tx := txGet()
|
|
f(tx)
|
|
err := tx.Commit()
|
|
dtmimp.FatalIfError(err)
|
|
}
|
|
|
|
return dtmcli.MapSuccess, nil
|
|
}
|
|
|
|
func benchAddRoute(app *gin.Engine) {
|
|
app.POST(benchAPI+"/TransIn", common.WrapHandler(func(c *gin.Context) (interface{}, error) {
|
|
return qsAdjustBalance(dtmimp.MustAtoi(c.Query("uid")), 1, c)
|
|
}))
|
|
app.POST(benchAPI+"/TransInCompensate", common.WrapHandler(func(c *gin.Context) (interface{}, error) {
|
|
return qsAdjustBalance(dtmimp.MustAtoi(c.Query("uid")), -1, c)
|
|
}))
|
|
app.POST(benchAPI+"/TransOut", common.WrapHandler(func(c *gin.Context) (interface{}, error) {
|
|
return qsAdjustBalance(dtmimp.MustAtoi(c.Query("uid")), -1, c)
|
|
}))
|
|
app.POST(benchAPI+"/TransOutCompensate", common.WrapHandler(func(c *gin.Context) (interface{}, error) {
|
|
return qsAdjustBalance(dtmimp.MustAtoi(c.Query("uid")), 30, c)
|
|
}))
|
|
app.Any(benchAPI+"/reloadData", common.WrapHandler(func(c *gin.Context) (interface{}, error) {
|
|
reloadData()
|
|
mode = c.Query("m")
|
|
s := c.Query("sqls")
|
|
if s != "" {
|
|
sqls = dtmimp.MustAtoi(s)
|
|
}
|
|
return nil, nil
|
|
}))
|
|
app.Any(benchAPI+"/bench", common.WrapHandler(func(c *gin.Context) (interface{}, error) {
|
|
uid := (atomic.AddInt32(&uidCounter, 1)-1)%total + 1
|
|
suid := fmt.Sprintf("%d", uid)
|
|
suid2 := fmt.Sprintf("%d", total+1-uid)
|
|
req := gin.H{}
|
|
params := fmt.Sprintf("?uid=%s", suid)
|
|
params2 := fmt.Sprintf("?uid=%s", suid2)
|
|
dtmimp.Logf("mode: %s contains dtm: %t", mode, strings.Contains(mode, "dtm"))
|
|
if strings.Contains(mode, "dtm") {
|
|
saga := dtmcli.NewSaga(examples.DtmHttpServer, fmt.Sprintf("bench-%d", uid)).
|
|
Add(benchBusi+"/TransOut"+params, benchBusi+"/TransOutCompensate"+params, req).
|
|
Add(benchBusi+"/TransIn"+params2, benchBusi+"/TransInCompensate"+params2, req)
|
|
saga.WaitResult = true
|
|
err := saga.Submit()
|
|
dtmimp.E2P(err)
|
|
} else {
|
|
_, err := dtmimp.RestyClient.R().SetBody(gin.H{}).SetQueryParam("uid", suid2).Post(benchBusi + "/TransOut")
|
|
dtmimp.E2P(err)
|
|
_, err = dtmimp.RestyClient.R().SetBody(gin.H{}).SetQueryParam("uid", suid).Post(benchBusi + "/TransIn")
|
|
dtmimp.E2P(err)
|
|
}
|
|
return nil, nil
|
|
}))
|
|
}
|
|
|