Browse Source

Performance improvement for event consumer.

pull/351/head
Sebastian Stehle 7 years ago
parent
commit
ca3b97c69b
  1. 13
      src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentRepository.cs
  2. 5
      src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentRepository_EventHandling.cs
  3. 5
      src/Squidex.Domain.Apps.Entities/Assets/AssetUsageTracker_EventHandling.cs
  4. 15
      src/Squidex.Domain.Apps.Entities/History/HistoryService.cs
  5. 5
      src/Squidex.Domain.Apps.Entities/Rules/RuleEnqueuer.cs
  6. 5
      src/Squidex.Infrastructure.RabbitMq/CQRS/Events/RabbitMqEventConsumer.cs
  7. 5
      src/Squidex.Infrastructure/EventSourcing/CompoundEventConsumer.cs
  8. 11
      src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerGrain.cs
  9. 2
      src/Squidex.Infrastructure/EventSourcing/IEventConsumer.cs
  10. 36
      tests/Squidex.Infrastructure.Tests/EventSourcing/CompoundEventConsumerTests.cs
  11. 25
      tests/Squidex.Infrastructure.Tests/EventSourcing/Grains/EventConsumerGrainTests.cs

13
src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentRepository.cs

@ -17,6 +17,8 @@ using Squidex.Domain.Apps.Entities.Contents;
using Squidex.Domain.Apps.Entities.Contents.Repositories;
using Squidex.Domain.Apps.Entities.Contents.Text;
using Squidex.Domain.Apps.Entities.Schemas;
using Squidex.Domain.Apps.Events.Assets;
using Squidex.Domain.Apps.Events.Contents;
using Squidex.Infrastructure;
using Squidex.Infrastructure.Json;
using Squidex.Infrastructure.Log;
@ -30,19 +32,26 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Contents
private readonly IAppProvider appProvider;
private readonly IJsonSerializer serializer;
private readonly ITextIndexer indexer;
private readonly string typeAssetDeleted;
private readonly string typeContentDeleted;
private readonly MongoContentCollection contents;
public MongoContentRepository(IMongoDatabase database, IAppProvider appProvider, IJsonSerializer serializer, ITextIndexer indexer)
public MongoContentRepository(IMongoDatabase database, IAppProvider appProvider, IJsonSerializer serializer, ITextIndexer indexer, TypeNameRegistry typeNameRegistry)
{
Guard.NotNull(appProvider, nameof(appProvider));
Guard.NotNull(database, nameof(database));
Guard.NotNull(serializer, nameof(serializer));
Guard.NotNull(indexer, nameof(ITextIndexer));
Guard.NotNull(indexer, nameof(indexer));
Guard.NotNull(typeNameRegistry, nameof(typeNameRegistry));
this.appProvider = appProvider;
this.database = database;
this.indexer = indexer;
this.serializer = serializer;
typeAssetDeleted = typeNameRegistry.GetName<AssetDeleted>();
typeContentDeleted = typeNameRegistry.GetName<ContentDeleted>();
contents = new MongoContentCollection(database, serializer, appProvider);
}

5
src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentRepository_EventHandling.cs

