From 8cac60ead240dec784e008a74ed7fe97568ccdb9 Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Sat, 1 Jul 2023 08:58:44 +0800 Subject: [PATCH 01/14] update test --- dtmsvr/cron.go | 4 ++-- dtmsvr/storage/boltdb/boltdb.go | 4 ++-- dtmsvr/storage/redis/redis.go | 1 - dtmsvr/storage/sql/sql.go | 2 +- test/common_test.go | 7 +++++++ test/main_test.go | 4 +++- test/msg_barrier_mongo_test.go | 5 +++++ test/msg_grpc_test.go | 4 ---- test/msg_test.go | 4 ---- test/saga_barrier_mongo_test.go | 2 ++ 10 files changed, 22 insertions(+), 15 deletions(-) diff --git a/dtmsvr/cron.go b/dtmsvr/cron.go index 671df25..4f7179e 100644 --- a/dtmsvr/cron.go +++ b/dtmsvr/cron.go @@ -53,11 +53,11 @@ func CronExpiredTrans(num int) { func CronUpdateTopicsMap() { for { time.Sleep(time.Duration(conf.ConfigUpdateInterval) * time.Second) - cronUpdateTopicsMapOnce() + CronUpdateTopicsMapOnce() } } -func cronUpdateTopicsMapOnce() { +func CronUpdateTopicsMapOnce() { defer handlePanic(nil) updateTopicsMap() } diff --git a/dtmsvr/storage/boltdb/boltdb.go b/dtmsvr/storage/boltdb/boltdb.go index 631d73d..eea3a5d 100644 --- a/dtmsvr/storage/boltdb/boltdb.go +++ b/dtmsvr/storage/boltdb/boltdb.go @@ -68,7 +68,8 @@ func initializeBuckets(db *bolt.DB) error { } // cleanupExpiredData will clean the expired data in boltdb, the -// expired time is configurable. +// +// expired time is configurable. func cleanupExpiredData(expire time.Duration, db *bolt.DB) error { if expire <= 0 { return nil @@ -482,7 +483,6 @@ func (s *Store) ResetCronTime(after time.Duration, limit int64) (succeedCount in return } -// ResetTransGlobalCronTime reset nextCronTime of one global trans. func (s *Store) ResetTransGlobalCronTime(g *storage.TransGlobalStore) error { old := g.UpdateTime err := s.boltDb.Update(func(t *bolt.Tx) error { diff --git a/dtmsvr/storage/redis/redis.go b/dtmsvr/storage/redis/redis.go index 090ab59..7a91682 100644 --- a/dtmsvr/storage/redis/redis.go +++ b/dtmsvr/storage/redis/redis.go @@ -330,7 +330,6 @@ return tostring(i) return } -// ResetTransGlobalCronTime reset nextCronTime of one global trans. func (s *Store) ResetTransGlobalCronTime(global *storage.TransGlobalStore) error { now := dtmutil.GetNextTime(0) global.NextCronTime = now diff --git a/dtmsvr/storage/sql/sql.go b/dtmsvr/storage/sql/sql.go index 28f9c9c..98a4501 100644 --- a/dtmsvr/storage/sql/sql.go +++ b/dtmsvr/storage/sql/sql.go @@ -204,7 +204,7 @@ func (s *Store) ResetCronTime(after time.Duration, limit int64) (succeedCount in return affected, affected == limit, err } -// ResetTransGlobalCronTime reset nextCronTime of one global trans. +// reset nextCronTime of one global trans. func (s *Store) ResetTransGlobalCronTime(global *storage.TransGlobalStore) error { now := getTimeStr(0) where := map[string]string{ diff --git a/test/common_test.go b/test/common_test.go index 2fe2724..4974268 100644 --- a/test/common_test.go +++ b/test/common_test.go @@ -1,6 +1,7 @@ package test import ( + "os" "testing" "github.com/dtm-labs/dtm/client/dtmcli" @@ -46,3 +47,9 @@ func TestMustGenGid(t *testing.T) { dtmgrpc.MustGenGid(dtmutil.DefaultGrpcServer) dtmcli.MustGenGid(dtmutil.DefaultHTTPServer) } + +func MaySkipMongo(t *testing.T) { + if os.Getenv("SKIP_MONGO") != "" { + t.Skip("skipping test with mongo") + } +} diff --git a/test/main_test.go b/test/main_test.go index 665eed5..b91e9dd 100644 --- a/test/main_test.go +++ b/test/main_test.go @@ -37,6 +37,7 @@ func TestMain(m *testing.M) { dtmdriver.Middlewares.Grpc = append(dtmdriver.Middlewares.Grpc, busi.SetGrpcHeaderForHeadersYes) tenv := dtmimp.OrString(os.Getenv("TEST_STORE"), config.Redis) + conf.ConfigUpdateInterval = 1 conf.Store.Host = "localhost" conf.Store.Driver = tenv if tenv == "boltdb" { @@ -74,7 +75,8 @@ func TestMain(m *testing.M) { subscribeTopic() subscribeGrpcTopic() - + dtmsvr.CronUpdateTopicsMapOnce() + logger.Debugf("unit main test inited") r := m.Run() if r != 0 { os.Exit(r) diff --git a/test/msg_barrier_mongo_test.go b/test/msg_barrier_mongo_test.go index 32bb0f0..50e60e8 100644 --- a/test/msg_barrier_mongo_test.go +++ b/test/msg_barrier_mongo_test.go @@ -12,6 +12,7 @@ import ( ) func TestMsgMongoDoSucceed(t *testing.T) { + MaySkipMongo(t) before := getBeforeBalances("mongo") gid := dtmimp.GetFuncName() req := busi.GenReqHTTP(30, false, false) @@ -30,6 +31,7 @@ func TestMsgMongoDoSucceed(t *testing.T) { } func TestMsgMongoDoBusiFailed(t *testing.T) { + MaySkipMongo(t) before := getBeforeBalances("mongo") gid := dtmimp.GetFuncName() req := busi.GenReqHTTP(30, false, false) @@ -43,6 +45,7 @@ func TestMsgMongoDoBusiFailed(t *testing.T) { } func TestMsgMongoDoBusiLater(t *testing.T) { + MaySkipMongo(t) before := getBeforeBalances("mongo") gid := dtmimp.GetFuncName() req := busi.GenReqHTTP(30, false, false) @@ -68,6 +71,7 @@ func TestMsgMongoDoBusiLater(t *testing.T) { } func TestMsgMongoDoCommitFailed(t *testing.T) { + MaySkipMongo(t) before := getBeforeBalances("mongo") gid := dtmimp.GetFuncName() req := busi.GenReqHTTP(30, false, false) @@ -85,6 +89,7 @@ func TestMsgMongoDoCommitFailed(t *testing.T) { } func TestMsgMongoDoCommitAfterFailed(t *testing.T) { + MaySkipMongo(t) before := getBeforeBalances("mongo") gid := dtmimp.GetFuncName() req := busi.GenReqHTTP(30, false, false) diff --git a/test/msg_grpc_test.go b/test/msg_grpc_test.go index a65a88a..3c2fd1f 100644 --- a/test/msg_grpc_test.go +++ b/test/msg_grpc_test.go @@ -9,7 +9,6 @@ package test import ( "fmt" "testing" - "time" "github.com/dtm-labs/dtm/client/dtmcli" "github.com/dtm-labs/dtm/client/dtmcli/dtmimp" @@ -70,7 +69,4 @@ func genGrpcMsg(gid string) *dtmgrpc.MsgGrpc { func subscribeGrpcTopic() { e2p(grpcSubscribe("grpc_trans", busi.BusiGrpc+"/busi.Busi/TransOut")) e2p(grpcSubscribe("grpc_trans", busi.BusiGrpc+"/busi.Busi/TransIn")) - - // wait for the topic configuration to take effect - time.Sleep(time.Second * time.Duration(conf.ConfigUpdateInterval+1)) } diff --git a/test/msg_test.go b/test/msg_test.go index e07ae13..7f86808 100644 --- a/test/msg_test.go +++ b/test/msg_test.go @@ -9,7 +9,6 @@ package test import ( "strings" "testing" - "time" "github.com/dtm-labs/dtm/client/dtmcli" "github.com/dtm-labs/dtm/client/dtmcli/dtmimp" @@ -89,7 +88,4 @@ func genMsg(gid string) *dtmcli.Msg { func subscribeTopic() { e2p(httpSubscribe("http_trans", busi.Busi+"/TransOut")) e2p(httpSubscribe("http_trans", busi.Busi+"/TransIn")) - - // wait for the topic configuration to take effect - time.Sleep(time.Second * time.Duration(conf.ConfigUpdateInterval+1)) } diff --git a/test/saga_barrier_mongo_test.go b/test/saga_barrier_mongo_test.go index 0cf2b61..789dc35 100644 --- a/test/saga_barrier_mongo_test.go +++ b/test/saga_barrier_mongo_test.go @@ -16,6 +16,7 @@ import ( ) func TestSagaBarrierMongoNormal(t *testing.T) { + MaySkipMongo(t) before := getBeforeBalances("mongo") saga := genSagaBarrierMongo(dtmimp.GetFuncName(), false) err := saga.Submit() @@ -27,6 +28,7 @@ func TestSagaBarrierMongoNormal(t *testing.T) { } func TestSagaBarrierMongoRollback(t *testing.T) { + MaySkipMongo(t) before := getBeforeBalances("mongo") saga := genSagaBarrierMongo(dtmimp.GetFuncName(), true) err := saga.Submit() From 25cab75ec8c8b111b0c02ac74752547a6b8e6909 Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Sat, 1 Jul 2023 09:05:38 +0800 Subject: [PATCH 02/14] update unit test --- test/tcc_barrier_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/test/tcc_barrier_test.go b/test/tcc_barrier_test.go index b298dde..dff1c82 100644 --- a/test/tcc_barrier_test.go +++ b/test/tcc_barrier_test.go @@ -63,6 +63,9 @@ func TestTccBarrierDisorderRedis(t *testing.T) { } func runTestTccBarrierDisorder(t *testing.T, store string) { + if store == "mongo" { + MaySkipMongo(t) + } before := getBeforeBalances(store) cancelFinishedChan := make(chan string, 2) cancelCanReturnChan := make(chan string, 2) From a1a63c9e1ee0a6b18cd2d6a873563ff510f626e0 Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Sat, 1 Jul 2023 09:25:47 +0800 Subject: [PATCH 03/14] add test case for reset cron time back --- helper/test-cover.sh | 2 ++ test/api_test.go | 55 ++++++++++++++++++++++---------------------- test/store_test.go | 39 ++++++++++++++++--------------- 3 files changed, 50 insertions(+), 46 deletions(-) diff --git a/helper/test-cover.sh b/helper/test-cover.sh index fb7bba1..c5860ca 100755 --- a/helper/test-cover.sh +++ b/helper/test-cover.sh @@ -9,6 +9,8 @@ for store in redis boltdb mysql postgres; do echo > profile.out fi done +## for local unit test, you may use following command +# SKIP_MONGO=1 TEST_STORE=redis GOARCH=amd64 go test -v -failfast -count=1 -gcflags=all=-l ./... # go tool cover -html=coverage.txt diff --git a/test/api_test.go b/test/api_test.go index 9831754..7753236 100644 --- a/test/api_test.go +++ b/test/api_test.go @@ -15,6 +15,7 @@ import ( "github.com/dtm-labs/dtm/client/dtmcli" "github.com/dtm-labs/dtm/client/dtmcli/dtmimp" + "github.com/dtm-labs/dtm/dtmsvr/storage/registry" "github.com/dtm-labs/dtm/dtmutil" "github.com/dtm-labs/dtm/test/busi" "github.com/stretchr/testify/assert" @@ -249,30 +250,30 @@ func TestAPIForceStoppedAbnormal(t *testing.T) { assert.Equal(t, resp.StatusCode(), http.StatusConflict) } -// func TestAPIResetNextCronTime(t *testing.T) { -// saga := genSaga(dtmimp.GetFuncName(), false, false) -// saga.Submit() -// waitTransProcessed(saga.Gid) -// assert.Equal(t, []string{StatusPrepared, StatusSucceed, StatusPrepared, StatusSucceed}, getBranchesStatus(saga.Gid)) -// assert.Equal(t, StatusSucceed, getTransStatus(saga.Gid)) -// gid := saga.Gid - -// s := registry.GetStore() -// g := s.FindTransGlobalStore(saga.Gid) - -// // reset -// resp, err := dtmcli.GetRestyClient().R().SetBody(map[string]string{ -// "gid": saga.Gid, -// }).Post(dtmutil.DefaultHTTPServer + "/resetNextCronTime") -// assert.Nil(t, err) -// assert.Equal(t, resp.StatusCode(), http.StatusOK) - -// // after reset assert -// g2 := s.FindTransGlobalStore(gid) -// assert.NotNil(t, g2) -// assert.Equal(t, gid, g2.Gid) -// assert.Greater(t, time.Now().Add(3*time.Second), *g2.NextCronTime) -// assert.Equal(t, g2.UpdateTime, g2.NextCronTime) -// assert.NotEqual(t, g.UpdateTime, g2.UpdateTime) -// assert.NotEqual(t, g.NextCronTime, g2.NextCronTime) -// } +func TestAPIResetNextCronTime(t *testing.T) { + saga := genSaga(dtmimp.GetFuncName(), false, false) + saga.Submit() + waitTransProcessed(saga.Gid) + assert.Equal(t, []string{StatusPrepared, StatusSucceed, StatusPrepared, StatusSucceed}, getBranchesStatus(saga.Gid)) + assert.Equal(t, StatusSucceed, getTransStatus(saga.Gid)) + gid := saga.Gid + + s := registry.GetStore() + g := s.FindTransGlobalStore(saga.Gid) + + // reset + resp, err := dtmcli.GetRestyClient().R().SetBody(map[string]string{ + "gid": saga.Gid, + }).Post(dtmutil.DefaultHTTPServer + "/resetNextCronTime") + assert.Nil(t, err) + assert.Equal(t, resp.StatusCode(), http.StatusOK) + + // after reset assert + g2 := s.FindTransGlobalStore(gid) + assert.NotNil(t, g2) + assert.Equal(t, gid, g2.Gid) + assert.Greater(t, time.Now().Add(3*time.Second), *g2.NextCronTime) + assert.Equal(t, g2.UpdateTime, g2.NextCronTime) + assert.NotEqual(t, g.UpdateTime, g2.UpdateTime) + assert.NotEqual(t, g.NextCronTime, g2.NextCronTime) +} diff --git a/test/store_test.go b/test/store_test.go index 8f0f93f..5845390 100644 --- a/test/store_test.go +++ b/test/store_test.go @@ -179,22 +179,23 @@ func TestUpdateBranches(t *testing.T) { } } -// func TestResetTransGlobalCronTime(t *testing.T) { -// gid := dtmimp.GetFuncName() -// g, _ := initTransGlobal(gid) - -// s := registry.GetStore() -// g2 := s.FindTransGlobalStore(gid) -// assert.NotNil(t, g2) -// assert.Equal(t, gid, g2.Gid) - -// s.ResetTransGlobalCronTime(g2) - -// g2 = s.FindTransGlobalStore(gid) -// assert.NotNil(t, g2) -// assert.Equal(t, gid, g2.Gid) -// assert.Greater(t, time.Now().Add(3*time.Second), *g2.NextCronTime) -// assert.Equal(t, g2.UpdateTime, g2.NextCronTime) -// assert.NotEqual(t, g.UpdateTime, g2.UpdateTime) -// assert.NotEqual(t, g.NextCronTime, g2.NextCronTime) -// } +func TestResetTransGlobalCronTime(t *testing.T) { + gid := dtmimp.GetFuncName() + g, _ := initTransGlobal(gid) + + s := registry.GetStore() + g2 := s.FindTransGlobalStore(gid) + assert.NotNil(t, g2) + assert.Equal(t, gid, g2.Gid) + + s.ResetTransGlobalCronTime(g2) + + g2 = s.FindTransGlobalStore(gid) + assert.NotNil(t, g2) + assert.Equal(t, gid, g2.Gid) + assert.Greater(t, time.Now().Add(3*time.Second), *g2.NextCronTime) + assert.Equal(t, g2.UpdateTime, g2.NextCronTime) + assert.NotEqual(t, g.UpdateTime, g2.UpdateTime) + assert.NotEqual(t, g.NextCronTime, g2.NextCronTime) + s.ChangeGlobalStatus(g, "succeed", []string{}, true) +} From fef6436759f5f9e1c472df34c8ef16eb5fe87221 Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Sat, 1 Jul 2023 09:34:01 +0800 Subject: [PATCH 04/14] fix comments --- dtmsvr/cron.go | 1 + dtmsvr/storage/boltdb/boltdb.go | 1 + dtmsvr/storage/redis/redis.go | 1 + dtmsvr/storage/sql/sql.go | 2 +- 4 files changed, 4 insertions(+), 1 deletion(-) diff --git a/dtmsvr/cron.go b/dtmsvr/cron.go index 4f7179e..21e22df 100644 --- a/dtmsvr/cron.go +++ b/dtmsvr/cron.go @@ -57,6 +57,7 @@ func CronUpdateTopicsMap() { } } +// CronUpdateTopicsMap cron updates topics map once func CronUpdateTopicsMapOnce() { defer handlePanic(nil) updateTopicsMap() diff --git a/dtmsvr/storage/boltdb/boltdb.go b/dtmsvr/storage/boltdb/boltdb.go index eea3a5d..e6b6b51 100644 --- a/dtmsvr/storage/boltdb/boltdb.go +++ b/dtmsvr/storage/boltdb/boltdb.go @@ -483,6 +483,7 @@ func (s *Store) ResetCronTime(after time.Duration, limit int64) (succeedCount in return } +// ResetTransGlobalCronTime reset nextCronTime of one global trans. func (s *Store) ResetTransGlobalCronTime(g *storage.TransGlobalStore) error { old := g.UpdateTime err := s.boltDb.Update(func(t *bolt.Tx) error { diff --git a/dtmsvr/storage/redis/redis.go b/dtmsvr/storage/redis/redis.go index 7a91682..090ab59 100644 --- a/dtmsvr/storage/redis/redis.go +++ b/dtmsvr/storage/redis/redis.go @@ -330,6 +330,7 @@ return tostring(i) return } +// ResetTransGlobalCronTime reset nextCronTime of one global trans. func (s *Store) ResetTransGlobalCronTime(global *storage.TransGlobalStore) error { now := dtmutil.GetNextTime(0) global.NextCronTime = now diff --git a/dtmsvr/storage/sql/sql.go b/dtmsvr/storage/sql/sql.go index 98a4501..28f9c9c 100644 --- a/dtmsvr/storage/sql/sql.go +++ b/dtmsvr/storage/sql/sql.go @@ -204,7 +204,7 @@ func (s *Store) ResetCronTime(after time.Duration, limit int64) (succeedCount in return affected, affected == limit, err } -// reset nextCronTime of one global trans. +// ResetTransGlobalCronTime reset nextCronTime of one global trans. func (s *Store) ResetTransGlobalCronTime(global *storage.TransGlobalStore) error { now := getTimeStr(0) where := map[string]string{ From 237a5b01d7a0b27e46002d0060064579bd499abd Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Sat, 1 Jul 2023 09:46:03 +0800 Subject: [PATCH 05/14] update some comments --- dtmsvr/storage/boltdb/boltdb.go | 1 + dtmsvr/storage/redis/redis.go | 1 + dtmsvr/storage/sql/sql.go | 1 + helper/golint.sh | 2 +- main.go | 1 + 5 files changed, 5 insertions(+), 1 deletion(-) diff --git a/dtmsvr/storage/boltdb/boltdb.go b/dtmsvr/storage/boltdb/boltdb.go index e6b6b51..e744b6e 100644 --- a/dtmsvr/storage/boltdb/boltdb.go +++ b/dtmsvr/storage/boltdb/boltdb.go @@ -4,6 +4,7 @@ * license that can be found in the LICENSE file. */ +// package boltdb implement the storage for boltdb package boltdb import ( diff --git a/dtmsvr/storage/redis/redis.go b/dtmsvr/storage/redis/redis.go index 090ab59..6f4d732 100644 --- a/dtmsvr/storage/redis/redis.go +++ b/dtmsvr/storage/redis/redis.go @@ -4,6 +4,7 @@ * license that can be found in the LICENSE file. */ +// package reds implement the storage for reds package redis import ( diff --git a/dtmsvr/storage/sql/sql.go b/dtmsvr/storage/sql/sql.go index 28f9c9c..3f94896 100644 --- a/dtmsvr/storage/sql/sql.go +++ b/dtmsvr/storage/sql/sql.go @@ -4,6 +4,7 @@ * license that can be found in the LICENSE file. */ +// package boltdb implement the storage for sql database package sql import ( diff --git a/helper/golint.sh b/helper/golint.sh index 71f1817..78fd29f 100644 --- a/helper/golint.sh +++ b/helper/golint.sh @@ -1,4 +1,4 @@ set -x -curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin v1.46.2 +curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin v1.53.3 $(go env GOPATH)/bin/golangci-lint run diff --git a/main.go b/main.go index 645031f..75119dd 100644 --- a/main.go +++ b/main.go @@ -4,6 +4,7 @@ * license that can be found in the LICENSE file. */ +// package main is the entry of dtm server package main import ( From cce7270e7abed908b2eae1a88c8bbd1408ee27f2 Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Sat, 1 Jul 2023 09:47:44 +0800 Subject: [PATCH 06/14] fix format --- dtmsvr/storage/trans.go | 2 +- test/saga_grpc_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dtmsvr/storage/trans.go b/dtmsvr/storage/trans.go index e9551d5..856bd09 100644 --- a/dtmsvr/storage/trans.go +++ b/dtmsvr/storage/trans.go @@ -81,7 +81,7 @@ func (b *TransBranchStore) String() string { return dtmimp.MustMarshalString(*b) } -//KVStore defines Key-Value storage info +// KVStore defines Key-Value storage info type KVStore struct { dtmutil.ModelBase Cat string `json:"cat"` diff --git a/test/saga_grpc_test.go b/test/saga_grpc_test.go index e14d3ae..17806a3 100644 --- a/test/saga_grpc_test.go +++ b/test/saga_grpc_test.go @@ -92,7 +92,7 @@ func TestSagaGrpcEmptyUrl(t *testing.T) { assert.Equal(t, StatusSucceed, getTransStatus(saga.Gid)) } -//nolint: unparam +// nolint: unparam func genSagaGrpc(gid string, outFailed bool, inFailed bool) *dtmgrpc.SagaGrpc { saga := dtmgrpc.NewSagaGrpc(dtmutil.DefaultGrpcServer, gid) req := busi.GenReqGrpc(30, outFailed, inFailed) From 3d639e098968fe23d547d57da4bafccf73a8e1ba Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Sat, 1 Jul 2023 10:49:29 +0800 Subject: [PATCH 07/14] fix lint --- .golangci.yml | 27 --------------------------- dtmsvr/cron.go | 2 +- helper/Makefile | 26 -------------------------- helper/golint.sh | 3 +-- test/common_test.go | 4 ++-- test/dtmsvr_test.go | 4 ++-- test/workflow_http_test.go | 2 +- 7 files changed, 7 insertions(+), 61 deletions(-) delete mode 100644 .golangci.yml delete mode 100644 helper/Makefile diff --git a/.golangci.yml b/.golangci.yml deleted file mode 100644 index 31cf246..0000000 --- a/.golangci.yml +++ /dev/null @@ -1,27 +0,0 @@ -run: - deadline: 5m - skip-dirs: -# - test -# - bench - -linter-settings: - goconst: - min-len: 2 - min-occurrences: 2 - -linters: - enable: - - revive - - goconst - - gofmt - - goimports - - misspell - - unparam - -issues: - exclude-use-default: false - exclude-rules: - - path: _test.go - linters: - - errcheck - - revive diff --git a/dtmsvr/cron.go b/dtmsvr/cron.go index 21e22df..82ef344 100644 --- a/dtmsvr/cron.go +++ b/dtmsvr/cron.go @@ -57,7 +57,7 @@ func CronUpdateTopicsMap() { } } -// CronUpdateTopicsMap cron updates topics map once +// CronUpdateTopicsMapOnce cron updates topics map once func CronUpdateTopicsMapOnce() { defer handlePanic(nil) updateTopicsMap() diff --git a/helper/Makefile b/helper/Makefile deleted file mode 100644 index f9d7353..0000000 --- a/helper/Makefile +++ /dev/null @@ -1,26 +0,0 @@ -# dev env https://www.dtm.pub/other/develop.html -all: fmt lint test_redis -.PHONY: all - -fmt: - @gofmt -s -w ./ - -lint: - @golangci-lint run - -.PHONY: test -test: - @go test ./... - -test_redis: - TEST_STORE=redis go test ./... - -test_all: - TEST_STORE=redis go test ./... - TEST_STORE=boltdb go test ./... - TEST_STORE=mysql go test ./... - TEST_STORE=postgres go test ./... - -cover_test: - ./helper/test-cover.sh - diff --git a/helper/golint.sh b/helper/golint.sh index 78fd29f..d4a8642 100644 --- a/helper/golint.sh +++ b/helper/golint.sh @@ -1,4 +1,3 @@ set -x -curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin v1.53.3 -$(go env GOPATH)/bin/golangci-lint run +go install github.com/mgechev/revive@latest && revive -config revive.toml ./... diff --git a/test/common_test.go b/test/common_test.go index 4974268..de01e63 100644 --- a/test/common_test.go +++ b/test/common_test.go @@ -14,12 +14,12 @@ import ( func TestGeneralDB(t *testing.T) { if conf.Store.IsDB() { - testSql(t) + testSQL(t) testDbAlone(t) } } -func testSql(t *testing.T) { +func testSQL(t *testing.T) { conf := conf.Store.GetDBConf() conf.Host = "127.0.0.1" // use a new host to trigger SetDBConn called db := dtmutil.DbGet(conf, sql.SetDBConn) diff --git a/test/dtmsvr_test.go b/test/dtmsvr_test.go index 497a449..93e592d 100644 --- a/test/dtmsvr_test.go +++ b/test/dtmsvr_test.go @@ -43,11 +43,11 @@ func getBranchesStatus(gid string) []string { return status } -func isSqlStore() bool { +func isSQLStore() bool { return conf.Store.Driver == config.Mysql || conf.Store.Driver == config.Postgres } func TestUpdateBranchAsync(t *testing.T) { - if !isSqlStore() { + if !isSQLStore() { return } conf.UpdateBranchSync = 0 diff --git a/test/workflow_http_test.go b/test/workflow_http_test.go index 991a315..8e740da 100644 --- a/test/workflow_http_test.go +++ b/test/workflow_http_test.go @@ -215,7 +215,7 @@ func TestWorkflowResumeSkip(t *testing.T) { workflow.Register(gid, func(wf *workflow.Workflow, data []byte) error { wf.NewBranch().Do(func(bb *dtmcli.BranchBarrier) ([]byte, error) { logger.Infof("increase resume counter") - resumeCounter += 1 + resumeCounter++ return nil, nil }) var req busi.ReqHTTP From 018517326282f126a715ece1ad724f6be85fddfc Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Sat, 1 Jul 2023 10:56:46 +0800 Subject: [PATCH 08/14] update CI --- Makefile | 26 ++++++++++++++++++++++++++ revive.toml | 27 +++++++++++++++++++++++++++ 2 files changed, 53 insertions(+) create mode 100644 Makefile create mode 100644 revive.toml diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..9564206 --- /dev/null +++ b/Makefile @@ -0,0 +1,26 @@ +# dev env https://www.dtm.pub/other/develop.html +all: fmt lint test_redis +.PHONY: all + +fmt: + @gofmt -s -w ./ + +lint: + revive -config revive.toml ./... + +.PHONY: test +test: + @go test ./... + +test_redis: + TEST_STORE=redis go test ./... + +test_all: + TEST_STORE=redis go test ./... + TEST_STORE=boltdb go test ./... + TEST_STORE=mysql go test ./... + TEST_STORE=postgres go test ./... + +cover_test: + ./helper/test-cover.sh + diff --git a/revive.toml b/revive.toml new file mode 100644 index 0000000..255d8c8 --- /dev/null +++ b/revive.toml @@ -0,0 +1,27 @@ +ignoreGeneratedHeader = false +severity = "warning" +confidence = 0.8 +errorCode = 0 +warningCode = 0 + +[rule.blank-imports] +[rule.context-as-argument] +[rule.context-keys-type] +[rule.dot-imports] +[rule.error-return] +[rule.error-strings] +[rule.error-naming] +[rule.exported] +[rule.if-return] +[rule.increment-decrement] +[rule.var-naming] +[rule.var-declaration] +[rule.range] +[rule.receiver-naming] +[rule.time-naming] +[rule.unexported-return] +[rule.indent-error-flow] +[rule.errorf] +[rule.superfluous-else] +[rule.unreachable-code] +[rule.redefines-builtin-id] \ No newline at end of file From 9d6ec4642bbd29d3851da01dbe1d36195763de89 Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Sat, 1 Jul 2023 11:09:42 +0800 Subject: [PATCH 09/14] fix boltdb test failure --- dtmsvr/storage/boltdb/boltdb.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/dtmsvr/storage/boltdb/boltdb.go b/dtmsvr/storage/boltdb/boltdb.go index e744b6e..bab761c 100644 --- a/dtmsvr/storage/boltdb/boltdb.go +++ b/dtmsvr/storage/boltdb/boltdb.go @@ -486,10 +486,9 @@ func (s *Store) ResetCronTime(after time.Duration, limit int64) (succeedCount in // ResetTransGlobalCronTime reset nextCronTime of one global trans. func (s *Store) ResetTransGlobalCronTime(g *storage.TransGlobalStore) error { - old := g.UpdateTime err := s.boltDb.Update(func(t *bolt.Tx) error { g := tGetGlobal(t, g.Gid) - if g == nil || g.UpdateTime == old { + if g == nil { return storage.ErrNotFound } now := dtmutil.GetNextTime(0) From c9a6e1bef206ca726633f11df5c0eb8d75688bbb Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Sat, 1 Jul 2023 17:44:04 +0800 Subject: [PATCH 10/14] fix scan return more records --- dtmsvr/storage/redis/redis.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/dtmsvr/storage/redis/redis.go b/dtmsvr/storage/redis/redis.go index 6f4d732..624f6a0 100644 --- a/dtmsvr/storage/redis/redis.go +++ b/dtmsvr/storage/redis/redis.go @@ -70,6 +70,7 @@ func (s *Store) ScanTransGlobalStores(position *string, limit int64, condition s globals := []storage.TransGlobalStore{} redis := redisGet() for { + limit -= int64(len(globals)) keys, nextCursor, err := redis.Scan(ctx, lid, conf.Store.RedisPrefix+"_g_*", limit).Result() logger.Debugf("calling redis scan: SCAN %d MATCH %s COUNT %d ,scan result: nextCursor:%d keys_len:%d", lid, conf.Store.RedisPrefix+"_g_*", limit, nextCursor, len(keys)) @@ -87,14 +88,15 @@ func (s *Store) ScanTransGlobalStores(position *string, limit int64, condition s (condition.CreateTimeEnd.IsZero() || global.CreateTime.Before(condition.CreateTimeEnd)) { globals = append(globals, global) } - if len(globals) == int(limit) { + // redis.Scan may return more records than limit + if len(globals) >= int(limit) { break } } } lid = nextCursor - if len(globals) == int(limit) || nextCursor == 0 { + if len(globals) >= int(limit) || nextCursor == 0 { break } } @@ -374,12 +376,14 @@ func (s *Store) ScanKV(cat string, position *string, limit int64) []storage.KVSt kvs := []storage.KVStore{} redis := redisGet() for { + limit -= int64(len(kvs)) keys, nextCursor, err := redis.Scan(ctx, lid, conf.Store.RedisPrefix+"_kv_"+cat+"_*", limit).Result() logger.Debugf("calling redis scan: SCAN %d MATCH %s COUNT %d ,scan result: nextCursor:%d keys_len:%d", lid, conf.Store.RedisPrefix+"_kv_"+cat+"_*", limit, nextCursor, len(keys)) dtmimp.E2P(err) if len(keys) > 0 { values, err := redis.MGet(ctx, keys...).Result() dtmimp.E2P(err) + logger.Debugf("keys: %s values: %s", dtmimp.MustMarshalString(keys), dtmimp.MustMarshalString(values)) for _, v := range values { if v == nil { continue @@ -391,7 +395,8 @@ func (s *Store) ScanKV(cat string, position *string, limit int64) []storage.KVSt } lid = nextCursor - if len(kvs) == int(limit) || nextCursor == 0 { + // for redis, `count` in `scan` command is only a hint, may return more than `count` items + if len(kvs) >= int(limit) || nextCursor == 0 { break } } From aca6caafff580c757f05f8496fa8dba756c8d794 Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Sat, 1 Jul 2023 17:49:10 +0800 Subject: [PATCH 11/14] update readme --- README.md | 4 ---- 1 file changed, 4 deletions(-) diff --git a/README.md b/README.md index 42c8171..af74b2a 100644 --- a/README.md +++ b/README.md @@ -117,10 +117,6 @@ If you want more quick start examples, please refer to [dtm-labs/quick-start-sam The above example mainly demonstrates the flow of a distributed transaction. More on this, including practical examples of how to interact with an actual database, how to do compensation, how to do rollback, etc. please refer to [dtm-examples](https://github.com/dtm-labs/dtm-examples) for more examples. -## Chat Group - -Join the chat via [https://discord.gg/dV9jS5Rb33](https://discord.gg/dV9jS5Rb33). - ## Give a star! ⭐ If you think this project is interesting, or helpful to you, please give a star! From 8092b726faa50f2e0b82ce7103583abafc310dcb Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Sat, 1 Jul 2023 18:08:08 +0800 Subject: [PATCH 12/14] fix sql --- dtmsvr/storage/sql/sql.go | 8 ++------ test/api_test.go | 2 +- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/dtmsvr/storage/sql/sql.go b/dtmsvr/storage/sql/sql.go index 3f94896..cf8583c 100644 --- a/dtmsvr/storage/sql/sql.go +++ b/dtmsvr/storage/sql/sql.go @@ -208,14 +208,10 @@ func (s *Store) ResetCronTime(after time.Duration, limit int64) (succeedCount in // ResetTransGlobalCronTime reset nextCronTime of one global trans. func (s *Store) ResetTransGlobalCronTime(global *storage.TransGlobalStore) error { now := getTimeStr(0) - where := map[string]string{ - dtmimp.DBTypeMysql: fmt.Sprintf(`gid = '%s'`, global.Gid), - }[conf.Store.Driver] - - sql := fmt.Sprintf(`UPDATE trans_global SET update_time='%s',next_cron_time='%s' WHERE %s`, + sql := fmt.Sprintf(`UPDATE trans_global SET update_time='%s',next_cron_time='%s' WHERE gid = '%s'`, now, now, - where) + global.Gid) _, err := dtmimp.DBExec(conf.Store.Driver, dbGet().ToSQLDB(), sql) return err } diff --git a/test/api_test.go b/test/api_test.go index 7753236..e5bb3db 100644 --- a/test/api_test.go +++ b/test/api_test.go @@ -266,7 +266,7 @@ func TestAPIResetNextCronTime(t *testing.T) { "gid": saga.Gid, }).Post(dtmutil.DefaultHTTPServer + "/resetNextCronTime") assert.Nil(t, err) - assert.Equal(t, resp.StatusCode(), http.StatusOK) + assert.Equal(t, http.StatusOK, resp.StatusCode()) // after reset assert g2 := s.FindTransGlobalStore(gid) From 6b8ef8fcecc71450d8da263a44357f8b4c592362 Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Sat, 1 Jul 2023 20:18:24 +0800 Subject: [PATCH 13/14] fix test --- test/api_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/test/api_test.go b/test/api_test.go index e5bb3db..cde6e55 100644 --- a/test/api_test.go +++ b/test/api_test.go @@ -274,6 +274,5 @@ func TestAPIResetNextCronTime(t *testing.T) { assert.Equal(t, gid, g2.Gid) assert.Greater(t, time.Now().Add(3*time.Second), *g2.NextCronTime) assert.Equal(t, g2.UpdateTime, g2.NextCronTime) - assert.NotEqual(t, g.UpdateTime, g2.UpdateTime) assert.NotEqual(t, g.NextCronTime, g2.NextCronTime) } From 051dbe4ef213c564049c4bc9af9b5b46d8ecb59a Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Sat, 1 Jul 2023 20:19:44 +0800 Subject: [PATCH 14/14] update test --- test/api_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/test/api_test.go b/test/api_test.go index cde6e55..9e75eb3 100644 --- a/test/api_test.go +++ b/test/api_test.go @@ -273,6 +273,5 @@ func TestAPIResetNextCronTime(t *testing.T) { assert.NotNil(t, g2) assert.Equal(t, gid, g2.Gid) assert.Greater(t, time.Now().Add(3*time.Second), *g2.NextCronTime) - assert.Equal(t, g2.UpdateTime, g2.NextCronTime) assert.NotEqual(t, g.NextCronTime, g2.NextCronTime) }