diff --git a/src/Squidex.Infrastructure.MongoDb/CQRS/Events/MongoEventStore.cs b/src/Squidex.Infrastructure.MongoDb/CQRS/Events/MongoEventStore.cs index 9cb91e7cb..23710d3c5 100644 --- a/src/Squidex.Infrastructure.MongoDb/CQRS/Events/MongoEventStore.cs +++ b/src/Squidex.Infrastructure.MongoDb/CQRS/Events/MongoEventStore.cs @@ -26,7 +26,7 @@ namespace Squidex.Infrastructure.CQRS.Events public class MongoEventStore : MongoRepositoryBase, IEventStore { private const long AnyVersion = long.MinValue; - private const int AppendTimeoutMs = 5000; + private const int MaxAttempts = 20; private static readonly BsonTimestamp EmptyTimestamp = new BsonTimestamp(0); private static readonly FieldDefinition TimestampField = Fields.Build(x => x.Timestamp); private static readonly FieldDefinition EventsCountField = Fields.Build(x => x.EventsCount); @@ -52,9 +52,10 @@ namespace Squidex.Infrastructure.CQRS.Events return new MongoCollectionSettings { ReadPreference = ReadPreference.Primary, WriteConcern = WriteConcern.WMajority }; } - protected override Task SetupCollectionAsync(IMongoCollection collection) + protected override async Task SetupCollectionAsync(IMongoCollection collection) { - return collection.Indexes.CreateOneAsync(Index.Ascending(x => x.EventStreamOffset).Ascending(x => x.EventStream), new CreateIndexOptions { Unique = true }); + await collection.Indexes.CreateOneAsync(Index.Ascending(x => x.Timestamp).Ascending(x => x.EventStream)); + await collection.Indexes.CreateOneAsync(Index.Ascending(x => x.EventStream).Descending(x => x.EventStreamOffset), new CreateIndexOptions { Unique = true }); } public IEventSubscription CreateSubscription(string streamFilter = null, string position = null) @@ -138,15 +139,13 @@ namespace Squidex.Infrastructure.CQRS.Events throw new WrongEventVersionException(currentVersion, expectedVersion); } - var cts = new CancellationTokenSource(AppendTimeoutMs); - var commit = BuildCommit(commitId, streamName, expectedVersion >= -1 ? expectedVersion : currentVersion, events); - while (!cts.IsCancellationRequested) + for (var attempt = 0; attempt < MaxAttempts; attempt++) { try { - await Collection.InsertOneAsync(commit, new InsertOneOptions(), cts.Token); + await Collection.InsertOneAsync(commit); notifier.NotifyEventsStored(); @@ -162,13 +161,13 @@ namespace Squidex.Infrastructure.CQRS.Events { throw new WrongEventVersionException(currentVersion, expectedVersion); } - else if (!cts.IsCancellationRequested) + else if (attempt < MaxAttempts) { - commit.EventStreamOffset = currentVersion; + expectedVersion = currentVersion; } else { - throw new TaskCanceledException("Could not acquire a free slot for the commit within the provided time."); + throw new TimeoutException("Could not acquire a free slot for the commit within the provided time."); } } else