|
|
|
@ -6,15 +6,22 @@ |
|
|
|
// All rights reserved.
|
|
|
|
// ==========================================================================
|
|
|
|
|
|
|
|
using MongoDB.Bson; |
|
|
|
using MongoDB.Driver; |
|
|
|
using Squidex.Infrastructure.CQRS.Events; |
|
|
|
using Squidex.Infrastructure.Timers; |
|
|
|
using System; |
|
|
|
using System.Collections.Concurrent; |
|
|
|
using System.Collections.Generic; |
|
|
|
using System.Linq; |
|
|
|
using System.Reactive.Linq; |
|
|
|
using System.Threading; |
|
|
|
using System.Threading.Tasks; |
|
|
|
using MongoDB.Bson; |
|
|
|
using MongoDB.Driver; |
|
|
|
using Squidex.Infrastructure.CQRS.Events; |
|
|
|
using Squidex.Infrastructure.Tasks; |
|
|
|
|
|
|
|
// ReSharper disable PossibleInvalidOperationException
|
|
|
|
// ReSharper disable EmptyGeneralCatchClause
|
|
|
|
// ReSharper disable AccessToModifiedClosure
|
|
|
|
// ReSharper disable RedundantAssignment
|
|
|
|
// ReSharper disable InvertIf
|
|
|
|
// ReSharper disable ConvertIfStatementToConditionalTernaryExpression
|
|
|
|
@ -22,17 +29,32 @@ using Squidex.Infrastructure.CQRS.Events; |
|
|
|
|
|
|
|
namespace Squidex.Infrastructure.MongoDb.EventStore |
|
|
|
{ |
|
|
|
public class MongoEventStore : MongoRepositoryBase<MongoEventCommit>, IEventStore |
|
|
|
public class MongoEventStore : MongoRepositoryBase<MongoEventCommit>, IEventStore, IDisposable |
|
|
|
{ |
|
|
|
private static readonly BsonTimestamp EmptyTimestamp = new BsonTimestamp(0); |
|
|
|
private static readonly FieldDefinition<MongoEventCommit, BsonTimestamp> TimestampField = Fields.Build(x => x.Timestamp); |
|
|
|
private static readonly FieldDefinition<MongoEventCommit, long> EventsCountField = Fields.Build(x => x.EventsCount); |
|
|
|
private static readonly FieldDefinition<MongoEventCommit, long> EventStreamOffsetField = Fields.Build(x => x.EventStreamOffset); |
|
|
|
private static readonly FieldDefinition<MongoEventCommit, string> EventStreamField = Fields.Build(x => x.EventStream); |
|
|
|
private readonly IEventNotifier notifier; |
|
|
|
private readonly CompletionTimer timer; |
|
|
|
private readonly ConcurrentQueue<(BsonDocument Document, TaskCompletionSource<bool> Completion)> pendingCommits = new ConcurrentQueue<(BsonDocument Document, TaskCompletionSource<bool> Completion)>(); |
|
|
|
private readonly Lazy<IMongoCollection<BsonDocument>> plainCollection; |
|
|
|
|
|
|
|
public MongoEventStore(IMongoDatabase database, IEventNotifier notifier) |
|
|
|
public MongoEventStore(IMongoDatabase database, IEventNotifier notifier) |
|
|
|
: base(database) |
|
|
|
{ |
|
|
|
Guard.NotNull(notifier, nameof(notifier)); |
|
|
|
|
|
|
|
this.notifier = notifier; |
|
|
|
|
|
|
|
timer = new CompletionTimer(50, ct => WriteAsync()); |
|
|
|
|
|
|
|
plainCollection = new Lazy<IMongoCollection<BsonDocument>>(() => Database.GetCollection<BsonDocument>(CollectionName())); |
|
|
|
} |
|
|
|
|
|
|
|
public void Dispose() |
|
|
|
{ |
|
|
|
timer.Dispose(); |
|
|
|
} |
|
|
|
|
|
|
|
protected override string CollectionName() |
|
|
|
@ -45,10 +67,9 @@ namespace Squidex.Infrastructure.MongoDb.EventStore |
|
|
|
return new MongoCollectionSettings { ReadPreference = ReadPreference.Primary, WriteConcern = WriteConcern.WMajority }; |
|
|
|
} |
|
|
|
|
|
|
|
protected override async Task SetupCollectionAsync(IMongoCollection<MongoEventCommit> collection) |
|
|
|
protected override Task SetupCollectionAsync(IMongoCollection<MongoEventCommit> collection) |
|
|
|
{ |
|
|
|
await collection.Indexes.CreateOneAsync(Index.Ascending(x => x.Timestamp).Ascending(x => x.EventStream)); |
|
|
|
await collection.Indexes.CreateOneAsync(Index.Ascending(x => x.EventStreamOffset).Ascending(x => x.EventStream), new CreateIndexOptions { Unique = true }); |
|
|
|
return collection.Indexes.CreateOneAsync(Index.Ascending(x => x.EventStreamOffset).Ascending(x => x.EventStream), new CreateIndexOptions { Unique = true }); |
|
|
|
} |
|
|
|
|
|
|
|
public IObservable<StoredEvent> GetEventsAsync(string streamFilter = null, string position = null) |
|
|
|
@ -59,7 +80,7 @@ namespace Squidex.Infrastructure.MongoDb.EventStore |
|
|
|
{ |
|
|
|
observer.OnNext(storedEvent); |
|
|
|
|
|
|
|
return Tasks.TaskHelper.Done; |
|
|
|
return TaskHelper.Done; |
|
|
|
}, ct, streamFilter, position); |
|
|
|
}); |
|
|
|
} |
|
|
|
@ -68,54 +89,96 @@ namespace Squidex.Infrastructure.MongoDb.EventStore |
|
|
|
{ |
|
|
|
Guard.NotNull(callback, nameof(callback)); |
|
|
|
|
|
|
|
var tokenTimestamp = EmptyTimestamp; |
|
|
|
var tokenCommitSize = -1; |
|
|
|
var tokenCommitOffset = -1; |
|
|
|
var isEndOfCommit = false; |
|
|
|
|
|
|
|
if (position != null) |
|
|
|
{ |
|
|
|
var token = ParsePosition(position); |
|
|
|
|
|
|
|
tokenTimestamp = token.Timestamp; |
|
|
|
tokenCommitSize = token.CommitSize; |
|
|
|
tokenCommitOffset = token.CommitOffset; |
|
|
|
|
|
|
|
isEndOfCommit = tokenCommitOffset == tokenCommitSize - 1; |
|
|
|
StreamPosition lastPosition = position; |
|
|
|
|
|
|
|
if (isEndOfCommit) |
|
|
|
{ |
|
|
|
tokenCommitOffset = -1; |
|
|
|
} |
|
|
|
} |
|
|
|
var wasEndOfCommit = lastPosition.IsEndOfCommit; |
|
|
|
|
|
|
|
var filter = CreateFilter(streamFilter, isEndOfCommit, tokenTimestamp); |
|
|
|
var filter = CreateFilter(streamFilter, lastPosition); |
|
|
|
|
|
|
|
await Collection.Find(filter).SortBy(x => x.Timestamp).ForEachAsync(async commit => |
|
|
|
await Collection.Find(filter).Sort(Sort.Ascending(EventStreamField)).ForEachAsync(async commit => |
|
|
|
{ |
|
|
|
var eventStreamNumber = (int)commit.EventStreamOffset; |
|
|
|
var eventStreamOffset = (int)commit.EventStreamOffset; |
|
|
|
|
|
|
|
var commitTimestamp = commit.Timestamp; |
|
|
|
var commitOffset = 0; |
|
|
|
|
|
|
|
foreach (var e in commit.Events) |
|
|
|
{ |
|
|
|
eventStreamNumber++; |
|
|
|
eventStreamOffset++; |
|
|
|
|
|
|
|
if (commitOffset > tokenCommitOffset) |
|
|
|
if (commitOffset > lastPosition.CommitOffset || wasEndOfCommit) |
|
|
|
{ |
|
|
|
var eventData = new EventData { EventId = e.EventId, Metadata = e.Metadata, Payload = e.Payload, Type = e.Type }; |
|
|
|
var eventToken = CreateToken(commit.Timestamp, commitOffset, commit.Events.Length); |
|
|
|
var eventToken = new StreamPosition(commitTimestamp, commitOffset, commit.Events.Length); |
|
|
|
|
|
|
|
await callback(new StoredEvent(eventToken, eventStreamNumber, eventData)); |
|
|
|
await callback(new StoredEvent(eventToken, eventStreamOffset, eventData)); |
|
|
|
|
|
|
|
commitOffset++; |
|
|
|
} |
|
|
|
else |
|
|
|
} |
|
|
|
}, cancellationToken); |
|
|
|
} |
|
|
|
|
|
|
|
private async Task WriteAsync() |
|
|
|
{ |
|
|
|
while (true) |
|
|
|
{ |
|
|
|
var commitsToInsert = new List<(BsonDocument Document, TaskCompletionSource<bool> Completion)>(); |
|
|
|
|
|
|
|
while (pendingCommits.TryDequeue(out var commit)) |
|
|
|
{ |
|
|
|
commitsToInsert.Add(commit); |
|
|
|
} |
|
|
|
|
|
|
|
var numCommits = commitsToInsert.Count; |
|
|
|
|
|
|
|
if (numCommits == 0) |
|
|
|
{ |
|
|
|
return; |
|
|
|
} |
|
|
|
|
|
|
|
try |
|
|
|
{ |
|
|
|
await plainCollection.Value.InsertManyAsync(commitsToInsert.Select(x => x.Document), new InsertManyOptions { IsOrdered = false }); |
|
|
|
|
|
|
|
notifier.NotifyEventsStored(); |
|
|
|
|
|
|
|
foreach (var commit in commitsToInsert) |
|
|
|
{ |
|
|
|
commit.Completion.SetResult(true); |
|
|
|
} |
|
|
|
} |
|
|
|
catch (MongoBulkWriteException ex) |
|
|
|
{ |
|
|
|
foreach (var error in ex.WriteErrors) |
|
|
|
{ |
|
|
|
break; |
|
|
|
var commit = commitsToInsert[error.Index]; |
|
|
|
|
|
|
|
if (error.Category == ServerErrorCategory.DuplicateKey) |
|
|
|
{ |
|
|
|
var streamName = commit.Document[nameof(MongoEventCommit.EventStream)].AsString; |
|
|
|
var streamOffset = commit.Document[nameof(MongoEventCommit.EventStreamOffset)].AsInt64; |
|
|
|
|
|
|
|
var currentVersion = await GetEventStreamOffset(streamName); |
|
|
|
|
|
|
|
var exception = new WrongEventVersionException(currentVersion, streamOffset); |
|
|
|
|
|
|
|
commit.Completion.SetException(exception); |
|
|
|
} |
|
|
|
else |
|
|
|
{ |
|
|
|
commit.Completion.SetException(new MongoWriteException(ex.ConnectionId, error, ex.WriteConcernError, ex)); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
}, cancellationToken); |
|
|
|
catch (Exception ex) |
|
|
|
{ |
|
|
|
foreach (var commit in commitsToInsert) |
|
|
|
{ |
|
|
|
commit.Completion.SetException(ex); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
public async Task AppendEventsAsync(Guid commitId, string streamName, int expectedVersion, ICollection<EventData> events) |
|
|
|
@ -138,76 +201,65 @@ namespace Squidex.Infrastructure.MongoDb.EventStore |
|
|
|
commitEvents[i++] = mongoEvent; |
|
|
|
} |
|
|
|
|
|
|
|
var commit = new MongoEventCommit |
|
|
|
var cts = new TaskCompletionSource<bool>(); |
|
|
|
|
|
|
|
var document = new MongoEventCommit |
|
|
|
{ |
|
|
|
Id = commitId, |
|
|
|
Events = commitEvents, |
|
|
|
EventsCount = eventsCount, |
|
|
|
EventStream = streamName, |
|
|
|
EventStreamOffset = expectedVersion, |
|
|
|
Timestamp = EmptyTimestamp |
|
|
|
}; |
|
|
|
|
|
|
|
try |
|
|
|
{ |
|
|
|
await Collection.InsertOneAsync(commit); |
|
|
|
EventStreamOffset = expectedVersion |
|
|
|
}.ToBsonDocument(); |
|
|
|
|
|
|
|
notifier.NotifyEventsStored(); |
|
|
|
} |
|
|
|
catch (MongoWriteException ex) |
|
|
|
{ |
|
|
|
if (ex.WriteError?.Category == ServerErrorCategory.DuplicateKey) |
|
|
|
{ |
|
|
|
var currentVersion = await GetEventStreamOffset(streamName); |
|
|
|
pendingCommits.Enqueue((document, cts)); |
|
|
|
|
|
|
|
throw new WrongEventVersionException(currentVersion, expectedVersion); |
|
|
|
} |
|
|
|
timer.Trigger(); |
|
|
|
|
|
|
|
throw; |
|
|
|
} |
|
|
|
await cts.Task; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private async Task<long> GetEventStreamOffset(string streamName) |
|
|
|
{ |
|
|
|
var document = |
|
|
|
await Collection.Find(x => x.EventStream == streamName) |
|
|
|
await Collection.Find(Filter.Eq(EventStreamField, streamName)) |
|
|
|
.Project<BsonDocument>(Project |
|
|
|
.Include(x => x.EventStreamOffset) |
|
|
|
.Include(x => x.EventsCount)) |
|
|
|
.SortByDescending(x => x.EventStreamOffset).Limit(1) |
|
|
|
.Include(EventStreamOffsetField) |
|
|
|
.Include(EventsCountField)) |
|
|
|
.Sort(Sort.Descending(EventStreamOffsetField)).Limit(1) |
|
|
|
.FirstOrDefaultAsync(); |
|
|
|
|
|
|
|
if (document != null) |
|
|
|
{ |
|
|
|
return document["EventStreamOffset"].ToInt64() + document["EventsCount"].ToInt64(); |
|
|
|
return document[nameof(MongoEventCommit.EventStreamOffset)].ToInt64() + document[nameof(MongoEventCommit.EventsCount)].ToInt64(); |
|
|
|
} |
|
|
|
|
|
|
|
return -1; |
|
|
|
} |
|
|
|
|
|
|
|
private static FilterDefinition<MongoEventCommit> CreateFilter(string streamFilter, bool isEndOfCommit, BsonTimestamp tokenTimestamp) |
|
|
|
private static FilterDefinition<MongoEventCommit> CreateFilter(string streamFilter, StreamPosition streamPosition) |
|
|
|
{ |
|
|
|
var filters = new List<FilterDefinition<MongoEventCommit>>(); |
|
|
|
|
|
|
|
if (isEndOfCommit) |
|
|
|
if (streamPosition.IsEndOfCommit) |
|
|
|
{ |
|
|
|
filters.Add(Filter.Gt(x => x.Timestamp, tokenTimestamp)); |
|
|
|
filters.Add(Filter.Gt(TimestampField, streamPosition.Timestamp)); |
|
|
|
} |
|
|
|
else |
|
|
|
{ |
|
|
|
filters.Add(Filter.Gte(x => x.Timestamp, tokenTimestamp)); |
|
|
|
filters.Add(Filter.Gte(TimestampField, streamPosition.Timestamp)); |
|
|
|
} |
|
|
|
|
|
|
|
if (!string.IsNullOrWhiteSpace(streamFilter) && !string.Equals(streamFilter, "*", StringComparison.OrdinalIgnoreCase)) |
|
|
|
{ |
|
|
|
if (streamFilter.Contains("^")) |
|
|
|
{ |
|
|
|
filters.Add(Filter.Regex(x => x.EventStream, streamFilter)); |
|
|
|
filters.Add(Filter.Regex(EventStreamField, streamFilter)); |
|
|
|
} |
|
|
|
else |
|
|
|
{ |
|
|
|
filters.Add(Filter.Eq(x => x.EventStream, streamFilter)); |
|
|
|
filters.Add(Filter.Eq(EventStreamField, streamFilter)); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
@ -224,19 +276,5 @@ namespace Squidex.Infrastructure.MongoDb.EventStore |
|
|
|
|
|
|
|
return filter; |
|
|
|
} |
|
|
|
|
|
|
|
private static string CreateToken(BsonTimestamp timestamp, int commitOffset, int commitSize) |
|
|
|
{ |
|
|
|
var parts = new object[] { timestamp.Timestamp, timestamp.Increment, commitOffset, commitSize }; |
|
|
|
|
|
|
|
return string.Join("-", parts); |
|
|
|
} |
|
|
|
|
|
|
|
private static (BsonTimestamp Timestamp, int CommitOffset, int CommitSize) ParsePosition(string position) |
|
|
|
{ |
|
|
|
var parts = position.Split('-'); |
|
|
|
|
|
|
|
return (new BsonTimestamp(int.Parse(parts[0]), int.Parse(parts[1])), int.Parse(parts[2]), int.Parse(parts[3])); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |