Browse Source

Merge pull request #132 from dtm-labs/alpha

redis bench ok
pull/134/head
yedf2 4 years ago
committed by GitHub
parent
commit
269a57340c
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 28
      bench/Makefile
  2. 49
      bench/main.go
  3. 10
      bench/prepare.sh
  4. 8
      bench/run-mysql.sh
  5. 5
      bench/run-services.sh
  6. 6
      bench/setup-redis6.sh
  7. 5
      bench/setup.sh
  8. 27
      bench/svr/http.go
  9. 5
      bench/test-boltdb.sh
  10. 15
      bench/test-mysql.sh
  11. 27
      bench/test-redis.sh
  12. 10
      dtmsvr/api.go
  13. 3
      dtmsvr/cron.go
  14. 3
      dtmsvr/storage/redis/redis.go
  15. 23
      dtmsvr/trans_process.go

28
bench/Makefile

@ -0,0 +1,28 @@
# All targets.
default: bench
bench: /usr/local/bin/go /etc/redis/redis.conf /usr/local/bin/docker-compose main.go
rm -f ../conf.sample.yml
go build -o bench
go: /usr/local/bin/go
redis: /etc/redis/redis.conf
mysql: /usr/local/bin/docker-compose
/usr/local/bin/go:
wget https://golang.org/dl/go1.17.1.linux-amd64.tar.gz
rm -rf /usr/local/go && tar -C /usr/local -xzf go1.17.1.linux-amd64.tar.gz && cp -f /usr/local/go/bin/go /usr/local/bin/go && rm go1.*
/etc/redis/redis.conf:
apt update
apt install -y redis redis-tools
/usr/local/bin/docker-compose:
apt update
apt install -y sysbench apache2-utils mysql-client-core-8.0
curl -fsSL https://get.docker.com | sh
curl -L "https://github.com/docker/compose/releases/download/1.29.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
chmod +x /usr/local/bin/docker-compose
cd .. && docker-compose -f helper/compose.mysql.yml up -d && cd bench

49
bench/main.go

