Browse Source

Stop consumer on deactivation.

pull/715/head
Sebastian Stehle 5 years ago
parent
commit
05fb0af60b
  1. 5
      backend/src/Squidex.Infrastructure/EventSourcing/Grains/BatchSubscriber.cs
  2. 10
      backend/src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerGrain.cs

5
backend/src/Squidex.Infrastructure/EventSourcing/Grains/BatchSubscriber.cs

@ -45,7 +45,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
Func<IEventSubscriber, IEventSubscription> factory, Func<IEventSubscriber, IEventSubscription> factory,
TaskScheduler scheduler) TaskScheduler scheduler)
{ {
var batchSize = Math.Max(1, eventConsumer!.BatchSize); var batchSize = Math.Max(1, eventConsumer.BatchSize);
var batchDelay = Math.Max(100, eventConsumer.BatchDelay); var batchDelay = Math.Max(100, eventConsumer.BatchDelay);
var parse = new TransformBlock<Job, Job>(job => var parse = new TransformBlock<Job, Job>(job =>
@ -132,7 +132,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
buffer.LinkTo(handle, new DataflowLinkOptions buffer.LinkTo(handle, new DataflowLinkOptions
{ {
PropagateCompletion = true PropagateCompletion = true,
}); });
pipelineStart = parse; pipelineStart = parse;
@ -189,6 +189,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
var job = new Job var job = new Job
{ {
Sender = subscription, Sender = subscription,
StoredEvent = null,
Exception = exception Exception = exception
}; };

10
backend/src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerGrain.cs

@ -11,6 +11,7 @@ using System.Runtime.CompilerServices;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Squidex.Infrastructure.Orleans; using Squidex.Infrastructure.Orleans;
using Squidex.Infrastructure.Tasks;
using Squidex.Log; using Squidex.Log;
namespace Squidex.Infrastructure.EventSourcing.Grains namespace Squidex.Infrastructure.EventSourcing.Grains
@ -63,6 +64,13 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
return Task.CompletedTask; return Task.CompletedTask;
} }
public override Task OnDeactivateAsync()
{
CompleteAsync().Forget();
return Task.CompletedTask;
}
public async Task CompleteAsync() public async Task CompleteAsync()
{ {
if (currentSubscriber != null) if (currentSubscriber != null)
@ -296,4 +304,4 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
return eventStore.CreateSubscription(subscriber, eventConsumer!.EventsFilter, State.Position); return eventStore.CreateSubscription(subscriber, eventConsumer!.EventsFilter, State.Position);
} }
} }
} }

Loading…
Cancel
Save