Browse Source

Error handling improved

pull/1/head
Sebastian Stehle 9 years ago
parent
commit
8abffe9e52
  1. 51
      src/Squidex.Infrastructure/CQRS/Events/EventReceiver.cs
  2. 76
      src/Squidex.Infrastructure/CQRS/Events/Internal/DispatchEventBlock.cs
  3. 124
      src/Squidex.Infrastructure/CQRS/Events/Internal/EventReceiverBlock.cs
  4. 21
      src/Squidex.Infrastructure/CQRS/Events/Internal/IEventReceiverBlock.cs
  5. 71
      src/Squidex.Infrastructure/CQRS/Events/Internal/ParseEventBlock.cs
  6. 127
      src/Squidex.Infrastructure/CQRS/Events/Internal/QueryEventsBlock.cs
  7. 71
      src/Squidex.Infrastructure/CQRS/Events/Internal/UpdateStateBlock.cs
  8. 4
      tests/Squidex.Infrastructure.Tests/CQRS/Events/EventReceiverTests.cs

51
src/Squidex.Infrastructure/CQRS/Events/EventReceiver.cs

@ -8,7 +8,6 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Squidex.Infrastructure.CQRS.Events.Internal;
using Squidex.Infrastructure.Log;
@ -28,6 +27,7 @@ namespace Squidex.Infrastructure.CQRS.Events
private DispatchEventBlock dispatchEventBlock;
private UpdateStateBlock updateStateBlock;
private ParseEventBlock parseEventBlock;
private IEventReceiverBlock[] blocks;
private Timer timer;
public EventReceiver(
@ -73,7 +73,7 @@ namespace Squidex.Infrastructure.CQRS.Events
public void Next()
{
queryEventsBlock.NextOrThrowAway(null);
queryEventsBlock.NextOrThrowAway();
}
public void Subscribe(IEventConsumer eventConsumer, int delay = 5000, bool autoTrigger = true)
@ -85,33 +85,60 @@ namespace Squidex.Infrastructure.CQRS.Events
return;
}
updateStateBlock = new UpdateStateBlock(eventConsumer, eventConsumerInfoRepository, log);
var onError = new Action<Exception>(ex => Stop(ex, eventConsumer));
dispatchEventBlock = new DispatchEventBlock(eventConsumer, eventConsumerInfoRepository, log);
updateStateBlock = new UpdateStateBlock(eventConsumerInfoRepository, eventConsumer, log);
updateStateBlock.OnError = onError;
dispatchEventBlock = new DispatchEventBlock(eventConsumer, log);
dispatchEventBlock.OnError = onError;
dispatchEventBlock.LinkTo(updateStateBlock.Target);
parseEventBlock = new ParseEventBlock(eventConsumer, eventConsumerInfoRepository, log, formatter);
parseEventBlock = new ParseEventBlock(formatter, log);
parseEventBlock.OnError = onError;
parseEventBlock.LinkTo(dispatchEventBlock.Target);
queryEventsBlock = new QueryEventsBlock(eventConsumer, eventConsumerInfoRepository, log, eventStore);
queryEventsBlock = new QueryEventsBlock(eventConsumerInfoRepository, eventConsumer, eventStore, log);
queryEventsBlock.OnEvent = parseEventBlock.NextAsync;
queryEventsBlock.OnReset = Reset;
queryEventsBlock.OnError = onError;
queryEventsBlock.Completion.ContinueWith(x => parseEventBlock.Complete());
blocks = new IEventReceiverBlock[] { updateStateBlock, dispatchEventBlock, parseEventBlock, queryEventsBlock };
if (autoTrigger)
{
timer = new Timer(x => queryEventsBlock.NextOrThrowAway(null), null, 0, delay);
timer = new Timer(x => queryEventsBlock.NextOrThrowAway(), null, 0, delay);
}
eventNotifier.Subscribe(() => queryEventsBlock.NextOrThrowAway());
}
private void Stop(Exception ex, IEventConsumer eventConsumer)
{
foreach (var block in blocks)
{
block.Stop();
}
eventNotifier.Subscribe(() => queryEventsBlock.NextOrThrowAway(null));
try
{
eventConsumerInfoRepository.StopAsync(eventConsumer.Name, ex.ToString()).Wait();
}
catch (Exception ex2)
{
log.LogFatal(ex2, w => w
.WriteProperty("action", "StopConsumer")
.WriteProperty("state", "Failed"));
}
}
private void Reset()
{
dispatchEventBlock.Reset();
parseEventBlock.Reset();
queryEventsBlock.Reset();
updateStateBlock.Reset();
foreach (var block in blocks)
{
block.Reset();
}
}
}
}

