From 80437d04dbd4055da9bc5a2ab90f4e83f53b2470 Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Thu, 23 Jun 2022 11:02:40 +0800 Subject: [PATCH] fix boltdb bug --- dtmsvr/storage/boltdb/boltdb.go | 31 +++++++++++++++---------------- dtmsvr/storage/trans.go | 4 ++++ 2 files changed, 19 insertions(+), 16 deletions(-) diff --git a/dtmsvr/storage/boltdb/boltdb.go b/dtmsvr/storage/boltdb/boltdb.go index bccc401..eeedcaf 100644 --- a/dtmsvr/storage/boltdb/boltdb.go +++ b/dtmsvr/storage/boltdb/boltdb.go @@ -11,7 +11,6 @@ import ( "strings" "time" - "github.com/dtm-labs/dtm/dtmcli" "github.com/dtm-labs/dtm/dtmcli/dtmimp" "github.com/dtm-labs/dtm/dtmcli/logger" "github.com/dtm-labs/dtm/dtmsvr/storage" @@ -383,34 +382,34 @@ func (s *Store) TouchCronTime(global *storage.TransGlobalStore, nextCronInterval // LockOneGlobalTrans finds GlobalTrans func (s *Store) LockOneGlobalTrans(expireIn time.Duration) *storage.TransGlobalStore { - var trans *storage.TransGlobalStore + var transo *storage.TransGlobalStore min := fmt.Sprintf("%d", time.Now().Add(expireIn).Unix()) - next := time.Now().Add(time.Duration(s.retryInterval) * time.Second) err := s.boltDb.Update(func(t *bolt.Tx) error { cursor := t.Bucket(bucketIndex).Cursor() toDelete := [][]byte{} - for trans == nil || trans.Status == dtmcli.StatusSucceed || trans.Status == dtmcli.StatusFailed { - k, v := cursor.First() - if k == nil || string(k) > min { - return storage.ErrNotFound - } - trans = tGetGlobal(t, string(v)) + for k, v := cursor.First(); k != nil && string(k) <= min; k, v = cursor.Next() { toDelete = append(toDelete, k) + trans := tGetGlobal(t, string(v)) + if trans != nil && !trans.IsFinished() { + transo = trans + break + } } for _, k := range toDelete { err := t.Bucket(bucketIndex).Delete(k) dtmimp.E2P(err) } - trans.NextCronTime = &next - tPutGlobal(t, trans) - tPutIndex(t, next.Unix(), trans.Gid) + if transo != nil { + next := time.Now().Add(time.Duration(s.retryInterval) * time.Second) + transo.NextCronTime = &next + tPutGlobal(t, transo) + tPutIndex(t, next.Unix(), transo.Gid) + + } return nil }) - if err == storage.ErrNotFound { - return nil - } dtmimp.E2P(err) - return trans + return transo } // ResetCronTime reset nextCronTime diff --git a/dtmsvr/storage/trans.go b/dtmsvr/storage/trans.go index 749eb1a..29bc6c4 100644 --- a/dtmsvr/storage/trans.go +++ b/dtmsvr/storage/trans.go @@ -52,6 +52,10 @@ func (g *TransGlobalStore) String() string { return dtmimp.MustMarshalString(g) } +func (g *TransGlobalStore) IsFinished() bool { + return g.Status == dtmcli.StatusFailed || g.Status == dtmcli.StatusSucceed +} + // TransBranchStore branch transaction type TransBranchStore struct { dtmutil.ModelBase