Browse Source

Event consumer improved.

pull/135/head
Sebastian Stehle 9 years ago
parent
commit
7c037b9987
  1. 61
      src/Squidex.Infrastructure/CQRS/Events/Actors/EventConsumerActor.cs
  2. 213
      tests/Squidex.Infrastructure.Tests/CQRS/Events/Actors/EventConsumerActorTests.cs

61
src/Squidex.Infrastructure/CQRS/Events/Actors/EventConsumerActor.cs

@ -18,7 +18,6 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors
{
public sealed class EventConsumerActor : DisposableObjectBase, IEventSubscriber, IActor
{
private const int ReconnectWaitMs = 1000;
private readonly EventDataFormatter formatter;
private readonly RetryWindow retryWindow = new RetryWindow(TimeSpan.FromMinutes(5), 5);
private readonly IEventStore eventStore;
@ -28,10 +27,10 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors
private IEventSubscription eventSubscription;
private IEventConsumer eventConsumer;
private bool isStopped;
private bool statusIsRunning;
private bool statusIsRunning = true;
private string statusPosition;
private string statusError;
private Guid stateId;
private Guid stateId = Guid.NewGuid();
private sealed class Teardown
{
@ -62,6 +61,8 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors
public Guid StateId { get; set; }
}
public int ReconnectWaitMs { get; set; } = 5000;
public EventConsumerActor(
EventDataFormatter formatter,
IEventStore eventStore,
@ -130,7 +131,8 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors
try
{
stateId = Guid.NewGuid();
var oldStateId = stateId;
var newStateId = stateId = Guid.NewGuid();
switch (message)
{
@ -138,7 +140,7 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors
{
isStopped = true;
break;
return;
}
case Setup setup:
@ -147,11 +149,19 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors
var status = await eventConsumerInfoRepository.FindAsync(eventConsumer.Name);
statusError = status?.Error;
statusPosition = status?.Position;
statusIsRunning = !(status?.IsStopped ?? false);
if (status != null)
{
statusError = status.Error;
statusPosition = status.Position;
statusIsRunning = !status.IsStopped;
}
return;
if (statusIsRunning)
{
await SubscribeAsync();
}
break;
}
case StartConsumerMessage startConsumer:
@ -178,8 +188,7 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors
await UnsubscribeAsync();
statusError = null;
statusIsRunning = true;
statusIsRunning = false;
break;
}
@ -199,7 +208,7 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors
case Reconnect reconnect:
{
if (!statusIsRunning || reconnect.StateId != stateId)
if (!statusIsRunning || reconnect.StateId != oldStateId)
{
return;
}
@ -220,9 +229,7 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors
if (retryWindow.CanRetryAfterFailure())
{
var id = stateId;
Task.Delay(ReconnectWaitMs).ContinueWith(t => dispatcher.SendAsync(new Reconnect { StateId = id })).Forget();
Task.Delay(ReconnectWaitMs).ContinueWith(t => dispatcher.SendAsync(new Reconnect { StateId = newStateId })).Forget();
}
else
{
@ -243,14 +250,26 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors
await DispatchConsumerAsync(@event);
statusError = null;
statusPosition = @eventReceived.Event.EventPosition;
break;
}
}
await eventConsumerInfoRepository.SetAsync(eventConsumer.Name, statusPosition, !statusIsRunning, statusError);
}
catch (Exception ex)
{
try
{
await UnsubscribeAsync();
}
catch (Exception unsubscribeException)
{
ex = new AggregateException(ex, unsubscribeException);
}
log.LogFatal(ex, w => w
.WriteProperty("action", "HandleEvent")
.WriteProperty("state", "Failed")
@ -258,9 +277,7 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors
statusError = ex.ToString();
statusIsRunning = false;
}
finally
{
await eventConsumerInfoRepository.SetAsync(eventConsumer.Name, statusPosition, !statusIsRunning, statusError);
}
}
@ -275,14 +292,14 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors
}
}
private async Task SubscribeAsync()
private Task SubscribeAsync()
{
if (eventSubscription == null)
{
var status = await eventConsumerInfoRepository.FindAsync(eventConsumer.Name);
eventSubscription = eventStore.CreateSubscription(this, eventConsumer.EventsFilter, status.Position);
eventSubscription = eventStore.CreateSubscription(this, eventConsumer.EventsFilter, statusPosition);
}
return TaskHelper.Done;
}
private async Task ClearAsync()

213
tests/Squidex.Infrastructure.Tests/CQRS/Events/Actors/EventConsumerActorTests.cs