76
src/Squidex.Infrastructure/CQRS/Events/Internal/DispatchEventBlock.cs

@ -8,26 +8,73 @@
using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using Squidex.Infrastructure.Log;
namespace Squidex.Infrastructure.CQRS.Events.Internal
{
internal sealed class DispatchEventBlock : EventReceiverBlock<Envelope<IEvent>, Envelope<IEvent>>
internal sealed class DispatchEventBlock : IEventReceiverBlock
{
public DispatchEventBlock(IEventConsumer eventConsumer, IEventConsumerInfoRepository eventConsumerInfoRepository, ISemanticLog log)
: base(true, eventConsumer, eventConsumerInfoRepository, log)
private readonly ISemanticLog log;
private readonly IEventConsumer eventConsumer;
private readonly TransformBlock<Envelope<IEvent>, Envelope<IEvent>> transformBlock;
private long lastReceivedEventNumber = -1;
private bool isRunning = true;
public Action<Exception> OnError { get; set; }
public ITargetBlock<Envelope<IEvent>> Target
{
get { return transformBlock; }
}
public DispatchEventBlock(IEventConsumer eventConsumer, ISemanticLog log)
{
this.eventConsumer = eventConsumer;
this.log = log;
var nullHandler = new ActionBlock<Envelope<IEvent>>(x => { });
transformBlock =
new TransformBlock<Envelope<IEvent>, Envelope<IEvent>>(x => HandleAsync(x),
new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });
transformBlock.LinkTo(nullHandler, x => x == null);
}
public void LinkTo(ITargetBlock<Envelope<IEvent>> target)
{
transformBlock.LinkTo(target, new DataflowLinkOptions { PropagateCompletion = true });
}
public void Stop()
{
isRunning = false;
}
protected override async Task<Envelope<IEvent>> On(Envelope<IEvent> input)
public void Reset()
{
var consumerName = EventConsumer.Name;
isRunning = true;
lastReceivedEventNumber = -1;
}
private async Task<Envelope<IEvent>> HandleAsync(Envelope<IEvent> input)
{
var eventNumber = input.Headers.EventNumber();
if (eventNumber <= lastReceivedEventNumber || !isRunning)
{
return null;
}
var consumerName = eventConsumer.Name;
var eventId = input.Headers.EventId().ToString();
var eventType = input.Payload.GetType().Name;
try
{
Log.LogInformation(w => w
log.LogInformation(w => w
.WriteProperty("action", "HandleEvent")
.WriteProperty("actionId", eventId)
.WriteProperty("state", "Started")
@ -35,9 +82,9 @@ namespace Squidex.Infrastructure.CQRS.Events.Internal
.WriteProperty("eventType", eventType)
.WriteProperty("eventConsumer", consumerName));
await EventConsumer.On(input);
await eventConsumer.On(input);
Log.LogInformation(w => w
log.LogInformation(w => w
.WriteProperty("action", "HandleEvent")
.WriteProperty("actionId", eventId)
.WriteProperty("state", "Completed")
@ -45,11 +92,15 @@ namespace Squidex.Infrastructure.CQRS.Events.Internal
.WriteProperty("eventType", eventType)
.WriteProperty("eventConsumer", consumerName));
lastReceivedEventNumber = eventNumber;
return input;
}
catch (Exception ex)
{
Log.LogError(ex, w => w
OnError?.Invoke(ex);
log.LogError(ex, w => w
.WriteProperty("action", "HandleEvent")
.WriteProperty("actionId", eventId)
.WriteProperty("state", "Started")
@ -57,13 +108,8 @@ namespace Squidex.Infrastructure.CQRS.Events.Internal
.WriteProperty("eventType", eventType)
.WriteProperty("eventConsumer", consumerName));
throw;
return null;
}
}
protected override long GetEventNumber(Envelope<IEvent> input)
{
return input.Headers.EventNumber();
}
}
}

124
src/Squidex.Infrastructure/CQRS/Events/Internal/EventReceiverBlock.cs

@ -1,124 +0,0 @@
// ==========================================================================
// EventReceiverBlock.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System;
using System.Threading.Tasks;
using Squidex.Infrastructure.Log;
using System.Threading.Tasks.Dataflow;
namespace Squidex.Infrastructure.CQRS.Events.Internal
{
public abstract class EventReceiverBlock<TInput, TOutput>
{
private long lastEventNumber = -1;
protected ISemanticLog Log { get; }
protected IEventConsumer EventConsumer { get; }
protected IEventConsumerInfoRepository EventConsumerInfoRepository { get; }
public ITargetBlock<TInput> Target { get; }
public Task Completion
{
get { return Target.Completion; }
}
protected EventReceiverBlock(bool transform, IEventConsumer eventConsumer, IEventConsumerInfoRepository eventConsumerInfoRepository, ISemanticLog log)
{
EventConsumer = eventConsumer;
EventConsumerInfoRepository = eventConsumerInfoRepository;
Log = log;
if (transform)
{
var nullHandlerBlock =
new ActionBlock<TOutput>(_ => { });
var transformBlock =
new TransformBlock<TInput, TOutput>(new Func<TInput, Task<TOutput>>(HandleAsync),
new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });
transformBlock.LinkTo(nullHandlerBlock, new DataflowLinkOptions { PropagateCompletion = true }, x => x == null);
Target = transformBlock;
}
else
{
Target =
new ActionBlock<TInput>(HandleAsync,
new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });
}
}
public Task NextAsync(TInput input)
{
return Target.SendAsync(input);
}
public void NextOrThrowAway(TInput input)
{
Target.Post(input);
}
public void Complete()
{
Target.Complete();
}
public void Reset()
{
lastEventNumber = -1;
}
public void LinkTo(ITargetBlock<TOutput> other)
{
if (Target is TransformBlock<TInput, TOutput> transformBlock)
{
transformBlock.LinkTo(other, new DataflowLinkOptions { PropagateCompletion = true }, e => e != null);
}
}
protected abstract Task<TOutput> On(TInput input);
protected abstract long GetEventNumber(TInput input);
private async Task<TOutput> HandleAsync(TInput input)
{
try
{
var eventNumber = GetEventNumber(input);
if (eventNumber > lastEventNumber)
{
var envelope = await On(input);
lastEventNumber = eventNumber;
return envelope;
}
}
catch (Exception ex)
{
Log.LogFatal(ex, w => w.WriteProperty("action", "EventHandlingFailed"));
try
{
await EventConsumerInfoRepository.StopAsync(EventConsumer.Name, ex.ToString());
}
catch (Exception ex2)
{
Log.LogFatal(ex2, w => w.WriteProperty("action", "EventHandlingFailed"));
}
}
return default(TOutput);
}
}
}

