diff --git a/.golangci.yml b/.golangci.yml deleted file mode 100644 index 31cf246..0000000 --- a/.golangci.yml +++ /dev/null @@ -1,27 +0,0 @@ -run: - deadline: 5m - skip-dirs: -# - test -# - bench - -linter-settings: - goconst: - min-len: 2 - min-occurrences: 2 - -linters: - enable: - - revive - - goconst - - gofmt - - goimports - - misspell - - unparam - -issues: - exclude-use-default: false - exclude-rules: - - path: _test.go - linters: - - errcheck - - revive diff --git a/helper/Makefile b/Makefile similarity index 91% rename from helper/Makefile rename to Makefile index f9d7353..9564206 100644 --- a/helper/Makefile +++ b/Makefile @@ -6,7 +6,7 @@ fmt: @gofmt -s -w ./ lint: - @golangci-lint run + revive -config revive.toml ./... .PHONY: test test: diff --git a/README.md b/README.md index 42c8171..af74b2a 100644 --- a/README.md +++ b/README.md @@ -117,10 +117,6 @@ If you want more quick start examples, please refer to [dtm-labs/quick-start-sam The above example mainly demonstrates the flow of a distributed transaction. More on this, including practical examples of how to interact with an actual database, how to do compensation, how to do rollback, etc. please refer to [dtm-examples](https://github.com/dtm-labs/dtm-examples) for more examples. -## Chat Group - -Join the chat via [https://discord.gg/dV9jS5Rb33](https://discord.gg/dV9jS5Rb33). - ## Give a star! ⭐ If you think this project is interesting, or helpful to you, please give a star! diff --git a/dtmsvr/cron.go b/dtmsvr/cron.go index 671df25..82ef344 100644 --- a/dtmsvr/cron.go +++ b/dtmsvr/cron.go @@ -53,11 +53,12 @@ func CronExpiredTrans(num int) { func CronUpdateTopicsMap() { for { time.Sleep(time.Duration(conf.ConfigUpdateInterval) * time.Second) - cronUpdateTopicsMapOnce() + CronUpdateTopicsMapOnce() } } -func cronUpdateTopicsMapOnce() { +// CronUpdateTopicsMapOnce cron updates topics map once +func CronUpdateTopicsMapOnce() { defer handlePanic(nil) updateTopicsMap() } diff --git a/dtmsvr/storage/boltdb/boltdb.go b/dtmsvr/storage/boltdb/boltdb.go index 631d73d..bab761c 100644 --- a/dtmsvr/storage/boltdb/boltdb.go +++ b/dtmsvr/storage/boltdb/boltdb.go @@ -4,6 +4,7 @@ * license that can be found in the LICENSE file. */ +// package boltdb implement the storage for boltdb package boltdb import ( @@ -68,7 +69,8 @@ func initializeBuckets(db *bolt.DB) error { } // cleanupExpiredData will clean the expired data in boltdb, the -// expired time is configurable. +// +// expired time is configurable. func cleanupExpiredData(expire time.Duration, db *bolt.DB) error { if expire <= 0 { return nil @@ -484,10 +486,9 @@ func (s *Store) ResetCronTime(after time.Duration, limit int64) (succeedCount in // ResetTransGlobalCronTime reset nextCronTime of one global trans. func (s *Store) ResetTransGlobalCronTime(g *storage.TransGlobalStore) error { - old := g.UpdateTime err := s.boltDb.Update(func(t *bolt.Tx) error { g := tGetGlobal(t, g.Gid) - if g == nil || g.UpdateTime == old { + if g == nil { return storage.ErrNotFound } now := dtmutil.GetNextTime(0) diff --git a/dtmsvr/storage/redis/redis.go b/dtmsvr/storage/redis/redis.go index 090ab59..624f6a0 100644 --- a/dtmsvr/storage/redis/redis.go +++ b/dtmsvr/storage/redis/redis.go @@ -4,6 +4,7 @@ * license that can be found in the LICENSE file. */ +// package reds implement the storage for reds package redis import ( @@ -69,6 +70,7 @@ func (s *Store) ScanTransGlobalStores(position *string, limit int64, condition s globals := []storage.TransGlobalStore{} redis := redisGet() for { + limit -= int64(len(globals)) keys, nextCursor, err := redis.Scan(ctx, lid, conf.Store.RedisPrefix+"_g_*", limit).Result() logger.Debugf("calling redis scan: SCAN %d MATCH %s COUNT %d ,scan result: nextCursor:%d keys_len:%d", lid, conf.Store.RedisPrefix+"_g_*", limit, nextCursor, len(keys)) @@ -86,14 +88,15 @@ func (s *Store) ScanTransGlobalStores(position *string, limit int64, condition s (condition.CreateTimeEnd.IsZero() || global.CreateTime.Before(condition.CreateTimeEnd)) { globals = append(globals, global) } - if len(globals) == int(limit) { + // redis.Scan may return more records than limit + if len(globals) >= int(limit) { break } } } lid = nextCursor - if len(globals) == int(limit) || nextCursor == 0 { + if len(globals) >= int(limit) || nextCursor == 0 { break } } @@ -373,12 +376,14 @@ func (s *Store) ScanKV(cat string, position *string, limit int64) []storage.KVSt kvs := []storage.KVStore{} redis := redisGet() for { + limit -= int64(len(kvs)) keys, nextCursor, err := redis.Scan(ctx, lid, conf.Store.RedisPrefix+"_kv_"+cat+"_*", limit).Result() logger.Debugf("calling redis scan: SCAN %d MATCH %s COUNT %d ,scan result: nextCursor:%d keys_len:%d", lid, conf.Store.RedisPrefix+"_kv_"+cat+"_*", limit, nextCursor, len(keys)) dtmimp.E2P(err) if len(keys) > 0 { values, err := redis.MGet(ctx, keys...).Result() dtmimp.E2P(err) + logger.Debugf("keys: %s values: %s", dtmimp.MustMarshalString(keys), dtmimp.MustMarshalString(values)) for _, v := range values { if v == nil { continue @@ -390,7 +395,8 @@ func (s *Store) ScanKV(cat string, position *string, limit int64) []storage.KVSt } lid = nextCursor - if len(kvs) == int(limit) || nextCursor == 0 { + // for redis, `count` in `scan` command is only a hint, may return more than `count` items + if len(kvs) >= int(limit) || nextCursor == 0 { break } } diff --git a/dtmsvr/storage/sql/sql.go b/dtmsvr/storage/sql/sql.go index 28f9c9c..cf8583c 100644 --- a/dtmsvr/storage/sql/sql.go +++ b/dtmsvr/storage/sql/sql.go @@ -4,6 +4,7 @@ * license that can be found in the LICENSE file. */ +// package boltdb implement the storage for sql database package sql import ( @@ -207,14 +208,10 @@ func (s *Store) ResetCronTime(after time.Duration, limit int64) (succeedCount in // ResetTransGlobalCronTime reset nextCronTime of one global trans. func (s *Store) ResetTransGlobalCronTime(global *storage.TransGlobalStore) error { now := getTimeStr(0) - where := map[string]string{ - dtmimp.DBTypeMysql: fmt.Sprintf(`gid = '%s'`, global.Gid), - }[conf.Store.Driver] - - sql := fmt.Sprintf(`UPDATE trans_global SET update_time='%s',next_cron_time='%s' WHERE %s`, + sql := fmt.Sprintf(`UPDATE trans_global SET update_time='%s',next_cron_time='%s' WHERE gid = '%s'`, now, now, - where) + global.Gid) _, err := dtmimp.DBExec(conf.Store.Driver, dbGet().ToSQLDB(), sql) return err } diff --git a/dtmsvr/storage/trans.go b/dtmsvr/storage/trans.go index e9551d5..856bd09 100644 --- a/dtmsvr/storage/trans.go +++ b/dtmsvr/storage/trans.go @@ -81,7 +81,7 @@ func (b *TransBranchStore) String() string { return dtmimp.MustMarshalString(*b) } -//KVStore defines Key-Value storage info +// KVStore defines Key-Value storage info type KVStore struct { dtmutil.ModelBase Cat string `json:"cat"` diff --git a/helper/golint.sh b/helper/golint.sh index 71f1817..d4a8642 100644 --- a/helper/golint.sh +++ b/helper/golint.sh @@ -1,4 +1,3 @@ set -x -curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin v1.46.2 -$(go env GOPATH)/bin/golangci-lint run +go install github.com/mgechev/revive@latest && revive -config revive.toml ./... diff --git a/helper/test-cover.sh b/helper/test-cover.sh index fb7bba1..c5860ca 100755 --- a/helper/test-cover.sh +++ b/helper/test-cover.sh @@ -9,6 +9,8 @@ for store in redis boltdb mysql postgres; do echo > profile.out fi done +## for local unit test, you may use following command +# SKIP_MONGO=1 TEST_STORE=redis GOARCH=amd64 go test -v -failfast -count=1 -gcflags=all=-l ./... # go tool cover -html=coverage.txt diff --git a/main.go b/main.go index 645031f..75119dd 100644 --- a/main.go +++ b/main.go @@ -4,6 +4,7 @@ * license that can be found in the LICENSE file. */ +// package main is the entry of dtm server package main import ( diff --git a/revive.toml b/revive.toml new file mode 100644 index 0000000..255d8c8 --- /dev/null +++ b/revive.toml @@ -0,0 +1,27 @@ +ignoreGeneratedHeader = false +severity = "warning" +confidence = 0.8 +errorCode = 0 +warningCode = 0 + +[rule.blank-imports] +[rule.context-as-argument] +[rule.context-keys-type] +[rule.dot-imports] +[rule.error-return] +[rule.error-strings] +[rule.error-naming] +[rule.exported] +[rule.if-return] +[rule.increment-decrement] +[rule.var-naming] +[rule.var-declaration] +[rule.range] +[rule.receiver-naming] +[rule.time-naming] +[rule.unexported-return] +[rule.indent-error-flow] +[rule.errorf] +[rule.superfluous-else] +[rule.unreachable-code] +[rule.redefines-builtin-id] \ No newline at end of file diff --git a/test/api_test.go b/test/api_test.go index 9831754..9e75eb3 100644 --- a/test/api_test.go +++ b/test/api_test.go @@ -15,6 +15,7 @@ import ( "github.com/dtm-labs/dtm/client/dtmcli" "github.com/dtm-labs/dtm/client/dtmcli/dtmimp" + "github.com/dtm-labs/dtm/dtmsvr/storage/registry" "github.com/dtm-labs/dtm/dtmutil" "github.com/dtm-labs/dtm/test/busi" "github.com/stretchr/testify/assert" @@ -249,30 +250,28 @@ func TestAPIForceStoppedAbnormal(t *testing.T) { assert.Equal(t, resp.StatusCode(), http.StatusConflict) } -// func TestAPIResetNextCronTime(t *testing.T) { -// saga := genSaga(dtmimp.GetFuncName(), false, false) -// saga.Submit() -// waitTransProcessed(saga.Gid) -// assert.Equal(t, []string{StatusPrepared, StatusSucceed, StatusPrepared, StatusSucceed}, getBranchesStatus(saga.Gid)) -// assert.Equal(t, StatusSucceed, getTransStatus(saga.Gid)) -// gid := saga.Gid - -// s := registry.GetStore() -// g := s.FindTransGlobalStore(saga.Gid) - -// // reset -// resp, err := dtmcli.GetRestyClient().R().SetBody(map[string]string{ -// "gid": saga.Gid, -// }).Post(dtmutil.DefaultHTTPServer + "/resetNextCronTime") -// assert.Nil(t, err) -// assert.Equal(t, resp.StatusCode(), http.StatusOK) - -// // after reset assert -// g2 := s.FindTransGlobalStore(gid) -// assert.NotNil(t, g2) -// assert.Equal(t, gid, g2.Gid) -// assert.Greater(t, time.Now().Add(3*time.Second), *g2.NextCronTime) -// assert.Equal(t, g2.UpdateTime, g2.NextCronTime) -// assert.NotEqual(t, g.UpdateTime, g2.UpdateTime) -// assert.NotEqual(t, g.NextCronTime, g2.NextCronTime) -// } +func TestAPIResetNextCronTime(t *testing.T) { + saga := genSaga(dtmimp.GetFuncName(), false, false) + saga.Submit() + waitTransProcessed(saga.Gid) + assert.Equal(t, []string{StatusPrepared, StatusSucceed, StatusPrepared, StatusSucceed}, getBranchesStatus(saga.Gid)) + assert.Equal(t, StatusSucceed, getTransStatus(saga.Gid)) + gid := saga.Gid + + s := registry.GetStore() + g := s.FindTransGlobalStore(saga.Gid) + + // reset + resp, err := dtmcli.GetRestyClient().R().SetBody(map[string]string{ + "gid": saga.Gid, + }).Post(dtmutil.DefaultHTTPServer + "/resetNextCronTime") + assert.Nil(t, err) + assert.Equal(t, http.StatusOK, resp.StatusCode()) + + // after reset assert + g2 := s.FindTransGlobalStore(gid) + assert.NotNil(t, g2) + assert.Equal(t, gid, g2.Gid) + assert.Greater(t, time.Now().Add(3*time.Second), *g2.NextCronTime) + assert.NotEqual(t, g.NextCronTime, g2.NextCronTime) +} diff --git a/test/common_test.go b/test/common_test.go index 2fe2724..de01e63 100644 --- a/test/common_test.go +++ b/test/common_test.go @@ -1,6 +1,7 @@ package test import ( + "os" "testing" "github.com/dtm-labs/dtm/client/dtmcli" @@ -13,12 +14,12 @@ import ( func TestGeneralDB(t *testing.T) { if conf.Store.IsDB() { - testSql(t) + testSQL(t) testDbAlone(t) } } -func testSql(t *testing.T) { +func testSQL(t *testing.T) { conf := conf.Store.GetDBConf() conf.Host = "127.0.0.1" // use a new host to trigger SetDBConn called db := dtmutil.DbGet(conf, sql.SetDBConn) @@ -46,3 +47,9 @@ func TestMustGenGid(t *testing.T) { dtmgrpc.MustGenGid(dtmutil.DefaultGrpcServer) dtmcli.MustGenGid(dtmutil.DefaultHTTPServer) } + +func MaySkipMongo(t *testing.T) { + if os.Getenv("SKIP_MONGO") != "" { + t.Skip("skipping test with mongo") + } +} diff --git a/test/dtmsvr_test.go b/test/dtmsvr_test.go index 497a449..93e592d 100644 --- a/test/dtmsvr_test.go +++ b/test/dtmsvr_test.go @@ -43,11 +43,11 @@ func getBranchesStatus(gid string) []string { return status } -func isSqlStore() bool { +func isSQLStore() bool { return conf.Store.Driver == config.Mysql || conf.Store.Driver == config.Postgres } func TestUpdateBranchAsync(t *testing.T) { - if !isSqlStore() { + if !isSQLStore() { return } conf.UpdateBranchSync = 0 diff --git a/test/main_test.go b/test/main_test.go index 665eed5..b91e9dd 100644 --- a/test/main_test.go +++ b/test/main_test.go @@ -37,6 +37,7 @@ func TestMain(m *testing.M) { dtmdriver.Middlewares.Grpc = append(dtmdriver.Middlewares.Grpc, busi.SetGrpcHeaderForHeadersYes) tenv := dtmimp.OrString(os.Getenv("TEST_STORE"), config.Redis) + conf.ConfigUpdateInterval = 1 conf.Store.Host = "localhost" conf.Store.Driver = tenv if tenv == "boltdb" { @@ -74,7 +75,8 @@ func TestMain(m *testing.M) { subscribeTopic() subscribeGrpcTopic() - + dtmsvr.CronUpdateTopicsMapOnce() + logger.Debugf("unit main test inited") r := m.Run() if r != 0 { os.Exit(r) diff --git a/test/msg_barrier_mongo_test.go b/test/msg_barrier_mongo_test.go index 32bb0f0..50e60e8 100644 --- a/test/msg_barrier_mongo_test.go +++ b/test/msg_barrier_mongo_test.go @@ -12,6 +12,7 @@ import ( ) func TestMsgMongoDoSucceed(t *testing.T) { + MaySkipMongo(t) before := getBeforeBalances("mongo") gid := dtmimp.GetFuncName() req := busi.GenReqHTTP(30, false, false) @@ -30,6 +31,7 @@ func TestMsgMongoDoSucceed(t *testing.T) { } func TestMsgMongoDoBusiFailed(t *testing.T) { + MaySkipMongo(t) before := getBeforeBalances("mongo") gid := dtmimp.GetFuncName() req := busi.GenReqHTTP(30, false, false) @@ -43,6 +45,7 @@ func TestMsgMongoDoBusiFailed(t *testing.T) { } func TestMsgMongoDoBusiLater(t *testing.T) { + MaySkipMongo(t) before := getBeforeBalances("mongo") gid := dtmimp.GetFuncName() req := busi.GenReqHTTP(30, false, false) @@ -68,6 +71,7 @@ func TestMsgMongoDoBusiLater(t *testing.T) { } func TestMsgMongoDoCommitFailed(t *testing.T) { + MaySkipMongo(t) before := getBeforeBalances("mongo") gid := dtmimp.GetFuncName() req := busi.GenReqHTTP(30, false, false) @@ -85,6 +89,7 @@ func TestMsgMongoDoCommitFailed(t *testing.T) { } func TestMsgMongoDoCommitAfterFailed(t *testing.T) { + MaySkipMongo(t) before := getBeforeBalances("mongo") gid := dtmimp.GetFuncName() req := busi.GenReqHTTP(30, false, false) diff --git a/test/msg_grpc_test.go b/test/msg_grpc_test.go index a65a88a..3c2fd1f 100644 --- a/test/msg_grpc_test.go +++ b/test/msg_grpc_test.go @@ -9,7 +9,6 @@ package test import ( "fmt" "testing" - "time" "github.com/dtm-labs/dtm/client/dtmcli" "github.com/dtm-labs/dtm/client/dtmcli/dtmimp" @@ -70,7 +69,4 @@ func genGrpcMsg(gid string) *dtmgrpc.MsgGrpc { func subscribeGrpcTopic() { e2p(grpcSubscribe("grpc_trans", busi.BusiGrpc+"/busi.Busi/TransOut")) e2p(grpcSubscribe("grpc_trans", busi.BusiGrpc+"/busi.Busi/TransIn")) - - // wait for the topic configuration to take effect - time.Sleep(time.Second * time.Duration(conf.ConfigUpdateInterval+1)) } diff --git a/test/msg_test.go b/test/msg_test.go index e07ae13..7f86808 100644 --- a/test/msg_test.go +++ b/test/msg_test.go @@ -9,7 +9,6 @@ package test import ( "strings" "testing" - "time" "github.com/dtm-labs/dtm/client/dtmcli" "github.com/dtm-labs/dtm/client/dtmcli/dtmimp" @@ -89,7 +88,4 @@ func genMsg(gid string) *dtmcli.Msg { func subscribeTopic() { e2p(httpSubscribe("http_trans", busi.Busi+"/TransOut")) e2p(httpSubscribe("http_trans", busi.Busi+"/TransIn")) - - // wait for the topic configuration to take effect - time.Sleep(time.Second * time.Duration(conf.ConfigUpdateInterval+1)) } diff --git a/test/saga_barrier_mongo_test.go b/test/saga_barrier_mongo_test.go index 0cf2b61..789dc35 100644 --- a/test/saga_barrier_mongo_test.go +++ b/test/saga_barrier_mongo_test.go @@ -16,6 +16,7 @@ import ( ) func TestSagaBarrierMongoNormal(t *testing.T) { + MaySkipMongo(t) before := getBeforeBalances("mongo") saga := genSagaBarrierMongo(dtmimp.GetFuncName(), false) err := saga.Submit() @@ -27,6 +28,7 @@ func TestSagaBarrierMongoNormal(t *testing.T) { } func TestSagaBarrierMongoRollback(t *testing.T) { + MaySkipMongo(t) before := getBeforeBalances("mongo") saga := genSagaBarrierMongo(dtmimp.GetFuncName(), true) err := saga.Submit() diff --git a/test/saga_grpc_test.go b/test/saga_grpc_test.go index e14d3ae..17806a3 100644 --- a/test/saga_grpc_test.go +++ b/test/saga_grpc_test.go @@ -92,7 +92,7 @@ func TestSagaGrpcEmptyUrl(t *testing.T) { assert.Equal(t, StatusSucceed, getTransStatus(saga.Gid)) } -//nolint: unparam +// nolint: unparam func genSagaGrpc(gid string, outFailed bool, inFailed bool) *dtmgrpc.SagaGrpc { saga := dtmgrpc.NewSagaGrpc(dtmutil.DefaultGrpcServer, gid) req := busi.GenReqGrpc(30, outFailed, inFailed) diff --git a/test/store_test.go b/test/store_test.go index 8f0f93f..5845390 100644 --- a/test/store_test.go +++ b/test/store_test.go @@ -179,22 +179,23 @@ func TestUpdateBranches(t *testing.T) { } } -// func TestResetTransGlobalCronTime(t *testing.T) { -// gid := dtmimp.GetFuncName() -// g, _ := initTransGlobal(gid) - -// s := registry.GetStore() -// g2 := s.FindTransGlobalStore(gid) -// assert.NotNil(t, g2) -// assert.Equal(t, gid, g2.Gid) - -// s.ResetTransGlobalCronTime(g2) - -// g2 = s.FindTransGlobalStore(gid) -// assert.NotNil(t, g2) -// assert.Equal(t, gid, g2.Gid) -// assert.Greater(t, time.Now().Add(3*time.Second), *g2.NextCronTime) -// assert.Equal(t, g2.UpdateTime, g2.NextCronTime) -// assert.NotEqual(t, g.UpdateTime, g2.UpdateTime) -// assert.NotEqual(t, g.NextCronTime, g2.NextCronTime) -// } +func TestResetTransGlobalCronTime(t *testing.T) { + gid := dtmimp.GetFuncName() + g, _ := initTransGlobal(gid) + + s := registry.GetStore() + g2 := s.FindTransGlobalStore(gid) + assert.NotNil(t, g2) + assert.Equal(t, gid, g2.Gid) + + s.ResetTransGlobalCronTime(g2) + + g2 = s.FindTransGlobalStore(gid) + assert.NotNil(t, g2) + assert.Equal(t, gid, g2.Gid) + assert.Greater(t, time.Now().Add(3*time.Second), *g2.NextCronTime) + assert.Equal(t, g2.UpdateTime, g2.NextCronTime) + assert.NotEqual(t, g.UpdateTime, g2.UpdateTime) + assert.NotEqual(t, g.NextCronTime, g2.NextCronTime) + s.ChangeGlobalStatus(g, "succeed", []string{}, true) +} diff --git a/test/tcc_barrier_test.go b/test/tcc_barrier_test.go index b298dde..dff1c82 100644 --- a/test/tcc_barrier_test.go +++ b/test/tcc_barrier_test.go @@ -63,6 +63,9 @@ func TestTccBarrierDisorderRedis(t *testing.T) { } func runTestTccBarrierDisorder(t *testing.T, store string) { + if store == "mongo" { + MaySkipMongo(t) + } before := getBeforeBalances(store) cancelFinishedChan := make(chan string, 2) cancelCanReturnChan := make(chan string, 2) diff --git a/test/workflow_http_test.go b/test/workflow_http_test.go index 991a315..8e740da 100644 --- a/test/workflow_http_test.go +++ b/test/workflow_http_test.go @@ -215,7 +215,7 @@ func TestWorkflowResumeSkip(t *testing.T) { workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error { wf.NewBranch().Do(func(bb *dtmcli.BranchBarrier) ([]byte, error) { logger.Infof("increase resume counter") - resumeCounter += 1 + resumeCounter++ return nil, nil }) var req busi.ReqHTTP