@ -13,31 +13,42 @@ import (
"github.com/dtm-labs/dtm/examples"
)
var hint = `To start the bench server, you need to specify the parameters:
Available commands:
http start bench server
var usage = `bench is a bench test server for dtmf
usage:
redis prepare for redis bench test
db prepare for mysql|postgres bench test
boltdb prepare for boltdb bench test
`
func hintAndExit() {
fmt.Printf(usage)
os.Exit(0)
}
var conf = &common.Config
func main() {
if len(os.Args) <= 1 {
fmt.Printf(hint)
return
hintAndExit()
}
logger.Infof("starting bench server")
common.MustLoadConfig()
logger.InitLog(conf.LogLevel)
if conf.ExamplesDB.Driver != "" {
dtmcli.SetCurrentDBType(conf.ExamplesDB.Driver)
svr.PrepareBenchDB()
}
logger.Infof("starting dtm....")
if os.Args[1] == "http" {
fmt.Println("start bench server")
common.MustLoadConfig()
logger.InitLog(common.Config.LogLevel)
dtmcli.SetCurrentDBType(common.Config.ExamplesDB.Driver)
registry.WaitStoreUp()
dtmsvr.PopulateDB(false)
registry.WaitStoreUp()
dtmsvr.PopulateDB(false)
if os.Args[1] == "db" {
examples.PopulateDB(false)
dtmsvr.StartSvr() // 启动dtmsvr的api服务
go dtmsvr.CronExpiredTrans(-1) // 启动dtmsvr的定时过期查询
svr.StartSvr()
select {}
} else if os.Args[1] == "redis" || os.Args[1] == "boltdb" {
} else {
fmt.Printf(hint)
hintAndExit()
}
dtmsvr.StartSvr() // 启动dtmsvr的api服务
go dtmsvr.CronExpiredTrans(-1) // 启动dtmsvr的定时过期查询
svr.StartSvr() // 启动bench服务
select {}
}

10
bench/prepare.sh

@ -0,0 +1,10 @@
# !/bin/bash
apt update
apt install -y git
git clone https://github.com/dtm-labs/dtm.git && cd dtm && git checkout alpha && cd bench && make
echo 'all prepared. you shoud run following commands to test in different terminal'
echo
echo 'cd dtm && go run bench/main.go redis|boltdb|db'
echo 'cd dtm && bench/run-redis|boltdb|mysql.sh'

8
bench/run-mysql.sh

@ -1,8 +0,0 @@
# !/bin/bash
cd /usr/share/sysbench/
echo 'create database sbtest;' > mysql -h 127.0.0.1 -uroot
sysbench oltp_write_only.lua --time=60 --mysql-host=127.0.0.1 --mysql-port=3306 --mysql-user=root --mysql-password= --mysql-db=sbtest --table-size=1000000 --tables=10 --threads=10 --events=999999999 --report-interval=10 prepare
sysbench oltp_write_only.lua --time=60 --mysql-host=127.0.0.1 --mysql-port=3306 --mysql-user=root --mysql-password= --mysql-db=sbtest --table-size=1000000 --tables=10 --threads=10 --events=999999999 --report-interval=10 run

5
bench/run-services.sh

@ -1,5 +0,0 @@
# !/bin/bash
# start all services
docker-compose -f helper/compose.mysql.yml up -d
go run app/main.go bench > /dev/nul

6
bench/setup-redis6.sh

@ -0,0 +1,6 @@
# !/bin/bash
apt update
apt install -y software-properties-common
add-apt-repository -y ppa:redislabs/redis
apt install -y redis redis-tools

5
bench/setup.sh

@ -1,8 +1,9 @@
# !/bin/bash
# install all commands needed
apt update
apt install -y git sysbench apache2-utils mysql-client-core-8.0
apt install -y sysbench apache2-utils mysql-client-core-8.0 redis redis-tools
# install docker and docker-compose
curl -fsSL https://get.docker.com -o get-docker.sh
@ -13,5 +14,3 @@ chmod +x /usr/local/bin/docker-compose
# install go
wget https://golang.org/dl/go1.17.1.linux-amd64.tar.gz
rm -rf /usr/local/go && tar -C /usr/local -xzf go1.17.1.linux-amd64.tar.gz && cp -f /usr/local/go/bin/go /usr/local/bin/go
apt install -y redis

27
bench/svr/http.go

@ -9,6 +9,7 @@ package svr
import (
"database/sql"
"fmt"
"os"
"strings"
"sync/atomic"
"time"
@ -18,7 +19,6 @@ import (
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmcli/logger"
"github.com/dtm-labs/dtm/dtmsvr"
"github.com/dtm-labs/dtm/examples"
"github.com/gin-gonic/gin"
"github.com/lithammer/shortuuid"
)
@ -27,10 +27,10 @@ import (
// service address of the transcation
const benchAPI = "/api/busi_bench"
const benchPort = 8083
const total = 200000
var benchBusi = fmt.Sprintf("http://localhost:%d%s", benchPort, benchAPI)
var benchPort = dtmimp.If(os.Getenv("BENCH_PORT") == "", "8083", os.Getenv("BENCH_PORT")).(string)
var benchBusi = fmt.Sprintf("http://localhost:%s%s", benchPort, benchAPI)
func sdbGet() *sql.DB {
db, err := dtmimp.PooledDB(common.Config.ExamplesDB)
@ -68,12 +68,7 @@ var uidCounter int32 = 0
var mode string = ""
var sqls int = 1
// StartSvr 1
func StartSvr() {
app := common.GetGinApp()
benchAddRoute(app)
logger.Debugf("bench listening at %d", benchPort)
go app.Run(fmt.Sprintf(":%d", benchPort))
func PrepareBenchDB() {
db := sdbGet()
_, err := dtmimp.DBExec(db, "drop table if exists dtm_busi.user_account_log")
logger.FatalIfError(err)
@ -94,6 +89,14 @@ func StartSvr() {
logger.FatalIfError(err)
}
// StartSvr 1
func StartSvr() {
app := common.GetGinApp()
benchAddRoute(app)
logger.Debugf("bench listening at %d", benchPort)
go app.Run(fmt.Sprintf(":%s", benchPort))
}
func qsAdjustBalance(uid int, amount int, c *gin.Context) (interface{}, error) {
if strings.Contains(mode, "empty") || sqls == 0 {
return dtmcli.MapSuccess, nil
@ -124,6 +127,7 @@ func qsAdjustBalance(uid int, amount int, c *gin.Context) (interface{}, error) {
}
func benchAddRoute(app *gin.Engine) {
dtmHttpServer := fmt.Sprintf("http://localhost:%d/api/dtmsvr", common.Config.HttpPort)
app.POST(benchAPI+"/TransIn", common.WrapHandler(func(c *gin.Context) (interface{}, error) {
return qsAdjustBalance(dtmimp.MustAtoi(c.Query("uid")), 1, c)
}))
@ -154,7 +158,7 @@ func benchAddRoute(app *gin.Engine) {
params2 := fmt.Sprintf("?uid=%s", suid2)
logger.Debugf("mode: %s contains dtm: %t", mode, strings.Contains(mode, "dtm"))
if strings.Contains(mode, "dtm") {
saga := dtmcli.NewSaga(examples.DtmHttpServer, fmt.Sprintf("bench-%d", uid)).
saga := dtmcli.NewSaga(dtmHttpServer, fmt.Sprintf("bench-%d", uid)).
Add(benchBusi+"/TransOut"+params, benchBusi+"/TransOutCompensate"+params, req).
Add(benchBusi+"/TransIn"+params2, benchBusi+"/TransInCompensate"+params2, req)
saga.WaitResult = true
@ -171,9 +175,10 @@ func benchAddRoute(app *gin.Engine) {
app.Any(benchAPI+"/benchEmptyUrl", common.WrapHandler(func(c *gin.Context) (interface{}, error) {
gid := shortuuid.New()
req := gin.H{}
saga := dtmcli.NewSaga(examples.DtmHttpServer, gid).
saga := dtmcli.NewSaga(dtmHttpServer, gid).
Add("", "", req).
Add("", "", req)
saga.WaitResult = true
err := saga.Submit()
return nil, err
}))

5
bench/test-boltdb.sh

@ -0,0 +1,5 @@
# !/bin/bash
set -x
ab -n 50000 -c 10 "http://127.0.0.1:8083/api/busi_bench/benchEmptyUrl"

15
bench/run-dtm.sh → bench/test-mysql.sh

@ -1,7 +1,14 @@
# !/bin/bash
# go run ../app/main.go
set -x
cd /usr/share/sysbench/
echo 'create database sbtest;' > mysql -h 127.0.0.1 -uroot
sysbench oltp_write_only.lua --time=60 --mysql-host=127.0.0.1 --mysql-port=3306 --mysql-user=root --mysql-password= --mysql-db=sbtest --table-size=1000000 --tables=10 --threads=10 --events=999999999 --report-interval=10 prepare
sysbench oltp_write_only.lua --time=60 --mysql-host=127.0.0.1 --mysql-port=3306 --mysql-user=root --mysql-password= --mysql-db=sbtest --table-size=1000000 --tables=10 --threads=10 --events=999999999 --report-interval=10 run
export TIME=10
export CONCURRENT=20
curl "http://127.0.0.1:8083/api/busi_bench/reloadData?m=dtm_tx&sqls=0" && ab -t $TIME -c $CONCURRENT "http://127.0.0.1:8083/api/busi_bench/bench"
@ -13,9 +20,3 @@ curl "http://127.0.0.1:8083/api/busi_bench/reloadData?m=dtm_barrier&sqls=1" && a
curl "http://127.0.0.1:8083/api/busi_bench/reloadData?m=raw_tx&sqls=1" && ab -t $TIME -c $CONCURRENT "http://127.0.0.1:8083/api/busi_bench/bench"
curl "http://127.0.0.1:8083/api/busi_bench/reloadData?m=raw_empty" && ab -t $TIME -c $CONCURRENT "http://127.0.0.1:8083/api/busi_bench/bench"
# curl "http://127.0.0.1:8083/api/busi_bench/reloadData?m=raw_empty" && curl "http://127.0.0.1:8083/api/busi_bench/bench"
# curl "http://127.0.0.1:8083/api/busi_bench/reloadData?m=raw_tx" && curl "http://127.0.0.1:8083/api/busi_bench/bench"
# curl "http://127.0.0.1:8083/api/busi_bench/reloadData?m=dtm_tx" && curl "http://127.0.0.1:8083/api/busi_bench/bench"
# curl "http://127.0.0.1:8083/api/busi_bench/reloadData?m=dtm_barrier" && curl "http://127.0.0.1:8083/api/busi_bench/bench"
ab -n 1000000 -c 10 "http://127.0.0.1:8083/api/busi_bench/benchEmptyUrl"

27
bench/test-redis.sh

@ -0,0 +1,27 @@
# !/bin/bash
set -x
export LOG_LEVEL=warn
export STORE_DRIVER=redis
export STORE_HOST=localhost
export STORE_PORT=6379
cd .. && bench/bench redis &
echo 'sleeping 3s for dtm bench to run up.' && sleep 3
ab -n 1000000 -c 10 "http://127.0.0.1:8083/api/busi_bench/benchEmptyUrl"
pkill bench
redis-benchmark -n 300000 SET 'abcdefg' 'ddddddd'
redis-benchmark -n 300000 EVAL "redis.call('SET', 'abcdedf', 'ddddddd')" 0
redis-benchmark -n 300000 EVAL "redis.call('SET', KEYS[1], ARGV[1])" 1 'aaaaaaaaa' 'bbbbbbbbbb'
redis-benchmark -n 3000000 -P 50 SET 'abcdefg' 'ddddddd'
redis-benchmark -n 300000 EVAL "for k=1, 10 do; redis.call('SET', KEYS[1], ARGV[1]);end" 1 'aaaaaaaaa' 'bbbbbbbbbb'
redis-benchmark -n 300000 -P 50 EVAL "redis.call('SET', KEYS[1], ARGV[1])" 1 'aaaaaaaaa' 'bbbbbbbbbb'
redis-benchmark -n 300000 EVAL "for k=1,10 do;local c = cjson.decode(ARGV[1]);end" 1 'aaaaaaaaa' '{"aaaaa":"bbbbb","b":1,"t":"2012-01-01 14:00:00"}'

10
dtmsvr/api.go

@ -17,22 +17,23 @@ import (
func svcSubmit(t *TransGlobal) (interface{}, error) {
t.Status = dtmcli.StatusSubmitted
err := t.saveNew()
branches, err := t.saveNew()
if err == storage.ErrUniqueConflict {
dbt := GetTransGlobal(t.Gid)
if dbt.Status == dtmcli.StatusPrepared {
dbt.changeStatus(t.Status)
branches = GetStore().FindBranches(t.Gid)
} else if dbt.Status != dtmcli.StatusSubmitted {
return map[string]interface{}{"dtm_result": dtmcli.ResultFailure, "message": fmt.Sprintf("current status '%s', cannot sumbmit", dbt.Status)}, nil
}
}
return t.Process(), nil
return t.Process(branches), nil
}
func svcPrepare(t *TransGlobal) (interface{}, error) {
t.Status = dtmcli.StatusPrepared
err := t.saveNew()
_, err := t.saveNew()
if err == storage.ErrUniqueConflict {
dbt := GetTransGlobal(t.Gid)
if dbt.Status != dtmcli.StatusPrepared {
@ -48,7 +49,8 @@ func svcAbort(t *TransGlobal) (interface{}, error) {
return map[string]interface{}{"dtm_result": dtmcli.ResultFailure, "message": fmt.Sprintf("trans type: '%s' current status '%s', cannot abort", dbt.TransType, dbt.Status)}, nil
}
dbt.changeStatus(dtmcli.StatusAborting)
return dbt.Process(), nil
branches := GetStore().FindBranches(t.Gid)
return dbt.Process(branches), nil
}
func svcRegisterBranch(transType string, branch *TransBranch, data map[string]string) (ret interface{}, rerr error) {

3
dtmsvr/cron.go

@ -31,7 +31,8 @@ func CronTransOnce() (gid string) {
}
gid = trans.Gid
trans.WaitResult = true
trans.Process()
branches := GetStore().FindBranches(gid)
trans.Process(branches)
return
}

3
dtmsvr/storage/redis/redis.go

@ -35,6 +35,7 @@ func (s *RedisStore) PopulateData(skipDrop bool) {
}
func (s *RedisStore) FindTransGlobalStore(gid string) *storage.TransGlobalStore {
logger.Debugf("calling FindTransGlobalStore: %s", gid)
r, err := redisGet().Get(ctx, config.Store.RedisPrefix+"_g_"+gid).Result()
if err == redis.Nil {
return nil
@ -46,6 +47,7 @@ func (s *RedisStore) FindTransGlobalStore(gid string) *storage.TransGlobalStore
}
func (s *RedisStore) ScanTransGlobalStores(position *string, limit int64) []storage.TransGlobalStore {
logger.Debugf("calling ScanTransGlobalStores: %s %d", *position, limit)
lid := uint64(0)
if *position != "" {
lid = uint64(dtmimp.MustAtoi(*position))
@ -71,6 +73,7 @@ func (s *RedisStore) ScanTransGlobalStores(position *string, limit int64) []stor
}
func (s *RedisStore) FindBranches(gid string) []storage.TransBranchStore {
logger.Debugf("calling FindBranches: %s", gid)
sa, err := redisGet().LRange(ctx, config.Store.RedisPrefix+"_b_"+gid, 0, -1).Result()
dtmimp.E2P(err)
branches := make([]storage.TransBranchStore, len(sa))

23
dtmsvr/trans_process.go

@ -16,23 +16,23 @@ import (
)
// Process process global transaction once
func (t *TransGlobal) Process() map[string]interface{} {
r := t.process()
func (t *TransGlobal) Process(branches []TransBranch) map[string]interface{} {
r := t.process(branches)
transactionMetrics(t, r["dtm_result"] == dtmcli.ResultSuccess)
return r
}
func (t *TransGlobal) process() map[string]interface{} {
func (t *TransGlobal) process(branches []TransBranch) map[string]interface{} {
if t.Options != "" {
dtmimp.MustUnmarshalString(t.Options, &t.TransOptions)
}
if !t.WaitResult {
go t.processInner()
go t.processInner(branches)
return dtmcli.MapSuccess
}
submitting := t.Status == dtmcli.StatusSubmitted
err := t.processInner()
err := t.processInner(branches)
if err != nil {
return map[string]interface{}{"dtm_result": dtmcli.ResultFailure, "message": err.Error()}
}
@ -42,7 +42,7 @@ func (t *TransGlobal) process() map[string]interface{} {
return dtmcli.MapSuccess
}
func (t *TransGlobal) processInner() (rerr error) {
func (t *TransGlobal) processInner(branches []TransBranch) (rerr error) {
defer handlePanic(&rerr)
defer func() {
if rerr != nil && rerr != dtmcli.ErrOngoing {
@ -55,14 +55,12 @@ func (t *TransGlobal) processInner() (rerr error) {
}
}()
logger.Debugf("processing: %s status: %s", t.Gid, t.Status)
branches := GetStore().FindBranches(t.Gid)
t.lastTouched = time.Now()
rerr = t.getProcessor().ProcessOnce(branches)
return
}
func (t *TransGlobal) saveNew() error {
branches := t.getProcessor().GenBranches()
func (t *TransGlobal) saveNew() ([]TransBranch, error) {
t.NextCronInterval = t.getNextCronInterval(cronReset)
t.NextCronTime = common.GetNextTime(t.NextCronInterval)
t.Options = dtmimp.MustMarshalString(t.TransOptions)
@ -72,8 +70,13 @@ func (t *TransGlobal) saveNew() error {
now := time.Now()
t.CreateTime = &now
t.UpdateTime = &now
branches := t.getProcessor().GenBranches()
for i, _ := range branches {
branches[i].CreateTime = &now
branches[i].UpdateTime = &now
}
err := GetStore().MaySaveNewTrans(&t.TransGlobalStore, branches)
logger.Infof("MaySaveNewTrans result: %v, global: %v branches: %v",
err, t.TransGlobalStore.String(), dtmimp.MustMarshalString(branches))
return err
return branches, err
}

Loading…
Cancel
Save