21
src/Squidex.Infrastructure/CQRS/Events/Internal/IEventReceiverBlock.cs

@ -0,0 +1,21 @@
// ==========================================================================
// IEventReceiverBlock.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System;
namespace Squidex.Infrastructure.CQRS.Events.Internal
{
public interface IEventReceiverBlock
{
Action<Exception> OnError { get; set; }
void Reset();
void Stop();
}
}

71
src/Squidex.Infrastructure/CQRS/Events/Internal/ParseEventBlock.cs

@ -1,5 +1,5 @@
// ==========================================================================
// ParseEventBlock.cs
// DispatchEventBlock.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
@ -8,22 +8,70 @@
using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using Squidex.Infrastructure.Log;
namespace Squidex.Infrastructure.CQRS.Events.Internal
{
internal sealed class ParseEventBlock : EventReceiverBlock<StoredEvent, Envelope<IEvent>>
internal sealed class ParseEventBlock : IEventReceiverBlock
{
private readonly EventDataFormatter formatter;
private readonly ISemanticLog log;
private readonly TransformBlock<StoredEvent, Envelope<IEvent>> transformBlock;
private long lastReceivedEventNumber = -1;
private bool isRunning = true;
public ParseEventBlock(IEventConsumer eventConsumer, IEventConsumerInfoRepository eventConsumerInfoRepository, ISemanticLog log, EventDataFormatter formatter)
: base(true, eventConsumer, eventConsumerInfoRepository, log)
public Action<Exception> OnError { get; set; }
public ParseEventBlock(EventDataFormatter formatter, ISemanticLog log)
{
this.formatter = formatter;
this.log = log;
var nullHandler = new ActionBlock<Envelope<IEvent>>(x => { });
transformBlock =
new TransformBlock<StoredEvent, Envelope<IEvent>>(x => HandleAsync(x),
new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });
transformBlock.LinkTo(nullHandler, x => x == null);
}
public Task NextAsync(StoredEvent input)
{
return transformBlock.SendAsync(input);
}
public void LinkTo(ITargetBlock<Envelope<IEvent>> target)
{
transformBlock.LinkTo(target, new DataflowLinkOptions { PropagateCompletion = true });
}
protected override Task<Envelope<IEvent>> On(StoredEvent input)
public void Complete()
{
transformBlock.Complete();
}
public void Stop()
{
isRunning = false;
}
public void Reset()
{
isRunning = true;
lastReceivedEventNumber = -1;
}
private Envelope<IEvent> HandleAsync(StoredEvent input)
{
var eventNumber = input.EventNumber;
if (eventNumber <= lastReceivedEventNumber || !isRunning)
{
return null;
}
try
{
var result = formatter.Parse(input.Data);
@ -31,23 +79,20 @@ namespace Squidex.Infrastructure.CQRS.Events.Internal
result.SetEventNumber(input.EventNumber);
result.SetEventStreamNumber(input.EventStreamNumber);
return Task.FromResult(result);
return result;
}
catch (Exception ex)
{
Log.LogFatal(ex, w => w
OnError?.Invoke(ex);
log.LogFatal(ex, w => w
.WriteProperty("action", "ParseEvent")
.WriteProperty("state", "Failed")
.WriteProperty("eventId", input.Data.EventId.ToString())
.WriteProperty("eventNumber", input.EventNumber));
throw;
return null;
}
}
protected override long GetEventNumber(StoredEvent input)
{
return input.EventNumber;
}
}
}