@ -26,6 +26,11 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Contents
get { return "^(content-)|(asset-)"; }
}
public bool Handles(StoredEvent @event)
{
return @event.Data.Type == typeAssetDeleted || @event.Data.Type == typeContentDeleted;
}
public Task On(Envelope<IEvent> @event)
{
return this.DispatchActionAsync(@event.Payload);

5
src/Squidex.Domain.Apps.Entities/Assets/AssetUsageTracker_EventHandling.cs

@ -27,6 +27,11 @@ namespace Squidex.Domain.Apps.Entities.Assets
get { return "^asset-"; }
}
public bool Handles(StoredEvent @event)
{
return true;
}
public Task ClearAsync()
{
return TaskHelper.Done;

15
src/Squidex.Domain.Apps.Entities/History/HistoryService.cs

@ -50,6 +50,16 @@ namespace Squidex.Domain.Apps.Entities.History
this.repository = repository;
}
public bool Handles(StoredEvent @event)
{
return true;
}
public Task ClearAsync()
{
return repository.ClearAsync();
}
public async Task On(Envelope<IEvent> @event)
{
foreach (var creator in creators)
@ -70,11 +80,6 @@ namespace Squidex.Domain.Apps.Entities.History
}
}
public Task ClearAsync()
{
return repository.ClearAsync();
}
public async Task<IReadOnlyList<ParsedHistoryEvent>> QueryByChannelAsync(Guid appId, string channelPrefix, int count)
{
var items = await repository.QueryByChannelAsync(appId, channelPrefix, count);

5
src/Squidex.Domain.Apps.Entities/Rules/RuleEnqueuer.cs

@ -52,6 +52,11 @@ namespace Squidex.Domain.Apps.Entities.Rules
this.ruleService = ruleService;
}
public bool Handles(StoredEvent @event)
{
return true;
}
public Task ClearAsync()
{
return TaskHelper.Done;

5
src/Squidex.Infrastructure.RabbitMq/CQRS/Events/RabbitMqEventConsumer.cs

@ -81,6 +81,11 @@ namespace Squidex.Infrastructure.CQRS.Events
}
}
public bool Handles(StoredEvent @event)
{
return true;
}
public Task ClearAsync()
{
return TaskHelper.Done;

5
src/Squidex.Infrastructure/EventSourcing/CompoundEventConsumer.cs

@ -56,6 +56,11 @@ namespace Squidex.Infrastructure.EventSourcing
EventsFilter = string.Join("|", innerFilters);
}
public bool Handles(StoredEvent @event)
{
return inners.Any(x => x.Handles(@event));
}
public Task ClearAsync()
{
return Task.WhenAll(inners.Select(i => i.ClearAsync()));

11
src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerGrain.cs

@ -70,11 +70,14 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
return DoAndUpdateStateAsync(async () =>
{
var @event = ParseKnownEvent(storedEvent.Value);
if (@event != null)
if (eventConsumer.Handles(storedEvent.Value))
{
await DispatchConsumerAsync(@event);
var @event = ParseKnownEvent(storedEvent.Value);
if (@event != null)
{
await DispatchConsumerAsync(@event);
}
}
State = State.Handled(storedEvent.Value.EventPosition);

2
src/Squidex.Infrastructure/EventSourcing/IEventConsumer.cs

@ -17,6 +17,8 @@ namespace Squidex.Infrastructure.EventSourcing
string EventsFilter { get; }
bool Handles(StoredEvent @event);
Task ClearAsync();
Task On(Envelope<IEvent> @event);

36
tests/Squidex.Infrastructure.Tests/EventSourcing/CompoundEventConsumerTests.cs

@ -91,5 +91,41 @@ namespace Squidex.Infrastructure.EventSourcing
A.CallTo(() => consumer1.On(@event)).MustHaveHappened();
A.CallTo(() => consumer2.On(@event)).MustHaveHappened();
}
[Fact]
public void Should_handle_if_any_consumer_handles()
{
var stored = new StoredEvent("Stream", "1", 1, new EventData("Type", new EnvelopeHeaders(), "Payload"));
A.CallTo(() => consumer1.Handles(stored))
.Returns(false);
A.CallTo(() => consumer2.Handles(stored))
.Returns(true);
var sut = new CompoundEventConsumer("consumer-name", consumer1, consumer2);
var result = sut.Handles(stored);
Assert.True(result);
}
[Fact]
public void Should_no_handle_if_no_consumer_handles()
{
var stored = new StoredEvent("Stream", "1", 1, new EventData("Type", new EnvelopeHeaders(), "Payload"));
A.CallTo(() => consumer1.Handles(stored))
.Returns(false);
A.CallTo(() => consumer2.Handles(stored))
.Returns(false);
var sut = new CompoundEventConsumer("consumer-name", consumer1, consumer2);
var result = sut.Handles(stored);
Assert.False(result);
}
}
}

25
tests/Squidex.Infrastructure.Tests/EventSourcing/Grains/EventConsumerGrainTests.cs

@ -76,6 +76,9 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
A.CallTo(() => eventConsumer.Name)
.Returns(consumerName);
A.CallTo(() => eventConsumer.Handles(A<StoredEvent>.Ignored))
.Returns(true);
A.CallTo(() => persistence.ReadAsync(EtagVersion.Any))
.Invokes(new Action<long>(s => apply(state)));
@ -193,6 +196,28 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
.MustHaveHappened(1, Times.Exactly);
}
[Fact]
public async Task Should_not_invoke_but_update_position_when_consumer_does_not_want_to_handle()
{
var @event = new StoredEvent("Stream", Guid.NewGuid().ToString(), 123, eventData);
A.CallTo(() => eventConsumer.Handles(@event))
.Returns(false);
await sut.ActivateAsync(consumerName);
await sut.ActivateAsync();
await OnEventAsync(eventSubscription, @event);
state.Should().BeEquivalentTo(new EventConsumerState { IsStopped = false, Position = @event.EventPosition, Error = null });
A.CallTo(() => persistence.WriteSnapshotAsync(A<EventConsumerState>.Ignored))
.MustHaveHappened(1, Times.Exactly);
A.CallTo(() => eventConsumer.On(envelope))
.MustNotHaveHappened();
}
[Fact]
public async Task Should_ignore_old_events()
{

Loading…
Cancel
Save