|
|
|
@ -110,20 +110,25 @@ namespace Squidex.Infrastructure.MongoDb.EventStore |
|
|
|
|
|
|
|
await Collection.Find(filter).SortBy(x => x.Timestamp).ForEachAsync(async commit => |
|
|
|
{ |
|
|
|
var isGreaterTimestamp = commit.Timestamp > tokenTimestamp; |
|
|
|
|
|
|
|
var eventStreamNumber = (int)commit.EventStreamOffset; |
|
|
|
|
|
|
|
foreach (var e in commit.Events) |
|
|
|
{ |
|
|
|
eventStreamNumber++; |
|
|
|
|
|
|
|
if (eventStreamNumber > tokenEventStreamNumber) |
|
|
|
if (isGreaterTimestamp || eventStreamNumber > tokenEventStreamNumber) |
|
|
|
{ |
|
|
|
var eventData = new EventData { EventId = e.EventId, Metadata = e.Metadata, Payload = e.Payload, Type = e.Type }; |
|
|
|
var eventToken = CreateToken(commit.Timestamp, eventStreamNumber); |
|
|
|
|
|
|
|
await callback(new StoredEvent(eventToken, eventStreamNumber, eventData)); |
|
|
|
} |
|
|
|
|
|
|
|
else |
|
|
|
{ |
|
|
|
break; |
|
|
|
} |
|
|
|
} |
|
|
|
}, cancellationToken); |
|
|
|
} |
|
|
|
@ -200,12 +205,12 @@ namespace Squidex.Infrastructure.MongoDb.EventStore |
|
|
|
{ |
|
|
|
var parts = new object[] { timestamp.Timestamp, timestamp.Increment, eventStreamNumber }; |
|
|
|
|
|
|
|
return string.Join("$", parts); |
|
|
|
return string.Join("-", parts); |
|
|
|
} |
|
|
|
|
|
|
|
private static (BsonTimestamp Timestamp, int EventStreamNumber) ParsePosition(string position) |
|
|
|
{ |
|
|
|
var parts = position.Split('$'); |
|
|
|
var parts = position.Split('-'); |
|
|
|
|
|
|
|
return (new BsonTimestamp(int.Parse(parts[0]), int.Parse(parts[1])), int.Parse(parts[2])); |
|
|
|
} |
|
|
|
|