|
|
|
@ -12,6 +12,7 @@ using System.Runtime.CompilerServices; |
|
|
|
using System.Threading; |
|
|
|
using System.Threading.Tasks; |
|
|
|
using MongoDB.Driver; |
|
|
|
using NodaTime; |
|
|
|
using Squidex.Infrastructure.MongoDb; |
|
|
|
using Squidex.Log; |
|
|
|
using EventFilter = MongoDB.Driver.FilterDefinition<Squidex.Infrastructure.EventSourcing.MongoEventCommit>; |
|
|
|
@ -103,7 +104,7 @@ namespace Squidex.Infrastructure.EventSourcing |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
public async IAsyncEnumerable<StoredEvent> QueryAllReverseAsync(string? streamFilter = null, string? position = null, long take = long.MaxValue, |
|
|
|
public async IAsyncEnumerable<StoredEvent> QueryAllReverseAsync(string? streamFilter = null, Instant timestamp = default, long take = long.MaxValue, |
|
|
|
[EnumeratorCancellation] CancellationToken ct = default) |
|
|
|
{ |
|
|
|
if (take <= 0) |
|
|
|
@ -111,7 +112,7 @@ namespace Squidex.Infrastructure.EventSourcing |
|
|
|
yield break; |
|
|
|
} |
|
|
|
|
|
|
|
StreamPosition lastPosition = position; |
|
|
|
StreamPosition lastPosition = timestamp; |
|
|
|
|
|
|
|
var filterDefinition = CreateFilter(streamFilter, lastPosition); |
|
|
|
|
|
|
|
@ -127,7 +128,7 @@ namespace Squidex.Infrastructure.EventSourcing |
|
|
|
{ |
|
|
|
foreach (var current in cursor.Current) |
|
|
|
{ |
|
|
|
foreach (var @event in current.Filtered(position).Reverse()) |
|
|
|
foreach (var @event in current.Filtered(lastPosition).Reverse()) |
|
|
|
{ |
|
|
|
yield return @event; |
|
|
|
|
|
|
|
@ -167,7 +168,7 @@ namespace Squidex.Infrastructure.EventSourcing |
|
|
|
{ |
|
|
|
foreach (var current in cursor.Current) |
|
|
|
{ |
|
|
|
foreach (var @event in current.Filtered(position)) |
|
|
|
foreach (var @event in current.Filtered(lastPosition)) |
|
|
|
{ |
|
|
|
yield return @event; |
|
|
|
|
|
|
|
|