Browse Source

EventStore simplified.

pull/65/merge
Sebastian Stehle 9 years ago
parent
commit
642128dfed
  1. 121
      src/Squidex.Infrastructure.MongoDb/EventStore/MongoEventStore.cs

121
src/Squidex.Infrastructure.MongoDb/EventStore/MongoEventStore.cs

@ -9,9 +9,7 @@
using MongoDB.Bson;
using MongoDB.Driver;
using Squidex.Infrastructure.CQRS.Events;
using Squidex.Infrastructure.Timers;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
@ -19,12 +17,13 @@ using System.Threading;
using System.Threading.Tasks;
using Squidex.Infrastructure.Tasks;
// ReSharper disable RedundantIfElseBlock
// ReSharper disable InvertIf
// ReSharper disable ConvertIfStatementToConditionalTernaryExpression
namespace Squidex.Infrastructure.MongoDb.EventStore
{
public class MongoEventStore : MongoRepositoryBase<MongoEventCommit>, IEventStore, IDisposable
public class MongoEventStore : MongoRepositoryBase<MongoEventCommit>, IEventStore
{
private static readonly BsonTimestamp EmptyTimestamp = new BsonTimestamp(0);
private static readonly FieldDefinition<MongoEventCommit, BsonTimestamp> TimestampField = Fields.Build(x => x.Timestamp);
@ -32,9 +31,6 @@ namespace Squidex.Infrastructure.MongoDb.EventStore
private static readonly FieldDefinition<MongoEventCommit, long> EventStreamOffsetField = Fields.Build(x => x.EventStreamOffset);
private static readonly FieldDefinition<MongoEventCommit, string> EventStreamField = Fields.Build(x => x.EventStream);
private readonly IEventNotifier notifier;
private readonly CompletionTimer timer;
private readonly ConcurrentQueue<(BsonDocument Document, TaskCompletionSource<bool> Completion)> pendingCommits = new ConcurrentQueue<(BsonDocument Document, TaskCompletionSource<bool> Completion)>();
private readonly Lazy<IMongoCollection<BsonDocument>> plainCollection;
public MongoEventStore(IMongoDatabase database, IEventNotifier notifier)
: base(database)
@ -42,15 +38,6 @@ namespace Squidex.Infrastructure.MongoDb.EventStore
Guard.NotNull(notifier, nameof(notifier));
this.notifier = notifier;
timer = new CompletionTimer(50, ct => WriteAsync());
plainCollection = new Lazy<IMongoCollection<BsonDocument>>(() => Database.GetCollection<BsonDocument>(CollectionName()));
}
public void Dispose()
{
timer.Dispose();
}
protected override string CollectionName()
@ -120,70 +107,6 @@ namespace Squidex.Infrastructure.MongoDb.EventStore
}, cancellationToken);
}
private async Task WriteAsync()
{
while (true)
{
var commitsToInsert = new List<(BsonDocument Document, TaskCompletionSource<bool> Completion)>();
while (pendingCommits.TryDequeue(out var commit))
{
commitsToInsert.Add(commit);
}
var numCommits = commitsToInsert.Count;
if (numCommits == 0)
{
return;
}
try
{
await plainCollection.Value.InsertManyAsync(commitsToInsert.Select(x => x.Document), new InsertManyOptions { IsOrdered = false });
notifier.NotifyEventsStored();
}
catch (MongoBulkWriteException ex)
{
foreach (var error in ex.WriteErrors)
{
var commit = commitsToInsert[error.Index];
if (error.Category == ServerErrorCategory.DuplicateKey)
{
var streamName = commit.Document[nameof(MongoEventCommit.EventStream)].AsString;
var streamOffset = commit.Document[nameof(MongoEventCommit.EventStreamOffset)].AsInt64;
var currentVersion = await GetEventStreamOffset(streamName);
var exception = new WrongEventVersionException(currentVersion, streamOffset);
commit.Completion.SetException(exception);
}
else
{
commit.Completion.SetException(new MongoWriteException(ex.ConnectionId, error, ex.WriteConcernError, ex));
}
}
}
catch (Exception ex)
{
foreach (var commit in commitsToInsert)
{
commit.Completion.SetException(ex);
}
}
finally
{
foreach (var commit in commitsToInsert)
{
commit.Completion.TrySetResult(true);
}
}
}
}
public async Task AppendEventsAsync(Guid commitId, string streamName, int expectedVersion, ICollection<EventData> events)
{
Guard.NotNullOrEmpty(streamName, nameof(streamName));
@ -204,23 +127,33 @@ namespace Squidex.Infrastructure.MongoDb.EventStore
commitEvents[i++] = mongoEvent;
}
var cts = new TaskCompletionSource<bool>();
var document = new MongoEventCommit
try
{
Id = commitId,
Events = commitEvents,
EventsCount = eventsCount,
EventStream = streamName,
EventStreamOffset = expectedVersion,
Timestamp = EmptyTimestamp
}.ToBsonDocument();
pendingCommits.Enqueue((document, cts));
timer.Wakeup();
var document = new MongoEventCommit
{
Id = commitId,
Events = commitEvents,
EventsCount = eventsCount,
EventStream = streamName,
EventStreamOffset = expectedVersion,
Timestamp = EmptyTimestamp
};
await Collection.InsertOneAsync(document);
}
catch (MongoWriteException ex)
{
if (ex.WriteError?.Category == ServerErrorCategory.DuplicateKey)
{
var currentVersion = await GetEventStreamOffset(streamName);
await cts.Task;
throw new WrongEventVersionException(currentVersion, expectedVersion);
}
else
{
throw;
}
}
}
}

Loading…
Cancel
Save