127
src/Squidex.Infrastructure/CQRS/Events/Internal/QueryEventsBlock.cs

@ -9,73 +9,135 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using Squidex.Infrastructure.Log;
// ReSharper disable InvertIf
namespace Squidex.Infrastructure.CQRS.Events.Internal
{
internal sealed class QueryEventsBlock : EventReceiverBlock<object, object>
internal sealed class QueryEventsBlock : IEventReceiverBlock
{
private readonly ISemanticLog log;
private readonly IEventConsumerInfoRepository eventConsumerInfoRepository;
private readonly IEventConsumer eventConsumer;
private readonly IEventStore eventStore;
private readonly ActionBlock<Envelope<IEvent>> actionBlock;
private bool isRunning = true;
private bool isStarted;
private long handled;
public Func<StoredEvent, Task> OnEvent { get; set; }
public Action<Exception> OnError { get; set; }
public Action OnReset { get; set; }
public QueryEventsBlock(IEventConsumer eventConsumer, IEventConsumerInfoRepository eventConsumerInfoRepository, ISemanticLog log, IEventStore eventStore)
: base(false, eventConsumer, eventConsumerInfoRepository, log)
public Func<StoredEvent, Task> OnEvent { get; set; }
public ITargetBlock<Envelope<IEvent>> Target
{
get { return actionBlock; }
}
public Task Completion
{
get { return actionBlock.Completion; }
}
public QueryEventsBlock(IEventConsumerInfoRepository eventConsumerInfoRepository, IEventConsumer eventConsumer, IEventStore eventStore, ISemanticLog log)
{
this.eventConsumerInfoRepository = eventConsumerInfoRepository;
this.eventConsumer = eventConsumer;
this.eventStore = eventStore;
this.log = log;
actionBlock =
new ActionBlock<Envelope<IEvent>>(HandleAsync,
new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });
}
protected override async Task<object> On(object input)
public void Complete()
{
if (!isStarted)
{
await EventConsumerInfoRepository.CreateAsync(EventConsumer.Name);
actionBlock.Complete();
}
isStarted = true;
}
public void NextOrThrowAway()
{
actionBlock.Post(null);
}
var status = await EventConsumerInfoRepository.FindAsync(EventConsumer.Name);
public void Stop()
{
isRunning = false;
}
var lastReceivedEventNumber = status.LastHandledEventNumber;
public void Reset()
{
isRunning = true;
}
if (status.IsResetting)
private async Task HandleAsync(object input)
{
try
{
await ResetAsync();
}
if (!isStarted)
{
await eventConsumerInfoRepository.CreateAsync(eventConsumer.Name);
if (!status.IsStopped)
{
var ct = CancellationToken.None;
isStarted = true;
}
await eventStore.GetEventsAsync(storedEvent => OnEvent?.Invoke(storedEvent), ct, null, lastReceivedEventNumber);
}
var status = await eventConsumerInfoRepository.FindAsync(eventConsumer.Name);
var lastReceivedEventNumber = status.LastHandledEventNumber;
if (status.IsResetting)
{
await ResetAsync();
Reset();
}
if (!status.IsStopped || !isRunning)
{
var ct = new CancellationTokenSource();
return null;
await eventStore.GetEventsAsync(async storedEvent =>
{
if (!isRunning)
{
ct.Cancel();
}
var onEvent = OnEvent;
if (onEvent != null)
{
await onEvent(storedEvent);
}
}, ct.Token, null, lastReceivedEventNumber);
}
}
catch (Exception ex)
{
OnError?.Invoke(ex);
}
}
private async Task ResetAsync()
{
var consumerName = EventConsumer.Name;
var consumerName = eventConsumer.Name;
var actionId = Guid.NewGuid().ToString();
try
{
Log.LogInformation(w => w
log.LogInformation(w => w
.WriteProperty("action", "EventConsumerReset")
.WriteProperty("actionId", actionId)
.WriteProperty("state", "Started")
.WriteProperty("eventConsumer", consumerName));
await EventConsumer.ClearAsync();
await EventConsumerInfoRepository.SetLastHandledEventNumberAsync(consumerName, -1);
await eventConsumer.ClearAsync();
await eventConsumerInfoRepository.SetLastHandledEventNumberAsync(consumerName, -1);
Log.LogInformation(w => w
log.LogInformation(w => w
.WriteProperty("action", "EventConsumerReset")
.WriteProperty("actionId", actionId)
.WriteProperty("state", "Completed")
@ -85,17 +147,12 @@ namespace Squidex.Infrastructure.CQRS.Events.Internal
}
catch (Exception ex)
{
Log.LogFatal(ex, w => w
log.LogFatal(ex, w => w
.WriteProperty("action", "EventConsumerReset")
.WriteProperty("actionId", actionId)
.WriteProperty("state", "Completed")
.WriteProperty("eventConsumer", consumerName));
}
}
protected override long GetEventNumber(object input)
{
return handled++;
}
}
}

71
src/Squidex.Infrastructure/CQRS/Events/Internal/UpdateStateBlock.cs

@ -1,33 +1,86 @@
// ==========================================================================
// UpdateStateBlock.cs
// DispatchEventBlock.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using Squidex.Infrastructure.Log;
namespace Squidex.Infrastructure.CQRS.Events.Internal
{
public sealed class UpdateStateBlock : EventReceiverBlock<Envelope<IEvent>, Envelope<IEvent>>
internal sealed class UpdateStateBlock : IEventReceiverBlock
{
public UpdateStateBlock(IEventConsumer eventConsumer, IEventConsumerInfoRepository eventConsumerInfoRepository, ISemanticLog log)
: base(false, eventConsumer, eventConsumerInfoRepository, log)
private readonly ISemanticLog log;
private readonly IEventConsumerInfoRepository eventConsumerInfoRepository;
private readonly IEventConsumer eventConsumer;
private readonly ActionBlock<Envelope<IEvent>> actionBlock;
private long lastReceivedEventNumber = -1;
private bool isRunning = true;
public Action<Exception> OnError { get; set; }
public ITargetBlock<Envelope<IEvent>> Target
{
get { return actionBlock; }
}
public Task Completion
{
get { return actionBlock.Completion; }
}
public UpdateStateBlock(IEventConsumerInfoRepository eventConsumerInfoRepository, IEventConsumer eventConsumer, ISemanticLog log)
{
this.eventConsumerInfoRepository = eventConsumerInfoRepository;
this.eventConsumer = eventConsumer;
this.log = log;
actionBlock =
new ActionBlock<Envelope<IEvent>>(HandleAsync,
new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });
}
public void Stop()
{
isRunning = false;
}
protected override async Task<Envelope<IEvent>> On(Envelope<IEvent> input)
public void Reset()
{
await EventConsumerInfoRepository.SetLastHandledEventNumberAsync(EventConsumer.Name, input.Headers.EventNumber());
isRunning = true;
return input;
lastReceivedEventNumber = -1;
}
protected override long GetEventNumber(Envelope<IEvent> input)
private async Task HandleAsync(Envelope<IEvent> input)
{
return input.Headers.EventNumber();
var eventNumber = input.Headers.EventNumber();
if (eventNumber <= lastReceivedEventNumber || !isRunning)
{
return;
}
try
{
await eventConsumerInfoRepository.SetLastHandledEventNumberAsync(eventConsumer.Name, eventNumber);
}
catch (Exception ex)
{
OnError?.Invoke(ex);
log.LogFatal(ex, w => w
.WriteProperty("action", "UpdateState")
.WriteProperty("state", "Failed")
.WriteProperty("eventId", input.Headers.EventId().ToString())
.WriteProperty("eventNumber", input.Headers.EventNumber()));
}
}
}
}

4
tests/Squidex.Infrastructure.Tests/CQRS/Events/EventReceiverTests.cs

@ -117,7 +117,7 @@ namespace Squidex.Infrastructure.CQRS.Events
public void Should_subscribe_to_consumer_and_handle_events()
{
consumerInfo.LastHandledEventNumber = 2L;
sut.Subscribe(eventConsumer.Object);
sut.Next();
sut.Dispose();
@ -169,7 +169,7 @@ namespace Squidex.Infrastructure.CQRS.Events
{
consumerInfo.IsResetting = true;
consumerInfo.LastHandledEventNumber = 2L;
sut.Subscribe(eventConsumer.Object, autoTrigger: false);
sut.Next();
sut.Dispose();

Loading…
Cancel
Save