|
|
|
@ -25,6 +25,8 @@ namespace Squidex.Infrastructure.CQRS.Events |
|
|
|
{ |
|
|
|
public class MongoEventStore : MongoRepositoryBase<MongoEventCommit>, IEventStore |
|
|
|
{ |
|
|
|
private const long AnyVersion = long.MinValue; |
|
|
|
private const int MaxAttempts = 20; |
|
|
|
private static readonly BsonTimestamp EmptyTimestamp = new BsonTimestamp(0); |
|
|
|
private static readonly FieldDefinition<MongoEventCommit, BsonTimestamp> TimestampField = Fields.Build(x => x.Timestamp); |
|
|
|
private static readonly FieldDefinition<MongoEventCommit, long> EventsCountField = Fields.Build(x => x.EventsCount); |
|
|
|
@ -50,9 +52,10 @@ namespace Squidex.Infrastructure.CQRS.Events |
|
|
|
return new MongoCollectionSettings { ReadPreference = ReadPreference.Primary, WriteConcern = WriteConcern.WMajority }; |
|
|
|
} |
|
|
|
|
|
|
|
protected override Task SetupCollectionAsync(IMongoCollection<MongoEventCommit> collection) |
|
|
|
protected override async Task SetupCollectionAsync(IMongoCollection<MongoEventCommit> collection) |
|
|
|
{ |
|
|
|
return collection.Indexes.CreateOneAsync(Index.Ascending(x => x.EventStreamOffset).Ascending(x => x.EventStream), new CreateIndexOptions { Unique = true }); |
|
|
|
await collection.Indexes.CreateOneAsync(Index.Ascending(x => x.Timestamp).Ascending(x => x.EventStream)); |
|
|
|
await collection.Indexes.CreateOneAsync(Index.Ascending(x => x.EventStream).Descending(x => x.EventStreamOffset), new CreateIndexOptions { Unique = true }); |
|
|
|
} |
|
|
|
|
|
|
|
public IEventSubscription CreateSubscription(string streamFilter = null, string position = null) |
|
|
|
@ -107,49 +110,65 @@ namespace Squidex.Infrastructure.CQRS.Events |
|
|
|
}, cancellationToken); |
|
|
|
} |
|
|
|
|
|
|
|
public async Task AppendEventsAsync(Guid commitId, string streamName, int expectedVersion, ICollection<EventData> events) |
|
|
|
public Task AppendEventsAsync(Guid commitId, string streamName, ICollection<EventData> events) |
|
|
|
{ |
|
|
|
return AppendEventsInternalAsync(commitId, streamName, AnyVersion, events); |
|
|
|
} |
|
|
|
|
|
|
|
public Task AppendEventsAsync(Guid commitId, string streamName, int expectedVersion, ICollection<EventData> events) |
|
|
|
{ |
|
|
|
Guard.GreaterEquals(expectedVersion, -1, nameof(expectedVersion)); |
|
|
|
|
|
|
|
return AppendEventsInternalAsync(commitId, streamName, expectedVersion, events); |
|
|
|
} |
|
|
|
|
|
|
|
private async Task AppendEventsInternalAsync(Guid commitId, string streamName, long expectedVersion, ICollection<EventData> events) |
|
|
|
{ |
|
|
|
Guard.NotNullOrEmpty(streamName, nameof(streamName)); |
|
|
|
Guard.NotNull(events, nameof(events)); |
|
|
|
|
|
|
|
var eventsCount = events.Count; |
|
|
|
|
|
|
|
if (eventsCount > 0) |
|
|
|
if (events.Count == 0) |
|
|
|
{ |
|
|
|
var commitEvents = new MongoEvent[events.Count]; |
|
|
|
return; |
|
|
|
} |
|
|
|
|
|
|
|
var i = 0; |
|
|
|
var currentVersion = await GetEventStreamOffset(streamName); |
|
|
|
|
|
|
|
foreach (var e in events) |
|
|
|
{ |
|
|
|
var mongoEvent = new MongoEvent { EventId = e.EventId, Metadata = e.Metadata, Payload = e.Payload, Type = e.Type }; |
|
|
|
if (expectedVersion != AnyVersion && expectedVersion != currentVersion) |
|
|
|
{ |
|
|
|
throw new WrongEventVersionException(currentVersion, expectedVersion); |
|
|
|
} |
|
|
|
|
|
|
|
commitEvents[i++] = mongoEvent; |
|
|
|
} |
|
|
|
var commit = BuildCommit(commitId, streamName, expectedVersion >= -1 ? expectedVersion : currentVersion, events); |
|
|
|
|
|
|
|
for (var attempt = 0; attempt < MaxAttempts; attempt++) |
|
|
|
{ |
|
|
|
try |
|
|
|
{ |
|
|
|
var document = new MongoEventCommit |
|
|
|
{ |
|
|
|
Id = commitId, |
|
|
|
Events = commitEvents, |
|
|
|
EventsCount = eventsCount, |
|
|
|
EventStream = streamName, |
|
|
|
EventStreamOffset = expectedVersion, |
|
|
|
Timestamp = EmptyTimestamp |
|
|
|
}; |
|
|
|
|
|
|
|
await Collection.InsertOneAsync(document); |
|
|
|
await Collection.InsertOneAsync(commit); |
|
|
|
|
|
|
|
notifier.NotifyEventsStored(); |
|
|
|
|
|
|
|
return; |
|
|
|
} |
|
|
|
catch (MongoWriteException ex) |
|
|
|
{ |
|
|
|
if (ex.WriteError?.Category == ServerErrorCategory.DuplicateKey) |
|
|
|
{ |
|
|
|
var currentVersion = await GetEventStreamOffset(streamName); |
|
|
|
|
|
|
|
throw new WrongEventVersionException(currentVersion, expectedVersion); |
|
|
|
currentVersion = await GetEventStreamOffset(streamName); |
|
|
|
|
|
|
|
if (expectedVersion != AnyVersion) |
|
|
|
{ |
|
|
|
throw new WrongEventVersionException(currentVersion, expectedVersion); |
|
|
|
} |
|
|
|
else if (attempt < MaxAttempts) |
|
|
|
{ |
|
|
|
expectedVersion = currentVersion; |
|
|
|
} |
|
|
|
else |
|
|
|
{ |
|
|
|
throw new TimeoutException("Could not acquire a free slot for the commit within the provided time."); |
|
|
|
} |
|
|
|
} |
|
|
|
else |
|
|
|
{ |
|
|
|
@ -204,5 +223,31 @@ namespace Squidex.Infrastructure.CQRS.Events |
|
|
|
|
|
|
|
return Filter.And(filters); |
|
|
|
} |
|
|
|
|
|
|
|
private static MongoEventCommit BuildCommit(Guid commitId, string streamName, long expectedVersion, ICollection<EventData> events) |
|
|
|
{ |
|
|
|
var commitEvents = new MongoEvent[events.Count]; |
|
|
|
|
|
|
|
var i = 0; |
|
|
|
|
|
|
|
foreach (var e in events) |
|
|
|
{ |
|
|
|
var mongoEvent = new MongoEvent { EventId = e.EventId, Metadata = e.Metadata, Payload = e.Payload, Type = e.Type }; |
|
|
|
|
|
|
|
commitEvents[i++] = mongoEvent; |
|
|
|
} |
|
|
|
|
|
|
|
var mongoCommit = new MongoEventCommit |
|
|
|
{ |
|
|
|
Id = commitId, |
|
|
|
Events = commitEvents, |
|
|
|
EventsCount = events.Count, |
|
|
|
EventStream = streamName, |
|
|
|
EventStreamOffset = expectedVersion, |
|
|
|
Timestamp = EmptyTimestamp |
|
|
|
}; |
|
|
|
|
|
|
|
return mongoCommit; |
|
|
|
} |
|
|
|
} |
|
|
|
} |