@ -59,37 +59,69 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors
A.CallTo(() => formatter.Parse(eventData, true)).Returns(envelope);
sut = new EventConsumerActor(formatter, eventStore, eventConsumerInfoRepository, log);
sut = new EventConsumerActor(formatter, eventStore, eventConsumerInfoRepository, log) { ReconnectWaitMs = 0 };
sutActor = sut;
sutSubscriber = sut;
}
/*
[Fact]
public async Task Should_subscribe_to_event_store_when_started()
public async Task Should_not_not_subscribe_to_event_store_when_stopped_in_db()
{
await SubscribeAsync();
consumerInfo.IsStopped = true;
await OnSubscribeAsync();
sut.Dispose();
A.CallTo(() => eventStore.CreateSubscription(A<IEventSubscriber>.Ignored, A<string>.Ignored, A<string>.Ignored))
.MustNotHaveHappened();
}
[Fact]
public async Task Should_subscribe_to_event_store_when_not_found_in_db()
{
A.CallTo(() => eventConsumerInfoRepository.FindAsync(consumerName)).Returns(Task.FromResult<IEventConsumerInfo>(null));
await OnSubscribeAsync();
sut.Dispose();
A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, null, false, null))
.MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => eventStore.CreateSubscription(A<IEventSubscriber>.Ignored, A<string>.Ignored, A<string>.Ignored))
.MustHaveHappened(Repeated.Exactly.Once);
}
[Fact]
public async Task Should_subscribe_to_event_store_when_not_stopped_in_db()
{
await OnSubscribeAsync();
sut.Dispose();
A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, false, null))
.MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => eventStore.CreateSubscription(A<IEventSubscriber>.Ignored, A<string>.Ignored, A<string>.Ignored))
.MustHaveHappened(Repeated.Exactly.Once);
}
[Fact]
public async Task Should_stop_subscription_when_stopped()
{
await SubscribeAsync();
await OnSubscribeAsync();
sutActor.Tell(new StopConsumerMessage());
sutActor.Tell(new StopConsumerMessage());
sut.Dispose();
A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, null, false, null))
A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, false, null))
.MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, null, true, null))
A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, true, null))
.MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => eventSubscription.StopAsync())
@ -99,12 +131,16 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors
[Fact]
public async Task Should_reset_consumer_when_resetting()
{
await SubscribeAsync();
await OnSubscribeAsync();
sutActor.Tell(new StopConsumerMessage());
sutActor.Tell(new ResetConsumerMessage());
sut.Dispose();
A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, null, false, null))
A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, false, null))
.MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, true, null))
.MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, null, false, null))
@ -122,16 +158,15 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors
{
var @event = new StoredEvent(Guid.NewGuid().ToString(), 123, eventData);
await SubscribeAsync();
await sutSubscriber.OnEventAsync(eventSubscription, @event);
await OnSubscribeAsync();
await OnEventAsync(eventSubscription, @event);
sut.Dispose();
A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, null, false, null))
A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, false, null))
.MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, null, false, @event.EventPosition))
A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, @event.EventPosition, false, null))
.MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => eventConsumer.On(envelope))
@ -143,118 +178,178 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors
{
var @event = new StoredEvent(Guid.NewGuid().ToString(), 123, eventData);
await SubscribeAsync();
await sutSubscriber.OnEventAsync(A.Fake<IEventSubscription>(), @event);
await OnSubscribeAsync();
await OnEventAsync(A.Fake<IEventSubscription>(), @event);
sut.Dispose();
A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, false, null))
.MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, @event.EventPosition, false, null))
.MustNotHaveHappened();
A.CallTo(() => eventConsumer.On(envelope))
.MustNotHaveHappened(Repeated.Exactly.Once);
.MustNotHaveHappened();
}
[Fact]
public async Task Should_reopen_subscription_when_exception_is_retrieved()
{
var ex = new InvalidOperationException();
await OnSubscribeAsync();
await OnErrorAsync(eventSubscription, ex);
sut.Dispose();
A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, false, null))
.MustHaveHappened(Repeated.Exactly.Times(3));
A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, false, ex.ToString()))
.MustNotHaveHappened();
A.CallTo(() => eventSubscription.StopAsync())
.MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => eventStore.CreateSubscription(A<IEventSubscriber>.Ignored, A<string>.Ignored, A<string>.Ignored))
.MustHaveHappened(Repeated.Exactly.Twice);
}
[Fact]
public async Task Should_not_make_error_handling_when_exception_is_from_another_subscription()
{
var ex = new InvalidOperationException();
await OnSubscribeAsync();
await OnErrorAsync(A.Fake<IEventSubscription>(), ex);
A.CallTo(() => eventConsumerInfoRepository.SetPositionAsync(consumerName, @event.EventPosition, false))
.MustNotHaveHappened(Repeated.Exactly.Once);
sut.Dispose();
A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, false, null))
.MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, false, ex.ToString()))
.MustNotHaveHappened();
}
[Fact]
public async Task Should_stop_if_resetting_failed()
{
var exception = new InvalidOperationException("Exception");
var ex = new InvalidOperationException();
A.CallTo(() => eventConsumer.ClearAsync())
.Throws(exception);
.Throws(ex);
var @event = new StoredEvent(Guid.NewGuid().ToString(), 123, eventData);
await SubscribeAsync();
await OnSubscribeAsync();
sutActor.Tell(new ResetConsumerMessage());
sut.Dispose();
A.CallTo(() => eventConsumerInfoRepository.StopAsync(consumerName, exception.ToString()))
.MustHaveHappened();
A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, true, ex.ToString()))
.MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => eventSubscription.StopAsync())
.MustHaveHappened(Repeated.Exactly.Once);
}
[Fact]
public async Task Should_stop_if_handling_failed()
{
var exception = new InvalidOperationException("Exception");
var ex = new InvalidOperationException();
A.CallTo(() => eventConsumer.On(envelope))
.Throws(exception);
.Throws(ex);
var @event = new StoredEvent(Guid.NewGuid().ToString(), 123, eventData);
await SubscribeAsync();
await sutSubscriber.OnEventAsync(eventSubscription, @event);
await OnSubscribeAsync();
await OnEventAsync(eventSubscription, @event);
sut.Dispose();
A.CallTo(() => eventConsumer.On(envelope))
.MustHaveHappened();
A.CallTo(() => eventConsumerInfoRepository.SetPositionAsync(consumerName, @event.EventPosition, false))
.MustNotHaveHappened();
A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, true, ex.ToString()))
.MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => eventConsumerInfoRepository.StopAsync(consumerName, exception.ToString()))
.MustHaveHappened();
A.CallTo(() => eventSubscription.StopAsync())
.MustHaveHappened(Repeated.Exactly.Once);
}
[Fact]
public async Task Should_start_after_stop_when_handling_failed()
public async Task Should_stop_if_deserialization_failed()
{
var exception = new InvalidOperationException("Exception");
var ex = new InvalidOperationException();
A.CallTo(() => eventConsumer.On(envelope))
.Throws(exception);
A.CallTo(() => formatter.Parse(eventData, true))
.Throws(ex);
var @event = new StoredEvent(Guid.NewGuid().ToString(), 123, eventData);
await SubscribeAsync();
await sutSubscriber.OnEventAsync(eventSubscription, @event);
await OnSubscribeAsync();
await OnEventAsync(eventSubscription, @event);
sutActor.Tell(new StartConsumerMessage());
sut.Dispose();
A.CallTo(() => eventConsumer.On(envelope))
.MustHaveHappened();
A.CallTo(() => eventConsumerInfoRepository.SetPositionAsync(consumerName, @event.EventPosition, false))
.MustNotHaveHappened();
A.CallTo(() => eventConsumerInfoRepository.StopAsync(consumerName, exception.ToString()))
.MustHaveHappened();
A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, true, ex.ToString()))
.MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => eventConsumerInfoRepository.StartAsync(consumerName))
.MustHaveHappened(Repeated.Exactly.Twice);
A.CallTo(() => eventSubscription.StopAsync())
.MustHaveHappened(Repeated.Exactly.Once);
}
[Fact]
public async Task Should_stop_if_deserialization_failed()
public async Task Should_start_after_stop_when_handling_failed()
{
var exception = new InvalidOperationException("Exception");
var exception = new InvalidOperationException();
A.CallTo(() => formatter.Parse(eventData, true))
A.CallTo(() => eventConsumer.On(envelope))
.Throws(exception);
var @event = new StoredEvent(Guid.NewGuid().ToString(), 123, eventData);
await SubscribeAsync();
await sutSubscriber.OnEventAsync(eventSubscription, @event);
await OnSubscribeAsync();
await OnEventAsync(eventSubscription, @event);
sutActor.Tell(new StartConsumerMessage());
sutActor.Tell(new StartConsumerMessage());
sut.Dispose();
A.CallTo(() => eventConsumer.On(envelope))
.MustNotHaveHappened();
.MustHaveHappened();
A.CallTo(() => eventConsumerInfoRepository.SetPositionAsync(consumerName, @event.EventPosition, false))
.MustNotHaveHappened();
A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, true, exception.ToString()))
.MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => eventConsumerInfoRepository.StopAsync(consumerName, exception.ToString()))
.MustHaveHappened();
}*/
A.CallTo(() => eventSubscription.StopAsync())
.MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => eventStore.CreateSubscription(A<IEventSubscriber>.Ignored, A<string>.Ignored, A<string>.Ignored))
.MustHaveHappened(Repeated.Exactly.Twice);
}
private async Task OnErrorAsync(IEventSubscription subscriber, Exception ex)
{
await sutSubscriber.OnErrorAsync(subscriber, ex);
await Task.Delay(200);
}
private async Task OnEventAsync(IEventSubscription subscriber, StoredEvent ev)
{
await sutSubscriber.OnEventAsync(subscriber, ev);
await Task.Delay(200);
}
private async Task SubscribeAsync()
private async Task OnSubscribeAsync()
{
await sut.SubscribeAsync(eventConsumer);

Loading…
Cancel
Save