From 24428e80821922780bedc0aa2a2d6b56fe8d7385 Mon Sep 17 00:00:00 2001 From: Sebastian Stehle Date: Sat, 13 Jan 2018 23:51:58 +0100 Subject: [PATCH] Started filter support. --- .../Rules/MongoRuleEventRepository.cs | 4 +- .../Repositories/IRuleEventRepository.cs | 2 +- .../Rules/RuleDequeuer.cs | 4 +- .../EventSourcing/Formatter.cs | 12 +- .../EventSourcing/GetEventStore.cs | 66 +++-- .../GetEventStoreSubscription.cs | 27 +-- .../EventSourcing/ProjectionClient.cs | 155 ++++++++++++ .../EventSourcing/ProjectionHelper.cs | 97 -------- .../EventSourcing/MongoEvent.cs | 9 +- .../EventSourcing/MongoEventStore.cs | 228 +----------------- .../EventSourcing/MongoEventStore_Reader.cs | 173 +++++++++++++ .../EventSourcing/MongoEventStore_Writer.cs | 135 +++++++++++ .../MongoDb/BsonJsonConvention.cs | 13 + .../MongoDb/JTokenSerializer.cs | 32 +++ ...matter.cs => DefaultEventDataFormatter.cs} | 39 ++- .../EventSourcing/EventData.cs | 8 +- .../EventSourcing/IEventStore.cs | 12 +- .../EventSourcing/PollingSubscription.cs | 2 +- .../EventSourcing/StoredEvent.cs | 26 +- .../Persistence{TOwner,TSnapshot,TKey}.cs | 4 +- .../Config/Domain/InfrastructureServices.cs | 2 +- .../EventSourcing/EventDataFormatterTests.cs | 4 +- .../EventSourcing/PollingSubscriptionTests.cs | 12 +- .../States/PersistenceEventSourcingTests.cs | 14 +- .../Migration00_ConvertEventStore.cs | 53 ++++ tools/Migrate_01/Rebuilder.cs | 12 +- 26 files changed, 679 insertions(+), 466 deletions(-) create mode 100644 src/Squidex.Infrastructure.GetEventStore/EventSourcing/ProjectionClient.cs delete mode 100644 src/Squidex.Infrastructure.GetEventStore/EventSourcing/ProjectionHelper.cs create mode 100644 src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Reader.cs create mode 100644 src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Writer.cs create mode 100644 src/Squidex.Infrastructure.MongoDb/MongoDb/JTokenSerializer.cs rename src/Squidex.Infrastructure/EventSourcing/{JsonEventDataFormatter.cs => DefaultEventDataFormatter.cs} (51%) create mode 100644 tools/Migrate_01/Migration00_ConvertEventStore.cs diff --git a/src/Squidex.Domain.Apps.Entities.MongoDb/Rules/MongoRuleEventRepository.cs b/src/Squidex.Domain.Apps.Entities.MongoDb/Rules/MongoRuleEventRepository.cs index d54da9c2d..468d8d831 100644 --- a/src/Squidex.Domain.Apps.Entities.MongoDb/Rules/MongoRuleEventRepository.cs +++ b/src/Squidex.Domain.Apps.Entities.MongoDb/Rules/MongoRuleEventRepository.cs @@ -39,9 +39,9 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Rules await collection.Indexes.CreateOneAsync(Index.Ascending(x => x.Expires), new CreateIndexOptions { ExpireAfter = TimeSpan.Zero }); } - public Task QueryPendingAsync(Instant now, Func callback, CancellationToken cancellationToken = default(CancellationToken)) + public Task QueryPendingAsync(Instant now, Func callback, CancellationToken ct = default(CancellationToken)) { - return Collection.Find(x => x.NextAttempt < now).ForEachAsync(callback, cancellationToken); + return Collection.Find(x => x.NextAttempt < now).ForEachAsync(callback, ct); } public async Task> QueryByAppAsync(Guid appId, int skip = 0, int take = 20) diff --git a/src/Squidex.Domain.Apps.Entities/Rules/Repositories/IRuleEventRepository.cs b/src/Squidex.Domain.Apps.Entities/Rules/Repositories/IRuleEventRepository.cs index 8edc62f37..31c09131b 100644 --- a/src/Squidex.Domain.Apps.Entities/Rules/Repositories/IRuleEventRepository.cs +++ b/src/Squidex.Domain.Apps.Entities/Rules/Repositories/IRuleEventRepository.cs @@ -23,7 +23,7 @@ namespace Squidex.Domain.Apps.Entities.Rules.Repositories Task MarkSentAsync(Guid jobId, string dump, RuleResult result, RuleJobResult jobResult, TimeSpan elapsed, Instant? nextCall); - Task QueryPendingAsync(Instant now, Func callback, CancellationToken cancellationToken = default(CancellationToken)); + Task QueryPendingAsync(Instant now, Func callback, CancellationToken ct = default(CancellationToken)); Task CountByAppAsync(Guid appId); diff --git a/src/Squidex.Domain.Apps.Entities/Rules/RuleDequeuer.cs b/src/Squidex.Domain.Apps.Entities/Rules/RuleDequeuer.cs index 8150f7013..6c0926902 100644 --- a/src/Squidex.Domain.Apps.Entities/Rules/RuleDequeuer.cs +++ b/src/Squidex.Domain.Apps.Entities/Rules/RuleDequeuer.cs @@ -71,13 +71,13 @@ namespace Squidex.Domain.Apps.Entities.Rules timer.SkipCurrentDelay(); } - private async Task QueryAsync(CancellationToken cancellationToken) + private async Task QueryAsync(CancellationToken ct) { try { var now = clock.GetCurrentInstant(); - await ruleEventRepository.QueryPendingAsync(now, requestBlock.SendAsync, cancellationToken); + await ruleEventRepository.QueryPendingAsync(now, requestBlock.SendAsync, ct); } catch (Exception ex) { diff --git a/src/Squidex.Infrastructure.GetEventStore/EventSourcing/Formatter.cs b/src/Squidex.Infrastructure.GetEventStore/EventSourcing/Formatter.cs index f0721a586..aefd883b2 100644 --- a/src/Squidex.Infrastructure.GetEventStore/EventSourcing/Formatter.cs +++ b/src/Squidex.Infrastructure.GetEventStore/EventSourcing/Formatter.cs @@ -5,6 +5,7 @@ // All rights reserved. Licensed under the MIT license. // ========================================================================== +using System; using System.Text; using EventStore.ClientAPI; using EventStoreData = EventStore.ClientAPI.EventData; @@ -20,7 +21,7 @@ namespace Squidex.Infrastructure.EventSourcing var body = Encoding.UTF8.GetString(@event.Data); var meta = Encoding.UTF8.GetString(@event.Metadata); - var eventData = new EventData { Type = @event.EventType, EventId = @event.EventId, Payload = body, Metadata = meta }; + var eventData = new EventData { Type = @event.EventType, Payload = body, Metadata = meta }; return new StoredEvent( resolvedEvent.OriginalEventNumber.ToString(), @@ -30,13 +31,10 @@ namespace Squidex.Infrastructure.EventSourcing public static EventStoreData Write(EventData eventData) { - var body = Encoding.UTF8.GetBytes(eventData.Payload); - var meta = Encoding.UTF8.GetBytes(eventData.Metadata); + var body = Encoding.UTF8.GetBytes(eventData.Payload.ToString()); + var meta = Encoding.UTF8.GetBytes(eventData.Metadata.ToString()); - return new EventStoreData( - eventData.EventId, - eventData.Type, - true, body, meta); + return new EventStoreData(Guid.NewGuid(), eventData.Type, true, body, meta); } } } diff --git a/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStore.cs b/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStore.cs index 2023d0f73..9e4d1a4d8 100644 --- a/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStore.cs +++ b/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStore.cs @@ -11,7 +11,6 @@ using System.Linq; using System.Threading; using System.Threading.Tasks; using EventStore.ClientAPI; -using EventStore.ClientAPI.Projections; namespace Squidex.Infrastructure.EventSourcing { @@ -20,18 +19,18 @@ namespace Squidex.Infrastructure.EventSourcing private const int WritePageSize = 500; private const int ReadPageSize = 500; private readonly IEventStoreConnection connection; - private readonly string projectionHost; private readonly string prefix; - private ProjectionsManager projectionsManager; + private ProjectionClient projectionClient; public GetEventStore(IEventStoreConnection connection, string prefix, string projectionHost) { Guard.NotNull(connection, nameof(connection)); this.connection = connection; - this.projectionHost = projectionHost; this.prefix = prefix?.Trim(' ', '-').WithFallback("squidex"); + + projectionClient = new ProjectionClient(connection, prefix, projectionHost); } public void Initialize() @@ -45,50 +44,43 @@ namespace Squidex.Infrastructure.EventSourcing throw new ConfigurationException("Cannot connect to event store.", ex); } - try - { - projectionsManager = connection.GetProjectionsManagerAsync(projectionHost).Result; - - projectionsManager.ListAllAsync(connection.Settings.DefaultUserCredentials).Wait(); - } - catch (Exception ex) - { - throw new ConfigurationException($"Cannot connect to event store projections: {projectionHost}.", ex); - } + projectionClient.ConnectAsync().Wait(); } public IEventSubscription CreateSubscription(IEventSubscriber subscriber, string streamFilter, string position = null) { - return new GetEventStoreSubscription(connection, subscriber, projectionsManager, prefix, position, streamFilter); + return new GetEventStoreSubscription(connection, subscriber, projectionClient, prefix, position, streamFilter); } - public async Task GetEventsAsync(Func callback, string streamFilter = null, string position = null, CancellationToken cancellationToken = default(CancellationToken)) + public Task CreateIndexAsync(string property) { - var streamName = await connection.CreateProjectionAsync(projectionsManager, prefix, streamFilter); + return projectionClient.CreateProjectionAsync(property, string.Empty); + } - var sliceStart = ProjectionHelper.ParsePosition(position); + public async Task QueryAsync(Func callback, string property, object value, string position = null, CancellationToken ct = default(CancellationToken)) + { + var streamName = await projectionClient.CreateProjectionAsync(property, value); - StreamEventsSlice currentSlice; - do - { - currentSlice = await connection.ReadStreamEventsForwardAsync(streamName, sliceStart, ReadPageSize, true); + var sliceStart = projectionClient.ParsePosition(position); - if (currentSlice.Status == SliceReadStatus.Success) - { - sliceStart = currentSlice.NextEventNumber; + await QueryAsync(callback, streamName, sliceStart, ct); + } - foreach (var resolved in currentSlice.Events) - { - var storedEvent = Formatter.Read(resolved); + public async Task QueryAsync(Func callback, string streamFilter = null, string position = null, CancellationToken ct = default(CancellationToken)) + { + var streamName = await projectionClient.CreateProjectionAsync(streamFilter); - await callback(storedEvent); - } - } - } - while (!currentSlice.IsEndOfStream && !cancellationToken.IsCancellationRequested); + var sliceStart = projectionClient.ParsePosition(position); + + await QueryAsync(callback, streamName, sliceStart, ct); + } + + private Task QueryAsync(Func callback, string streamName, long sliceStart, CancellationToken ct) + { + return QueryAsync(callback, GetStreamName(streamName), sliceStart, ct); } - public async Task> GetEventsAsync(string streamName, long streamPosition = 0) + public async Task> QueryAsync(string streamName, long streamPosition = 0) { var result = new List(); @@ -97,7 +89,7 @@ namespace Squidex.Infrastructure.EventSourcing StreamEventsSlice currentSlice; do { - currentSlice = await connection.ReadStreamEventsForwardAsync(GetStreamName(streamName), sliceStart, ReadPageSize, false); + currentSlice = await connection.ReadStreamEventsForwardAsync(streamName, sliceStart, ReadPageSize, false); if (currentSlice.Status == SliceReadStatus.Success) { @@ -116,12 +108,12 @@ namespace Squidex.Infrastructure.EventSourcing return result; } - public Task AppendEventsAsync(Guid commitId, string streamName, ICollection events) + public Task AppendAsync(Guid commitId, string streamName, ICollection events) { return AppendEventsInternalAsync(streamName, EtagVersion.Any, events); } - public Task AppendEventsAsync(Guid commitId, string streamName, long expectedVersion, ICollection events) + public Task AppendAsync(Guid commitId, string streamName, long expectedVersion, ICollection events) { Guard.GreaterEquals(expectedVersion, -1, nameof(expectedVersion)); diff --git a/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStoreSubscription.cs b/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStoreSubscription.cs index 3fbffbe37..cbf1559f5 100644 --- a/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStoreSubscription.cs +++ b/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStoreSubscription.cs @@ -8,33 +8,32 @@ using System.Threading.Tasks; using EventStore.ClientAPI; using EventStore.ClientAPI.Exceptions; -using EventStore.ClientAPI.Projections; using Squidex.Infrastructure.Tasks; namespace Squidex.Infrastructure.EventSourcing { internal sealed class GetEventStoreSubscription : IEventSubscription { - private readonly IEventStoreConnection eventStoreConnection; - private readonly IEventSubscriber eventSubscriber; + private readonly IEventStoreConnection connection; + private readonly IEventSubscriber subscriber; private readonly EventStoreCatchUpSubscription subscription; private readonly long? position; public GetEventStoreSubscription( - IEventStoreConnection eventStoreConnection, - IEventSubscriber eventSubscriber, - ProjectionsManager projectionsManager, + IEventStoreConnection connection, + IEventSubscriber subscriber, + ProjectionClient projectionClient, string prefix, string position, string streamFilter) { - Guard.NotNull(eventSubscriber, nameof(eventSubscriber)); + Guard.NotNull(subscriber, nameof(subscriber)); - this.eventStoreConnection = eventStoreConnection; - this.eventSubscriber = eventSubscriber; - this.position = ProjectionHelper.ParsePositionOrNull(position); + this.connection = connection; + this.position = projectionClient.ParsePositionOrNull(position); + this.subscriber = subscriber; - var streamName = eventStoreConnection.CreateProjectionAsync(projectionsManager, prefix, streamFilter).Result; + var streamName = projectionClient.CreateProjectionAsync(streamFilter).Result; subscription = SubscribeToStream(streamName); } @@ -50,12 +49,12 @@ namespace Squidex.Infrastructure.EventSourcing { var settings = CatchUpSubscriptionSettings.Default; - return eventStoreConnection.SubscribeToStreamFrom(streamName, position, settings, + return connection.SubscribeToStreamFrom(streamName, position, settings, (s, e) => { var storedEvent = Formatter.Read(e); - eventSubscriber.OnEventAsync(this, storedEvent).Wait(); + subscriber.OnEventAsync(this, storedEvent).Wait(); }, null, (s, reason, ex) => { @@ -64,7 +63,7 @@ namespace Squidex.Infrastructure.EventSourcing { ex = ex ?? new ConnectionClosedException($"Subscription closed with reason {reason}."); - eventSubscriber.OnErrorAsync(this, ex); + subscriber.OnErrorAsync(this, ex); } }); } diff --git a/src/Squidex.Infrastructure.GetEventStore/EventSourcing/ProjectionClient.cs b/src/Squidex.Infrastructure.GetEventStore/EventSourcing/ProjectionClient.cs new file mode 100644 index 000000000..13dbc1558 --- /dev/null +++ b/src/Squidex.Infrastructure.GetEventStore/EventSourcing/ProjectionClient.cs @@ -0,0 +1,155 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschränkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +using System; +using System.Collections.Concurrent; +using System.Globalization; +using System.Linq; +using System.Net; +using System.Net.Sockets; +using System.Threading.Tasks; +using EventStore.ClientAPI; +using EventStore.ClientAPI.Exceptions; +using EventStore.ClientAPI.Projections; + +namespace Squidex.Infrastructure.EventSourcing +{ + public sealed class ProjectionClient + { + private const string StreamByFilter = "by-{0}-{1}"; + private const string StreamByProperty = "by-{0}-{1}-property"; + private readonly ConcurrentDictionary projections = new ConcurrentDictionary(); + private readonly IEventStoreConnection connection; + private readonly string prefix; + private readonly string projectionHost; + private ProjectionsManager projectionsManager; + + public ProjectionClient(IEventStoreConnection connection, string prefix, string projectionHost) + { + this.connection = connection; + + this.prefix = prefix; + this.projectionHost = projectionHost; + } + + private string CreateFilterStreamName(string filter) + { + return string.Format(CultureInfo.InvariantCulture, StreamByFilter, prefix.Simplify(), filter.Simplify()); + } + + private string CreatePropertyStreamName(string property) + { + return string.Format(CultureInfo.InvariantCulture, StreamByFilter, prefix.Simplify(), property.Simplify()); + } + + public async Task CreateProjectionAsync(string property, object value) + { + var streamName = CreatePropertyStreamName(property); + + if (projections.TryAdd(streamName, true)) + { + var projectionConfig = + $@"fromAll() + .when({{ + $any: function (s, e) {{ + if (e.streamId.indexOf('{prefix}') === 0 && e.data.{property}) {{ + linkTo('{streamName}-' + e.data.{property}, e); + }} + }} + }});"; + + try + { + var credentials = connection.Settings.DefaultUserCredentials; + + await projectionsManager.CreateContinuousAsync($"{streamName}", projectionConfig, credentials); + } + catch (Exception ex) + { + if (!ex.Is()) + { + throw; + } + } + } + + return streamName + "-" + value; + } + + public async Task CreateProjectionAsync(string streamFilter = null) + { + streamFilter = streamFilter ?? ".*"; + + var streamName = CreateFilterStreamName(streamFilter); + + if (projections.TryAdd(streamName, true)) + { + var projectionConfig = + $@"fromAll() + .when({{ + $any: function (s, e) {{ + if (e.streamId.indexOf('{prefix}') === 0 && /{streamFilter}/.test(e.streamId.substring({prefix.Length + 1}))) {{ + linkTo('{streamName}', e); + }} + }} + }});"; + + try + { + var credentials = connection.Settings.DefaultUserCredentials; + + await projectionsManager.CreateContinuousAsync($"{streamName}", projectionConfig, credentials); + } + catch (Exception ex) + { + if (!ex.Is()) + { + throw; + } + } + } + + return streamName; + } + + public async Task ConnectAsync() + { + var addressParts = projectionHost.Split(':'); + + if (addressParts.Length < 2 || !int.TryParse(addressParts[1], out var port)) + { + port = 2113; + } + + var endpoints = await Dns.GetHostAddressesAsync(addressParts[0]); + var endpoint = new IPEndPoint(endpoints.First(x => x.AddressFamily == AddressFamily.InterNetwork), port); + + projectionsManager = + new ProjectionsManager( + connection.Settings.Log, endpoint, + connection.Settings.OperationTimeout); + try + { + await projectionsManager.ListAllAsync(connection.Settings.DefaultUserCredentials); + } + catch (Exception ex) + { + throw new ConfigurationException($"Cannot connect to event store projections: {projectionHost}.", ex); + } + } + + public long? ParsePositionOrNull(string position) + { + return long.TryParse(position, out var parsedPosition) ? (long?)parsedPosition : null; + } + + public long ParsePosition(string position) + { + return long.TryParse(position, out var parsedPosition) ? parsedPosition : 0; + } + } +} diff --git a/src/Squidex.Infrastructure.GetEventStore/EventSourcing/ProjectionHelper.cs b/src/Squidex.Infrastructure.GetEventStore/EventSourcing/ProjectionHelper.cs deleted file mode 100644 index 3baccf1a2..000000000 --- a/src/Squidex.Infrastructure.GetEventStore/EventSourcing/ProjectionHelper.cs +++ /dev/null @@ -1,97 +0,0 @@ -// ========================================================================== -// Squidex Headless CMS -// ========================================================================== -// Copyright (c) Squidex UG (haftungsbeschränkt) -// All rights reserved. Licensed under the MIT license. -// ========================================================================== - -using System; -using System.Collections.Concurrent; -using System.Globalization; -using System.Linq; -using System.Net; -using System.Net.Sockets; -using System.Threading.Tasks; -using EventStore.ClientAPI; -using EventStore.ClientAPI.Exceptions; -using EventStore.ClientAPI.Projections; - -namespace Squidex.Infrastructure.EventSourcing -{ - public static class ProjectionHelper - { - private const string ProjectionName = "by-{0}-{1}"; - private static readonly ConcurrentDictionary SubscriptionsCreated = new ConcurrentDictionary(); - - private static string ParseFilter(string prefix, string filter) - { - return string.Format(CultureInfo.InvariantCulture, ProjectionName, prefix.Simplify(), filter.Simplify()); - } - - public static async Task CreateProjectionAsync(this IEventStoreConnection connection, ProjectionsManager projectionsManager, string prefix, string streamFilter = null) - { - streamFilter = streamFilter ?? ".*"; - - var streamName = ParseFilter(prefix, streamFilter); - - if (SubscriptionsCreated.TryAdd(streamName, true)) - { - var projectionConfig = - $@"fromAll() - .when({{ - $any: function (s, e) {{ - if (e.streamId.indexOf('{prefix}') === 0 && /{streamFilter}/.test(e.streamId.substring({prefix.Length + 1}))) {{ - linkTo('{streamName}', e); - }} - }} - }});"; - - try - { - var credentials = connection.Settings.DefaultUserCredentials; - - await projectionsManager.CreateContinuousAsync($"${streamName}", projectionConfig, credentials); - } - catch (Exception ex) - { - if (!ex.Is()) - { - throw; - } - } - } - - return streamName; - } - - public static async Task GetProjectionsManagerAsync(this IEventStoreConnection connection, string projectionHost) - { - var addressParts = projectionHost.Split(':'); - - if (addressParts.Length < 2 || !int.TryParse(addressParts[1], out var port)) - { - port = 2113; - } - - var endpoints = await Dns.GetHostAddressesAsync(addressParts[0]); - var endpoint = new IPEndPoint(endpoints.First(x => x.AddressFamily == AddressFamily.InterNetwork), port); - - var projectionsManager = - new ProjectionsManager( - connection.Settings.Log, endpoint, - connection.Settings.OperationTimeout); - - return projectionsManager; - } - - public static long? ParsePositionOrNull(string position) - { - return long.TryParse(position, out var parsedPosition) ? (long?)parsedPosition : null; - } - - public static long ParsePosition(string position) - { - return long.TryParse(position, out var parsedPosition) ? parsedPosition : 0; - } - } -} diff --git a/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEvent.cs b/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEvent.cs index 703fb5223..81649a439 100644 --- a/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEvent.cs +++ b/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEvent.cs @@ -7,6 +7,7 @@ using System; using MongoDB.Bson.Serialization.Attributes; +using Newtonsoft.Json.Linq; using Squidex.Infrastructure.Reflection; namespace Squidex.Infrastructure.EventSourcing @@ -15,15 +16,11 @@ namespace Squidex.Infrastructure.EventSourcing { [BsonElement] [BsonRequired] - public Guid EventId { get; set; } + public JToken Payload { get; set; } [BsonElement] [BsonRequired] - public string Payload { get; set; } - - [BsonElement] - [BsonRequired] - public string Metadata { get; set; } + public JToken Metadata { get; set; } [BsonElement] [BsonRequired] diff --git a/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore.cs b/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore.cs index 2b4c072a3..e5779e42e 100644 --- a/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore.cs +++ b/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore.cs @@ -5,10 +5,6 @@ // All rights reserved. Licensed under the MIT license. // ========================================================================== -using System; -using System.Collections.Generic; -using System.Reactive.Linq; -using System.Threading; using System.Threading.Tasks; using MongoDB.Bson; using MongoDB.Driver; @@ -16,16 +12,19 @@ using Squidex.Infrastructure.MongoDb; namespace Squidex.Infrastructure.EventSourcing { - public class MongoEventStore : MongoRepositoryBase, IEventStore + public partial class MongoEventStore : MongoRepositoryBase, IEventStore { - private const int MaxAttempts = 20; - private static readonly BsonTimestamp EmptyTimestamp = new BsonTimestamp(0); private static readonly FieldDefinition TimestampField = Fields.Build(x => x.Timestamp); private static readonly FieldDefinition EventsCountField = Fields.Build(x => x.EventsCount); private static readonly FieldDefinition EventStreamOffsetField = Fields.Build(x => x.EventStreamOffset); private static readonly FieldDefinition EventStreamField = Fields.Build(x => x.EventStream); private readonly IEventNotifier notifier; + public IMongoCollection RawCollection + { + get { return Database.GetCollection(CollectionName()); } + } + public MongoEventStore(IMongoDatabase database, IEventNotifier notifier) : base(database) { @@ -50,220 +49,5 @@ namespace Squidex.Infrastructure.EventSourcing collection.Indexes.CreateOneAsync(Index.Ascending(x => x.Timestamp).Ascending(x => x.EventStream)), collection.Indexes.CreateOneAsync(Index.Ascending(x => x.EventStream).Descending(x => x.EventStreamOffset), new CreateIndexOptions { Unique = true })); } - - public IEventSubscription CreateSubscription(IEventSubscriber subscriber, string streamFilter, string position = null) - { - Guard.NotNull(subscriber, nameof(subscriber)); - Guard.NotNullOrEmpty(streamFilter, nameof(streamFilter)); - - return new PollingSubscription(this, notifier, subscriber, streamFilter, position); - } - - public async Task> GetEventsAsync(string streamName, long streamPosition = 0) - { - var commits = - await Collection.Find( - Filter.And( - Filter.Eq(EventStreamField, streamName), - Filter.Gte(EventStreamOffsetField, streamPosition - 1))) - .Sort(Sort.Ascending(TimestampField)).ToListAsync(); - - var result = new List(); - - foreach (var commit in commits) - { - var eventStreamOffset = (int)commit.EventStreamOffset; - - var commitTimestamp = commit.Timestamp; - var commitOffset = 0; - - foreach (var e in commit.Events) - { - eventStreamOffset++; - - if (eventStreamOffset >= streamPosition) - { - var eventData = e.ToEventData(); - var eventToken = new StreamPosition(commitTimestamp, commitOffset, commit.Events.Length); - - result.Add(new StoredEvent(eventToken, eventStreamOffset, eventData)); - } - } - } - - return result; - } - - public async Task GetEventsAsync(Func callback, string streamFilter = null, string position = null, CancellationToken cancellationToken = default(CancellationToken)) - { - Guard.NotNull(callback, nameof(callback)); - - StreamPosition lastPosition = position; - - var filter = CreateFilter(streamFilter, lastPosition); - - await Collection.Find(filter).Sort(Sort.Ascending(TimestampField)).ForEachAsync(async commit => - { - var eventStreamOffset = (int)commit.EventStreamOffset; - - var commitTimestamp = commit.Timestamp; - var commitOffset = 0; - - foreach (var e in commit.Events) - { - eventStreamOffset++; - - if (commitOffset > lastPosition.CommitOffset || commitTimestamp > lastPosition.Timestamp) - { - var eventData = e.ToEventData(); - var eventToken = new StreamPosition(commitTimestamp, commitOffset, commit.Events.Length); - - await callback(new StoredEvent(eventToken, eventStreamOffset, eventData)); - - commitOffset++; - } - } - }, cancellationToken); - } - - public Task AppendEventsAsync(Guid commitId, string streamName, ICollection events) - { - return AppendEventsInternalAsync(commitId, streamName, EtagVersion.Any, events); - } - - public Task AppendEventsAsync(Guid commitId, string streamName, long expectedVersion, ICollection events) - { - Guard.GreaterEquals(expectedVersion, EtagVersion.Any, nameof(expectedVersion)); - - return AppendEventsInternalAsync(commitId, streamName, expectedVersion, events); - } - - private async Task AppendEventsInternalAsync(Guid commitId, string streamName, long expectedVersion, ICollection events) - { - Guard.NotNullOrEmpty(streamName, nameof(streamName)); - Guard.NotNull(events, nameof(events)); - - if (events.Count == 0) - { - return; - } - - var currentVersion = await GetEventStreamOffset(streamName); - - if (expectedVersion != EtagVersion.Any && expectedVersion != currentVersion) - { - throw new WrongEventVersionException(currentVersion, expectedVersion); - } - - var commit = BuildCommit(commitId, streamName, expectedVersion >= -1 ? expectedVersion : currentVersion, events); - - for (var attempt = 0; attempt < MaxAttempts; attempt++) - { - try - { - await Collection.InsertOneAsync(commit); - - notifier.NotifyEventsStored(streamName); - - return; - } - catch (MongoWriteException ex) - { - if (ex.WriteError?.Category == ServerErrorCategory.DuplicateKey) - { - currentVersion = await GetEventStreamOffset(streamName); - - if (expectedVersion != EtagVersion.Any) - { - throw new WrongEventVersionException(currentVersion, expectedVersion); - } - else if (attempt < MaxAttempts) - { - expectedVersion = currentVersion; - } - else - { - throw new TimeoutException("Could not acquire a free slot for the commit within the provided time."); - } - } - else - { - throw; - } - } - } - } - - private async Task GetEventStreamOffset(string streamName) - { - var document = - await Collection.Find(Filter.Eq(EventStreamField, streamName)) - .Project(Projection - .Include(EventStreamOffsetField) - .Include(EventsCountField)) - .Sort(Sort.Descending(EventStreamOffsetField)).Limit(1) - .FirstOrDefaultAsync(); - - if (document != null) - { - return document[nameof(MongoEventCommit.EventStreamOffset)].ToInt64() + document[nameof(MongoEventCommit.EventsCount)].ToInt64(); - } - - return EtagVersion.Empty; - } - - private static FilterDefinition CreateFilter(string streamFilter, StreamPosition streamPosition) - { - var filters = new List>(); - - if (streamPosition.IsEndOfCommit) - { - filters.Add(Filter.Gt(TimestampField, streamPosition.Timestamp)); - } - else - { - filters.Add(Filter.Gte(TimestampField, streamPosition.Timestamp)); - } - - if (!string.IsNullOrWhiteSpace(streamFilter) && !string.Equals(streamFilter, ".*", StringComparison.OrdinalIgnoreCase)) - { - if (streamFilter.Contains("^")) - { - filters.Add(Filter.Regex(EventStreamField, streamFilter)); - } - else - { - filters.Add(Filter.Eq(EventStreamField, streamFilter)); - } - } - - return Filter.And(filters); - } - - private static MongoEventCommit BuildCommit(Guid commitId, string streamName, long expectedVersion, ICollection events) - { - var commitEvents = new MongoEvent[events.Count]; - - var i = 0; - - foreach (var e in events) - { - var mongoEvent = new MongoEvent(e); - - commitEvents[i++] = mongoEvent; - } - - var mongoCommit = new MongoEventCommit - { - Id = commitId, - Events = commitEvents, - EventsCount = events.Count, - EventStream = streamName, - EventStreamOffset = expectedVersion, - Timestamp = EmptyTimestamp - }; - - return mongoCommit; - } } } \ No newline at end of file diff --git a/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Reader.cs b/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Reader.cs new file mode 100644 index 000000000..ccc7737e9 --- /dev/null +++ b/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Reader.cs @@ -0,0 +1,173 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschränkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +using System; +using System.Collections.Generic; +using System.Reactive.Linq; +using System.Threading; +using System.Threading.Tasks; +using MongoDB.Driver; +using Squidex.Infrastructure.MongoDb; + +namespace Squidex.Infrastructure.EventSourcing +{ + public partial class MongoEventStore : MongoRepositoryBase, IEventStore + { + public Task CreateIndexAsync(string property) + { + return Collection.Indexes.CreateOneAsync(Index.Ascending(CreateIndexPath(property))); + } + + public IEventSubscription CreateSubscription(IEventSubscriber subscriber, string streamFilter, string position = null) + { + Guard.NotNull(subscriber, nameof(subscriber)); + Guard.NotNullOrEmpty(streamFilter, nameof(streamFilter)); + + return new PollingSubscription(this, notifier, subscriber, streamFilter, position); + } + + public async Task> QueryAsync(string streamName, long streamPosition = 0) + { + var commits = + await Collection.Find( + Filter.And( + Filter.Eq(EventStreamField, streamName), + Filter.Gte(EventStreamOffsetField, streamPosition - 1))) + .Sort(Sort.Ascending(TimestampField)).ToListAsync(); + + var result = new List(); + + foreach (var commit in commits) + { + var eventStreamOffset = (int)commit.EventStreamOffset; + + var commitTimestamp = commit.Timestamp; + var commitOffset = 0; + + foreach (var e in commit.Events) + { + eventStreamOffset++; + + if (eventStreamOffset >= streamPosition) + { + var eventData = e.ToEventData(); + var eventToken = new StreamPosition(commitTimestamp, commitOffset, commit.Events.Length); + + result.Add(new StoredEvent(eventToken, eventStreamOffset, eventData)); + } + } + } + + return result; + } + + public Task QueryAsync(Func callback, string property, object value, string position = null, CancellationToken ct = default(CancellationToken)) + { + Guard.NotNull(callback, nameof(callback)); + + StreamPosition lastPosition = position; + + var filter = CreateFilter(property, value, lastPosition); + + return QueryAsync(callback, lastPosition, filter, ct); + } + + public Task QueryAsync(Func callback, string streamFilter = null, string position = null, CancellationToken ct = default(CancellationToken)) + { + Guard.NotNull(callback, nameof(callback)); + + StreamPosition lastPosition = position; + + var filter = CreateFilter(streamFilter, lastPosition); + + return QueryAsync(callback, lastPosition, filter, ct); + } + + private async Task QueryAsync(Func callback, StreamPosition lastPosition, FilterDefinition filter, CancellationToken ct) + { + await Collection.Find(filter).Sort(Sort.Ascending(TimestampField)).ForEachAsync(async commit => + { + var eventStreamOffset = (int)commit.EventStreamOffset; + + var commitTimestamp = commit.Timestamp; + var commitOffset = 0; + + foreach (var e in commit.Events) + { + eventStreamOffset++; + + if (commitOffset > lastPosition.CommitOffset || commitTimestamp > lastPosition.Timestamp) + { + var eventData = e.ToEventData(); + var eventToken = new StreamPosition(commitTimestamp, commitOffset, commit.Events.Length); + + await callback(new StoredEvent(eventToken, eventStreamOffset, eventData)); + + commitOffset++; + } + } + }, ct); + } + + private static FilterDefinition CreateFilter(string property, object value, StreamPosition streamPosition) + { + var filters = new List>(); + + AddPositionFilter(streamPosition, filters); + AddPropertyFitler(property, value, filters); + + return Filter.And(filters); + } + + private static FilterDefinition CreateFilter(string streamFilter, StreamPosition streamPosition) + { + var filters = new List>(); + + AddPositionFilter(streamPosition, filters); + AddStreamFilter(streamFilter, filters); + + return Filter.And(filters); + } + + private static void AddPropertyFitler(string property, object value, List> filters) + { + filters.Add(Filter.Eq(CreateIndexPath(property), value)); + } + + private static void AddStreamFilter(string streamFilter, List> filters) + { + if (!string.IsNullOrWhiteSpace(streamFilter) && !string.Equals(streamFilter, ".*", StringComparison.OrdinalIgnoreCase)) + { + if (streamFilter.Contains("^")) + { + filters.Add(Filter.Regex(EventStreamField, streamFilter)); + } + else + { + filters.Add(Filter.Eq(EventStreamField, streamFilter)); + } + } + } + + private static void AddPositionFilter(StreamPosition streamPosition, List> filters) + { + if (streamPosition.IsEndOfCommit) + { + filters.Add(Filter.Gt(TimestampField, streamPosition.Timestamp)); + } + else + { + filters.Add(Filter.Gte(TimestampField, streamPosition.Timestamp)); + } + } + + private static string CreateIndexPath(string property) + { + return $"Events.Payload.{property}"; + } + } +} \ No newline at end of file diff --git a/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Writer.cs b/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Writer.cs new file mode 100644 index 000000000..9c0e3f877 --- /dev/null +++ b/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Writer.cs @@ -0,0 +1,135 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschränkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +using System; +using System.Collections.Generic; +using System.Reactive.Linq; +using System.Threading.Tasks; +using MongoDB.Bson; +using MongoDB.Driver; + +namespace Squidex.Infrastructure.EventSourcing +{ + public partial class MongoEventStore + { + private const int MaxWriteAttempts = 20; + private static readonly BsonTimestamp EmptyTimestamp = new BsonTimestamp(0); + + public Task AppendAsync(Guid commitId, string streamName, ICollection events) + { + return AppendEventsInternalAsync(commitId, streamName, EtagVersion.Any, events); + } + + public Task AppendAsync(Guid commitId, string streamName, long expectedVersion, ICollection events) + { + Guard.GreaterEquals(expectedVersion, EtagVersion.Any, nameof(expectedVersion)); + + return AppendEventsInternalAsync(commitId, streamName, expectedVersion, events); + } + + private async Task AppendEventsInternalAsync(Guid commitId, string streamName, long expectedVersion, ICollection events) + { + Guard.NotNullOrEmpty(streamName, nameof(streamName)); + Guard.NotNull(events, nameof(events)); + + if (events.Count == 0) + { + return; + } + + var currentVersion = await GetEventStreamOffset(streamName); + + if (expectedVersion != EtagVersion.Any && expectedVersion != currentVersion) + { + throw new WrongEventVersionException(currentVersion, expectedVersion); + } + + var commit = BuildCommit(commitId, streamName, expectedVersion >= -1 ? expectedVersion : currentVersion, events); + + for (var attempt = 0; attempt < MaxWriteAttempts; attempt++) + { + try + { + await Collection.InsertOneAsync(commit); + + notifier.NotifyEventsStored(streamName); + + return; + } + catch (MongoWriteException ex) + { + if (ex.WriteError?.Category == ServerErrorCategory.DuplicateKey) + { + currentVersion = await GetEventStreamOffset(streamName); + + if (expectedVersion != EtagVersion.Any) + { + throw new WrongEventVersionException(currentVersion, expectedVersion); + } + + if (attempt < MaxWriteAttempts) + { + expectedVersion = currentVersion; + } + else + { + throw new TimeoutException("Could not acquire a free slot for the commit within the provided time."); + } + } + else + { + throw; + } + } + } + } + + private async Task GetEventStreamOffset(string streamName) + { + var document = + await Collection.Find(Filter.Eq(EventStreamField, streamName)) + .Project(Projection + .Include(EventStreamOffsetField) + .Include(EventsCountField)) + .Sort(Sort.Descending(EventStreamOffsetField)).Limit(1) + .FirstOrDefaultAsync(); + + if (document != null) + { + return document[nameof(MongoEventCommit.EventStreamOffset)].ToInt64() + document[nameof(MongoEventCommit.EventsCount)].ToInt64(); + } + + return EtagVersion.Empty; + } + + private static MongoEventCommit BuildCommit(Guid commitId, string streamName, long expectedVersion, ICollection events) + { + var commitEvents = new MongoEvent[events.Count]; + + var i = 0; + + foreach (var e in events) + { + var mongoEvent = new MongoEvent(e); + + commitEvents[i++] = mongoEvent; + } + + var mongoCommit = new MongoEventCommit + { + Id = commitId, + Events = commitEvents, + EventsCount = events.Count, + EventStream = streamName, + EventStreamOffset = expectedVersion, + Timestamp = EmptyTimestamp + }; + + return mongoCommit; + } + } +} \ No newline at end of file diff --git a/src/Squidex.Infrastructure.MongoDb/MongoDb/BsonJsonConvention.cs b/src/Squidex.Infrastructure.MongoDb/MongoDb/BsonJsonConvention.cs index ab9f13d11..2a8d6e572 100644 --- a/src/Squidex.Infrastructure.MongoDb/MongoDb/BsonJsonConvention.cs +++ b/src/Squidex.Infrastructure.MongoDb/MongoDb/BsonJsonConvention.cs @@ -11,6 +11,7 @@ using System.Reflection; using MongoDB.Bson.Serialization; using MongoDB.Bson.Serialization.Conventions; using Newtonsoft.Json; +using Newtonsoft.Json.Linq; namespace Squidex.Infrastructure.MongoDb { @@ -31,6 +32,18 @@ namespace Squidex.Infrastructure.MongoDb memberMap.SetSerializer((IBsonSerializer)bsonSerializer); } + else if (memberMap.MemberType == typeof(JToken)) + { + memberMap.SetSerializer(JTokenSerializer.Instance); + } + else if (memberMap.MemberType == typeof(JObject)) + { + memberMap.SetSerializer(JTokenSerializer.Instance); + } + else if (memberMap.MemberType == typeof(JValue)) + { + memberMap.SetSerializer(JTokenSerializer.Instance); + } }); ConventionRegistry.Register("json", pack, t => true); diff --git a/src/Squidex.Infrastructure.MongoDb/MongoDb/JTokenSerializer.cs b/src/Squidex.Infrastructure.MongoDb/MongoDb/JTokenSerializer.cs new file mode 100644 index 000000000..fbb1039e0 --- /dev/null +++ b/src/Squidex.Infrastructure.MongoDb/MongoDb/JTokenSerializer.cs @@ -0,0 +1,32 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschraenkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +using MongoDB.Bson.Serialization; +using MongoDB.Bson.Serialization.Serializers; +using Newtonsoft.Json.Linq; + +namespace Squidex.Infrastructure.MongoDb +{ + public sealed class JTokenSerializer : ClassSerializerBase where T : JToken + { + public static readonly JTokenSerializer Instance = new JTokenSerializer(); + + protected override T DeserializeValue(BsonDeserializationContext context, BsonDeserializationArgs args) + { + var jsonReader = new BsonJsonReader(context.Reader); + + return (T)JToken.ReadFrom(jsonReader); + } + + protected override void SerializeValue(BsonSerializationContext context, BsonSerializationArgs args, T value) + { + var jsonWriter = new BsonJsonWriter(context.Writer); + + value.WriteTo(jsonWriter); + } + } +} diff --git a/src/Squidex.Infrastructure/EventSourcing/JsonEventDataFormatter.cs b/src/Squidex.Infrastructure/EventSourcing/DefaultEventDataFormatter.cs similarity index 51% rename from src/Squidex.Infrastructure/EventSourcing/JsonEventDataFormatter.cs rename to src/Squidex.Infrastructure/EventSourcing/DefaultEventDataFormatter.cs index 9af9e3a68..83ce9ed5a 100644 --- a/src/Squidex.Infrastructure/EventSourcing/JsonEventDataFormatter.cs +++ b/src/Squidex.Infrastructure/EventSourcing/DefaultEventDataFormatter.cs @@ -7,38 +7,37 @@ using System; using Newtonsoft.Json; +using Newtonsoft.Json.Linq; namespace Squidex.Infrastructure.EventSourcing { - public class JsonEventDataFormatter : IEventDataFormatter + public class DefaultEventDataFormatter : IEventDataFormatter { - private readonly JsonSerializerSettings serializerSettings; + private readonly JsonSerializer serializer; private readonly TypeNameRegistry typeNameRegistry; - public JsonEventDataFormatter(TypeNameRegistry typeNameRegistry, JsonSerializerSettings serializerSettings = null) + public DefaultEventDataFormatter(TypeNameRegistry typeNameRegistry, JsonSerializer serializer = null) { Guard.NotNull(typeNameRegistry, nameof(typeNameRegistry)); this.typeNameRegistry = typeNameRegistry; - this.serializerSettings = serializerSettings ?? new JsonSerializerSettings(); + this.serializer = serializer ?? JsonSerializer.CreateDefault(); } public Envelope Parse(EventData eventData, bool migrate = true) { - var headers = ReadJson(eventData.Metadata); - var eventType = typeNameRegistry.GetType(eventData.Type); - var eventPayload = ReadJson(eventData.Payload, eventType); - if (migrate && eventPayload is IMigratedEvent migratedEvent) + var headers = eventData.Metadata.ToObject(); + var content = eventData.Metadata.ToObject(eventType, serializer) as IEvent; + + if (migrate && content is IMigratedEvent migratedEvent) { - eventPayload = migratedEvent.Migrate(); + content = migratedEvent.Migrate(); } - var envelope = new Envelope(eventPayload, headers); - - envelope.SetEventId(eventData.EventId); + var envelope = new Envelope(content, headers); return envelope; } @@ -56,20 +55,10 @@ namespace Squidex.Infrastructure.EventSourcing envelope.SetCommitId(commitId); - var headers = WriteJson(envelope.Headers); - var content = WriteJson(envelope.Payload); + var headers = JToken.FromObject(envelope.Headers, serializer); + var content = JToken.FromObject(envelope.Payload, serializer); - return new EventData { EventId = envelope.Headers.EventId(), Type = eventType, Payload = content, Metadata = headers }; - } - - private T ReadJson(string data, Type type = null) - { - return (T)JsonConvert.DeserializeObject(data, type ?? typeof(T), serializerSettings); - } - - private string WriteJson(object value) - { - return JsonConvert.SerializeObject(value, serializerSettings); + return new EventData { Type = eventType, Payload = content, Metadata = headers }; } } } diff --git a/src/Squidex.Infrastructure/EventSourcing/EventData.cs b/src/Squidex.Infrastructure/EventSourcing/EventData.cs index 9ca13635d..739ea8068 100644 --- a/src/Squidex.Infrastructure/EventSourcing/EventData.cs +++ b/src/Squidex.Infrastructure/EventSourcing/EventData.cs @@ -5,17 +5,15 @@ // All rights reserved. Licensed under the MIT license. // ========================================================================== -using System; +using Newtonsoft.Json.Linq; namespace Squidex.Infrastructure.EventSourcing { public class EventData { - public Guid EventId { get; set; } + public JToken Payload { get; set; } - public string Payload { get; set; } - - public string Metadata { get; set; } + public JToken Metadata { get; set; } public string Type { get; set; } } diff --git a/src/Squidex.Infrastructure/EventSourcing/IEventStore.cs b/src/Squidex.Infrastructure/EventSourcing/IEventStore.cs index 2993ef86b..c33d86e5a 100644 --- a/src/Squidex.Infrastructure/EventSourcing/IEventStore.cs +++ b/src/Squidex.Infrastructure/EventSourcing/IEventStore.cs @@ -14,13 +14,17 @@ namespace Squidex.Infrastructure.EventSourcing { public interface IEventStore { - Task> GetEventsAsync(string streamName, long streamPosition = 0); + Task CreateIndexAsync(string property); - Task GetEventsAsync(Func callback, string streamFilter = null, string position = null, CancellationToken cancellationToken = default(CancellationToken)); + Task> QueryAsync(string streamName, long streamPosition = 0); - Task AppendEventsAsync(Guid commitId, string streamName, ICollection events); + Task QueryAsync(Func callback, string streamFilter = null, string position = null, CancellationToken ct = default(CancellationToken)); - Task AppendEventsAsync(Guid commitId, string streamName, long expectedVersion, ICollection events); + Task QueryAsync(Func callback, string property, object value, string position = null, CancellationToken ct = default(CancellationToken)); + + Task AppendAsync(Guid commitId, string streamName, ICollection events); + + Task AppendAsync(Guid commitId, string streamName, long expectedVersion, ICollection events); IEventSubscription CreateSubscription(IEventSubscriber subscriber, string streamFilter, string position = null); } diff --git a/src/Squidex.Infrastructure/EventSourcing/PollingSubscription.cs b/src/Squidex.Infrastructure/EventSourcing/PollingSubscription.cs index dd5fc072b..7cbb556b9 100644 --- a/src/Squidex.Infrastructure/EventSourcing/PollingSubscription.cs +++ b/src/Squidex.Infrastructure/EventSourcing/PollingSubscription.cs @@ -46,7 +46,7 @@ namespace Squidex.Infrastructure.EventSourcing { try { - await eventStore.GetEventsAsync(async storedEvent => + await eventStore.QueryAsync(async storedEvent => { await eventSubscriber.OnEventAsync(this, storedEvent); diff --git a/src/Squidex.Infrastructure/EventSourcing/StoredEvent.cs b/src/Squidex.Infrastructure/EventSourcing/StoredEvent.cs index 747dce5bc..3c93e21a4 100644 --- a/src/Squidex.Infrastructure/EventSourcing/StoredEvent.cs +++ b/src/Squidex.Infrastructure/EventSourcing/StoredEvent.cs @@ -9,33 +9,21 @@ namespace Squidex.Infrastructure.EventSourcing { public sealed class StoredEvent { - private readonly string eventPosition; - private readonly long eventStreamNumber; - private readonly EventData data; + public string EventPosition { get; } - public string EventPosition - { - get { return eventPosition; } - } - - public long EventStreamNumber - { - get { return eventStreamNumber; } - } + public long EventStreamNumber { get; } - public EventData Data - { - get { return data; } - } + public EventData Data { get; } public StoredEvent(string eventPosition, long eventStreamNumber, EventData data) { Guard.NotNullOrEmpty(eventPosition, nameof(eventPosition)); Guard.NotNull(data, nameof(data)); - this.data = data; - this.eventPosition = eventPosition; - this.eventStreamNumber = eventStreamNumber; + Data = data; + + EventPosition = eventPosition; + EventStreamNumber = eventStreamNumber; } } } diff --git a/src/Squidex.Infrastructure/States/Persistence{TOwner,TSnapshot,TKey}.cs b/src/Squidex.Infrastructure/States/Persistence{TOwner,TSnapshot,TKey}.cs index 024a05119..cb0900aed 100644 --- a/src/Squidex.Infrastructure/States/Persistence{TOwner,TSnapshot,TKey}.cs +++ b/src/Squidex.Infrastructure/States/Persistence{TOwner,TSnapshot,TKey}.cs @@ -101,7 +101,7 @@ namespace Squidex.Infrastructure.States { if (UseEventSourcing()) { - var events = await eventStore.GetEventsAsync(GetStreamName(), versionEvents + 1); + var events = await eventStore.QueryAsync(GetStreamName(), versionEvents + 1); foreach (var @event in events) { @@ -160,7 +160,7 @@ namespace Squidex.Infrastructure.States try { - await eventStore.AppendEventsAsync(commitId, GetStreamName(), expectedVersion, eventData); + await eventStore.AppendAsync(commitId, GetStreamName(), expectedVersion, eventData); } catch (WrongEventVersionException ex) { diff --git a/src/Squidex/Config/Domain/InfrastructureServices.cs b/src/Squidex/Config/Domain/InfrastructureServices.cs index 6781b36e7..9a182bf87 100644 --- a/src/Squidex/Config/Domain/InfrastructureServices.cs +++ b/src/Squidex/Config/Domain/InfrastructureServices.cs @@ -89,7 +89,7 @@ namespace Squidex.Config.Domain services.AddSingletonAs() .As(); - services.AddSingletonAs() + services.AddSingletonAs() .As(); services.AddSingletonAs() diff --git a/tests/Squidex.Infrastructure.Tests/EventSourcing/EventDataFormatterTests.cs b/tests/Squidex.Infrastructure.Tests/EventSourcing/EventDataFormatterTests.cs index c0272f796..6d9406d95 100644 --- a/tests/Squidex.Infrastructure.Tests/EventSourcing/EventDataFormatterTests.cs +++ b/tests/Squidex.Infrastructure.Tests/EventSourcing/EventDataFormatterTests.cs @@ -29,7 +29,7 @@ namespace Squidex.Infrastructure.EventSourcing private readonly JsonSerializerSettings serializerSettings = new JsonSerializerSettings(); private readonly TypeNameRegistry typeNameRegistry = new TypeNameRegistry(); - private readonly JsonEventDataFormatter sut; + private readonly DefaultEventDataFormatter sut; public EventDataFormatterTests() { @@ -38,7 +38,7 @@ namespace Squidex.Infrastructure.EventSourcing typeNameRegistry.Map(typeof(MyEvent), "Event"); typeNameRegistry.Map(typeof(MyOldEvent), "OldEvent"); - sut = new JsonEventDataFormatter(typeNameRegistry, serializerSettings); + sut = new DefaultEventDataFormatter(typeNameRegistry, JsonSerializer.Create(serializerSettings)); } [Fact] diff --git a/tests/Squidex.Infrastructure.Tests/EventSourcing/PollingSubscriptionTests.cs b/tests/Squidex.Infrastructure.Tests/EventSourcing/PollingSubscriptionTests.cs index 4c58d2655..19baac3de 100644 --- a/tests/Squidex.Infrastructure.Tests/EventSourcing/PollingSubscriptionTests.cs +++ b/tests/Squidex.Infrastructure.Tests/EventSourcing/PollingSubscriptionTests.cs @@ -27,7 +27,7 @@ namespace Squidex.Infrastructure.EventSourcing await WaitAndStopAsync(sut); - A.CallTo(() => eventStore.GetEventsAsync(A>.Ignored, "^my-stream", position, A.Ignored)) + A.CallTo(() => eventStore.QueryAsync(A>.Ignored, "^my-stream", position, A.Ignored)) .MustHaveHappened(Repeated.Exactly.Once); } @@ -36,7 +36,7 @@ namespace Squidex.Infrastructure.EventSourcing { var ex = new InvalidOperationException(); - A.CallTo(() => eventStore.GetEventsAsync(A>.Ignored, "^my-stream", position, A.Ignored)) + A.CallTo(() => eventStore.QueryAsync(A>.Ignored, "^my-stream", position, A.Ignored)) .Throws(ex); var sut = new PollingSubscription(eventStore, eventNotifier, eventSubscriber, "^my-stream", position); @@ -52,7 +52,7 @@ namespace Squidex.Infrastructure.EventSourcing { var ex = new OperationCanceledException(); - A.CallTo(() => eventStore.GetEventsAsync(A>.Ignored, "^my-stream", position, A.Ignored)) + A.CallTo(() => eventStore.QueryAsync(A>.Ignored, "^my-stream", position, A.Ignored)) .Throws(ex); var sut = new PollingSubscription(eventStore, eventNotifier, eventSubscriber, "^my-stream", position); @@ -68,7 +68,7 @@ namespace Squidex.Infrastructure.EventSourcing { var ex = new AggregateException(new OperationCanceledException()); - A.CallTo(() => eventStore.GetEventsAsync(A>.Ignored, "^my-stream", position, A.Ignored)) + A.CallTo(() => eventStore.QueryAsync(A>.Ignored, "^my-stream", position, A.Ignored)) .Throws(ex); var sut = new PollingSubscription(eventStore, eventNotifier, eventSubscriber, "^my-stream", position); @@ -88,7 +88,7 @@ namespace Squidex.Infrastructure.EventSourcing await WaitAndStopAsync(sut); - A.CallTo(() => eventStore.GetEventsAsync(A>.Ignored, "^my-stream", position, A.Ignored)) + A.CallTo(() => eventStore.QueryAsync(A>.Ignored, "^my-stream", position, A.Ignored)) .MustHaveHappened(Repeated.Exactly.Once); } @@ -101,7 +101,7 @@ namespace Squidex.Infrastructure.EventSourcing await WaitAndStopAsync(sut); - A.CallTo(() => eventStore.GetEventsAsync(A>.Ignored, "^my-stream", position, A.Ignored)) + A.CallTo(() => eventStore.QueryAsync(A>.Ignored, "^my-stream", position, A.Ignored)) .MustHaveHappened(Repeated.Exactly.Twice); } diff --git a/tests/Squidex.Infrastructure.Tests/States/PersistenceEventSourcingTests.cs b/tests/Squidex.Infrastructure.Tests/States/PersistenceEventSourcingTests.cs index 865d2d0b5..9eed300b9 100644 --- a/tests/Squidex.Infrastructure.Tests/States/PersistenceEventSourcingTests.cs +++ b/tests/Squidex.Infrastructure.Tests/States/PersistenceEventSourcingTests.cs @@ -118,7 +118,7 @@ namespace Squidex.Infrastructure.States await sut.GetSingleAsync(key); - A.CallTo(() => eventStore.GetEventsAsync(key, 3)) + A.CallTo(() => eventStore.QueryAsync(key, 3)) .MustHaveHappened(); } @@ -199,9 +199,9 @@ namespace Squidex.Infrastructure.States await statefulObject.WriteEventsAsync(new MyEvent(), new MyEvent()); await statefulObject.WriteEventsAsync(new MyEvent(), new MyEvent()); - A.CallTo(() => eventStore.AppendEventsAsync(A.Ignored, key, 2, A>.That.Matches(x => x.Count == 2))) + A.CallTo(() => eventStore.AppendAsync(A.Ignored, key, 2, A>.That.Matches(x => x.Count == 2))) .MustHaveHappened(); - A.CallTo(() => eventStore.AppendEventsAsync(A.Ignored, key, 4, A>.That.Matches(x => x.Count == 2))) + A.CallTo(() => eventStore.AppendAsync(A.Ignored, key, 4, A>.That.Matches(x => x.Count == 2))) .MustHaveHappened(); } @@ -212,7 +212,7 @@ namespace Squidex.Infrastructure.States var actualObject = await sut.GetSingleAsync(key); - A.CallTo(() => eventStore.AppendEventsAsync(A.Ignored, key, 2, A>.That.Matches(x => x.Count == 2))) + A.CallTo(() => eventStore.AppendAsync(A.Ignored, key, 2, A>.That.Matches(x => x.Count == 2))) .Throws(new WrongEventVersionException(1, 1)); await Assert.ThrowsAsync(() => statefulObject.WriteEventsAsync(new MyEvent(), new MyEvent())); @@ -221,7 +221,7 @@ namespace Squidex.Infrastructure.States [Fact] public async Task Should_not_remove_from_cache_when_write_failed() { - A.CallTo(() => eventStore.AppendEventsAsync(A.Ignored, A.Ignored, A.Ignored, A>.Ignored)) + A.CallTo(() => eventStore.AppendAsync(A.Ignored, A.Ignored, A.Ignored, A>.Ignored)) .Throws(new InvalidOperationException()); var actualObject = await sut.GetSingleAsync(key); @@ -251,7 +251,7 @@ namespace Squidex.Infrastructure.States Assert.Same(retrievedStates[0], retrievedState); } - A.CallTo(() => eventStore.GetEventsAsync(key, 0)) + A.CallTo(() => eventStore.QueryAsync(key, 0)) .MustHaveHappened(Repeated.Exactly.Once); } @@ -284,7 +284,7 @@ namespace Squidex.Infrastructure.States i++; } - A.CallTo(() => eventStore.GetEventsAsync(key, readPosition)) + A.CallTo(() => eventStore.QueryAsync(key, readPosition)) .Returns(eventsStored); } } diff --git a/tools/Migrate_01/Migration00_ConvertEventStore.cs b/tools/Migrate_01/Migration00_ConvertEventStore.cs new file mode 100644 index 000000000..4cac5ed83 --- /dev/null +++ b/tools/Migrate_01/Migration00_ConvertEventStore.cs @@ -0,0 +1,53 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschraenkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +using System.Collections.Generic; +using System.Threading.Tasks; +using MongoDB.Bson; +using MongoDB.Driver; +using Squidex.Infrastructure.EventSourcing; +using Squidex.Infrastructure.Migrations; + +namespace Migrate_01 +{ + public sealed class Migration00_ConvertEventStore : IMigration + { + private readonly IEventStore eventStore; + + public int FromVersion { get; } = 0; + + public int ToVersion { get; } = 1; + + public Migration00_ConvertEventStore(IEventStore eventStore) + { + this.eventStore = eventStore; + } + + public async Task UpdateAsync(IEnumerable previousMigrations) + { + if (eventStore is MongoEventStore mongoEventStore) + { + var collection = mongoEventStore.RawCollection; + + var filter = Builders.Filter; + + await collection.Find(new BsonDocument()).ForEachAsync(async commit => + { + foreach (BsonDocument @event in commit["Events"].AsBsonArray) + { + @event.Remove("EventId"); + + @event["Payload"] = BsonDocument.Parse(@event["Payload"].AsString); + @event["Metadata"] = BsonDocument.Parse(@event["Metadata"].AsString); + } + + await collection.ReplaceOneAsync(filter.Eq("_id", commit["_id"].AsString), commit); + }); + } + } + } +} diff --git a/tools/Migrate_01/Rebuilder.cs b/tools/Migrate_01/Rebuilder.cs index a401c39d7..884664cdb 100644 --- a/tools/Migrate_01/Rebuilder.cs +++ b/tools/Migrate_01/Rebuilder.cs @@ -53,7 +53,7 @@ namespace Migrate_01 var handledIds = new HashSet(); - return eventStore.GetEventsAsync(async storedEvent => + return eventStore.QueryAsync(async storedEvent => { var @event = ParseKnownEvent(storedEvent); @@ -68,7 +68,7 @@ namespace Migrate_01 await asset.WriteSnapshotAsync(); } } - }, filter, cancellationToken: CancellationToken.None); + }, filter, ct: CancellationToken.None); } public Task RebuildConfigAsync() @@ -77,7 +77,7 @@ namespace Migrate_01 var handledIds = new HashSet(); - return eventStore.GetEventsAsync(async storedEvent => + return eventStore.QueryAsync(async storedEvent => { var @event = ParseKnownEvent(storedEvent); @@ -102,7 +102,7 @@ namespace Migrate_01 await app.WriteSnapshotAsync(); } } - }, filter, cancellationToken: CancellationToken.None); + }, filter, ct: CancellationToken.None); } public async Task RebuildContentAsync() @@ -113,7 +113,7 @@ namespace Migrate_01 await snapshotContentStore.ClearAsync(); - await eventStore.GetEventsAsync(async storedEvent => + await eventStore.QueryAsync(async storedEvent => { var @event = ParseKnownEvent(storedEvent); @@ -139,7 +139,7 @@ namespace Migrate_01 // Schema has been deleted. } } - }, filter, cancellationToken: CancellationToken.None); + }, filter, ct: CancellationToken.None); } private Envelope ParseKnownEvent(StoredEvent storedEvent)