|
|
|
@ -15,10 +15,9 @@ using MongoDB.Bson; |
|
|
|
using MongoDB.Driver; |
|
|
|
using Squidex.Infrastructure.CQRS.Events; |
|
|
|
|
|
|
|
// ReSharper disable ConvertIfStatementToConditionalTernaryExpression
|
|
|
|
// ReSharper disable ClassNeverInstantiated.Local
|
|
|
|
// ReSharper disable UnusedMember.Local
|
|
|
|
// ReSharper disable InvertIf
|
|
|
|
// ReSharper disable ConvertIfStatementToConditionalTernaryExpression
|
|
|
|
// ReSharper disable TooWideLocalVariableScope
|
|
|
|
|
|
|
|
namespace Squidex.Infrastructure.MongoDb.EventStore |
|
|
|
{ |
|
|
|
@ -48,7 +47,7 @@ namespace Squidex.Infrastructure.MongoDb.EventStore |
|
|
|
|
|
|
|
protected override async Task SetupCollectionAsync(IMongoCollection<MongoEventCommit> collection) |
|
|
|
{ |
|
|
|
await collection.Indexes.CreateOneAsync(Index.Ascending(x => x.EventStream).Ascending(x => x.Timestamp)); |
|
|
|
await collection.Indexes.CreateOneAsync(Index.Ascending(x => x.Timestamp).Ascending(x => x.EventStream)); |
|
|
|
await collection.Indexes.CreateOneAsync(Index.Ascending(x => x.EventStreamOffset).Ascending(x => x.EventStream), new CreateIndexOptions { Unique = true }); |
|
|
|
} |
|
|
|
|
|
|
|
@ -70,20 +69,36 @@ namespace Squidex.Infrastructure.MongoDb.EventStore |
|
|
|
Guard.NotNull(callback, nameof(callback)); |
|
|
|
|
|
|
|
var tokenTimestamp = EmptyTimestamp; |
|
|
|
var tokenEventStreamNumber = -1; |
|
|
|
var tokenCommitSize = -1; |
|
|
|
var tokenCommitOffset = -1; |
|
|
|
var isEndOfCommit = false; |
|
|
|
|
|
|
|
if (position != null) |
|
|
|
{ |
|
|
|
var token = ParsePosition(position); |
|
|
|
|
|
|
|
tokenTimestamp = token.Timestamp; |
|
|
|
tokenEventStreamNumber = token.EventStreamNumber; |
|
|
|
tokenCommitSize = token.CommitSize; |
|
|
|
tokenCommitOffset = token.CommitOffset; |
|
|
|
|
|
|
|
isEndOfCommit = tokenCommitOffset == tokenCommitSize - 1; |
|
|
|
|
|
|
|
if (isEndOfCommit) |
|
|
|
{ |
|
|
|
tokenCommitOffset = -1; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
var filters = new List<FilterDefinition<MongoEventCommit>>(); |
|
|
|
|
|
|
|
if (isEndOfCommit) |
|
|
|
{ |
|
|
|
filters.Add(Filter.Gt(x => x.Timestamp, tokenTimestamp)); |
|
|
|
} |
|
|
|
|
|
|
|
var filters = new List<FilterDefinition<MongoEventCommit>> |
|
|
|
else |
|
|
|
{ |
|
|
|
Filter.Gte(x => x.Timestamp, tokenTimestamp) |
|
|
|
}; |
|
|
|
filters.Add(Filter.Gte(x => x.Timestamp, tokenTimestamp)); |
|
|
|
} |
|
|
|
|
|
|
|
if (!string.IsNullOrWhiteSpace(streamFilter) && !string.Equals(streamFilter, "*", StringComparison.OrdinalIgnoreCase)) |
|
|
|
{ |
|
|
|
@ -110,20 +125,22 @@ namespace Squidex.Infrastructure.MongoDb.EventStore |
|
|
|
|
|
|
|
await Collection.Find(filter).SortBy(x => x.Timestamp).ForEachAsync(async commit => |
|
|
|
{ |
|
|
|
var isGreaterTimestamp = commit.Timestamp > tokenTimestamp; |
|
|
|
|
|
|
|
var eventStreamNumber = (int)commit.EventStreamOffset; |
|
|
|
|
|
|
|
var commitOffset = 0; |
|
|
|
|
|
|
|
foreach (var e in commit.Events) |
|
|
|
{ |
|
|
|
eventStreamNumber++; |
|
|
|
|
|
|
|
if (isGreaterTimestamp || eventStreamNumber > tokenEventStreamNumber) |
|
|
|
if (commitOffset > tokenCommitOffset) |
|
|
|
{ |
|
|
|
var eventData = new EventData { EventId = e.EventId, Metadata = e.Metadata, Payload = e.Payload, Type = e.Type }; |
|
|
|
var eventToken = CreateToken(commit.Timestamp, eventStreamNumber); |
|
|
|
var eventToken = CreateToken(commit.Timestamp, commitOffset, commit.Events.Length); |
|
|
|
|
|
|
|
await callback(new StoredEvent(eventToken, eventStreamNumber, eventData)); |
|
|
|
|
|
|
|
commitOffset++; |
|
|
|
} |
|
|
|
else |
|
|
|
{ |
|
|
|
@ -201,18 +218,18 @@ namespace Squidex.Infrastructure.MongoDb.EventStore |
|
|
|
return -1; |
|
|
|
} |
|
|
|
|
|
|
|
private static string CreateToken(BsonTimestamp timestamp, int eventStreamNumber) |
|
|
|
private static string CreateToken(BsonTimestamp timestamp, int commitOffset, int commitSize) |
|
|
|
{ |
|
|
|
var parts = new object[] { timestamp.Timestamp, timestamp.Increment, eventStreamNumber }; |
|
|
|
var parts = new object[] { timestamp.Timestamp, timestamp.Increment, commitOffset, commitSize }; |
|
|
|
|
|
|
|
return string.Join("-", parts); |
|
|
|
} |
|
|
|
|
|
|
|
private static (BsonTimestamp Timestamp, int EventStreamNumber) ParsePosition(string position) |
|
|
|
private static (BsonTimestamp Timestamp, int CommitOffset, int CommitSize) ParsePosition(string position) |
|
|
|
{ |
|
|
|
var parts = position.Split('-'); |
|
|
|
|
|
|
|
return (new BsonTimestamp(int.Parse(parts[0]), int.Parse(parts[1])), int.Parse(parts[2])); |
|
|
|
return (new BsonTimestamp(int.Parse(parts[0]), int.Parse(parts[1])), int.Parse(parts[2]), int.Parse(parts[3])); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|