Browse Source

Merge pull request #443 from dtm-labs/alpha

Fix unit test
pull/447/head
yedf2 3 years ago
committed by GitHub
parent
commit
477f8ff01c
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 27
      .golangci.yml
  2. 2
      Makefile
  3. 4
      README.md
  4. 5
      dtmsvr/cron.go
  5. 7
      dtmsvr/storage/boltdb/boltdb.go
  6. 12
      dtmsvr/storage/redis/redis.go
  7. 9
      dtmsvr/storage/sql/sql.go
  8. 2
      dtmsvr/storage/trans.go
  9. 3
      helper/golint.sh
  10. 2
      helper/test-cover.sh
  11. 1
      main.go
  12. 27
      revive.toml
  13. 53
      test/api_test.go
  14. 11
      test/common_test.go
  15. 4
      test/dtmsvr_test.go
  16. 4
      test/main_test.go
  17. 5
      test/msg_barrier_mongo_test.go
  18. 4
      test/msg_grpc_test.go
  19. 4
      test/msg_test.go
  20. 2
      test/saga_barrier_mongo_test.go
  21. 2
      test/saga_grpc_test.go
  22. 39
      test/store_test.go
  23. 3
      test/tcc_barrier_test.go
  24. 2
      test/workflow_http_test.go

27
.golangci.yml

@ -1,27 +0,0 @@
run:
deadline: 5m
skip-dirs:
# - test
# - bench
linter-settings:
goconst:
min-len: 2
min-occurrences: 2
linters:
enable:
- revive
- goconst
- gofmt
- goimports
- misspell
- unparam
issues:
exclude-use-default: false
exclude-rules:
- path: _test.go
linters:
- errcheck
- revive

2
helper/Makefile → Makefile

@ -6,7 +6,7 @@ fmt:
@gofmt -s -w ./
lint:
@golangci-lint run
revive -config revive.toml ./...
.PHONY: test
test:

4
README.md

