From 8abffe9e526f572a83f235fbcd28e637b38c09e4 Mon Sep 17 00:00:00 2001 From: Sebastian Stehle Date: Mon, 3 Apr 2017 20:31:19 +0200 Subject: [PATCH] Error handling improved --- .../CQRS/Events/EventReceiver.cs | 51 +++++-- .../Events/Internal/DispatchEventBlock.cs | 76 ++++++++--- .../Events/Internal/EventReceiverBlock.cs | 124 ----------------- .../Events/Internal/IEventReceiverBlock.cs | 21 +++ .../CQRS/Events/Internal/ParseEventBlock.cs | 71 ++++++++-- .../CQRS/Events/Internal/QueryEventsBlock.cs | 127 +++++++++++++----- .../CQRS/Events/Internal/UpdateStateBlock.cs | 71 ++++++++-- .../CQRS/Events/EventReceiverTests.cs | 4 +- 8 files changed, 335 insertions(+), 210 deletions(-) delete mode 100644 src/Squidex.Infrastructure/CQRS/Events/Internal/EventReceiverBlock.cs create mode 100644 src/Squidex.Infrastructure/CQRS/Events/Internal/IEventReceiverBlock.cs diff --git a/src/Squidex.Infrastructure/CQRS/Events/EventReceiver.cs b/src/Squidex.Infrastructure/CQRS/Events/EventReceiver.cs index 09d82653c..db2848efe 100644 --- a/src/Squidex.Infrastructure/CQRS/Events/EventReceiver.cs +++ b/src/Squidex.Infrastructure/CQRS/Events/EventReceiver.cs @@ -8,7 +8,6 @@ using System; using System.Threading; -using System.Threading.Tasks; using Squidex.Infrastructure.CQRS.Events.Internal; using Squidex.Infrastructure.Log; @@ -28,6 +27,7 @@ namespace Squidex.Infrastructure.CQRS.Events private DispatchEventBlock dispatchEventBlock; private UpdateStateBlock updateStateBlock; private ParseEventBlock parseEventBlock; + private IEventReceiverBlock[] blocks; private Timer timer; public EventReceiver( @@ -73,7 +73,7 @@ namespace Squidex.Infrastructure.CQRS.Events public void Next() { - queryEventsBlock.NextOrThrowAway(null); + queryEventsBlock.NextOrThrowAway(); } public void Subscribe(IEventConsumer eventConsumer, int delay = 5000, bool autoTrigger = true) @@ -85,33 +85,60 @@ namespace Squidex.Infrastructure.CQRS.Events return; } - updateStateBlock = new UpdateStateBlock(eventConsumer, eventConsumerInfoRepository, log); + var onError = new Action(ex => Stop(ex, eventConsumer)); - dispatchEventBlock = new DispatchEventBlock(eventConsumer, eventConsumerInfoRepository, log); + updateStateBlock = new UpdateStateBlock(eventConsumerInfoRepository, eventConsumer, log); + updateStateBlock.OnError = onError; + + dispatchEventBlock = new DispatchEventBlock(eventConsumer, log); + dispatchEventBlock.OnError = onError; dispatchEventBlock.LinkTo(updateStateBlock.Target); - parseEventBlock = new ParseEventBlock(eventConsumer, eventConsumerInfoRepository, log, formatter); + parseEventBlock = new ParseEventBlock(formatter, log); + parseEventBlock.OnError = onError; parseEventBlock.LinkTo(dispatchEventBlock.Target); - queryEventsBlock = new QueryEventsBlock(eventConsumer, eventConsumerInfoRepository, log, eventStore); + queryEventsBlock = new QueryEventsBlock(eventConsumerInfoRepository, eventConsumer, eventStore, log); queryEventsBlock.OnEvent = parseEventBlock.NextAsync; queryEventsBlock.OnReset = Reset; + queryEventsBlock.OnError = onError; queryEventsBlock.Completion.ContinueWith(x => parseEventBlock.Complete()); + blocks = new IEventReceiverBlock[] { updateStateBlock, dispatchEventBlock, parseEventBlock, queryEventsBlock }; + if (autoTrigger) { - timer = new Timer(x => queryEventsBlock.NextOrThrowAway(null), null, 0, delay); + timer = new Timer(x => queryEventsBlock.NextOrThrowAway(), null, 0, delay); + } + + eventNotifier.Subscribe(() => queryEventsBlock.NextOrThrowAway()); + } + + private void Stop(Exception ex, IEventConsumer eventConsumer) + { + foreach (var block in blocks) + { + block.Stop(); } - eventNotifier.Subscribe(() => queryEventsBlock.NextOrThrowAway(null)); + try + { + eventConsumerInfoRepository.StopAsync(eventConsumer.Name, ex.ToString()).Wait(); + } + catch (Exception ex2) + { + log.LogFatal(ex2, w => w + .WriteProperty("action", "StopConsumer") + .WriteProperty("state", "Failed")); + } } private void Reset() { - dispatchEventBlock.Reset(); - parseEventBlock.Reset(); - queryEventsBlock.Reset(); - updateStateBlock.Reset(); + foreach (var block in blocks) + { + block.Reset(); + } } } } \ No newline at end of file diff --git a/src/Squidex.Infrastructure/CQRS/Events/Internal/DispatchEventBlock.cs b/src/Squidex.Infrastructure/CQRS/Events/Internal/DispatchEventBlock.cs index eb992cf94..022314177 100644 --- a/src/Squidex.Infrastructure/CQRS/Events/Internal/DispatchEventBlock.cs +++ b/src/Squidex.Infrastructure/CQRS/Events/Internal/DispatchEventBlock.cs @@ -8,26 +8,73 @@ using System; using System.Threading.Tasks; +using System.Threading.Tasks.Dataflow; using Squidex.Infrastructure.Log; namespace Squidex.Infrastructure.CQRS.Events.Internal { - internal sealed class DispatchEventBlock : EventReceiverBlock, Envelope> + internal sealed class DispatchEventBlock : IEventReceiverBlock { - public DispatchEventBlock(IEventConsumer eventConsumer, IEventConsumerInfoRepository eventConsumerInfoRepository, ISemanticLog log) - : base(true, eventConsumer, eventConsumerInfoRepository, log) + private readonly ISemanticLog log; + private readonly IEventConsumer eventConsumer; + private readonly TransformBlock, Envelope> transformBlock; + private long lastReceivedEventNumber = -1; + private bool isRunning = true; + + public Action OnError { get; set; } + + public ITargetBlock> Target + { + get { return transformBlock; } + } + + public DispatchEventBlock(IEventConsumer eventConsumer, ISemanticLog log) + { + this.eventConsumer = eventConsumer; + + this.log = log; + + var nullHandler = new ActionBlock>(x => { }); + + transformBlock = + new TransformBlock, Envelope>(x => HandleAsync(x), + new ExecutionDataflowBlockOptions { BoundedCapacity = 1 }); + transformBlock.LinkTo(nullHandler, x => x == null); + } + + public void LinkTo(ITargetBlock> target) + { + transformBlock.LinkTo(target, new DataflowLinkOptions { PropagateCompletion = true }); + } + + public void Stop() { + isRunning = false; } - protected override async Task> On(Envelope input) + public void Reset() { - var consumerName = EventConsumer.Name; + isRunning = true; + + lastReceivedEventNumber = -1; + } + + private async Task> HandleAsync(Envelope input) + { + var eventNumber = input.Headers.EventNumber(); + + if (eventNumber <= lastReceivedEventNumber || !isRunning) + { + return null; + } + + var consumerName = eventConsumer.Name; var eventId = input.Headers.EventId().ToString(); var eventType = input.Payload.GetType().Name; try { - Log.LogInformation(w => w + log.LogInformation(w => w .WriteProperty("action", "HandleEvent") .WriteProperty("actionId", eventId) .WriteProperty("state", "Started") @@ -35,9 +82,9 @@ namespace Squidex.Infrastructure.CQRS.Events.Internal .WriteProperty("eventType", eventType) .WriteProperty("eventConsumer", consumerName)); - await EventConsumer.On(input); + await eventConsumer.On(input); - Log.LogInformation(w => w + log.LogInformation(w => w .WriteProperty("action", "HandleEvent") .WriteProperty("actionId", eventId) .WriteProperty("state", "Completed") @@ -45,11 +92,15 @@ namespace Squidex.Infrastructure.CQRS.Events.Internal .WriteProperty("eventType", eventType) .WriteProperty("eventConsumer", consumerName)); + lastReceivedEventNumber = eventNumber; + return input; } catch (Exception ex) { - Log.LogError(ex, w => w + OnError?.Invoke(ex); + + log.LogError(ex, w => w .WriteProperty("action", "HandleEvent") .WriteProperty("actionId", eventId) .WriteProperty("state", "Started") @@ -57,13 +108,8 @@ namespace Squidex.Infrastructure.CQRS.Events.Internal .WriteProperty("eventType", eventType) .WriteProperty("eventConsumer", consumerName)); - throw; + return null; } } - - protected override long GetEventNumber(Envelope input) - { - return input.Headers.EventNumber(); - } } } diff --git a/src/Squidex.Infrastructure/CQRS/Events/Internal/EventReceiverBlock.cs b/src/Squidex.Infrastructure/CQRS/Events/Internal/EventReceiverBlock.cs deleted file mode 100644 index ac0c3f757..000000000 --- a/src/Squidex.Infrastructure/CQRS/Events/Internal/EventReceiverBlock.cs +++ /dev/null @@ -1,124 +0,0 @@ -// ========================================================================== -// EventReceiverBlock.cs -// Squidex Headless CMS -// ========================================================================== -// Copyright (c) Squidex Group -// All rights reserved. -// ========================================================================== - -using System; -using System.Threading.Tasks; -using Squidex.Infrastructure.Log; -using System.Threading.Tasks.Dataflow; - -namespace Squidex.Infrastructure.CQRS.Events.Internal -{ - public abstract class EventReceiverBlock - { - private long lastEventNumber = -1; - - protected ISemanticLog Log { get; } - - protected IEventConsumer EventConsumer { get; } - - protected IEventConsumerInfoRepository EventConsumerInfoRepository { get; } - - public ITargetBlock Target { get; } - - public Task Completion - { - get { return Target.Completion; } - } - - protected EventReceiverBlock(bool transform, IEventConsumer eventConsumer, IEventConsumerInfoRepository eventConsumerInfoRepository, ISemanticLog log) - { - EventConsumer = eventConsumer; - EventConsumerInfoRepository = eventConsumerInfoRepository; - - Log = log; - - if (transform) - { - var nullHandlerBlock = - new ActionBlock(_ => { }); - - var transformBlock = - new TransformBlock(new Func>(HandleAsync), - new ExecutionDataflowBlockOptions { BoundedCapacity = 1 }); - transformBlock.LinkTo(nullHandlerBlock, new DataflowLinkOptions { PropagateCompletion = true }, x => x == null); - - Target = transformBlock; - } - else - { - Target = - new ActionBlock(HandleAsync, - new ExecutionDataflowBlockOptions { BoundedCapacity = 1 }); - } - } - - public Task NextAsync(TInput input) - { - return Target.SendAsync(input); - } - - public void NextOrThrowAway(TInput input) - { - Target.Post(input); - } - - public void Complete() - { - Target.Complete(); - } - - public void Reset() - { - lastEventNumber = -1; - } - - public void LinkTo(ITargetBlock other) - { - if (Target is TransformBlock transformBlock) - { - transformBlock.LinkTo(other, new DataflowLinkOptions { PropagateCompletion = true }, e => e != null); - } - } - - protected abstract Task On(TInput input); - - protected abstract long GetEventNumber(TInput input); - - private async Task HandleAsync(TInput input) - { - try - { - var eventNumber = GetEventNumber(input); - - if (eventNumber > lastEventNumber) - { - var envelope = await On(input); - - lastEventNumber = eventNumber; - - return envelope; - } - } - catch (Exception ex) - { - Log.LogFatal(ex, w => w.WriteProperty("action", "EventHandlingFailed")); - - try - { - await EventConsumerInfoRepository.StopAsync(EventConsumer.Name, ex.ToString()); - } - catch (Exception ex2) - { - Log.LogFatal(ex2, w => w.WriteProperty("action", "EventHandlingFailed")); - } - } - - return default(TOutput); - } - } -} diff --git a/src/Squidex.Infrastructure/CQRS/Events/Internal/IEventReceiverBlock.cs b/src/Squidex.Infrastructure/CQRS/Events/Internal/IEventReceiverBlock.cs new file mode 100644 index 000000000..4cb8d0d00 --- /dev/null +++ b/src/Squidex.Infrastructure/CQRS/Events/Internal/IEventReceiverBlock.cs @@ -0,0 +1,21 @@ +// ========================================================================== +// IEventReceiverBlock.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +using System; + +namespace Squidex.Infrastructure.CQRS.Events.Internal +{ + public interface IEventReceiverBlock + { + Action OnError { get; set; } + + void Reset(); + + void Stop(); + } +} diff --git a/src/Squidex.Infrastructure/CQRS/Events/Internal/ParseEventBlock.cs b/src/Squidex.Infrastructure/CQRS/Events/Internal/ParseEventBlock.cs index da6447ae0..84b280457 100644 --- a/src/Squidex.Infrastructure/CQRS/Events/Internal/ParseEventBlock.cs +++ b/src/Squidex.Infrastructure/CQRS/Events/Internal/ParseEventBlock.cs @@ -1,5 +1,5 @@ // ========================================================================== -// ParseEventBlock.cs +// DispatchEventBlock.cs // Squidex Headless CMS // ========================================================================== // Copyright (c) Squidex Group @@ -8,22 +8,70 @@ using System; using System.Threading.Tasks; +using System.Threading.Tasks.Dataflow; using Squidex.Infrastructure.Log; namespace Squidex.Infrastructure.CQRS.Events.Internal { - internal sealed class ParseEventBlock : EventReceiverBlock> + internal sealed class ParseEventBlock : IEventReceiverBlock { private readonly EventDataFormatter formatter; + private readonly ISemanticLog log; + private readonly TransformBlock> transformBlock; + private long lastReceivedEventNumber = -1; + private bool isRunning = true; - public ParseEventBlock(IEventConsumer eventConsumer, IEventConsumerInfoRepository eventConsumerInfoRepository, ISemanticLog log, EventDataFormatter formatter) - : base(true, eventConsumer, eventConsumerInfoRepository, log) + public Action OnError { get; set; } + + public ParseEventBlock(EventDataFormatter formatter, ISemanticLog log) { this.formatter = formatter; + this.log = log; + + var nullHandler = new ActionBlock>(x => { }); + + transformBlock = + new TransformBlock>(x => HandleAsync(x), + new ExecutionDataflowBlockOptions { BoundedCapacity = 1 }); + transformBlock.LinkTo(nullHandler, x => x == null); + } + + public Task NextAsync(StoredEvent input) + { + return transformBlock.SendAsync(input); + } + + public void LinkTo(ITargetBlock> target) + { + transformBlock.LinkTo(target, new DataflowLinkOptions { PropagateCompletion = true }); } - protected override Task> On(StoredEvent input) + public void Complete() { + transformBlock.Complete(); + } + + public void Stop() + { + isRunning = false; + } + + public void Reset() + { + isRunning = true; + + lastReceivedEventNumber = -1; + } + + private Envelope HandleAsync(StoredEvent input) + { + var eventNumber = input.EventNumber; + + if (eventNumber <= lastReceivedEventNumber || !isRunning) + { + return null; + } + try { var result = formatter.Parse(input.Data); @@ -31,23 +79,20 @@ namespace Squidex.Infrastructure.CQRS.Events.Internal result.SetEventNumber(input.EventNumber); result.SetEventStreamNumber(input.EventStreamNumber); - return Task.FromResult(result); + return result; } catch (Exception ex) { - Log.LogFatal(ex, w => w + OnError?.Invoke(ex); + + log.LogFatal(ex, w => w .WriteProperty("action", "ParseEvent") .WriteProperty("state", "Failed") .WriteProperty("eventId", input.Data.EventId.ToString()) .WriteProperty("eventNumber", input.EventNumber)); - throw; + return null; } } - - protected override long GetEventNumber(StoredEvent input) - { - return input.EventNumber; - } } } diff --git a/src/Squidex.Infrastructure/CQRS/Events/Internal/QueryEventsBlock.cs b/src/Squidex.Infrastructure/CQRS/Events/Internal/QueryEventsBlock.cs index af88602cc..05013da86 100644 --- a/src/Squidex.Infrastructure/CQRS/Events/Internal/QueryEventsBlock.cs +++ b/src/Squidex.Infrastructure/CQRS/Events/Internal/QueryEventsBlock.cs @@ -9,73 +9,135 @@ using System; using System.Threading; using System.Threading.Tasks; +using System.Threading.Tasks.Dataflow; using Squidex.Infrastructure.Log; -// ReSharper disable InvertIf - namespace Squidex.Infrastructure.CQRS.Events.Internal { - internal sealed class QueryEventsBlock : EventReceiverBlock + internal sealed class QueryEventsBlock : IEventReceiverBlock { + private readonly ISemanticLog log; + private readonly IEventConsumerInfoRepository eventConsumerInfoRepository; + private readonly IEventConsumer eventConsumer; private readonly IEventStore eventStore; + private readonly ActionBlock> actionBlock; + private bool isRunning = true; private bool isStarted; - private long handled; - public Func OnEvent { get; set; } + public Action OnError { get; set; } public Action OnReset { get; set; } - public QueryEventsBlock(IEventConsumer eventConsumer, IEventConsumerInfoRepository eventConsumerInfoRepository, ISemanticLog log, IEventStore eventStore) - : base(false, eventConsumer, eventConsumerInfoRepository, log) + public Func OnEvent { get; set; } + + public ITargetBlock> Target + { + get { return actionBlock; } + } + + public Task Completion + { + get { return actionBlock.Completion; } + } + + public QueryEventsBlock(IEventConsumerInfoRepository eventConsumerInfoRepository, IEventConsumer eventConsumer, IEventStore eventStore, ISemanticLog log) { + this.eventConsumerInfoRepository = eventConsumerInfoRepository; + this.eventConsumer = eventConsumer; this.eventStore = eventStore; + + this.log = log; + + actionBlock = + new ActionBlock>(HandleAsync, + new ExecutionDataflowBlockOptions { BoundedCapacity = 1 }); } - protected override async Task On(object input) + public void Complete() { - if (!isStarted) - { - await EventConsumerInfoRepository.CreateAsync(EventConsumer.Name); + actionBlock.Complete(); + } - isStarted = true; - } + public void NextOrThrowAway() + { + actionBlock.Post(null); + } - var status = await EventConsumerInfoRepository.FindAsync(EventConsumer.Name); + public void Stop() + { + isRunning = false; + } - var lastReceivedEventNumber = status.LastHandledEventNumber; + public void Reset() + { + isRunning = true; + } - if (status.IsResetting) + private async Task HandleAsync(object input) + { + try { - await ResetAsync(); - } + if (!isStarted) + { + await eventConsumerInfoRepository.CreateAsync(eventConsumer.Name); - if (!status.IsStopped) - { - var ct = CancellationToken.None; + isStarted = true; + } - await eventStore.GetEventsAsync(storedEvent => OnEvent?.Invoke(storedEvent), ct, null, lastReceivedEventNumber); - } + var status = await eventConsumerInfoRepository.FindAsync(eventConsumer.Name); + + var lastReceivedEventNumber = status.LastHandledEventNumber; + + if (status.IsResetting) + { + await ResetAsync(); + + Reset(); + } + + if (!status.IsStopped || !isRunning) + { + var ct = new CancellationTokenSource(); - return null; + await eventStore.GetEventsAsync(async storedEvent => + { + if (!isRunning) + { + ct.Cancel(); + } + + var onEvent = OnEvent; + + if (onEvent != null) + { + await onEvent(storedEvent); + } + }, ct.Token, null, lastReceivedEventNumber); + } + } + catch (Exception ex) + { + OnError?.Invoke(ex); + } } private async Task ResetAsync() { - var consumerName = EventConsumer.Name; + var consumerName = eventConsumer.Name; var actionId = Guid.NewGuid().ToString(); try { - Log.LogInformation(w => w + log.LogInformation(w => w .WriteProperty("action", "EventConsumerReset") .WriteProperty("actionId", actionId) .WriteProperty("state", "Started") .WriteProperty("eventConsumer", consumerName)); - await EventConsumer.ClearAsync(); - await EventConsumerInfoRepository.SetLastHandledEventNumberAsync(consumerName, -1); + await eventConsumer.ClearAsync(); + await eventConsumerInfoRepository.SetLastHandledEventNumberAsync(consumerName, -1); - Log.LogInformation(w => w + log.LogInformation(w => w .WriteProperty("action", "EventConsumerReset") .WriteProperty("actionId", actionId) .WriteProperty("state", "Completed") @@ -85,17 +147,12 @@ namespace Squidex.Infrastructure.CQRS.Events.Internal } catch (Exception ex) { - Log.LogFatal(ex, w => w + log.LogFatal(ex, w => w .WriteProperty("action", "EventConsumerReset") .WriteProperty("actionId", actionId) .WriteProperty("state", "Completed") .WriteProperty("eventConsumer", consumerName)); } } - - protected override long GetEventNumber(object input) - { - return handled++; - } } } diff --git a/src/Squidex.Infrastructure/CQRS/Events/Internal/UpdateStateBlock.cs b/src/Squidex.Infrastructure/CQRS/Events/Internal/UpdateStateBlock.cs index 3a1461be8..0626f5b65 100644 --- a/src/Squidex.Infrastructure/CQRS/Events/Internal/UpdateStateBlock.cs +++ b/src/Squidex.Infrastructure/CQRS/Events/Internal/UpdateStateBlock.cs @@ -1,33 +1,86 @@ // ========================================================================== -// UpdateStateBlock.cs +// DispatchEventBlock.cs // Squidex Headless CMS // ========================================================================== // Copyright (c) Squidex Group // All rights reserved. // ========================================================================== +using System; using System.Threading.Tasks; +using System.Threading.Tasks.Dataflow; using Squidex.Infrastructure.Log; namespace Squidex.Infrastructure.CQRS.Events.Internal { - public sealed class UpdateStateBlock : EventReceiverBlock, Envelope> + internal sealed class UpdateStateBlock : IEventReceiverBlock { - public UpdateStateBlock(IEventConsumer eventConsumer, IEventConsumerInfoRepository eventConsumerInfoRepository, ISemanticLog log) - : base(false, eventConsumer, eventConsumerInfoRepository, log) + private readonly ISemanticLog log; + private readonly IEventConsumerInfoRepository eventConsumerInfoRepository; + private readonly IEventConsumer eventConsumer; + private readonly ActionBlock> actionBlock; + private long lastReceivedEventNumber = -1; + private bool isRunning = true; + + public Action OnError { get; set; } + + public ITargetBlock> Target + { + get { return actionBlock; } + } + + public Task Completion + { + get { return actionBlock.Completion; } + } + + public UpdateStateBlock(IEventConsumerInfoRepository eventConsumerInfoRepository, IEventConsumer eventConsumer, ISemanticLog log) { + this.eventConsumerInfoRepository = eventConsumerInfoRepository; + this.eventConsumer = eventConsumer; + + this.log = log; + + actionBlock = + new ActionBlock>(HandleAsync, + new ExecutionDataflowBlockOptions { BoundedCapacity = 1 }); + } + + public void Stop() + { + isRunning = false; } - protected override async Task> On(Envelope input) + public void Reset() { - await EventConsumerInfoRepository.SetLastHandledEventNumberAsync(EventConsumer.Name, input.Headers.EventNumber()); + isRunning = true; - return input; + lastReceivedEventNumber = -1; } - protected override long GetEventNumber(Envelope input) + private async Task HandleAsync(Envelope input) { - return input.Headers.EventNumber(); + var eventNumber = input.Headers.EventNumber(); + + if (eventNumber <= lastReceivedEventNumber || !isRunning) + { + return; + } + + try + { + await eventConsumerInfoRepository.SetLastHandledEventNumberAsync(eventConsumer.Name, eventNumber); + } + catch (Exception ex) + { + OnError?.Invoke(ex); + + log.LogFatal(ex, w => w + .WriteProperty("action", "UpdateState") + .WriteProperty("state", "Failed") + .WriteProperty("eventId", input.Headers.EventId().ToString()) + .WriteProperty("eventNumber", input.Headers.EventNumber())); + } } } } diff --git a/tests/Squidex.Infrastructure.Tests/CQRS/Events/EventReceiverTests.cs b/tests/Squidex.Infrastructure.Tests/CQRS/Events/EventReceiverTests.cs index 60165549c..47444876a 100644 --- a/tests/Squidex.Infrastructure.Tests/CQRS/Events/EventReceiverTests.cs +++ b/tests/Squidex.Infrastructure.Tests/CQRS/Events/EventReceiverTests.cs @@ -117,7 +117,7 @@ namespace Squidex.Infrastructure.CQRS.Events public void Should_subscribe_to_consumer_and_handle_events() { consumerInfo.LastHandledEventNumber = 2L; - + sut.Subscribe(eventConsumer.Object); sut.Next(); sut.Dispose(); @@ -169,7 +169,7 @@ namespace Squidex.Infrastructure.CQRS.Events { consumerInfo.IsResetting = true; consumerInfo.LastHandledEventNumber = 2L; - + sut.Subscribe(eventConsumer.Object, autoTrigger: false); sut.Next(); sut.Dispose();