|
|
@ -11,7 +11,6 @@ import ( |
|
|
"strings" |
|
|
"strings" |
|
|
"time" |
|
|
"time" |
|
|
|
|
|
|
|
|
"github.com/dtm-labs/dtm/dtmcli" |
|
|
|
|
|
"github.com/dtm-labs/dtm/dtmcli/dtmimp" |
|
|
"github.com/dtm-labs/dtm/dtmcli/dtmimp" |
|
|
"github.com/dtm-labs/dtm/dtmcli/logger" |
|
|
"github.com/dtm-labs/dtm/dtmcli/logger" |
|
|
"github.com/dtm-labs/dtm/dtmsvr/storage" |
|
|
"github.com/dtm-labs/dtm/dtmsvr/storage" |
|
|
@ -383,34 +382,34 @@ func (s *Store) TouchCronTime(global *storage.TransGlobalStore, nextCronInterval |
|
|
|
|
|
|
|
|
// LockOneGlobalTrans finds GlobalTrans
|
|
|
// LockOneGlobalTrans finds GlobalTrans
|
|
|
func (s *Store) LockOneGlobalTrans(expireIn time.Duration) *storage.TransGlobalStore { |
|
|
func (s *Store) LockOneGlobalTrans(expireIn time.Duration) *storage.TransGlobalStore { |
|
|
var trans *storage.TransGlobalStore |
|
|
var transo *storage.TransGlobalStore |
|
|
min := fmt.Sprintf("%d", time.Now().Add(expireIn).Unix()) |
|
|
min := fmt.Sprintf("%d", time.Now().Add(expireIn).Unix()) |
|
|
next := time.Now().Add(time.Duration(s.retryInterval) * time.Second) |
|
|
|
|
|
err := s.boltDb.Update(func(t *bolt.Tx) error { |
|
|
err := s.boltDb.Update(func(t *bolt.Tx) error { |
|
|
cursor := t.Bucket(bucketIndex).Cursor() |
|
|
cursor := t.Bucket(bucketIndex).Cursor() |
|
|
toDelete := [][]byte{} |
|
|
toDelete := [][]byte{} |
|
|
for trans == nil || trans.Status == dtmcli.StatusSucceed || trans.Status == dtmcli.StatusFailed { |
|
|
for k, v := cursor.First(); k != nil && string(k) <= min; k, v = cursor.Next() { |
|
|
k, v := cursor.First() |
|
|
|
|
|
if k == nil || string(k) > min { |
|
|
|
|
|
return storage.ErrNotFound |
|
|
|
|
|
} |
|
|
|
|
|
trans = tGetGlobal(t, string(v)) |
|
|
|
|
|
toDelete = append(toDelete, k) |
|
|
toDelete = append(toDelete, k) |
|
|
|
|
|
trans := tGetGlobal(t, string(v)) |
|
|
|
|
|
if trans != nil && !trans.IsFinished() { |
|
|
|
|
|
transo = trans |
|
|
|
|
|
break |
|
|
|
|
|
} |
|
|
} |
|
|
} |
|
|
for _, k := range toDelete { |
|
|
for _, k := range toDelete { |
|
|
err := t.Bucket(bucketIndex).Delete(k) |
|
|
err := t.Bucket(bucketIndex).Delete(k) |
|
|
dtmimp.E2P(err) |
|
|
dtmimp.E2P(err) |
|
|
} |
|
|
} |
|
|
trans.NextCronTime = &next |
|
|
if transo != nil { |
|
|
tPutGlobal(t, trans) |
|
|
next := time.Now().Add(time.Duration(s.retryInterval) * time.Second) |
|
|
tPutIndex(t, next.Unix(), trans.Gid) |
|
|
transo.NextCronTime = &next |
|
|
|
|
|
tPutGlobal(t, transo) |
|
|
|
|
|
tPutIndex(t, next.Unix(), transo.Gid) |
|
|
|
|
|
|
|
|
|
|
|
} |
|
|
return nil |
|
|
return nil |
|
|
}) |
|
|
}) |
|
|
if err == storage.ErrNotFound { |
|
|
|
|
|
return nil |
|
|
|
|
|
} |
|
|
|
|
|
dtmimp.E2P(err) |
|
|
dtmimp.E2P(err) |
|
|
return trans |
|
|
return transo |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// ResetCronTime reset nextCronTime
|
|
|
// ResetCronTime reset nextCronTime
|
|
|
|