From fe6e43fcfe8ba20a7cda29d017623478675abc4f Mon Sep 17 00:00:00 2001 From: Sebastian Date: Fri, 10 Feb 2017 19:53:58 +0100 Subject: [PATCH] Refactored denormlizer --- Squidex.sln | 1 - .../EventStore/MongoEventCommit.cs | 10 +- .../EventStore/MongoEventStore.cs | 200 +++++++++++------- .../EventStore/MongoStreamsRepository.cs | 11 + .../Commands/DefaultDomainObjectRepository.cs | 15 +- .../CQRS/EnvelopeExtensions.cs | 4 +- .../CQRS/Events/EventReceiver.cs | 88 ++++---- ...ventConsumer.cs => IEventCatchConsumer.cs} | 14 +- .../{IEventStream.cs => IEventNotifier.cs} | 8 +- .../CQRS/Events/IEventStore.cs | 4 +- .../CQRS/Events/ILiveEventConsumer.cs | 13 -- .../CQRS/Events/InMemoryEventBus.cs | 32 --- .../{IEventPublisher.cs => StoredEvent.cs} | 18 +- .../CQRS/Events/WrongEventVersionException.cs | 12 +- .../CQRS/Replay/IReplayableStore.cs | 17 -- .../CQRS/Replay/ReplayGenerator.cs | 107 ---------- src/Squidex.Infrastructure/ICliCommand.cs | 17 -- .../Timers/CompletionTimer.cs | 68 ++++++ .../Apps/MongoAppEntity.cs | 15 +- .../Apps/MongoAppRepository.cs | 100 +-------- .../Apps/MongoAppRepository_EventHandling.cs | 111 ++++++++++ .../Contents/MongoContentRepository.cs | 129 +---------- .../MongoContentRepository_EventHandling.cs | 148 +++++++++++++ .../History/MongoHistoryEventRepository.cs | 3 +- .../Schemas/MongoSchemaEntity.cs | 21 +- .../Schemas/MongoSchemaRepository.cs | 119 +---------- .../MongoSchemaRepository_EventHandling.cs | 112 ++++++++++ .../Utils/MongoDbConsumerWrapper.cs | 74 +++++++ .../Apps/Services/IAppProvider.cs | 2 + .../Implementations/CachingAppProvider.cs | 41 +--- .../Schemas/Services/ISchemaProvider.cs | 4 +- .../Implementations/CachingSchemaProvider.cs | 57 ++--- src/Squidex/Config/Domain/ReadModule.cs | 4 - .../Config/Domain/StoreMongoDbModule.cs | 13 +- .../CQRS/Events/EventReceiverTests.cs | 4 +- 35 files changed, 825 insertions(+), 771 deletions(-) create mode 100644 src/Squidex.Infrastructure.MongoDb/EventStore/MongoStreamsRepository.cs rename src/Squidex.Infrastructure/CQRS/Events/{ICatchEventConsumer.cs => IEventCatchConsumer.cs} (50%) rename src/Squidex.Infrastructure/CQRS/Events/{IEventStream.cs => IEventNotifier.cs} (73%) delete mode 100644 src/Squidex.Infrastructure/CQRS/Events/ILiveEventConsumer.cs delete mode 100644 src/Squidex.Infrastructure/CQRS/Events/InMemoryEventBus.cs rename src/Squidex.Infrastructure/CQRS/Events/{IEventPublisher.cs => StoredEvent.cs} (53%) delete mode 100644 src/Squidex.Infrastructure/CQRS/Replay/IReplayableStore.cs delete mode 100644 src/Squidex.Infrastructure/CQRS/Replay/ReplayGenerator.cs delete mode 100644 src/Squidex.Infrastructure/ICliCommand.cs create mode 100644 src/Squidex.Infrastructure/Timers/CompletionTimer.cs create mode 100644 src/Squidex.Read.MongoDb/Apps/MongoAppRepository_EventHandling.cs create mode 100644 src/Squidex.Read.MongoDb/Contents/MongoContentRepository_EventHandling.cs create mode 100644 src/Squidex.Read.MongoDb/Schemas/MongoSchemaRepository_EventHandling.cs create mode 100644 src/Squidex.Read.MongoDb/Utils/MongoDbConsumerWrapper.cs diff --git a/Squidex.sln b/Squidex.sln index a98a613c0..3816dc81f 100644 --- a/Squidex.sln +++ b/Squidex.sln @@ -103,7 +103,6 @@ Global HideSolutionNode = FALSE EndGlobalSection GlobalSection(NestedProjects) = preSolution - {61F6BBCE-A080-4400-B194-70E2F5D2096E} = {24A3171D-2905-49C9-8A49-A473799014E8} {47F3C27E-698B-4EDF-A7E8-D7F4232AFBB0} = {4C6B06C2-6D77-4E0E-AE32-D7050236433A} {BD1C30A8-8FFA-4A92-A9BD-B67B1CDDD84C} = {8CF53B92-5EB1-461D-98F8-70DA9B603FBF} {25F66C64-058A-4D44-BC0C-F12A054F9A91} = {4C6B06C2-6D77-4E0E-AE32-D7050236433A} diff --git a/src/Squidex.Infrastructure.MongoDb/EventStore/MongoEventCommit.cs b/src/Squidex.Infrastructure.MongoDb/EventStore/MongoEventCommit.cs index d499ae7ed..c61935136 100644 --- a/src/Squidex.Infrastructure.MongoDb/EventStore/MongoEventCommit.cs +++ b/src/Squidex.Infrastructure.MongoDb/EventStore/MongoEventCommit.cs @@ -30,14 +30,18 @@ namespace Squidex.Infrastructure.MongoDb.EventStore [BsonElement] [BsonRequired] - public string EventStream { get; set; } + public long EventsOffset { get; set; } [BsonElement] [BsonRequired] - public int EventsVersion { get; set; } + public long EventStreamOffset { get; set; } + + [BsonElement] + [BsonRequired] + public string EventStream { get; set; } [BsonElement] [BsonRequired] - public int EventCount { get; set; } + public long EventsCount { get; set; } } } diff --git a/src/Squidex.Infrastructure.MongoDb/EventStore/MongoEventStore.cs b/src/Squidex.Infrastructure.MongoDb/EventStore/MongoEventStore.cs index 4ac5fcec6..fc54bfedb 100644 --- a/src/Squidex.Infrastructure.MongoDb/EventStore/MongoEventStore.cs +++ b/src/Squidex.Infrastructure.MongoDb/EventStore/MongoEventStore.cs @@ -12,7 +12,6 @@ using System.Linq; using System.Reactive.Linq; using System.Threading.Tasks; using MongoDB.Bson; -using MongoDB.Bson.Serialization.Attributes; using MongoDB.Driver; using Squidex.Infrastructure.CQRS.Events; using Squidex.Infrastructure.Reflection; @@ -25,21 +24,16 @@ namespace Squidex.Infrastructure.MongoDb.EventStore { public class MongoEventStore : MongoRepositoryBase, IEventStore, IExternalSystem { - private sealed class EventCountEntity - { - [BsonId] - [BsonElement] - [BsonRepresentation(BsonType.String)] - public Guid Id { get; set; } - - [BsonElement] - [BsonRequired] - public int EventCount { get; set; } - } + private const int Retries = 500; + private readonly IEventNotifier notifier; + private string eventsOffsetIndex; - public MongoEventStore(IMongoDatabase database) + public MongoEventStore(IMongoDatabase database, IEventNotifier notifier) : base(database) { + Guard.NotNull(notifier, nameof(notifier)); + + this.notifier = notifier; } protected override string CollectionName() @@ -47,9 +41,19 @@ namespace Squidex.Infrastructure.MongoDb.EventStore return "Events"; } - protected override Task SetupCollectionAsync(IMongoCollection collection) + protected override MongoCollectionSettings CollectionSettings() + { + return new MongoCollectionSettings { WriteConcern = WriteConcern.WMajority }; + } + + protected override async Task SetupCollectionAsync(IMongoCollection collection) { - return collection.Indexes.CreateOneAsync(IndexKeys.Ascending(x => x.EventStream).Ascending(x => x.EventsVersion), new CreateIndexOptions { Unique = true }); + var indexNames = + await Task.WhenAll( + collection.Indexes.CreateOneAsync(IndexKeys.Descending(x => x.EventsOffset), new CreateIndexOptions { Unique = true }), + collection.Indexes.CreateOneAsync(IndexKeys.Descending(x => x.EventStreamOffset).Ascending(x => x.EventStream), new CreateIndexOptions { Unique = true })); + + eventsOffsetIndex = indexNames[0]; } public void CheckConnection() @@ -64,61 +68,57 @@ namespace Squidex.Infrastructure.MongoDb.EventStore } } - public IObservable GetEventsAsync(string streamName) + public IObservable GetEventsAsync(string streamName) { Guard.NotNullOrEmpty(streamName, nameof(streamName)); - return Observable.Create(async (observer, ct) => + return Observable.Create(async (observer, ct) => { - try + await Collection.Find(x => x.EventStream == streamName).ForEachAsync(commit => { - await Collection.Find(x => x.EventStream == streamName).ForEachAsync(commit => + var position = commit.EventStreamOffset; + + foreach (var @event in commit.Events) { - foreach (var @event in commit.Events) - { - var eventData = SimpleMapper.Map(@event, new EventData()); + var eventData = SimpleMapper.Map(@event, new EventData()); - observer.OnNext(eventData); - } - }, ct); + observer.OnNext(new StoredEvent(position, eventData)); - observer.OnCompleted(); - } - catch (Exception e) - { - observer.OnError(e); - } + position++; + } + }, ct); }); } - public IObservable GetEventsAsync() + public IObservable GetEventsAsync(long lastReceivedPosition = -1) { - return Observable.Create(async (observer, ct) => + return Observable.Create(async (observer, ct) => { - try + var position = await GetPreviousOffset(lastReceivedPosition); + + await Collection.Find(new BsonDocument()).ForEachAsync(commit => { - await Collection.Find(new BsonDocument()).ForEachAsync(commit => + foreach (var @event in commit.Events) { - foreach (var @event in commit.Events) + if (position >= lastReceivedPosition) { var eventData = SimpleMapper.Map(@event, new EventData()); - observer.OnNext(eventData); + observer.OnNext(new StoredEvent(position, eventData)); } - }, ct); - observer.OnCompleted(); - } - catch (Exception e) - { - observer.OnError(e); - } + position++; + } + }, ct); }); } public async Task AppendEventsAsync(Guid commitId, string streamName, int expectedVersion, IEnumerable events) { - var currentVersion = await GetEventVersionAsync(streamName); + Guard.NotNullOrEmpty(streamName, nameof(streamName)); + Guard.NotNull(events, nameof(events)); + + var currentVersion = await GetEventStreamOffset(streamName); if (currentVersion != expectedVersion) { @@ -127,50 +127,106 @@ namespace Squidex.Infrastructure.MongoDb.EventStore var now = DateTime.UtcNow; - var commit = new MongoEventCommit - { - Id = commitId, - Events = events.Select(x => SimpleMapper.Map(x, new MongoEvent())).ToList(), - EventStream = streamName, - EventsVersion = expectedVersion, - Timestamp = now - }; - - if (commit.Events.Any()) + var commitEvents = events.Select(x => SimpleMapper.Map(x, new MongoEvent())).ToList(); + + if (commitEvents.Any()) { - commit.EventCount = commit.Events.Count; + var offset = await GetEventOffset(); - try + var commit = new MongoEventCommit { - await Collection.InsertOneAsync(commit); - } - catch (MongoWriteException e) + Id = commitId, + Events = commitEvents, + EventsOffset = offset, + EventsCount = commitEvents.Count, + EventStream = streamName, + EventStreamOffset = expectedVersion, + Timestamp = now + }; + + for (var retry = 0; retry < Retries; retry++) { - if (e.WriteError?.Category == ServerErrorCategory.DuplicateKey) + try { - currentVersion = await GetEventVersionAsync(streamName); + await Collection.InsertOneAsync(commit); + + notifier.NotifyEventsStored(); - if (currentVersion != expectedVersion) + return; + } + catch (MongoWriteException e) + { + if (e.Message.IndexOf(eventsOffsetIndex, StringComparison.OrdinalIgnoreCase) >= 0) + { + commit.EventsOffset = await GetEventOffset(); + } + else if (e.WriteError?.Category == ServerErrorCategory.DuplicateKey) { + currentVersion = await GetEventStreamOffset(streamName); + throw new WrongEventVersionException(currentVersion, expectedVersion); } + else + { + throw; + } } - - throw; } } } - private async Task GetEventVersionAsync(string streamName) + private async Task GetPreviousOffset(long startPosition) { - var allCommits = - await Collection.Find(c => c.EventStream == streamName) - .Project(Projection.Include(x => x.EventCount)) - .ToListAsync(); + var document = + await Collection.Find(x => x.EventsOffset <= startPosition) + .Project(Projection + .Include(x => x.EventStreamOffset) + .Include(x => x.EventsCount)) + .SortByDescending(x => x.EventsOffset).Limit(1) + .FirstOrDefaultAsync(); + + if (document != null) + { + return document["EventsOffset"].ToInt64(); + } - var currentVersion = allCommits.Sum(x => x["EventCount"].ToInt32()) - 1; + return -1; + } + + private async Task GetEventOffset() + { + var document = + await Collection.Find(new BsonDocument()) + .Project(Projection + .Include(x => x.EventsOffset) + .Include(x => x.EventsCount)) + .SortByDescending(x => x.EventsOffset).Limit(1) + .FirstOrDefaultAsync(); + + if (document != null) + { + return document["EventsOffset"].ToInt64() + document["EventsCount"].ToInt64(); + } + + return -1; + } + + private async Task GetEventStreamOffset(string streamName) + { + var document = + await Collection.Find(x => x.EventStream == streamName) + .Project(Projection + .Include(x => x.EventStreamOffset) + .Include(x => x.EventsCount)) + .SortByDescending(x => x.EventsOffset).Limit(1) + .FirstOrDefaultAsync(); + + if (document != null) + { + return document["EventStreamOffset"].ToInt64() + document["EventsCount"].ToInt64(); + } - return currentVersion; + return -1; } } } diff --git a/src/Squidex.Infrastructure.MongoDb/EventStore/MongoStreamsRepository.cs b/src/Squidex.Infrastructure.MongoDb/EventStore/MongoStreamsRepository.cs new file mode 100644 index 000000000..b948e0e4e --- /dev/null +++ b/src/Squidex.Infrastructure.MongoDb/EventStore/MongoStreamsRepository.cs @@ -0,0 +1,11 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; + +namespace Squidex.Infrastructure.MongoDb.EventStore +{ + public class MongoStreamsRepository + { + } +} diff --git a/src/Squidex.Infrastructure/CQRS/Commands/DefaultDomainObjectRepository.cs b/src/Squidex.Infrastructure/CQRS/Commands/DefaultDomainObjectRepository.cs index d5a3202bf..7c657d2e6 100644 --- a/src/Squidex.Infrastructure/CQRS/Commands/DefaultDomainObjectRepository.cs +++ b/src/Squidex.Infrastructure/CQRS/Commands/DefaultDomainObjectRepository.cs @@ -20,26 +20,22 @@ namespace Squidex.Infrastructure.CQRS.Commands private readonly IStreamNameResolver nameResolver; private readonly IDomainObjectFactory factory; private readonly IEventStore eventStore; - private readonly IEventPublisher eventPublisher; private readonly EventDataFormatter formatter; public DefaultDomainObjectRepository( IDomainObjectFactory factory, IEventStore eventStore, - IEventPublisher eventPublisher, IStreamNameResolver nameResolver, EventDataFormatter formatter) { Guard.NotNull(factory, nameof(factory)); Guard.NotNull(formatter, nameof(formatter)); Guard.NotNull(eventStore, nameof(eventStore)); - Guard.NotNull(eventPublisher, nameof(eventPublisher)); Guard.NotNull(nameResolver, nameof(nameResolver)); this.factory = factory; - this.eventStore = eventStore; this.formatter = formatter; - this.eventPublisher = eventPublisher; + this.eventStore = eventStore; this.nameResolver = nameResolver; } @@ -58,9 +54,9 @@ namespace Squidex.Infrastructure.CQRS.Commands var domainObject = (TDomainObject)factory.CreateNew(typeof(TDomainObject), id); - foreach (var eventData in events) + foreach (var storedEvent in events) { - var envelope = formatter.Parse(eventData); + var envelope = formatter.Parse(storedEvent.Data); domainObject.ApplyEvent(envelope); } @@ -93,11 +89,6 @@ namespace Squidex.Infrastructure.CQRS.Commands { throw new DomainObjectVersionException(domainObject.Id.ToString(), domainObject.GetType(), versionCurrent, versionExpected); } - - foreach (var eventData in eventsToSave) - { - eventPublisher.Publish(eventData); - } } } } diff --git a/src/Squidex.Infrastructure/CQRS/EnvelopeExtensions.cs b/src/Squidex.Infrastructure/CQRS/EnvelopeExtensions.cs index a82550571..3022bc98c 100644 --- a/src/Squidex.Infrastructure/CQRS/EnvelopeExtensions.cs +++ b/src/Squidex.Infrastructure/CQRS/EnvelopeExtensions.cs @@ -14,12 +14,12 @@ namespace Squidex.Infrastructure.CQRS { public static class EnvelopeExtensions { - public static int EventNumber(this EnvelopeHeaders headers) + public static long EventNumber(this EnvelopeHeaders headers) { return headers[CommonHeaders.EventNumber].ToInt32(CultureInfo.InvariantCulture); } - public static Envelope SetEventNumber(this Envelope envelope, int value) where T : class + public static Envelope SetEventNumber(this Envelope envelope, long value) where T : class { envelope.Headers.Set(CommonHeaders.EventNumber, value); diff --git a/src/Squidex.Infrastructure/CQRS/Events/EventReceiver.cs b/src/Squidex.Infrastructure/CQRS/Events/EventReceiver.cs index 0ee52987e..eabd950fe 100644 --- a/src/Squidex.Infrastructure/CQRS/Events/EventReceiver.cs +++ b/src/Squidex.Infrastructure/CQRS/Events/EventReceiver.cs @@ -7,93 +7,93 @@ // ========================================================================== using System; -using System.Collections.Generic; -using System.Linq; +using System.Reactive.Linq; using System.Threading.Tasks; using Microsoft.Extensions.Logging; +using Squidex.Infrastructure.Timers; // ReSharper disable ConvertIfStatementToConditionalTernaryExpression // ReSharper disable InvertIf namespace Squidex.Infrastructure.CQRS.Events { - public sealed class EventReceiver + public sealed class EventReceiver : DisposableObject { private readonly EventDataFormatter formatter; - private readonly bool canCatch; - private readonly IEnumerable liveConsumers; - private readonly IEnumerable catchConsumers; - private readonly IEventStream eventStream; + private readonly IEventStore eventStore; + private readonly IEventNotifier eventNotifier; + private readonly IEventCatchConsumer eventConsumer; private readonly ILogger logger; - private bool isSubscribed; + private CompletionTimer timer; public EventReceiver( - ILogger logger, - IEventStream eventStream, - IEnumerable liveConsumers, - IEnumerable catchConsumers, EventDataFormatter formatter, - bool canCatch = true) + IEventStore eventStore, + IEventNotifier eventNotifier, + IEventCatchConsumer eventConsumer, + ILogger logger) { Guard.NotNull(logger, nameof(logger)); Guard.NotNull(formatter, nameof(formatter)); - Guard.NotNull(eventStream, nameof(eventStream)); - Guard.NotNull(liveConsumers, nameof(liveConsumers)); - Guard.NotNull(catchConsumers, nameof(catchConsumers)); + Guard.NotNull(eventStore, nameof(eventStore)); + Guard.NotNull(eventNotifier, nameof(eventNotifier)); + Guard.NotNull(eventConsumer, nameof(eventConsumer)); this.logger = logger; this.formatter = formatter; - this.canCatch = canCatch; - this.eventStream = eventStream; - this.liveConsumers = liveConsumers; - this.catchConsumers = catchConsumers; + this.eventStore = eventStore; + this.eventNotifier = eventNotifier; + this.eventConsumer = eventConsumer; } - public void Subscribe() + protected override void DisposeObject(bool disposing) { - if (isSubscribed) + if (disposing) { - return; + timer?.Dispose(); } + } - eventStream.Connect("squidex", eventData => + public void Subscribe(int delay = 5000) + { + if (timer != null) { - var @event = ParseEvent(eventData); + return; + } - if (@event == null) + var lastReceivedPosition = long.MinValue; + + timer = new CompletionTimer(delay, async ct => + { + if (lastReceivedPosition == long.MinValue) { - return; + lastReceivedPosition = await eventConsumer.GetLastHandledEventNumber(); } - if (canCatch) - { - DispatchConsumers(catchConsumers, @event); - } - else + await eventStore.GetEventsAsync(lastReceivedPosition).ForEachAsync(async storedEvent => { - DispatchConsumers(liveConsumers, @event); - } + var @event = ParseEvent(storedEvent.Data); - logger.LogDebug("Event {0} handled", @event.Payload.GetType().Name); - }); + @event.SetEventNumber(storedEvent.EventNumber); - isSubscribed = true; - } + await DispatchConsumer(@event, eventConsumer, storedEvent.EventNumber); + }, ct); + }); - private void DispatchConsumers(IEnumerable consumers, Envelope @event) - { - Task.WaitAll(consumers.Select(c => DispatchConsumer(@event, c)).ToArray()); + eventNotifier.Subscribe(timer.Trigger); } - private async Task DispatchConsumer(Envelope @event, IEventConsumer consumer) + private async Task DispatchConsumer(Envelope @event, IEventCatchConsumer consumer, long eventNumber) { try { - await consumer.On(@event); + await consumer.On(@event, eventNumber); } catch (Exception ex) { logger.LogError(InfrastructureErrors.EventHandlingFailed, ex, "[{0}]: Failed to handle event {1} ({2})", consumer, @event.Payload, @event.Headers.EventId()); + + throw; } } @@ -109,7 +109,7 @@ namespace Squidex.Infrastructure.CQRS.Events { logger.LogError(InfrastructureErrors.EventDeserializationFailed, ex, "Failed to parse event {0}", eventData.EventId); - return null; + throw; } } } diff --git a/src/Squidex.Infrastructure/CQRS/Events/ICatchEventConsumer.cs b/src/Squidex.Infrastructure/CQRS/Events/IEventCatchConsumer.cs similarity index 50% rename from src/Squidex.Infrastructure/CQRS/Events/ICatchEventConsumer.cs rename to src/Squidex.Infrastructure/CQRS/Events/IEventCatchConsumer.cs index 9b2ab2434..c0403b60e 100644 --- a/src/Squidex.Infrastructure/CQRS/Events/ICatchEventConsumer.cs +++ b/src/Squidex.Infrastructure/CQRS/Events/IEventCatchConsumer.cs @@ -1,13 +1,19 @@ -// ========================================================================== -// ICatchEventConsumer.cs +// ========================================================================== +// IEventCatchConsumer.cs // Squidex Headless CMS // ========================================================================== // Copyright (c) Squidex Group // All rights reserved. // ========================================================================== + +using System.Threading.Tasks; + namespace Squidex.Infrastructure.CQRS.Events { - public interface ICatchEventConsumer : IEventConsumer + public interface IEventCatchConsumer { + Task GetLastHandledEventNumber(); + + Task On(Envelope @event, long eventNumber); } -} +} \ No newline at end of file diff --git a/src/Squidex.Infrastructure/CQRS/Events/IEventStream.cs b/src/Squidex.Infrastructure/CQRS/Events/IEventNotifier.cs similarity index 73% rename from src/Squidex.Infrastructure/CQRS/Events/IEventStream.cs rename to src/Squidex.Infrastructure/CQRS/Events/IEventNotifier.cs index 5b311bfea..290ca5d53 100644 --- a/src/Squidex.Infrastructure/CQRS/Events/IEventStream.cs +++ b/src/Squidex.Infrastructure/CQRS/Events/IEventNotifier.cs @@ -1,5 +1,5 @@ // ========================================================================== -// IEventStream.cs +// IEventsPushedNotifier.cs // Squidex Headless CMS // ========================================================================== // Copyright (c) Squidex Group @@ -10,8 +10,10 @@ using System; namespace Squidex.Infrastructure.CQRS.Events { - public interface IEventStream : IDisposable + public interface IEventNotifier { - void Connect(string queuePrefix, Action received); + void NotifyEventsStored(); + + void Subscribe(Action handler); } } diff --git a/src/Squidex.Infrastructure/CQRS/Events/IEventStore.cs b/src/Squidex.Infrastructure/CQRS/Events/IEventStore.cs index 38b0f5fad..76f73ca91 100644 --- a/src/Squidex.Infrastructure/CQRS/Events/IEventStore.cs +++ b/src/Squidex.Infrastructure/CQRS/Events/IEventStore.cs @@ -14,9 +14,9 @@ namespace Squidex.Infrastructure.CQRS.Events { public interface IEventStore { - IObservable GetEventsAsync(); + IObservable GetEventsAsync(long lastReceivedPosition = -1); - IObservable GetEventsAsync(string streamName); + IObservable GetEventsAsync(string streamName); Task AppendEventsAsync(Guid commitId, string streamName, int expectedVersion, IEnumerable events); } diff --git a/src/Squidex.Infrastructure/CQRS/Events/ILiveEventConsumer.cs b/src/Squidex.Infrastructure/CQRS/Events/ILiveEventConsumer.cs deleted file mode 100644 index 9bbe7b922..000000000 --- a/src/Squidex.Infrastructure/CQRS/Events/ILiveEventConsumer.cs +++ /dev/null @@ -1,13 +0,0 @@ -// ========================================================================== -// ILiveEventConsumer.cs -// Squidex Headless CMS -// ========================================================================== -// Copyright (c) Squidex Group -// All rights reserved. -// ========================================================================== -namespace Squidex.Infrastructure.CQRS.Events -{ - public interface ILiveEventConsumer : IEventConsumer - { - } -} diff --git a/src/Squidex.Infrastructure/CQRS/Events/InMemoryEventBus.cs b/src/Squidex.Infrastructure/CQRS/Events/InMemoryEventBus.cs deleted file mode 100644 index 0eb7063f1..000000000 --- a/src/Squidex.Infrastructure/CQRS/Events/InMemoryEventBus.cs +++ /dev/null @@ -1,32 +0,0 @@ -// ========================================================================== -// InMemoryEventBus.cs -// Squidex Headless CMS -// ========================================================================== -// Copyright (c) Squidex Group -// All rights reserved. -// ========================================================================== - -using System; -using System.Reactive.Subjects; - -namespace Squidex.Infrastructure.CQRS.Events -{ - public class InMemoryEventBus : IEventPublisher, IEventStream - { - private readonly Subject subject = new Subject(); - - public void Dispose() - { - } - - public void Publish(EventData eventData) - { - subject.OnNext(eventData); - } - - public void Connect(string queuePrefix, Action received) - { - subject.Subscribe(received); - } - } -} diff --git a/src/Squidex.Infrastructure/CQRS/Events/IEventPublisher.cs b/src/Squidex.Infrastructure/CQRS/Events/StoredEvent.cs similarity index 53% rename from src/Squidex.Infrastructure/CQRS/Events/IEventPublisher.cs rename to src/Squidex.Infrastructure/CQRS/Events/StoredEvent.cs index e13f69798..478081e8c 100644 --- a/src/Squidex.Infrastructure/CQRS/Events/IEventPublisher.cs +++ b/src/Squidex.Infrastructure/CQRS/Events/StoredEvent.cs @@ -1,15 +1,25 @@ // ========================================================================== -// IEventPublisher.cs +// StoredEvent.cs // Squidex Headless CMS // ========================================================================== // Copyright (c) Squidex Group // All rights reserved. // ========================================================================== - namespace Squidex.Infrastructure.CQRS.Events { - public interface IEventPublisher + public sealed class StoredEvent { - void Publish(EventData eventData); + public long EventNumber { get; } + + public EventData Data { get; } + + public StoredEvent(long eventNumber, EventData data) + { + Guard.NotNull(data, nameof(data)); + + EventNumber = eventNumber; + + Data = data; + } } } diff --git a/src/Squidex.Infrastructure/CQRS/Events/WrongEventVersionException.cs b/src/Squidex.Infrastructure/CQRS/Events/WrongEventVersionException.cs index ccdc0dab8..030197c3f 100644 --- a/src/Squidex.Infrastructure/CQRS/Events/WrongEventVersionException.cs +++ b/src/Squidex.Infrastructure/CQRS/Events/WrongEventVersionException.cs @@ -12,20 +12,20 @@ namespace Squidex.Infrastructure.CQRS.Events { public class WrongEventVersionException : Exception { - private readonly int currentVersion; - private readonly int expectedVersion; + private readonly long currentVersion; + private readonly long expectedVersion; - public int CurrentVersion + public long CurrentVersion { get { return currentVersion; } } - public int ExpectedVersion + public long ExpectedVersion { get { return expectedVersion; } } - public WrongEventVersionException(int currentVersion, int expectedVersion) + public WrongEventVersionException(long currentVersion, long expectedVersion) : base(FormatMessage(currentVersion, expectedVersion)) { this.currentVersion = currentVersion; @@ -33,7 +33,7 @@ namespace Squidex.Infrastructure.CQRS.Events this.expectedVersion = expectedVersion; } - private static string FormatMessage(int currentVersion, int expectedVersion) + private static string FormatMessage(long currentVersion, long expectedVersion) { return $"Requested version {expectedVersion}, but found {currentVersion}."; } diff --git a/src/Squidex.Infrastructure/CQRS/Replay/IReplayableStore.cs b/src/Squidex.Infrastructure/CQRS/Replay/IReplayableStore.cs deleted file mode 100644 index a643c372c..000000000 --- a/src/Squidex.Infrastructure/CQRS/Replay/IReplayableStore.cs +++ /dev/null @@ -1,17 +0,0 @@ -// ========================================================================== -// IReplayableStore.cs -// Squidex Headless CMS -// ========================================================================== -// Copyright (c) Squidex Group -// All rights reserved. -// ========================================================================== - -using System.Threading.Tasks; - -namespace Squidex.Infrastructure.CQRS.Replay -{ - public interface IReplayableStore - { - Task ClearAsync(); - } -} diff --git a/src/Squidex.Infrastructure/CQRS/Replay/ReplayGenerator.cs b/src/Squidex.Infrastructure/CQRS/Replay/ReplayGenerator.cs deleted file mode 100644 index ca0a47602..000000000 --- a/src/Squidex.Infrastructure/CQRS/Replay/ReplayGenerator.cs +++ /dev/null @@ -1,107 +0,0 @@ -// ========================================================================== -// ReplayGenerator.cs -// Squidex Headless CMS -// ========================================================================== -// Copyright (c) Squidex Group -// All rights reserved. -// ========================================================================== - -using System; -using System.Collections.Generic; -using System.Reactive.Linq; -using System.Threading.Tasks; -using Microsoft.Extensions.Logging; -using Squidex.Infrastructure.CQRS.Events; - -namespace Squidex.Infrastructure.CQRS.Replay -{ - public sealed class ReplayGenerator : ICliCommand - { - private readonly ILogger logger; - private readonly IEventStore eventStore; - private readonly IEventPublisher eventPublisher; - private readonly IEnumerable stores; - - public string Name { get; } = "replay"; - - public ReplayGenerator( - ILogger logger, - IEventStore eventStore, - IEventPublisher eventPublisher, - IEnumerable stores) - { - Guard.NotNull(logger, nameof(logger)); - Guard.NotNull(eventStore, nameof(eventStore)); - Guard.NotNull(eventPublisher, nameof(eventPublisher)); - Guard.NotNull(stores, nameof(stores)); - - this.stores = stores; - this.logger = logger; - this.eventStore = eventStore; - this.eventPublisher = eventPublisher; - } - - public void Execute(string[] args) - { - ReplayAllAsync().Wait(); - } - - public async Task ReplayAllAsync() - { - logger.LogDebug("Starting to replay all events"); - - if (!await ClearAsync()) - { - return; - } - - await ReplayEventsAsync(); - - logger.LogDebug("Finished to replay all events"); - } - - private async Task ReplayEventsAsync() - { - try - { - logger.LogDebug("Replaying all messages"); - - await eventStore.GetEventsAsync().ForEachAsync(eventData => - { - eventPublisher.Publish(eventData); - }); - - logger.LogDebug("Replayed all messages"); - } - catch (Exception e) - { - logger.LogCritical(InfrastructureErrors.ReplayPublishingFailed, e, "Failed to publish events to {0}", eventPublisher); - } - } - - private async Task ClearAsync() - { - logger.LogDebug("Clearing replayable stores"); - - foreach (var store in stores) - { - try - { - await store.ClearAsync(); - - logger.LogDebug("Cleared store {0}", store); - } - catch (Exception e) - { - logger.LogCritical(InfrastructureErrors.ReplayClearingFailed, e, "Failed to clear store {0}", store); - - return false; - } - } - - logger.LogDebug("Cleared replayable stores"); - - return true; - } - } -} diff --git a/src/Squidex.Infrastructure/ICliCommand.cs b/src/Squidex.Infrastructure/ICliCommand.cs deleted file mode 100644 index 5cbf9a2e6..000000000 --- a/src/Squidex.Infrastructure/ICliCommand.cs +++ /dev/null @@ -1,17 +0,0 @@ -// ========================================================================== -// ICommand.cs -// Squidex Headless CMS -// ========================================================================== -// Copyright (c) Squidex Group -// All rights reserved. -// ========================================================================== - -namespace Squidex.Infrastructure -{ - public interface ICliCommand - { - string Name { get; } - - void Execute(string[] args); - } -} diff --git a/src/Squidex.Infrastructure/Timers/CompletionTimer.cs b/src/Squidex.Infrastructure/Timers/CompletionTimer.cs new file mode 100644 index 000000000..e430353da --- /dev/null +++ b/src/Squidex.Infrastructure/Timers/CompletionTimer.cs @@ -0,0 +1,68 @@ +// ========================================================================== +// CompletionTimer.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +using System; +using System.Threading; +using System.Threading.Tasks; + +// ReSharper disable InvertIf + +namespace Squidex.Infrastructure.Timers +{ + public sealed class CompletionTimer : DisposableObject + { + private readonly CancellationTokenSource disposeCancellationTokenSource = new CancellationTokenSource(); + private readonly Task runTask; + private CancellationTokenSource delayCancellationSource; + + public CompletionTimer(int delay, Func callback) + { + Guard.NotNull(callback, nameof(callback)); + Guard.GreaterThan(delay, 0, nameof(delay)); + + runTask = RunInternal(delay, callback); + } + + private async Task RunInternal(int delay, Func callback) + { + while (!disposeCancellationTokenSource.IsCancellationRequested) + { + try + { + await callback(disposeCancellationTokenSource.Token).ConfigureAwait(false); + } + catch (TaskCanceledException) + { + Console.WriteLine("Task in TriggerTimer has been cancelled."); + } + + delayCancellationSource = new CancellationTokenSource(); + + await Task.Delay(delay, delayCancellationSource.Token).ConfigureAwait(false); + } + } + + protected override void DisposeObject(bool disposing) + { + if (disposing) + { + delayCancellationSource?.Cancel(); + disposeCancellationTokenSource.Cancel(); + + runTask.Wait(); + } + } + + public void Trigger() + { + ThrowIfDisposed(); + + delayCancellationSource?.Cancel(); + } + } +} diff --git a/src/Squidex.Read.MongoDb/Apps/MongoAppEntity.cs b/src/Squidex.Read.MongoDb/Apps/MongoAppEntity.cs index fbfee8bce..0400a395d 100644 --- a/src/Squidex.Read.MongoDb/Apps/MongoAppEntity.cs +++ b/src/Squidex.Read.MongoDb/Apps/MongoAppEntity.cs @@ -27,15 +27,15 @@ namespace Squidex.Read.MongoDb.Apps [BsonRequired] [BsonElement] - public HashSet Languages { get; set; } + public HashSet Languages { get; } = new HashSet(); [BsonRequired] [BsonElement] - public Dictionary Clients { get; set; } + public Dictionary Clients { get; } = new Dictionary(); [BsonRequired] [BsonElement] - public Dictionary Contributors { get; set; } + public Dictionary Contributors { get; } = new Dictionary(); IReadOnlyCollection IAppEntity.Clients { @@ -56,14 +56,5 @@ namespace Squidex.Read.MongoDb.Apps { get { return Language.GetLanguage(MasterLanguage); } } - - public MongoAppEntity() - { - Contributors = new Dictionary(); - - Clients = new Dictionary(); - - Languages = new HashSet(); - } } } diff --git a/src/Squidex.Read.MongoDb/Apps/MongoAppRepository.cs b/src/Squidex.Read.MongoDb/Apps/MongoAppRepository.cs index 6ae50d818..62fe7f90e 100644 --- a/src/Squidex.Read.MongoDb/Apps/MongoAppRepository.cs +++ b/src/Squidex.Read.MongoDb/Apps/MongoAppRepository.cs @@ -10,25 +10,25 @@ using System; using System.Collections.Generic; using System.Threading.Tasks; using MongoDB.Driver; -using Squidex.Events.Apps; using Squidex.Infrastructure; -using Squidex.Infrastructure.CQRS; using Squidex.Infrastructure.CQRS.Events; -using Squidex.Infrastructure.CQRS.Replay; -using Squidex.Infrastructure.Dispatching; using Squidex.Infrastructure.MongoDb; -using Squidex.Infrastructure.Reflection; using Squidex.Read.Apps; using Squidex.Read.Apps.Repositories; -using Squidex.Read.MongoDb.Utils; +using Squidex.Read.Apps.Services; namespace Squidex.Read.MongoDb.Apps { - public class MongoAppRepository : MongoRepositoryBase, IAppRepository, ICatchEventConsumer, IReplayableStore + public partial class MongoAppRepository : MongoRepositoryBase, IAppRepository, IEventConsumer { - public MongoAppRepository(IMongoDatabase database) + private readonly IAppProvider appProvider; + + public MongoAppRepository(IMongoDatabase database, IAppProvider appProvider) : base(database) { + Guard.NotNull(appProvider, nameof(appProvider)); + + this.appProvider = appProvider; } protected override string CollectionName() @@ -41,11 +41,6 @@ namespace Squidex.Read.MongoDb.Apps return collection.Indexes.CreateOneAsync(IndexKeys.Ascending(x => x.Name)); } - public Task ClearAsync() - { - return TryDropCollectionAsync(); - } - public async Task> QueryAllAsync(string subjectId) { var entities = @@ -69,84 +64,5 @@ namespace Squidex.Read.MongoDb.Apps return entity; } - - protected Task On(AppCreated @event, EnvelopeHeaders headers) - { - return Collection.CreateAsync(headers, a => - { - SimpleMapper.Map(@event, a); - }); - } - - protected Task On(AppContributorAssigned @event, EnvelopeHeaders headers) - { - return Collection.UpdateAsync(headers, a => - { - var contributor = a.Contributors.GetOrAddNew(@event.ContributorId); - - SimpleMapper.Map(@event, contributor); - }); - } - - protected Task On(AppContributorRemoved @event, EnvelopeHeaders headers) - { - return Collection.UpdateAsync(headers, a => - { - a.Contributors.Remove(@event.ContributorId); - }); - } - - protected Task On(AppClientAttached @event, EnvelopeHeaders headers) - { - return Collection.UpdateAsync(headers, a => - { - a.Clients.Add(@event.Id, SimpleMapper.Map(@event, new MongoAppClientEntity())); - }); - } - - protected Task On(AppClientRevoked @event, EnvelopeHeaders headers) - { - return Collection.UpdateAsync(headers, a => - { - a.Clients.Remove(@event.Id); - }); - } - - protected Task On(AppClientRenamed @event, EnvelopeHeaders headers) - { - return Collection.UpdateAsync(headers, a => - { - a.Clients[@event.Id].Name = @event.Name; - }); - } - - protected Task On(AppLanguageAdded @event, EnvelopeHeaders headers) - { - return Collection.UpdateAsync(headers, a => - { - a.Languages.Add(@event.Language.Iso2Code); - }); - } - - protected Task On(AppLanguageRemoved @event, EnvelopeHeaders headers) - { - return Collection.UpdateAsync(headers, a => - { - a.Languages.Remove(@event.Language.Iso2Code); - }); - } - - protected Task On(AppMasterLanguageSet @event, EnvelopeHeaders headers) - { - return Collection.UpdateAsync(headers, a => - { - a.MasterLanguage = @event.Language.Iso2Code; - }); - } - - public Task On(Envelope @event) - { - return this.DispatchActionAsync(@event.Payload, @event.Headers); - } } } diff --git a/src/Squidex.Read.MongoDb/Apps/MongoAppRepository_EventHandling.cs b/src/Squidex.Read.MongoDb/Apps/MongoAppRepository_EventHandling.cs new file mode 100644 index 000000000..674e0679e --- /dev/null +++ b/src/Squidex.Read.MongoDb/Apps/MongoAppRepository_EventHandling.cs @@ -0,0 +1,111 @@ +// ========================================================================== +// MongoAppRepository_EventHandling.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +using System; +using System.Threading.Tasks; +using Squidex.Events.Apps; +using Squidex.Infrastructure; +using Squidex.Infrastructure.CQRS; +using Squidex.Infrastructure.CQRS.Events; +using Squidex.Infrastructure.Dispatching; +using Squidex.Infrastructure.Reflection; +using Squidex.Read.MongoDb.Utils; + +namespace Squidex.Read.MongoDb.Apps +{ + public partial class MongoAppRepository + { + public Task On(Envelope @event) + { + return this.DispatchActionAsync(@event.Payload, @event.Headers); + } + + protected async Task On(AppCreated @event, EnvelopeHeaders headers) + { + await Collection.CreateAsync(headers, a => + { + SimpleMapper.Map(@event, a); + }); + + appProvider.Remove(headers.AggregateId()); + } + + protected Task On(AppContributorAssigned @event, EnvelopeHeaders headers) + { + return UpdateAsync(headers, a => + { + var contributor = a.Contributors.GetOrAddNew(@event.ContributorId); + + SimpleMapper.Map(@event, contributor); + }); + } + + protected Task On(AppContributorRemoved @event, EnvelopeHeaders headers) + { + return UpdateAsync(headers, a => + { + a.Contributors.Remove(@event.ContributorId); + }); + } + + protected Task On(AppClientAttached @event, EnvelopeHeaders headers) + { + return UpdateAsync(headers, a => + { + a.Clients[@event.Id] = SimpleMapper.Map(@event, new MongoAppClientEntity()); + }); + } + + protected Task On(AppClientRevoked @event, EnvelopeHeaders headers) + { + return UpdateAsync(headers, a => + { + a.Clients.Remove(@event.Id); + }); + } + + protected Task On(AppClientRenamed @event, EnvelopeHeaders headers) + { + return UpdateAsync(headers, a => + { + a.Clients[@event.Id].Name = @event.Name; + }); + } + + protected Task On(AppLanguageAdded @event, EnvelopeHeaders headers) + { + return UpdateAsync(headers, a => + { + a.Languages.Add(@event.Language.Iso2Code); + }); + } + + protected Task On(AppLanguageRemoved @event, EnvelopeHeaders headers) + { + return UpdateAsync(headers, a => + { + a.Languages.Remove(@event.Language.Iso2Code); + }); + } + + protected Task On(AppMasterLanguageSet @event, EnvelopeHeaders headers) + { + return UpdateAsync(headers, a => + { + a.MasterLanguage = @event.Language.Iso2Code; + }); + } + + public async Task UpdateAsync(EnvelopeHeaders headers, Action updater) + { + await Collection.UpdateAsync(headers, updater); + + appProvider.Remove(headers.AggregateId()); + } + } +} diff --git a/src/Squidex.Read.MongoDb/Contents/MongoContentRepository.cs b/src/Squidex.Read.MongoDb/Contents/MongoContentRepository.cs index 21f4dab32..6af65cc88 100644 --- a/src/Squidex.Read.MongoDb/Contents/MongoContentRepository.cs +++ b/src/Squidex.Read.MongoDb/Contents/MongoContentRepository.cs @@ -11,39 +11,22 @@ using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; using Microsoft.OData.Core; -using MongoDB.Bson; using MongoDB.Driver; using Squidex.Core.Schemas; -using Squidex.Events; -using Squidex.Events.Contents; -using Squidex.Events.Schemas; using Squidex.Infrastructure; -using Squidex.Infrastructure.CQRS; using Squidex.Infrastructure.CQRS.Events; -using Squidex.Infrastructure.CQRS.Replay; -using Squidex.Infrastructure.Dispatching; -using Squidex.Infrastructure.Reflection; using Squidex.Read.Contents; using Squidex.Read.Contents.Repositories; using Squidex.Read.MongoDb.Contents.Visitors; -using Squidex.Read.MongoDb.Utils; using Squidex.Read.Schemas.Services; namespace Squidex.Read.MongoDb.Contents { - public class MongoContentRepository : IContentRepository, ICatchEventConsumer, IReplayableStore + public partial class MongoContentRepository : IContentRepository, IEventConsumer { private const string Prefix = "Projections_Content_"; private readonly IMongoDatabase database; private readonly ISchemaProvider schemaProvider; - - protected UpdateDefinitionBuilder Update - { - get - { - return Builders.Update; - } - } protected IndexKeysDefinitionBuilder IndexKeys { @@ -63,25 +46,6 @@ namespace Squidex.Read.MongoDb.Contents this.schemaProvider = schemaProvider; } - public async Task ClearAsync() - { - using (var collections = await database.ListCollectionsAsync()) - { - while (await collections.MoveNextAsync()) - { - foreach (var collection in collections.Current) - { - var name = collection["name"].ToString(); - - if (name.StartsWith(Prefix, StringComparison.OrdinalIgnoreCase)) - { - await database.DropCollectionAsync(name); - } - } - } - } - } - public async Task> QueryAsync(Guid schemaId, bool nonPublished, string odataQuery, HashSet languages) { List result = null; @@ -167,83 +131,6 @@ namespace Squidex.Read.MongoDb.Contents return result; } - protected Task On(ContentCreated @event, EnvelopeHeaders headers) - { - return ForSchemaAsync(headers.SchemaId(), (collection, schema) => - { - return collection.CreateAsync(headers, x => - { - SimpleMapper.Map(@event, x); - - x.SetData(schema, @event.Data); - }); - }); - } - - protected Task On(ContentUpdated @event, EnvelopeHeaders headers) - { - return ForSchemaAsync(headers.SchemaId(), (collection, schema) => - { - return collection.UpdateAsync(headers, x => - { - x.SetData(schema, @event.Data); - }); - }); - } - - protected Task On(ContentPublished @event, EnvelopeHeaders headers) - { - return ForSchemaAsync(headers.SchemaId(), collection => - { - return collection.UpdateAsync(headers, x => - { - x.IsPublished = true; - }); - }); - } - - protected Task On(ContentUnpublished @event, EnvelopeHeaders headers) - { - return ForSchemaAsync(headers.SchemaId(), collection => - { - return collection.UpdateAsync(headers, x => - { - x.IsPublished = false; - }); - }); - } - - protected Task On(ContentDeleted @event, EnvelopeHeaders headers) - { - return ForSchemaAsync(headers.SchemaId(), collection => - { - return collection.UpdateAsync(headers, x => - { - x.IsDeleted = true; - }); - }); - } - - protected Task On(FieldDeleted @event, EnvelopeHeaders headers) - { - var collection = GetCollection(headers.SchemaId()); - - return collection.UpdateManyAsync(new BsonDocument(), Update.Unset(new StringFieldDefinition($"Data.{@event.FieldId}"))); - } - - protected async Task On(SchemaCreated @event, EnvelopeHeaders headers) - { - var collection = GetCollection(headers.AggregateId()); - - await collection.Indexes.CreateOneAsync(IndexKeys.Ascending(x => x.IsPublished)); - await collection.Indexes.CreateOneAsync(IndexKeys.Text(x => x.Text)); - } - - public Task On(Envelope @event) - { - return this.DispatchActionAsync(@event.Payload, @event.Headers); - } - private async Task ForSchemaAsync(Guid schemaId, Func, Schema, Task> action) { var collection = GetCollection(schemaId); @@ -257,19 +144,5 @@ namespace Squidex.Read.MongoDb.Contents await action(collection, schemaEntity.Schema); } - - private async Task ForSchemaAsync(Guid schemaId, Func, Task> action) - { - var collection = GetCollection(schemaId); - - await action(collection); - } - - private IMongoCollection GetCollection(Guid schemaId) - { - var name = $"{Prefix}{schemaId}"; - - return database.GetCollection(name); - } } } diff --git a/src/Squidex.Read.MongoDb/Contents/MongoContentRepository_EventHandling.cs b/src/Squidex.Read.MongoDb/Contents/MongoContentRepository_EventHandling.cs new file mode 100644 index 000000000..48b37e64b --- /dev/null +++ b/src/Squidex.Read.MongoDb/Contents/MongoContentRepository_EventHandling.cs @@ -0,0 +1,148 @@ +// ========================================================================== +// MongoContentRepository_EventHandling.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +using System; +using System.Threading.Tasks; +using MongoDB.Bson; +using MongoDB.Driver; +using Squidex.Events; +using Squidex.Events.Contents; +using Squidex.Events.Schemas; +using Squidex.Infrastructure.CQRS; +using Squidex.Infrastructure.CQRS.Events; +using Squidex.Infrastructure.Dispatching; +using Squidex.Infrastructure.Reflection; +using Squidex.Read.MongoDb.Utils; + +// ReSharper disable ConvertToLambdaExpression + +namespace Squidex.Read.MongoDb.Contents +{ + public partial class MongoContentRepository + { + protected UpdateDefinitionBuilder Update + { + get + { + return Builders.Update; + } + } + + public async Task ClearAsync() + { + using (var collections = await database.ListCollectionsAsync()) + { + while (await collections.MoveNextAsync()) + { + foreach (var collection in collections.Current) + { + var name = collection["name"].ToString(); + + if (name.StartsWith(Prefix, StringComparison.OrdinalIgnoreCase)) + { + await database.DropCollectionAsync(name); + } + } + } + } + } + + public Task On(Envelope @event) + { + return this.DispatchActionAsync(@event.Payload, @event.Headers); + } + + protected Task On(SchemaCreated @event, EnvelopeHeaders headers) + { + return ForSchemaIdAsync(headers.AggregateId(), async collection => + { + await collection.Indexes.CreateOneAsync(IndexKeys.Ascending(x => x.IsPublished)); + await collection.Indexes.CreateOneAsync(IndexKeys.Text(x => x.Text)); + }); + } + + protected Task On(ContentCreated @event, EnvelopeHeaders headers) + { + return ForSchemaAsync(headers.SchemaId(), (collection, schema) => + { + return collection.CreateAsync(headers, x => + { + SimpleMapper.Map(@event, x); + + x.SetData(schema, @event.Data); + }); + }); + } + + protected Task On(ContentUpdated @event, EnvelopeHeaders headers) + { + return ForSchemaAsync(headers.SchemaId(), (collection, schema) => + { + return collection.UpdateAsync(headers, x => + { + x.SetData(schema, @event.Data); + }); + }); + } + + protected Task On(ContentPublished @event, EnvelopeHeaders headers) + { + return ForSchemaIdAsync(headers.SchemaId(), collection => + { + return collection.UpdateAsync(headers, x => + { + x.IsPublished = true; + }); + }); + } + + protected Task On(ContentUnpublished @event, EnvelopeHeaders headers) + { + return ForSchemaIdAsync(headers.SchemaId(), collection => + { + return collection.UpdateAsync(headers, x => + { + x.IsPublished = false; + }); + }); + } + + protected Task On(ContentDeleted @event, EnvelopeHeaders headers) + { + return ForSchemaIdAsync(headers.SchemaId(), collection => + { + return collection.UpdateAsync(headers, x => + { + x.IsDeleted = true; + }); + }); + } + + protected Task On(FieldDeleted @event, EnvelopeHeaders headers) + { + return ForSchemaIdAsync(headers.SchemaId(), collection => + { + return collection.UpdateManyAsync(new BsonDocument(), Update.Unset(new StringFieldDefinition($"Data.{@event.FieldId}"))); + }); + } + + private async Task ForSchemaIdAsync(Guid schemaId, Func, Task> action) + { + var collection = GetCollection(schemaId); + + await action(collection); + } + + private IMongoCollection GetCollection(Guid schemaId) + { + var name = $"{Prefix}{schemaId}"; + + return database.GetCollection(name); + } + } +} diff --git a/src/Squidex.Read.MongoDb/History/MongoHistoryEventRepository.cs b/src/Squidex.Read.MongoDb/History/MongoHistoryEventRepository.cs index b9e4ddd1f..eaf2ea0a5 100644 --- a/src/Squidex.Read.MongoDb/History/MongoHistoryEventRepository.cs +++ b/src/Squidex.Read.MongoDb/History/MongoHistoryEventRepository.cs @@ -14,7 +14,6 @@ using System.Threading.Tasks; using MongoDB.Driver; using Squidex.Infrastructure.CQRS; using Squidex.Infrastructure.CQRS.Events; -using Squidex.Infrastructure.CQRS.Replay; using Squidex.Infrastructure.MongoDb; using Squidex.Read.History; using Squidex.Read.History.Repositories; @@ -22,7 +21,7 @@ using Squidex.Read.MongoDb.Utils; namespace Squidex.Read.MongoDb.History { - public class MongoHistoryEventRepository : MongoRepositoryBase, IHistoryEventRepository, ICatchEventConsumer, IReplayableStore + public class MongoHistoryEventRepository : MongoRepositoryBase, IHistoryEventRepository, IEventConsumer { private readonly List creators; private readonly Dictionary texts = new Dictionary(); diff --git a/src/Squidex.Read.MongoDb/Schemas/MongoSchemaEntity.cs b/src/Squidex.Read.MongoDb/Schemas/MongoSchemaEntity.cs index a410a5706..4ac33a0e0 100644 --- a/src/Squidex.Read.MongoDb/Schemas/MongoSchemaEntity.cs +++ b/src/Squidex.Read.MongoDb/Schemas/MongoSchemaEntity.cs @@ -58,11 +58,26 @@ namespace Squidex.Read.MongoDb.Schemas get { return schema.Value; } } - public Lazy DeserializeSchema(SchemaJsonSerializer serializer) + public void SerializeSchema(Schema newSchema, SchemaJsonSerializer serializer) + { + Label = newSchema.Properties.Label ?? newSchema.Name; + + Schema = serializer.Serialize(newSchema).ToString(); + schema = new Lazy(() => newSchema); + + IsPublished = newSchema.IsPublished; + } + + public void UpdateSchema(SchemaJsonSerializer serializer, Func updater) { - schema = new Lazy(() => Schema != null ? serializer.Deserialize(JObject.Parse(Schema)) : null); + DeserializeSchema(serializer); + + SerializeSchema(updater(schema.Value), serializer); + } - return schema; + public Lazy DeserializeSchema(SchemaJsonSerializer serializer) + { + return schema ?? (schema = new Lazy(() => Schema != null ? serializer.Deserialize(JObject.Parse(Schema)) : null)); } } } diff --git a/src/Squidex.Read.MongoDb/Schemas/MongoSchemaRepository.cs b/src/Squidex.Read.MongoDb/Schemas/MongoSchemaRepository.cs index 05b3d3636..28089ce7d 100644 --- a/src/Squidex.Read.MongoDb/Schemas/MongoSchemaRepository.cs +++ b/src/Squidex.Read.MongoDb/Schemas/MongoSchemaRepository.cs @@ -13,35 +13,31 @@ using System.Threading.Tasks; using MongoDB.Driver; using Squidex.Core.Schemas; using Squidex.Core.Schemas.Json; -using Squidex.Events.Schemas; -using Squidex.Events.Schemas.Utils; using Squidex.Infrastructure; -using Squidex.Infrastructure.CQRS; using Squidex.Infrastructure.CQRS.Events; -using Squidex.Infrastructure.CQRS.Replay; -using Squidex.Infrastructure.Dispatching; using Squidex.Infrastructure.MongoDb; -using Squidex.Infrastructure.Reflection; -using Squidex.Read.MongoDb.Utils; using Squidex.Read.Schemas; using Squidex.Read.Schemas.Repositories; +using Squidex.Read.Schemas.Services; namespace Squidex.Read.MongoDb.Schemas { - public class MongoSchemaRepository : MongoRepositoryBase, ISchemaRepository, ICatchEventConsumer, IReplayableStore + public partial class MongoSchemaRepository : MongoRepositoryBase, ISchemaRepository, IEventConsumer { private readonly SchemaJsonSerializer serializer; private readonly FieldRegistry registry; + private readonly ISchemaProvider schemaProvider; - public MongoSchemaRepository(IMongoDatabase database, SchemaJsonSerializer serializer, FieldRegistry registry) + public MongoSchemaRepository(IMongoDatabase database, SchemaJsonSerializer serializer, FieldRegistry registry, ISchemaProvider schemaProvider) : base(database) { - Guard.NotNull(serializer, nameof(serializer)); Guard.NotNull(registry, nameof(registry)); - - this.serializer = serializer; + Guard.NotNull(serializer, nameof(serializer)); + Guard.NotNull(schemaProvider, nameof(schemaProvider)); this.registry = registry; + this.serializer = serializer; + this.schemaProvider = schemaProvider; } protected override string CollectionName() @@ -105,104 +101,5 @@ namespace Squidex.Read.MongoDb.Schemas return entity?.Id; } - - protected Task On(SchemaDeleted @event, EnvelopeHeaders headers) - { - return Collection.UpdateAsync(headers, s => s.IsDeleted = true); - } - - protected Task On(FieldDeleted @event, EnvelopeHeaders headers) - { - return UpdateSchema(headers, s => SchemaEventDispatcher.Dispatch(@event, s)); - } - - protected Task On(FieldDisabled @event, EnvelopeHeaders headers) - { - return UpdateSchema(headers, s => SchemaEventDispatcher.Dispatch(@event, s)); - } - - protected Task On(FieldEnabled @event, EnvelopeHeaders headers) - { - return UpdateSchema(headers, s => SchemaEventDispatcher.Dispatch(@event, s)); - } - - protected Task On(FieldHidden @event, EnvelopeHeaders headers) - { - return UpdateSchema(headers, s => SchemaEventDispatcher.Dispatch(@event, s)); - } - - protected Task On(FieldShown @event, EnvelopeHeaders headers) - { - return UpdateSchema(headers, s => SchemaEventDispatcher.Dispatch(@event, s)); - } - - protected Task On(FieldUpdated @event, EnvelopeHeaders headers) - { - return UpdateSchema(headers, s => SchemaEventDispatcher.Dispatch(@event, s)); - } - - protected Task On(SchemaUpdated @event, EnvelopeHeaders headers) - { - return UpdateSchema(headers, s => SchemaEventDispatcher.Dispatch(@event, s)); - } - - protected Task On(SchemaPublished @event, EnvelopeHeaders headers) - { - return UpdateSchema(headers, s => SchemaEventDispatcher.Dispatch(@event, s)); - } - - protected Task On(SchemaUnpublished @event, EnvelopeHeaders headers) - { - return UpdateSchema(headers, s => SchemaEventDispatcher.Dispatch(@event, s)); - } - - protected Task On(FieldAdded @event, EnvelopeHeaders headers) - { - return UpdateSchema(headers, s => SchemaEventDispatcher.Dispatch(@event, s, registry)); - } - - protected Task On(SchemaCreated @event, EnvelopeHeaders headers) - { - var schema = Schema.Create(@event.Name, @event.Properties); - - return Collection.CreateAsync(headers, s => { UpdateSchema(s, schema); SimpleMapper.Map(@event, s); }); - } - - public Task On(Envelope @event) - { - return this.DispatchActionAsync(@event.Payload, @event.Headers); - } - - private Task UpdateSchema(EnvelopeHeaders headers, Func updater) - { - return Collection.UpdateAsync(headers, e => UpdateSchema(e, updater)); - } - - private void UpdateSchema(MongoSchemaEntity entity, Func updater) - { - var currentSchema = Deserialize(entity); - - currentSchema = updater(currentSchema); - - UpdateSchema(entity, currentSchema); - UpdateProperties(entity, currentSchema); - } - - private static void UpdateProperties(MongoSchemaEntity entity, Schema currentSchema) - { - entity.Label = currentSchema.Properties.Label; - - entity.IsPublished = currentSchema.IsPublished; - } - - private void UpdateSchema(MongoSchemaEntity entity, Schema schema) - { - entity.Schema = serializer.Serialize(schema).ToString(); - } - - private Schema Deserialize(MongoSchemaEntity entity) - { - return entity.DeserializeSchema(serializer).Value; - } } } diff --git a/src/Squidex.Read.MongoDb/Schemas/MongoSchemaRepository_EventHandling.cs b/src/Squidex.Read.MongoDb/Schemas/MongoSchemaRepository_EventHandling.cs new file mode 100644 index 000000000..26309d078 --- /dev/null +++ b/src/Squidex.Read.MongoDb/Schemas/MongoSchemaRepository_EventHandling.cs @@ -0,0 +1,112 @@ +// ========================================================================== +// MongoSchemaRepository_EventHandling.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +using System; +using System.Threading.Tasks; +using Squidex.Core.Schemas; +using Squidex.Events.Schemas; +using Squidex.Events.Schemas.Utils; +using Squidex.Infrastructure.CQRS; +using Squidex.Infrastructure.CQRS.Events; +using Squidex.Infrastructure.Dispatching; +using Squidex.Infrastructure.Reflection; +using Squidex.Read.MongoDb.Utils; + +namespace Squidex.Read.MongoDb.Schemas +{ + public partial class MongoSchemaRepository + { + public Task On(Envelope @event) + { + return this.DispatchActionAsync(@event.Payload, @event.Headers); + } + + protected async Task On(SchemaCreated @event, EnvelopeHeaders headers) + { + var schema = SchemaEventDispatcher.Dispatch(@event); + + await Collection.CreateAsync(headers, s => { UpdateSchema(s, schema); SimpleMapper.Map(@event, s); }); + + schemaProvider.Remove(headers.AggregateId()); + } + + protected Task On(FieldDeleted @event, EnvelopeHeaders headers) + { + return UpdateSchema(headers, s => SchemaEventDispatcher.Dispatch(@event, s)); + } + + protected Task On(FieldDisabled @event, EnvelopeHeaders headers) + { + return UpdateSchema(headers, s => SchemaEventDispatcher.Dispatch(@event, s)); + } + + protected Task On(FieldEnabled @event, EnvelopeHeaders headers) + { + return UpdateSchema(headers, s => SchemaEventDispatcher.Dispatch(@event, s)); + } + + protected Task On(FieldHidden @event, EnvelopeHeaders headers) + { + return UpdateSchema(headers, s => SchemaEventDispatcher.Dispatch(@event, s)); + } + + protected Task On(FieldShown @event, EnvelopeHeaders headers) + { + return UpdateSchema(headers, s => SchemaEventDispatcher.Dispatch(@event, s)); + } + + protected Task On(FieldUpdated @event, EnvelopeHeaders headers) + { + return UpdateSchema(headers, s => SchemaEventDispatcher.Dispatch(@event, s)); + } + + protected Task On(SchemaUpdated @event, EnvelopeHeaders headers) + { + return UpdateSchema(headers, s => SchemaEventDispatcher.Dispatch(@event, s)); + } + + protected Task On(SchemaPublished @event, EnvelopeHeaders headers) + { + return UpdateSchema(headers, s => SchemaEventDispatcher.Dispatch(@event, s)); + } + + protected Task On(SchemaUnpublished @event, EnvelopeHeaders headers) + { + return UpdateSchema(headers, s => SchemaEventDispatcher.Dispatch(@event, s)); + } + + protected Task On(FieldAdded @event, EnvelopeHeaders headers) + { + return UpdateSchema(headers, s => SchemaEventDispatcher.Dispatch(@event, s, registry)); + } + + protected async Task On(SchemaDeleted @event, EnvelopeHeaders headers) + { + await Collection.UpdateAsync(headers, s => s.IsDeleted = true); + + schemaProvider.Remove(headers.AggregateId()); + } + + private async Task UpdateSchema(EnvelopeHeaders headers, Func updater) + { + await Collection.UpdateAsync(headers, e => UpdateSchema(e, updater)); + + schemaProvider.Remove(headers.AggregateId()); + } + + private void UpdateSchema(MongoSchemaEntity entity, Func updater) + { + entity.UpdateSchema(serializer, updater); + } + + private void UpdateSchema(MongoSchemaEntity entity, Schema schema) + { + entity.SerializeSchema(schema, serializer); + } + } +} diff --git a/src/Squidex.Read.MongoDb/Utils/MongoDbConsumerWrapper.cs b/src/Squidex.Read.MongoDb/Utils/MongoDbConsumerWrapper.cs new file mode 100644 index 000000000..221a938b9 --- /dev/null +++ b/src/Squidex.Read.MongoDb/Utils/MongoDbConsumerWrapper.cs @@ -0,0 +1,74 @@ +// ========================================================================== +// MongoDbStore.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +using System.Threading.Tasks; +using MongoDB.Bson; +using MongoDB.Driver; +using Squidex.Infrastructure; +using Squidex.Infrastructure.CQRS; +using Squidex.Infrastructure.CQRS.Events; +using Squidex.Infrastructure.MongoDb; + +namespace Squidex.Read.MongoDb.Utils +{ + public sealed class EventPosition + { + public string Name { get; set; } + + public long EventNumber { get; set; } + } + + public sealed class MongoDbConsumerWrapper : MongoRepositoryBase, IEventCatchConsumer + { + private static readonly UpdateOptions upsert = new UpdateOptions { IsUpsert = true }; + private readonly IEventConsumer eventConsumer; + private readonly string eventStoreName; + + public MongoDbConsumerWrapper(IMongoDatabase database, IEventConsumer eventConsumer) + : base(database) + { + Guard.NotNull(eventConsumer, nameof(eventConsumer)); + + this.eventConsumer = eventConsumer; + + eventStoreName = GetType().Name; + } + + protected override string CollectionName() + { + return "EventPositions"; + } + + protected override Task SetupCollectionAsync(IMongoCollection collection) + { + return collection.Indexes.CreateOneAsync(IndexKeys.Ascending(x => x.Name), new CreateIndexOptions { Unique = true }); + } + + public async Task On(Envelope @event, long eventNumber) + { + await eventConsumer.On(@event); + + await SetLastHandledEventNumber(eventNumber); + } + + private Task SetLastHandledEventNumber(long eventNumber) + { + return Collection.ReplaceOneAsync(x => x.Name == eventStoreName, new EventPosition { Name = eventStoreName, EventNumber = eventNumber }, upsert); + } + + public async Task GetLastHandledEventNumber() + { + var collectionPosition = + await Collection + .Find(new BsonDocument()).SortByDescending(x => x.EventNumber).Limit(1) + .FirstOrDefaultAsync(); + + return collectionPosition?.EventNumber ?? -1; + } + } +} diff --git a/src/Squidex.Read/Apps/Services/IAppProvider.cs b/src/Squidex.Read/Apps/Services/IAppProvider.cs index 0a1e78e25..9c85dfec9 100644 --- a/src/Squidex.Read/Apps/Services/IAppProvider.cs +++ b/src/Squidex.Read/Apps/Services/IAppProvider.cs @@ -16,5 +16,7 @@ namespace Squidex.Read.Apps.Services Task FindAppByIdAsync(Guid id); Task FindAppByNameAsync(string name); + + void Remove(Guid id); } } diff --git a/src/Squidex.Read/Apps/Services/Implementations/CachingAppProvider.cs b/src/Squidex.Read/Apps/Services/Implementations/CachingAppProvider.cs index 8e4b1aa4b..d4baeae8d 100644 --- a/src/Squidex.Read/Apps/Services/Implementations/CachingAppProvider.cs +++ b/src/Squidex.Read/Apps/Services/Implementations/CachingAppProvider.cs @@ -9,10 +9,7 @@ using System; using System.Threading.Tasks; using Microsoft.Extensions.Caching.Memory; -using Squidex.Events.Apps; using Squidex.Infrastructure; -using Squidex.Infrastructure.CQRS; -using Squidex.Infrastructure.CQRS.Events; using Squidex.Read.Apps.Repositories; using Squidex.Read.Utils; @@ -20,7 +17,7 @@ using Squidex.Read.Utils; namespace Squidex.Read.Apps.Services.Implementations { - public class CachingAppProvider : CachingProvider, IAppProvider, ICatchEventConsumer, ILiveEventConsumer + public class CachingAppProvider : CachingProvider, IAppProvider { private static readonly TimeSpan CacheDuration = TimeSpan.FromMinutes(30); private readonly IAppRepository repository; @@ -49,7 +46,7 @@ namespace Squidex.Read.Apps.Services.Implementations { var entity = await repository.FindAppAsync(appId); - cacheItem = new CacheItem { Entity = entity, Name = entity?.Name }; + cacheItem = new CacheItem { Entity = entity, Name = entity.Name }; Cache.Set(cacheKey, cacheItem, CacheDuration); @@ -86,40 +83,18 @@ namespace Squidex.Read.Apps.Services.Implementations return cacheItem.Entity; } - public Task On(Envelope @event) + public void Remove(Guid id) { - if (@event.Payload is AppContributorAssigned || - @event.Payload is AppContributorRemoved || - @event.Payload is AppClientAttached || - @event.Payload is AppClientRevoked || - @event.Payload is AppClientRenamed || - @event.Payload is AppLanguageAdded || - @event.Payload is AppLanguageRemoved || - @event.Payload is AppMasterLanguageSet) - { - var cacheKey = BuildIdCacheKey(@event.Headers.AggregateId()); - - var cacheItem = Cache.Get(cacheKey); + var cacheKey = BuildIdCacheKey(id); - if (cacheItem?.Name != null) - { - Cache.Remove(BuildNameCacheKey(cacheItem.Name)); - } + var cacheItem = Cache.Get(cacheKey); - Cache.Remove(cacheKey); - } - else + if (cacheItem?.Name != null) { - var appCreated = @event.Payload as AppCreated; - - if (appCreated != null) - { - Cache.Remove(BuildIdCacheKey(@event.Headers.AggregateId())); - Cache.Remove(BuildNameCacheKey(appCreated.Name)); - } + Cache.Remove(BuildNameCacheKey(cacheItem.Name)); } - return Task.FromResult(true); + Cache.Remove(cacheKey); } private static string BuildNameCacheKey(string name) diff --git a/src/Squidex.Read/Schemas/Services/ISchemaProvider.cs b/src/Squidex.Read/Schemas/Services/ISchemaProvider.cs index 2d155f57b..178b4ef09 100644 --- a/src/Squidex.Read/Schemas/Services/ISchemaProvider.cs +++ b/src/Squidex.Read/Schemas/Services/ISchemaProvider.cs @@ -13,8 +13,10 @@ namespace Squidex.Read.Schemas.Services { public interface ISchemaProvider { - Task FindSchemaByIdAsync(Guid schemaId); + Task FindSchemaByIdAsync(Guid id); Task FindSchemaByNameAsync(Guid appId, string name); + + void Remove(Guid id); } } diff --git a/src/Squidex.Read/Schemas/Services/Implementations/CachingSchemaProvider.cs b/src/Squidex.Read/Schemas/Services/Implementations/CachingSchemaProvider.cs index 889fd1c7a..93d1508b5 100644 --- a/src/Squidex.Read/Schemas/Services/Implementations/CachingSchemaProvider.cs +++ b/src/Squidex.Read/Schemas/Services/Implementations/CachingSchemaProvider.cs @@ -9,19 +9,16 @@ using System; using System.Threading.Tasks; using Microsoft.Extensions.Caching.Memory; -using Squidex.Events.Schemas; using Squidex.Infrastructure; -using Squidex.Infrastructure.CQRS; -using Squidex.Infrastructure.CQRS.Events; using Squidex.Read.Schemas.Repositories; using Squidex.Read.Utils; -using Squidex.Events; +// ReSharper disable ConvertIfStatementToConditionalTernaryExpression // ReSharper disable InvertIf namespace Squidex.Read.Schemas.Services.Implementations { - public class CachingSchemaProvider : CachingProvider, ISchemaProvider, ICatchEventConsumer, ILiveEventConsumer + public class CachingSchemaProvider : CachingProvider, ISchemaProvider { private static readonly TimeSpan CacheDuration = TimeSpan.FromMinutes(10); private readonly ISchemaRepository repository; @@ -30,6 +27,8 @@ namespace Squidex.Read.Schemas.Services.Implementations { public ISchemaEntityWithSchema Entity; + public Guid AppId; + public string Name; } @@ -41,16 +40,23 @@ namespace Squidex.Read.Schemas.Services.Implementations this.repository = repository; } - public async Task FindSchemaByIdAsync(Guid schemaId) + public async Task FindSchemaByIdAsync(Guid id) { - var cacheKey = BuildIdCacheKey(schemaId); + var cacheKey = BuildIdCacheKey(id); var cacheItem = Cache.Get(cacheKey); if (cacheItem == null) { - var entity = await repository.FindSchemaAsync(schemaId); + var entity = await repository.FindSchemaAsync(id); - cacheItem = new CacheItem { Entity = entity, Name = entity?.Name }; + if (entity == null) + { + cacheItem = new CacheItem(); + } + else + { + cacheItem = new CacheItem { Entity = entity, Name = entity.Name, AppId = entity.AppId }; + } Cache.Set(cacheKey, cacheItem, CacheDuration); @@ -74,7 +80,7 @@ namespace Squidex.Read.Schemas.Services.Implementations { var entity = await repository.FindSchemaAsync(appId, name); - cacheItem = new CacheItem { Entity = entity, Name = name }; + cacheItem = new CacheItem { Entity = entity, Name = name, AppId = appId }; Cache.Set(cacheKey, cacheItem, CacheDuration); @@ -87,37 +93,18 @@ namespace Squidex.Read.Schemas.Services.Implementations return cacheItem.Entity; } - public Task On(Envelope @event) + public void Remove(Guid id) { - if (@event.Payload is SchemaDeleted || - @event.Payload is SchemaPublished || - @event.Payload is SchemaUnpublished || - @event.Payload is SchemaUpdated || - @event.Payload is FieldEvent) - { - var cacheKey = BuildIdCacheKey(@event.Headers.AggregateId()); - - var cacheItem = Cache.Get(cacheKey); + var cacheKey = BuildIdCacheKey(id); - if (cacheItem?.Name != null) - { - Cache.Remove(BuildNameCacheKey(@event.Headers.AppId(), cacheItem.Name)); - } + var cacheItem = Cache.Get(cacheKey); - Cache.Remove(cacheKey); - } - else + if (cacheItem?.Name != null) { - var schemaCreated = @event.Payload as SchemaCreated; - - if (schemaCreated != null) - { - Cache.Remove(BuildIdCacheKey(@event.Headers.AggregateId())); - Cache.Remove(BuildNameCacheKey(@event.Headers.AppId(), schemaCreated.Name)); - } + Cache.Remove(BuildNameCacheKey(cacheItem.AppId, cacheItem.Name)); } - return Task.FromResult(true); + Cache.Remove(cacheKey); } private static string BuildNameCacheKey(Guid appId, string name) diff --git a/src/Squidex/Config/Domain/ReadModule.cs b/src/Squidex/Config/Domain/ReadModule.cs index 8c153f368..487af56c4 100644 --- a/src/Squidex/Config/Domain/ReadModule.cs +++ b/src/Squidex/Config/Domain/ReadModule.cs @@ -33,14 +33,10 @@ namespace Squidex.Config.Domain { builder.RegisterType() .As() - .As() - .As() .SingleInstance(); builder.RegisterType() .As() - .As() - .As() .SingleInstance(); builder.RegisterType() diff --git a/src/Squidex/Config/Domain/StoreMongoDbModule.cs b/src/Squidex/Config/Domain/StoreMongoDbModule.cs index dd3eb7a02..be6caeb91 100644 --- a/src/Squidex/Config/Domain/StoreMongoDbModule.cs +++ b/src/Squidex/Config/Domain/StoreMongoDbModule.cs @@ -15,7 +15,6 @@ using Microsoft.Extensions.Configuration; using MongoDB.Driver; using Squidex.Infrastructure; using Squidex.Infrastructure.CQRS.Events; -using Squidex.Infrastructure.CQRS.Replay; using Squidex.Read.Apps.Repositories; using Squidex.Read.Contents.Repositories; using Squidex.Read.History.Repositories; @@ -102,29 +101,25 @@ namespace Squidex.Config.Domain builder.RegisterType() .WithParameter(ResolvedParameter.ForNamed(MongoDatabaseName)) .As() - .As() - .As() + .As() .SingleInstance(); builder.RegisterType() .WithParameter(ResolvedParameter.ForNamed(MongoDatabaseName)) .As() - .As() - .As() + .As() .SingleInstance(); builder.RegisterType() .WithParameter(ResolvedParameter.ForNamed(MongoDatabaseName)) .As() - .As() - .As() + .As() .SingleInstance(); builder.RegisterType() .WithParameter(ResolvedParameter.ForNamed(MongoDatabaseName)) .As() - .As() - .As() + .As() .SingleInstance(); } } diff --git a/tests/Squidex.Infrastructure.Tests/CQRS/Events/EventReceiverTests.cs b/tests/Squidex.Infrastructure.Tests/CQRS/Events/EventReceiverTests.cs index cff12c5c0..5b92b3c2a 100644 --- a/tests/Squidex.Infrastructure.Tests/CQRS/Events/EventReceiverTests.cs +++ b/tests/Squidex.Infrastructure.Tests/CQRS/Events/EventReceiverTests.cs @@ -44,8 +44,8 @@ namespace Squidex.Infrastructure.CQRS.Events private readonly Mock liveConsumer1 = new Mock(); private readonly Mock liveConsumer2 = new Mock(); - private readonly Mock catchConsumer1 = new Mock(); - private readonly Mock catchConsumer2 = new Mock(); + private readonly Mock catchConsumer1 = new Mock(); + private readonly Mock catchConsumer2 = new Mock(); private readonly Mock eventStream = new Mock(); private readonly Mock formatter = new Mock(new TypeNameRegistry(), null); private readonly EventData eventData = new EventData();