Browse Source

Mongo Event store improved.

pull/95/head
Sebastian Stehle 8 years ago
parent
commit
f18b030583
  1. 19
      src/Squidex.Infrastructure.MongoDb/CQRS/Events/MongoEventStore.cs

19
src/Squidex.Infrastructure.MongoDb/CQRS/Events/MongoEventStore.cs

@ -26,7 +26,7 @@ namespace Squidex.Infrastructure.CQRS.Events
public class MongoEventStore : MongoRepositoryBase<MongoEventCommit>, IEventStore
{
private const long AnyVersion = long.MinValue;
private const int AppendTimeoutMs = 5000;
private const int MaxAttempts = 20;
private static readonly BsonTimestamp EmptyTimestamp = new BsonTimestamp(0);
private static readonly FieldDefinition<MongoEventCommit, BsonTimestamp> TimestampField = Fields.Build(x => x.Timestamp);
private static readonly FieldDefinition<MongoEventCommit, long> EventsCountField = Fields.Build(x => x.EventsCount);
@ -52,9 +52,10 @@ namespace Squidex.Infrastructure.CQRS.Events
return new MongoCollectionSettings { ReadPreference = ReadPreference.Primary, WriteConcern = WriteConcern.WMajority };
}
protected override Task SetupCollectionAsync(IMongoCollection<MongoEventCommit> collection)
protected override async Task SetupCollectionAsync(IMongoCollection<MongoEventCommit> collection)
{
return collection.Indexes.CreateOneAsync(Index.Ascending(x => x.EventStreamOffset).Ascending(x => x.EventStream), new CreateIndexOptions { Unique = true });
await collection.Indexes.CreateOneAsync(Index.Ascending(x => x.Timestamp).Ascending(x => x.EventStream));
await collection.Indexes.CreateOneAsync(Index.Ascending(x => x.EventStream).Descending(x => x.EventStreamOffset), new CreateIndexOptions { Unique = true });
}
public IEventSubscription CreateSubscription(string streamFilter = null, string position = null)
@ -138,15 +139,13 @@ namespace Squidex.Infrastructure.CQRS.Events
throw new WrongEventVersionException(currentVersion, expectedVersion);
}
var cts = new CancellationTokenSource(AppendTimeoutMs);
var commit = BuildCommit(commitId, streamName, expectedVersion >= -1 ? expectedVersion : currentVersion, events);
while (!cts.IsCancellationRequested)
for (var attempt = 0; attempt < MaxAttempts; attempt++)
{
try
{
await Collection.InsertOneAsync(commit, new InsertOneOptions(), cts.Token);
await Collection.InsertOneAsync(commit);
notifier.NotifyEventsStored();
@ -162,13 +161,13 @@ namespace Squidex.Infrastructure.CQRS.Events
{
throw new WrongEventVersionException(currentVersion, expectedVersion);
}
else if (!cts.IsCancellationRequested)
else if (attempt < MaxAttempts)
{
commit.EventStreamOffset = currentVersion;
expectedVersion = currentVersion;
}
else
{
throw new TaskCanceledException("Could not acquire a free slot for the commit within the provided time.");
throw new TimeoutException("Could not acquire a free slot for the commit within the provided time.");
}
}
else

Loading…
Cancel
Save