diff --git a/backend/src/Squidex.Infrastructure/EventSourcing/Grains/BatchSubscriber.cs b/backend/src/Squidex.Infrastructure/EventSourcing/Grains/BatchSubscriber.cs index 0a048f434..adac80b49 100644 --- a/backend/src/Squidex.Infrastructure/EventSourcing/Grains/BatchSubscriber.cs +++ b/backend/src/Squidex.Infrastructure/EventSourcing/Grains/BatchSubscriber.cs @@ -45,7 +45,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains Func factory, TaskScheduler scheduler) { - var batchSize = Math.Max(1, eventConsumer!.BatchSize); + var batchSize = Math.Max(1, eventConsumer.BatchSize); var batchDelay = Math.Max(100, eventConsumer.BatchDelay); var parse = new TransformBlock(job => @@ -132,7 +132,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains buffer.LinkTo(handle, new DataflowLinkOptions { - PropagateCompletion = true + PropagateCompletion = true, }); pipelineStart = parse; @@ -189,6 +189,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains var job = new Job { Sender = subscription, + StoredEvent = null, Exception = exception }; diff --git a/backend/src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerGrain.cs b/backend/src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerGrain.cs index 9b6f3b023..d99ef13cd 100644 --- a/backend/src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerGrain.cs +++ b/backend/src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerGrain.cs @@ -11,6 +11,7 @@ using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; using Squidex.Infrastructure.Orleans; +using Squidex.Infrastructure.Tasks; using Squidex.Log; namespace Squidex.Infrastructure.EventSourcing.Grains @@ -63,6 +64,13 @@ namespace Squidex.Infrastructure.EventSourcing.Grains return Task.CompletedTask; } + public override Task OnDeactivateAsync() + { + CompleteAsync().Forget(); + + return Task.CompletedTask; + } + public async Task CompleteAsync() { if (currentSubscriber != null) @@ -296,4 +304,4 @@ namespace Squidex.Infrastructure.EventSourcing.Grains return eventStore.CreateSubscription(subscriber, eventConsumer!.EventsFilter, State.Position); } } -} \ No newline at end of file +}