Browse Source

bench ok

pull/39/head v1.2.0
yedf2 5 years ago
parent
commit
a78f354ffc
  1. 43
      bench/http.go
  2. 9
      dtmsvr/dtmsvr.go
  3. 4
      test/dtmsvr_test.go

43
bench/http.go

@ -10,6 +10,7 @@ import (
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/yedf/dtm/common" "github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmcli" "github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/dtmsvr"
"github.com/yedf/dtm/examples" "github.com/yedf/dtm/examples"
) )
@ -36,28 +37,12 @@ func txGet() *sql.Tx {
} }
func reloadData() { func reloadData() {
time.Sleep(dtmsvr.UpdateBranchAsyncInterval * 2)
began := time.Now() began := time.Now()
db := sdbGet() db := sdbGet()
_, err := dtmcli.DBExec(db, "drop table if exists dtm_busi.user_account_log")
dtmcli.FatalIfError(err)
_, err = dtmcli.DBExec(db, `create table if not exists dtm_busi.user_account_log (
id INT(11) AUTO_INCREMENT PRIMARY KEY,
user_id INT(11) NOT NULL,
delta DECIMAL(11, 2) not null,
gid varchar(45) not null,
branch_id varchar(45) not null,
branch_type varchar(45) not null,
reason varchar(45),
create_time datetime not null default now(),
update_time datetime not null default now(),
key(user_id),
key(create_time)
)
`)
dtmcli.FatalIfError(err)
tables := []string{"dtm_busi.user_account", "dtm_busi.user_account_log", "dtm.trans_global", "dtm.trans_branch", "dtm_barrier.barrier"} tables := []string{"dtm_busi.user_account", "dtm_busi.user_account_log", "dtm.trans_global", "dtm.trans_branch", "dtm_barrier.barrier"}
for _, t := range tables { for _, t := range tables {
_, err = dtmcli.DBExec(db, fmt.Sprintf("truncate %s", t)) _, err := dtmcli.DBExec(db, fmt.Sprintf("truncate %s", t))
dtmcli.FatalIfError(err) dtmcli.FatalIfError(err)
} }
s := "insert ignore into dtm_busi.user_account(user_id, balance) values " s := "insert ignore into dtm_busi.user_account(user_id, balance) values "
@ -65,7 +50,7 @@ func reloadData() {
for i := 1; i <= total; i++ { for i := 1; i <= total; i++ {
ss = append(ss, fmt.Sprintf("(%d, 1000000)", i)) ss = append(ss, fmt.Sprintf("(%d, 1000000)", i))
} }
_, err = db.Exec(s + strings.Join(ss, ",")) _, err := db.Exec(s + strings.Join(ss, ","))
dtmcli.FatalIfError(err) dtmcli.FatalIfError(err)
dtmcli.Logf("%d users inserted. used: %dms", total, time.Since(began).Milliseconds()) dtmcli.Logf("%d users inserted. used: %dms", total, time.Since(began).Milliseconds())
} }
@ -80,8 +65,24 @@ func StartSvr() {
benchAddRoute(app) benchAddRoute(app)
dtmcli.Logf("bench listening at %d", benchPort) dtmcli.Logf("bench listening at %d", benchPort)
go app.Run(fmt.Sprintf(":%d", benchPort)) go app.Run(fmt.Sprintf(":%d", benchPort))
reloadData() db := sdbGet()
time.Sleep(1100 * time.Millisecond) // sleep 1 second for async branch status update to finish _, err := dtmcli.DBExec(db, "drop table if exists dtm_busi.user_account_log")
dtmcli.FatalIfError(err)
_, err = dtmcli.DBExec(db, `create table if not exists dtm_busi.user_account_log (
id INT(11) AUTO_INCREMENT PRIMARY KEY,
user_id INT(11) NOT NULL,
delta DECIMAL(11, 2) not null,
gid varchar(45) not null,
branch_id varchar(45) not null,
branch_type varchar(45) not null,
reason varchar(45),
create_time datetime not null default now(),
update_time datetime not null default now(),
key(user_id),
key(create_time)
)
`)
dtmcli.FatalIfError(err)
} }
func qsAdjustBalance(uid int, amount int, c *gin.Context) (interface{}, error) { func qsAdjustBalance(uid int, amount int, c *gin.Context) (interface{}, error) {

9
dtmsvr/dtmsvr.go

@ -55,15 +55,16 @@ func PopulateDB(skipDrop bool) {
examples.RunSQLScript(config.DB, file, skipDrop) examples.RunSQLScript(config.DB, file, skipDrop)
} }
// UpdateBranchAsyncInterval unit millisecond // UpdateBranchAsyncInterval interval to flush branch
var UpdateBranchAsyncInterval time.Duration = 1000 var UpdateBranchAsyncInterval = 200 * time.Millisecond
var updateBranchAsyncChan chan branchStatus = make(chan branchStatus, 1000) var updateBranchAsyncChan chan branchStatus = make(chan branchStatus, 1000)
func updateBranchAsync() { func updateBranchAsync() {
for { // flush branches every second for { // flush branches every second
updates := []TransBranch{} updates := []TransBranch{}
started := time.Now() started := time.Now()
for time.Since(started) < UpdateBranchAsyncInterval*time.Millisecond { checkInterval := 20 * time.Millisecond
for time.Since(started) < UpdateBranchAsyncInterval-checkInterval && len(updates) < 20 {
select { select {
case updateBranch := <-updateBranchAsyncChan: case updateBranch := <-updateBranchAsyncChan:
updates = append(updates, TransBranch{ updates = append(updates, TransBranch{
@ -71,7 +72,7 @@ func updateBranchAsync() {
Status: updateBranch.status, Status: updateBranch.status,
FinishTime: updateBranch.finish_time, FinishTime: updateBranch.finish_time,
}) })
case <-time.After(50 * time.Millisecond): case <-time.After(checkInterval):
} }
} }
for len(updates) > 0 { for len(updates) > 0 {

4
test/dtmsvr_test.go

@ -118,15 +118,13 @@ func TestSqlDB(t *testing.T) {
func TestUpdateBranchAsync(t *testing.T) { func TestUpdateBranchAsync(t *testing.T) {
common.DtmConfig.UpdateBranchSync = 0 common.DtmConfig.UpdateBranchSync = 0
dtmsvr.UpdateBranchAsyncInterval = 50
saga := genSaga("gid-update-branch-async", false, false) saga := genSaga("gid-update-branch-async", false, false)
saga.WaitResult = true saga.WaitResult = true
err := saga.Submit() err := saga.Submit()
assert.Nil(t, err) assert.Nil(t, err)
WaitTransProcessed(saga.Gid) WaitTransProcessed(saga.Gid)
time.Sleep(100 * time.Millisecond) time.Sleep(dtmsvr.UpdateBranchAsyncInterval)
assert.Equal(t, []string{dtmcli.StatusPrepared, dtmcli.StatusSucceed, dtmcli.StatusPrepared, dtmcli.StatusSucceed}, getBranchesStatus(saga.Gid)) assert.Equal(t, []string{dtmcli.StatusPrepared, dtmcli.StatusSucceed, dtmcli.StatusPrepared, dtmcli.StatusSucceed}, getBranchesStatus(saga.Gid))
assert.Equal(t, dtmcli.StatusSucceed, getTransStatus(saga.Gid)) assert.Equal(t, dtmcli.StatusSucceed, getTransStatus(saga.Gid))
common.DtmConfig.UpdateBranchSync = 1 common.DtmConfig.UpdateBranchSync = 1
dtmsvr.UpdateBranchAsyncInterval = 1000
} }

Loading…
Cancel
Save