|
|
|
@ -430,11 +430,11 @@ func (s *Store) TouchCronTime(global *storage.TransGlobalStore, nextCronInterval |
|
|
|
// LockOneGlobalTrans finds GlobalTrans
|
|
|
|
func (s *Store) LockOneGlobalTrans(expireIn time.Duration) *storage.TransGlobalStore { |
|
|
|
var trans *storage.TransGlobalStore |
|
|
|
min := fmt.Sprintf("%d", time.Now().Add(expireIn).Unix()) |
|
|
|
min1 := fmt.Sprintf("%d", time.Now().Add(expireIn).Unix()) |
|
|
|
err := s.boltDb.Update(func(t *bolt.Tx) error { |
|
|
|
cursor := t.Bucket(bucketIndex).Cursor() |
|
|
|
toDelete := [][]byte{} |
|
|
|
for k, v := cursor.First(); k != nil && string(k) <= min && (trans == nil || trans.IsFinished()); k, v = cursor.Next() { |
|
|
|
for k, v := cursor.First(); k != nil && string(k) <= min1 && (trans == nil || trans.IsFinished()); k, v = cursor.Next() { |
|
|
|
trans = tGetGlobal(t, string(v)) |
|
|
|
toDelete = append(toDelete, k) |
|
|
|
} |
|
|
|
@ -460,11 +460,11 @@ func (s *Store) LockOneGlobalTrans(expireIn time.Duration) *storage.TransGlobalS |
|
|
|
func (s *Store) ResetCronTime(after time.Duration, limit int64) (succeedCount int64, hasRemaining bool, err error) { |
|
|
|
next := time.Now() |
|
|
|
var trans *storage.TransGlobalStore |
|
|
|
min := fmt.Sprintf("%d", time.Now().Add(after).Unix()) |
|
|
|
min1 := fmt.Sprintf("%d", time.Now().Add(after).Unix()) |
|
|
|
err = s.boltDb.Update(func(t *bolt.Tx) error { |
|
|
|
cursor := t.Bucket(bucketIndex).Cursor() |
|
|
|
succeedCount = 0 |
|
|
|
for k, v := cursor.Seek([]byte(min)); k != nil && succeedCount <= limit; k, v = cursor.Next() { |
|
|
|
for k, v := cursor.Seek([]byte(min1)); k != nil && succeedCount <= limit; k, v = cursor.Next() { |
|
|
|
if succeedCount == limit { |
|
|
|
hasRemaining = true |
|
|
|
break |
|
|
|
|