diff --git a/src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentRepository.cs b/src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentRepository.cs index 2a74ffe74..03e74bf11 100644 --- a/src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentRepository.cs +++ b/src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentRepository.cs @@ -17,6 +17,8 @@ using Squidex.Domain.Apps.Entities.Contents; using Squidex.Domain.Apps.Entities.Contents.Repositories; using Squidex.Domain.Apps.Entities.Contents.Text; using Squidex.Domain.Apps.Entities.Schemas; +using Squidex.Domain.Apps.Events.Assets; +using Squidex.Domain.Apps.Events.Contents; using Squidex.Infrastructure; using Squidex.Infrastructure.Json; using Squidex.Infrastructure.Log; @@ -30,19 +32,26 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Contents private readonly IAppProvider appProvider; private readonly IJsonSerializer serializer; private readonly ITextIndexer indexer; + private readonly string typeAssetDeleted; + private readonly string typeContentDeleted; private readonly MongoContentCollection contents; - public MongoContentRepository(IMongoDatabase database, IAppProvider appProvider, IJsonSerializer serializer, ITextIndexer indexer) + public MongoContentRepository(IMongoDatabase database, IAppProvider appProvider, IJsonSerializer serializer, ITextIndexer indexer, TypeNameRegistry typeNameRegistry) { Guard.NotNull(appProvider, nameof(appProvider)); + Guard.NotNull(database, nameof(database)); Guard.NotNull(serializer, nameof(serializer)); - Guard.NotNull(indexer, nameof(ITextIndexer)); + Guard.NotNull(indexer, nameof(indexer)); + Guard.NotNull(typeNameRegistry, nameof(typeNameRegistry)); this.appProvider = appProvider; this.database = database; this.indexer = indexer; this.serializer = serializer; + typeAssetDeleted = typeNameRegistry.GetName(); + typeContentDeleted = typeNameRegistry.GetName(); + contents = new MongoContentCollection(database, serializer, appProvider); } diff --git a/src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentRepository_EventHandling.cs b/src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentRepository_EventHandling.cs index bb072df70..66c8ddcad 100644 --- a/src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentRepository_EventHandling.cs +++ b/src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentRepository_EventHandling.cs @@ -26,6 +26,11 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Contents get { return "^(content-)|(asset-)"; } } + public bool Handles(StoredEvent @event) + { + return @event.Data.Type == typeAssetDeleted || @event.Data.Type == typeContentDeleted; + } + public Task On(Envelope @event) { return this.DispatchActionAsync(@event.Payload); diff --git a/src/Squidex.Domain.Apps.Entities/Assets/AssetUsageTracker_EventHandling.cs b/src/Squidex.Domain.Apps.Entities/Assets/AssetUsageTracker_EventHandling.cs index 5f8bfceae..aad7c4ac3 100644 --- a/src/Squidex.Domain.Apps.Entities/Assets/AssetUsageTracker_EventHandling.cs +++ b/src/Squidex.Domain.Apps.Entities/Assets/AssetUsageTracker_EventHandling.cs @@ -27,6 +27,11 @@ namespace Squidex.Domain.Apps.Entities.Assets get { return "^asset-"; } } + public bool Handles(StoredEvent @event) + { + return true; + } + public Task ClearAsync() { return TaskHelper.Done; diff --git a/src/Squidex.Domain.Apps.Entities/History/HistoryService.cs b/src/Squidex.Domain.Apps.Entities/History/HistoryService.cs index 5c6687380..e0f8e00c2 100644 --- a/src/Squidex.Domain.Apps.Entities/History/HistoryService.cs +++ b/src/Squidex.Domain.Apps.Entities/History/HistoryService.cs @@ -50,6 +50,16 @@ namespace Squidex.Domain.Apps.Entities.History this.repository = repository; } + public bool Handles(StoredEvent @event) + { + return true; + } + + public Task ClearAsync() + { + return repository.ClearAsync(); + } + public async Task On(Envelope @event) { foreach (var creator in creators) @@ -70,11 +80,6 @@ namespace Squidex.Domain.Apps.Entities.History } } - public Task ClearAsync() - { - return repository.ClearAsync(); - } - public async Task> QueryByChannelAsync(Guid appId, string channelPrefix, int count) { var items = await repository.QueryByChannelAsync(appId, channelPrefix, count); diff --git a/src/Squidex.Domain.Apps.Entities/Rules/RuleEnqueuer.cs b/src/Squidex.Domain.Apps.Entities/Rules/RuleEnqueuer.cs index e485d324e..5ccf4dff2 100644 --- a/src/Squidex.Domain.Apps.Entities/Rules/RuleEnqueuer.cs +++ b/src/Squidex.Domain.Apps.Entities/Rules/RuleEnqueuer.cs @@ -52,6 +52,11 @@ namespace Squidex.Domain.Apps.Entities.Rules this.ruleService = ruleService; } + public bool Handles(StoredEvent @event) + { + return true; + } + public Task ClearAsync() { return TaskHelper.Done; diff --git a/src/Squidex.Infrastructure.RabbitMq/CQRS/Events/RabbitMqEventConsumer.cs b/src/Squidex.Infrastructure.RabbitMq/CQRS/Events/RabbitMqEventConsumer.cs index 21d44ae47..5e426c8fc 100644 --- a/src/Squidex.Infrastructure.RabbitMq/CQRS/Events/RabbitMqEventConsumer.cs +++ b/src/Squidex.Infrastructure.RabbitMq/CQRS/Events/RabbitMqEventConsumer.cs @@ -81,6 +81,11 @@ namespace Squidex.Infrastructure.CQRS.Events } } + public bool Handles(StoredEvent @event) + { + return true; + } + public Task ClearAsync() { return TaskHelper.Done; diff --git a/src/Squidex.Infrastructure/EventSourcing/CompoundEventConsumer.cs b/src/Squidex.Infrastructure/EventSourcing/CompoundEventConsumer.cs index 665f433d8..8e93b73f8 100644 --- a/src/Squidex.Infrastructure/EventSourcing/CompoundEventConsumer.cs +++ b/src/Squidex.Infrastructure/EventSourcing/CompoundEventConsumer.cs @@ -56,6 +56,11 @@ namespace Squidex.Infrastructure.EventSourcing EventsFilter = string.Join("|", innerFilters); } + public bool Handles(StoredEvent @event) + { + return inners.Any(x => x.Handles(@event)); + } + public Task ClearAsync() { return Task.WhenAll(inners.Select(i => i.ClearAsync())); diff --git a/src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerGrain.cs b/src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerGrain.cs index 4c6f1c3ab..80fcd9435 100644 --- a/src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerGrain.cs +++ b/src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerGrain.cs @@ -70,11 +70,14 @@ namespace Squidex.Infrastructure.EventSourcing.Grains return DoAndUpdateStateAsync(async () => { - var @event = ParseKnownEvent(storedEvent.Value); - - if (@event != null) + if (eventConsumer.Handles(storedEvent.Value)) { - await DispatchConsumerAsync(@event); + var @event = ParseKnownEvent(storedEvent.Value); + + if (@event != null) + { + await DispatchConsumerAsync(@event); + } } State = State.Handled(storedEvent.Value.EventPosition); diff --git a/src/Squidex.Infrastructure/EventSourcing/IEventConsumer.cs b/src/Squidex.Infrastructure/EventSourcing/IEventConsumer.cs index 8e3179bec..2822c744d 100644 --- a/src/Squidex.Infrastructure/EventSourcing/IEventConsumer.cs +++ b/src/Squidex.Infrastructure/EventSourcing/IEventConsumer.cs @@ -17,6 +17,8 @@ namespace Squidex.Infrastructure.EventSourcing string EventsFilter { get; } + bool Handles(StoredEvent @event); + Task ClearAsync(); Task On(Envelope @event); diff --git a/tests/Squidex.Infrastructure.Tests/EventSourcing/CompoundEventConsumerTests.cs b/tests/Squidex.Infrastructure.Tests/EventSourcing/CompoundEventConsumerTests.cs index 9494ff7b7..6099dce5d 100644 --- a/tests/Squidex.Infrastructure.Tests/EventSourcing/CompoundEventConsumerTests.cs +++ b/tests/Squidex.Infrastructure.Tests/EventSourcing/CompoundEventConsumerTests.cs @@ -91,5 +91,41 @@ namespace Squidex.Infrastructure.EventSourcing A.CallTo(() => consumer1.On(@event)).MustHaveHappened(); A.CallTo(() => consumer2.On(@event)).MustHaveHappened(); } + + [Fact] + public void Should_handle_if_any_consumer_handles() + { + var stored = new StoredEvent("Stream", "1", 1, new EventData("Type", new EnvelopeHeaders(), "Payload")); + + A.CallTo(() => consumer1.Handles(stored)) + .Returns(false); + + A.CallTo(() => consumer2.Handles(stored)) + .Returns(true); + + var sut = new CompoundEventConsumer("consumer-name", consumer1, consumer2); + + var result = sut.Handles(stored); + + Assert.True(result); + } + + [Fact] + public void Should_no_handle_if_no_consumer_handles() + { + var stored = new StoredEvent("Stream", "1", 1, new EventData("Type", new EnvelopeHeaders(), "Payload")); + + A.CallTo(() => consumer1.Handles(stored)) + .Returns(false); + + A.CallTo(() => consumer2.Handles(stored)) + .Returns(false); + + var sut = new CompoundEventConsumer("consumer-name", consumer1, consumer2); + + var result = sut.Handles(stored); + + Assert.False(result); + } } } diff --git a/tests/Squidex.Infrastructure.Tests/EventSourcing/Grains/EventConsumerGrainTests.cs b/tests/Squidex.Infrastructure.Tests/EventSourcing/Grains/EventConsumerGrainTests.cs index 6a95aac4b..f781618ab 100644 --- a/tests/Squidex.Infrastructure.Tests/EventSourcing/Grains/EventConsumerGrainTests.cs +++ b/tests/Squidex.Infrastructure.Tests/EventSourcing/Grains/EventConsumerGrainTests.cs @@ -76,6 +76,9 @@ namespace Squidex.Infrastructure.EventSourcing.Grains A.CallTo(() => eventConsumer.Name) .Returns(consumerName); + A.CallTo(() => eventConsumer.Handles(A.Ignored)) + .Returns(true); + A.CallTo(() => persistence.ReadAsync(EtagVersion.Any)) .Invokes(new Action(s => apply(state))); @@ -193,6 +196,28 @@ namespace Squidex.Infrastructure.EventSourcing.Grains .MustHaveHappened(1, Times.Exactly); } + [Fact] + public async Task Should_not_invoke_but_update_position_when_consumer_does_not_want_to_handle() + { + var @event = new StoredEvent("Stream", Guid.NewGuid().ToString(), 123, eventData); + + A.CallTo(() => eventConsumer.Handles(@event)) + .Returns(false); + + await sut.ActivateAsync(consumerName); + await sut.ActivateAsync(); + + await OnEventAsync(eventSubscription, @event); + + state.Should().BeEquivalentTo(new EventConsumerState { IsStopped = false, Position = @event.EventPosition, Error = null }); + + A.CallTo(() => persistence.WriteSnapshotAsync(A.Ignored)) + .MustHaveHappened(1, Times.Exactly); + + A.CallTo(() => eventConsumer.On(envelope)) + .MustNotHaveHappened(); + } + [Fact] public async Task Should_ignore_old_events() {