From 979f1862435999be5fe1e28d68eaa8ed9f91bc74 Mon Sep 17 00:00:00 2001 From: Sebastian Date: Fri, 18 Sep 2020 11:44:42 +0200 Subject: [PATCH] Fix event consumer. --- .../EventSourcing/Grains/BatchSubscriber.cs | 9 +++++++-- .../EventSourcing/Grains/EventConsumerGrain.cs | 14 ++++++++++++-- .../EventSourcing/MongoEventStoreFixture.cs | 9 ++++++--- .../EventSourcing/MongoParallelInsertTests.cs | 1 + 4 files changed, 26 insertions(+), 7 deletions(-) diff --git a/backend/src/Squidex.Infrastructure/EventSourcing/Grains/BatchSubscriber.cs b/backend/src/Squidex.Infrastructure/EventSourcing/Grains/BatchSubscriber.cs index 01ff81ab3..7cf94a5fb 100644 --- a/backend/src/Squidex.Infrastructure/EventSourcing/Grains/BatchSubscriber.cs +++ b/backend/src/Squidex.Infrastructure/EventSourcing/Grains/BatchSubscriber.cs @@ -22,6 +22,11 @@ namespace Squidex.Infrastructure.EventSourcing.Grains private readonly IEventSubscription eventSubscription; private readonly IDataflowBlock pipelineEnd; + public object Sender + { + get { return eventSubscription.Sender; } + } + private sealed class Job { public StoredEvent? StoredEvent { get; set; } @@ -91,11 +96,11 @@ namespace Squidex.Infrastructure.EventSourcing.Grains if (exception != null) { - await grain.OnErrorAsync(exception); + await grain.OnErrorAsync(Sender, exception); } else { - await grain.OnEventsAsync(GetEvents(jobsBySender), GetPosition(jobsBySender)); + await grain.OnEventsAsync(Sender, GetEvents(jobsBySender), GetPosition(jobsBySender)); } } } diff --git a/backend/src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerGrain.cs b/backend/src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerGrain.cs index 84cf9fd80..a45052bb3 100644 --- a/backend/src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerGrain.cs +++ b/backend/src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerGrain.cs @@ -81,8 +81,13 @@ namespace Squidex.Infrastructure.EventSourcing.Grains return State.ToInfo(eventConsumer!.Name).AsImmutable(); } - public Task OnEventsAsync(IReadOnlyList> events, string position) + public Task OnEventsAsync(object sender, IReadOnlyList> events, string position) { + if (!ReferenceEquals(sender, currentSubscriber?.Sender)) + { + return Task.CompletedTask; + } + return DoAndUpdateStateAsync(async () => { await DispatchAsync(events); @@ -91,8 +96,13 @@ namespace Squidex.Infrastructure.EventSourcing.Grains }); } - public Task OnErrorAsync(Exception exception) + public Task OnErrorAsync(object sender, Exception exception) { + if (!ReferenceEquals(sender, currentSubscriber?.Sender)) + { + return Task.CompletedTask; + } + return DoAndUpdateStateAsync(() => { Unsubscribe(); diff --git a/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/MongoEventStoreFixture.cs b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/MongoEventStoreFixture.cs index bef26dd3b..c180e5608 100644 --- a/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/MongoEventStoreFixture.cs +++ b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/MongoEventStoreFixture.cs @@ -27,18 +27,21 @@ namespace Squidex.Infrastructure.EventSourcing mongoClient = new MongoClient(connectionString); mongoDatabase = mongoClient.GetDatabase($"EventStoreTest"); - Dispose(); - BsonJsonConvention.Register(JsonSerializer.Create(JsonHelper.DefaultSettings())); EventStore = new MongoEventStore(mongoDatabase, notifier); EventStore.InitializeAsync().Wait(); } - public void Dispose() + public void Cleanup() { mongoClient.DropDatabase("EventStoreTest"); } + + public void Dispose() + { + Cleanup(); + } } public sealed class MongoEventStoreDirectFixture : MongoEventStoreFixture diff --git a/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/MongoParallelInsertTests.cs b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/MongoParallelInsertTests.cs index 295fddf5c..9df79a8c9 100644 --- a/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/MongoParallelInsertTests.cs +++ b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/MongoParallelInsertTests.cs @@ -212,6 +212,7 @@ namespace Squidex.Infrastructure.EventSourcing public MongoParallelInsertTests(MongoEventStoreReplicaSetFixture fixture) { _ = fixture; + _.Cleanup(); var typeNameRegistry = new TypeNameRegistry().Map(typeof(MyEvent), "My");