@ -117,10 +117,6 @@ If you want more quick start examples, please refer to [dtm-labs/quick-start-sam
The above example mainly demonstrates the flow of a distributed transaction. More on this, including practical examples of how to interact with an actual database, how to do compensation, how to do rollback, etc. please refer to [dtm-examples](https://github.com/dtm-labs/dtm-examples) for more examples.
## Chat Group
Join the chat via [https://discord.gg/dV9jS5Rb33](https://discord.gg/dV9jS5Rb33).
## Give a star! ⭐
If you think this project is interesting, or helpful to you, please give a star!

5
dtmsvr/cron.go

@ -53,11 +53,12 @@ func CronExpiredTrans(num int) {
func CronUpdateTopicsMap() {
for {
time.Sleep(time.Duration(conf.ConfigUpdateInterval) * time.Second)
cronUpdateTopicsMapOnce()
CronUpdateTopicsMapOnce()
}
}
func cronUpdateTopicsMapOnce() {
// CronUpdateTopicsMapOnce cron updates topics map once
func CronUpdateTopicsMapOnce() {
defer handlePanic(nil)
updateTopicsMap()
}

7
dtmsvr/storage/boltdb/boltdb.go

@ -4,6 +4,7 @@
* license that can be found in the LICENSE file.
*/
// package boltdb implement the storage for boltdb
package boltdb
import (
@ -68,7 +69,8 @@ func initializeBuckets(db *bolt.DB) error {
}
// cleanupExpiredData will clean the expired data in boltdb, the
// expired time is configurable.
//
// expired time is configurable.
func cleanupExpiredData(expire time.Duration, db *bolt.DB) error {
if expire <= 0 {
return nil
@ -484,10 +486,9 @@ func (s *Store) ResetCronTime(after time.Duration, limit int64) (succeedCount in
// ResetTransGlobalCronTime reset nextCronTime of one global trans.
func (s *Store) ResetTransGlobalCronTime(g *storage.TransGlobalStore) error {
old := g.UpdateTime
err := s.boltDb.Update(func(t *bolt.Tx) error {
g := tGetGlobal(t, g.Gid)
if g == nil || g.UpdateTime == old {
if g == nil {
return storage.ErrNotFound
}
now := dtmutil.GetNextTime(0)

12
dtmsvr/storage/redis/redis.go

@ -4,6 +4,7 @@
* license that can be found in the LICENSE file.
*/
// package reds implement the storage for reds
package redis
import (
@ -69,6 +70,7 @@ func (s *Store) ScanTransGlobalStores(position *string, limit int64, condition s
globals := []storage.TransGlobalStore{}
redis := redisGet()
for {
limit -= int64(len(globals))
keys, nextCursor, err := redis.Scan(ctx, lid, conf.Store.RedisPrefix+"_g_*", limit).Result()
logger.Debugf("calling redis scan: SCAN %d MATCH %s COUNT %d ,scan result: nextCursor:%d keys_len:%d", lid, conf.Store.RedisPrefix+"_g_*", limit, nextCursor, len(keys))
@ -86,14 +88,15 @@ func (s *Store) ScanTransGlobalStores(position *string, limit int64, condition s
(condition.CreateTimeEnd.IsZero() || global.CreateTime.Before(condition.CreateTimeEnd)) {
globals = append(globals, global)
}
if len(globals) == int(limit) {
// redis.Scan may return more records than limit
if len(globals) >= int(limit) {
break
}
}
}
lid = nextCursor
if len(globals) == int(limit) || nextCursor == 0 {
if len(globals) >= int(limit) || nextCursor == 0 {
break
}
}
@ -373,12 +376,14 @@ func (s *Store) ScanKV(cat string, position *string, limit int64) []storage.KVSt
kvs := []storage.KVStore{}
redis := redisGet()
for {
limit -= int64(len(kvs))
keys, nextCursor, err := redis.Scan(ctx, lid, conf.Store.RedisPrefix+"_kv_"+cat+"_*", limit).Result()
logger.Debugf("calling redis scan: SCAN %d MATCH %s COUNT %d ,scan result: nextCursor:%d keys_len:%d", lid, conf.Store.RedisPrefix+"_kv_"+cat+"_*", limit, nextCursor, len(keys))
dtmimp.E2P(err)
if len(keys) > 0 {
values, err := redis.MGet(ctx, keys...).Result()
dtmimp.E2P(err)
logger.Debugf("keys: %s values: %s", dtmimp.MustMarshalString(keys), dtmimp.MustMarshalString(values))
for _, v := range values {
if v == nil {
continue
@ -390,7 +395,8 @@ func (s *Store) ScanKV(cat string, position *string, limit int64) []storage.KVSt
}
lid = nextCursor
if len(kvs) == int(limit) || nextCursor == 0 {
// for redis, `count` in `scan` command is only a hint, may return more than `count` items
if len(kvs) >= int(limit) || nextCursor == 0 {
break
}
}

9
dtmsvr/storage/sql/sql.go

@ -4,6 +4,7 @@
* license that can be found in the LICENSE file.
*/
// package boltdb implement the storage for sql database
package sql
import (
@ -207,14 +208,10 @@ func (s *Store) ResetCronTime(after time.Duration, limit int64) (succeedCount in
// ResetTransGlobalCronTime reset nextCronTime of one global trans.
func (s *Store) ResetTransGlobalCronTime(global *storage.TransGlobalStore) error {
now := getTimeStr(0)
where := map[string]string{
dtmimp.DBTypeMysql: fmt.Sprintf(`gid = '%s'`, global.Gid),
}[conf.Store.Driver]
sql := fmt.Sprintf(`UPDATE trans_global SET update_time='%s',next_cron_time='%s' WHERE %s`,
sql := fmt.Sprintf(`UPDATE trans_global SET update_time='%s',next_cron_time='%s' WHERE gid = '%s'`,
now,
now,
where)
global.Gid)
_, err := dtmimp.DBExec(conf.Store.Driver, dbGet().ToSQLDB(), sql)
return err
}

2
dtmsvr/storage/trans.go

@ -81,7 +81,7 @@ func (b *TransBranchStore) String() string {
return dtmimp.MustMarshalString(*b)
}
//KVStore defines Key-Value storage info
// KVStore defines Key-Value storage info
type KVStore struct {
dtmutil.ModelBase
Cat string `json:"cat"`

3
helper/golint.sh

@ -1,4 +1,3 @@
set -x
curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin v1.46.2
$(go env GOPATH)/bin/golangci-lint run
go install github.com/mgechev/revive@latest && revive -config revive.toml ./...

2
helper/test-cover.sh

@ -9,6 +9,8 @@ for store in redis boltdb mysql postgres; do
echo > profile.out
fi
done
## for local unit test, you may use following command
# SKIP_MONGO=1 TEST_STORE=redis GOARCH=amd64 go test -v -failfast -count=1 -gcflags=all=-l ./...
# go tool cover -html=coverage.txt

1
main.go

@ -4,6 +4,7 @@
* license that can be found in the LICENSE file.
*/
// package main is the entry of dtm server
package main
import (

27
revive.toml

@ -0,0 +1,27 @@
ignoreGeneratedHeader = false
severity = "warning"
confidence = 0.8
errorCode = 0
warningCode = 0
[rule.blank-imports]
[rule.context-as-argument]
[rule.context-keys-type]
[rule.dot-imports]
[rule.error-return]
[rule.error-strings]
[rule.error-naming]
[rule.exported]
[rule.if-return]
[rule.increment-decrement]
[rule.var-naming]
[rule.var-declaration]
[rule.range]
[rule.receiver-naming]
[rule.time-naming]
[rule.unexported-return]
[rule.indent-error-flow]
[rule.errorf]
[rule.superfluous-else]
[rule.unreachable-code]
[rule.redefines-builtin-id]

53
test/api_test.go

@ -15,6 +15,7 @@ import (
"github.com/dtm-labs/dtm/client/dtmcli"
"github.com/dtm-labs/dtm/client/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmsvr/storage/registry"
"github.com/dtm-labs/dtm/dtmutil"
"github.com/dtm-labs/dtm/test/busi"
"github.com/stretchr/testify/assert"
@ -249,30 +250,28 @@ func TestAPIForceStoppedAbnormal(t *testing.T) {
assert.Equal(t, resp.StatusCode(), http.StatusConflict)
}
// func TestAPIResetNextCronTime(t *testing.T) {
// saga := genSaga(dtmimp.GetFuncName(), false, false)
// saga.Submit()
// waitTransProcessed(saga.Gid)
// assert.Equal(t, []string{StatusPrepared, StatusSucceed, StatusPrepared, StatusSucceed}, getBranchesStatus(saga.Gid))
// assert.Equal(t, StatusSucceed, getTransStatus(saga.Gid))
// gid := saga.Gid
// s := registry.GetStore()
// g := s.FindTransGlobalStore(saga.Gid)
// // reset
// resp, err := dtmcli.GetRestyClient().R().SetBody(map[string]string{
// "gid": saga.Gid,
// }).Post(dtmutil.DefaultHTTPServer + "/resetNextCronTime")
// assert.Nil(t, err)
// assert.Equal(t, resp.StatusCode(), http.StatusOK)
// // after reset assert
// g2 := s.FindTransGlobalStore(gid)
// assert.NotNil(t, g2)
// assert.Equal(t, gid, g2.Gid)
// assert.Greater(t, time.Now().Add(3*time.Second), *g2.NextCronTime)
// assert.Equal(t, g2.UpdateTime, g2.NextCronTime)
// assert.NotEqual(t, g.UpdateTime, g2.UpdateTime)
// assert.NotEqual(t, g.NextCronTime, g2.NextCronTime)
// }
func TestAPIResetNextCronTime(t *testing.T) {
saga := genSaga(dtmimp.GetFuncName(), false, false)
saga.Submit()
waitTransProcessed(saga.Gid)
assert.Equal(t, []string{StatusPrepared, StatusSucceed, StatusPrepared, StatusSucceed}, getBranchesStatus(saga.Gid))
assert.Equal(t, StatusSucceed, getTransStatus(saga.Gid))
gid := saga.Gid
s := registry.GetStore()
g := s.FindTransGlobalStore(saga.Gid)
// reset
resp, err := dtmcli.GetRestyClient().R().SetBody(map[string]string{
"gid": saga.Gid,
}).Post(dtmutil.DefaultHTTPServer + "/resetNextCronTime")
assert.Nil(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode())
// after reset assert
g2 := s.FindTransGlobalStore(gid)
assert.NotNil(t, g2)
assert.Equal(t, gid, g2.Gid)
assert.Greater(t, time.Now().Add(3*time.Second), *g2.NextCronTime)
assert.NotEqual(t, g.NextCronTime, g2.NextCronTime)
}

11
test/common_test.go

@ -1,6 +1,7 @@
package test
import (
"os"
"testing"
"github.com/dtm-labs/dtm/client/dtmcli"
@ -13,12 +14,12 @@ import (
func TestGeneralDB(t *testing.T) {
if conf.Store.IsDB() {
testSql(t)
testSQL(t)
testDbAlone(t)
}
}
func testSql(t *testing.T) {
func testSQL(t *testing.T) {
conf := conf.Store.GetDBConf()
conf.Host = "127.0.0.1" // use a new host to trigger SetDBConn called
db := dtmutil.DbGet(conf, sql.SetDBConn)
@ -46,3 +47,9 @@ func TestMustGenGid(t *testing.T) {
dtmgrpc.MustGenGid(dtmutil.DefaultGrpcServer)
dtmcli.MustGenGid(dtmutil.DefaultHTTPServer)
}
func MaySkipMongo(t *testing.T) {
if os.Getenv("SKIP_MONGO") != "" {
t.Skip("skipping test with mongo")
}
}

4
test/dtmsvr_test.go

@ -43,11 +43,11 @@ func getBranchesStatus(gid string) []string {
return status
}
func isSqlStore() bool {
func isSQLStore() bool {
return conf.Store.Driver == config.Mysql || conf.Store.Driver == config.Postgres
}
func TestUpdateBranchAsync(t *testing.T) {
if !isSqlStore() {
if !isSQLStore() {
return
}
conf.UpdateBranchSync = 0

4
test/main_test.go

@ -37,6 +37,7 @@ func TestMain(m *testing.M) {
dtmdriver.Middlewares.Grpc = append(dtmdriver.Middlewares.Grpc, busi.SetGrpcHeaderForHeadersYes)
tenv := dtmimp.OrString(os.Getenv("TEST_STORE"), config.Redis)
conf.ConfigUpdateInterval = 1
conf.Store.Host = "localhost"
conf.Store.Driver = tenv
if tenv == "boltdb" {
@ -74,7 +75,8 @@ func TestMain(m *testing.M) {
subscribeTopic()
subscribeGrpcTopic()
dtmsvr.CronUpdateTopicsMapOnce()
logger.Debugf("unit main test inited")
r := m.Run()
if r != 0 {
os.Exit(r)

5
test/msg_barrier_mongo_test.go

@ -12,6 +12,7 @@ import (
)
func TestMsgMongoDoSucceed(t *testing.T) {
MaySkipMongo(t)
before := getBeforeBalances("mongo")
gid := dtmimp.GetFuncName()
req := busi.GenReqHTTP(30, false, false)
@ -30,6 +31,7 @@ func TestMsgMongoDoSucceed(t *testing.T) {
}
func TestMsgMongoDoBusiFailed(t *testing.T) {
MaySkipMongo(t)
before := getBeforeBalances("mongo")
gid := dtmimp.GetFuncName()
req := busi.GenReqHTTP(30, false, false)
@ -43,6 +45,7 @@ func TestMsgMongoDoBusiFailed(t *testing.T) {
}
func TestMsgMongoDoBusiLater(t *testing.T) {
MaySkipMongo(t)
before := getBeforeBalances("mongo")
gid := dtmimp.GetFuncName()
req := busi.GenReqHTTP(30, false, false)
@ -68,6 +71,7 @@ func TestMsgMongoDoBusiLater(t *testing.T) {
}
func TestMsgMongoDoCommitFailed(t *testing.T) {
MaySkipMongo(t)
before := getBeforeBalances("mongo")
gid := dtmimp.GetFuncName()
req := busi.GenReqHTTP(30, false, false)
@ -85,6 +89,7 @@ func TestMsgMongoDoCommitFailed(t *testing.T) {
}
func TestMsgMongoDoCommitAfterFailed(t *testing.T) {
MaySkipMongo(t)
before := getBeforeBalances("mongo")
gid := dtmimp.GetFuncName()
req := busi.GenReqHTTP(30, false, false)

4
test/msg_grpc_test.go

@ -9,7 +9,6 @@ package test
import (
"fmt"
"testing"
"time"
"github.com/dtm-labs/dtm/client/dtmcli"
"github.com/dtm-labs/dtm/client/dtmcli/dtmimp"
@ -70,7 +69,4 @@ func genGrpcMsg(gid string) *dtmgrpc.MsgGrpc {
func subscribeGrpcTopic() {
e2p(grpcSubscribe("grpc_trans", busi.BusiGrpc+"/busi.Busi/TransOut"))
e2p(grpcSubscribe("grpc_trans", busi.BusiGrpc+"/busi.Busi/TransIn"))
// wait for the topic configuration to take effect
time.Sleep(time.Second * time.Duration(conf.ConfigUpdateInterval+1))
}

4
test/msg_test.go

@ -9,7 +9,6 @@ package test
import (
"strings"
"testing"
"time"
"github.com/dtm-labs/dtm/client/dtmcli"
"github.com/dtm-labs/dtm/client/dtmcli/dtmimp"
@ -89,7 +88,4 @@ func genMsg(gid string) *dtmcli.Msg {
func subscribeTopic() {
e2p(httpSubscribe("http_trans", busi.Busi+"/TransOut"))
e2p(httpSubscribe("http_trans", busi.Busi+"/TransIn"))
// wait for the topic configuration to take effect
time.Sleep(time.Second * time.Duration(conf.ConfigUpdateInterval+1))
}

2
test/saga_barrier_mongo_test.go

@ -16,6 +16,7 @@ import (
)
func TestSagaBarrierMongoNormal(t *testing.T) {
MaySkipMongo(t)
before := getBeforeBalances("mongo")
saga := genSagaBarrierMongo(dtmimp.GetFuncName(), false)
err := saga.Submit()
@ -27,6 +28,7 @@ func TestSagaBarrierMongoNormal(t *testing.T) {
}
func TestSagaBarrierMongoRollback(t *testing.T) {
MaySkipMongo(t)
before := getBeforeBalances("mongo")
saga := genSagaBarrierMongo(dtmimp.GetFuncName(), true)
err := saga.Submit()

2
test/saga_grpc_test.go

@ -92,7 +92,7 @@ func TestSagaGrpcEmptyUrl(t *testing.T) {
assert.Equal(t, StatusSucceed, getTransStatus(saga.Gid))
}
//nolint: unparam
// nolint: unparam
func genSagaGrpc(gid string, outFailed bool, inFailed bool) *dtmgrpc.SagaGrpc {
saga := dtmgrpc.NewSagaGrpc(dtmutil.DefaultGrpcServer, gid)
req := busi.GenReqGrpc(30, outFailed, inFailed)

39
test/store_test.go

@ -179,22 +179,23 @@ func TestUpdateBranches(t *testing.T) {
}
}
// func TestResetTransGlobalCronTime(t *testing.T) {
// gid := dtmimp.GetFuncName()
// g, _ := initTransGlobal(gid)
// s := registry.GetStore()
// g2 := s.FindTransGlobalStore(gid)
// assert.NotNil(t, g2)
// assert.Equal(t, gid, g2.Gid)
// s.ResetTransGlobalCronTime(g2)
// g2 = s.FindTransGlobalStore(gid)
// assert.NotNil(t, g2)
// assert.Equal(t, gid, g2.Gid)
// assert.Greater(t, time.Now().Add(3*time.Second), *g2.NextCronTime)
// assert.Equal(t, g2.UpdateTime, g2.NextCronTime)
// assert.NotEqual(t, g.UpdateTime, g2.UpdateTime)
// assert.NotEqual(t, g.NextCronTime, g2.NextCronTime)
// }
func TestResetTransGlobalCronTime(t *testing.T) {
gid := dtmimp.GetFuncName()
g, _ := initTransGlobal(gid)
s := registry.GetStore()
g2 := s.FindTransGlobalStore(gid)
assert.NotNil(t, g2)
assert.Equal(t, gid, g2.Gid)
s.ResetTransGlobalCronTime(g2)
g2 = s.FindTransGlobalStore(gid)
assert.NotNil(t, g2)
assert.Equal(t, gid, g2.Gid)
assert.Greater(t, time.Now().Add(3*time.Second), *g2.NextCronTime)
assert.Equal(t, g2.UpdateTime, g2.NextCronTime)
assert.NotEqual(t, g.UpdateTime, g2.UpdateTime)
assert.NotEqual(t, g.NextCronTime, g2.NextCronTime)
s.ChangeGlobalStatus(g, "succeed", []string{}, true)
}

3
test/tcc_barrier_test.go

@ -63,6 +63,9 @@ func TestTccBarrierDisorderRedis(t *testing.T) {
}
func runTestTccBarrierDisorder(t *testing.T, store string) {
if store == "mongo" {
MaySkipMongo(t)
}
before := getBeforeBalances(store)
cancelFinishedChan := make(chan string, 2)
cancelCanReturnChan := make(chan string, 2)

2
test/workflow_http_test.go

@ -215,7 +215,7 @@ func TestWorkflowResumeSkip(t *testing.T) {
workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error {
wf.NewBranch().Do(func(bb *dtmcli.BranchBarrier) ([]byte, error) {
logger.Infof("increase resume counter")
resumeCounter += 1
resumeCounter++
return nil, nil
})
var req busi.ReqHTTP

Loading…
Cancel
Save