Browse Source

Merge branch 'refactoring-retry-subscriber' into refactoring-simple-model

pull/152/head
Sebastian Stehle 8 years ago
parent
commit
3568c12127
  1. 9
      src/Squidex.Infrastructure.GetEventStore/CQRS/Events/GetEventStoreSubscription.cs
  2. 125
      src/Squidex.Infrastructure/Actors/Actor.cs
  3. 69
      src/Squidex.Infrastructure/Actors/SingleThreadedDispatcher.cs
  4. 256
      src/Squidex.Infrastructure/CQRS/Events/Actors/EventConsumerActor.cs
  5. 2
      src/Squidex.Infrastructure/CQRS/Events/IEventSubscriber.cs
  6. 104
      src/Squidex.Infrastructure/CQRS/Events/RetrySubscription.cs
  7. 19
      tests/Squidex.Infrastructure.Tests/Actors/ActorRemoteTests.cs
  8. 158
      tests/Squidex.Infrastructure.Tests/Actors/ActorTests.cs
  9. 91
      tests/Squidex.Infrastructure.Tests/Actors/SingleThreadedDispatcherTests.cs
  10. 76
      tests/Squidex.Infrastructure.Tests/CQRS/Events/Actors/EventConsumerActorTests.cs
  11. 123
      tests/Squidex.Infrastructure.Tests/CQRS/Events/RetrySubscriptionTests.cs

9
src/Squidex.Infrastructure.GetEventStore/CQRS/Events/GetEventStoreSubscription.cs

@ -20,7 +20,7 @@ using Squidex.Infrastructure.Tasks;
namespace Squidex.Infrastructure.CQRS.Events
{
internal sealed class GetEventStoreSubscription : IEventSubscription
internal sealed class GetEventStoreSubscription : DisposableObjectBase, IEventSubscription
{
private const string ProjectionName = "by-{0}-{1}";
private static readonly ConcurrentDictionary<string, bool> SubscriptionsCreated = new ConcurrentDictionary<string, bool>();
@ -57,11 +57,12 @@ namespace Squidex.Infrastructure.CQRS.Events
subscription = SubscribeToStream(streamName);
}
public Task StopAsync()
protected override void DisposeObject(bool disposing)
{
if (disposing)
{
subscription.Stop();
return TaskHelper.Done;
}
}
private EventStoreCatchUpSubscription SubscribeToStream(string streamName)

125
src/Squidex.Infrastructure/Actors/Actor.cs

@ -1,125 +0,0 @@
// ==========================================================================
// Actor.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using Squidex.Infrastructure.Tasks;
namespace Squidex.Infrastructure.Actors
{
public abstract class Actor : IDisposable
{
private readonly ActionBlock<object> block;
private bool isStopped;
private sealed class StopMessage
{
}
private sealed class ErrorMessage
{
public Exception Exception { get; set; }
}
protected Actor()
{
var options = new ExecutionDataflowBlockOptions
{
MaxMessagesPerTask = -1,
MaxDegreeOfParallelism = 1,
BoundedCapacity = 10
};
block = new ActionBlock<object>(Handle, options);
}
public void Dispose()
{
StopAndWaitAsync().Wait();
}
protected async Task DispatchAsync(object message)
{
Guard.NotNull(message, nameof(message));
await block.SendAsync(message);
}
protected async Task FailAsync(Exception exception)
{
Guard.NotNull(exception, nameof(exception));
await block.SendAsync(new ErrorMessage { Exception = exception });
}
protected async Task StopAndWaitAsync()
{
await block.SendAsync(new StopMessage());
await block.Completion;
}
protected virtual Task OnStop()
{
return TaskHelper.Done;
}
protected virtual Task OnError(Exception exception)
{
return TaskHelper.Done;
}
protected virtual Task OnMessage(object message)
{
return TaskHelper.Done;
}
private async Task Handle(object message)
{
if (isStopped)
{
return;
}
switch (message)
{
case StopMessage stopMessage:
{
isStopped = true;
block.Complete();
await OnStop();
break;
}
case ErrorMessage errorMessage:
{
await OnError(errorMessage.Exception);
break;
}
default:
{
try
{
await OnMessage(message);
}
catch (Exception ex)
{
await OnError(ex);
}
break;
}
}
}
}
}

69
src/Squidex.Infrastructure/Actors/SingleThreadedDispatcher.cs

@ -0,0 +1,69 @@
// ==========================================================================
// Actor.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using Squidex.Infrastructure.Tasks;
namespace Squidex.Infrastructure.Actors
{
public sealed class SingleThreadedDispatcher
{
private readonly ActionBlock<Func<Task>> block;
private bool isStopped;
public SingleThreadedDispatcher(int capacity = 10)
{
var options = new ExecutionDataflowBlockOptions
{
MaxMessagesPerTask = -1,
MaxDegreeOfParallelism = 1,
BoundedCapacity = capacity
};
block = new ActionBlock<Func<Task>>(Handle, options);
}
public Task DispatchAsync(Func<Task> action)
{
Guard.NotNull(action, nameof(action));
return block.SendAsync(action);
}
public Task DispatchAsync(Action action)
{
Guard.NotNull(action, nameof(action));
return block.SendAsync(() => { action(); return TaskHelper.Done; });
}
public async Task StopAndWaitAsync()
{
await DispatchAsync(() =>
{
isStopped = true;
block.Complete();
});
await block.Completion;
}
private Task Handle(Func<Task> action)
{
if (isStopped)
{
return TaskHelper.Done;
}
return action();
}
}
}

256
src/Squidex.Infrastructure/CQRS/Events/Actors/EventConsumerActor.cs

@ -8,7 +8,6 @@
using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using Squidex.Infrastructure.Actors;
using Squidex.Infrastructure.CQRS.Events.Actors.Messages;
using Squidex.Infrastructure.Log;
@ -16,53 +15,24 @@ using Squidex.Infrastructure.Tasks;
namespace Squidex.Infrastructure.CQRS.Events.Actors
{
public sealed class EventConsumerActor : DisposableObjectBase, IEventSubscriber, IActor
public class EventConsumerActor : DisposableObjectBase, IEventSubscriber, IActor
{
private readonly EventDataFormatter formatter;
private readonly RetryWindow retryWindow = new RetryWindow(TimeSpan.FromMinutes(5), 5);
private readonly IEventStore eventStore;
private readonly IEventConsumerInfoRepository eventConsumerInfoRepository;
private readonly ISemanticLog log;
private readonly ActionBlock<object> dispatcher;
private IEventSubscription eventSubscription;
private readonly SingleThreadedDispatcher dispatcher = new SingleThreadedDispatcher(1);
private IEventSubscription currentSubscription;
private IEventConsumer eventConsumer;
private bool isStopped;
private bool statusIsRunning = true;
private string statusPosition;
private string statusError;
private Guid stateId = Guid.NewGuid();
private sealed class Teardown
private static Func<IEventStore, IEventSubscriber, string, string, IEventSubscription> DefaultFactory
{
get { return (e, s, t, p) => new RetrySubscription(e, s, t, p); }
}
private sealed class Setup
{
public IEventConsumer EventConsumer { get; set; }
}
private abstract class SubscriptionMessage
{
public IEventSubscription Subscription { get; set; }
}
private sealed class SubscriptionEventReceived : SubscriptionMessage
{
public StoredEvent Event { get; set; }
}
private sealed class SubscriptionFailed : SubscriptionMessage
{
public Exception Exception { get; set; }
}
private sealed class Reconnect
{
public Guid StateId { get; set; }
}
public int ReconnectWaitMs { get; set; } = 5000;
public EventConsumerActor(
EventDataFormatter formatter,
IEventStore eventStore,
@ -79,199 +49,172 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors
this.formatter = formatter;
this.eventStore = eventStore;
this.eventConsumerInfoRepository = eventConsumerInfoRepository;
var options = new ExecutionDataflowBlockOptions
{
MaxMessagesPerTask = -1,
MaxDegreeOfParallelism = 1,
BoundedCapacity = 10
};
dispatcher = new ActionBlock<object>(OnMessage, options);
}
protected override void DisposeObject(bool disposing)
{
if (disposing)
{
dispatcher.SendAsync(new Teardown()).Wait();
dispatcher.Complete();
dispatcher.Completion.Wait();
dispatcher.StopAndWaitAsync().Wait();
}
}
public async Task WaitForCompletionAsync()
protected virtual IEventSubscription CreateSubscription(IEventStore eventStore, string streamFilter, string position)
{
while (dispatcher.InputCount > 0)
{
await Task.Delay(20);
}
return new RetrySubscription(eventStore, this, streamFilter, position);
}
public Task SubscribeAsync(IEventConsumer eventConsumer)
{
Guard.NotNull(eventConsumer, nameof(eventConsumer));
return dispatcher.SendAsync(new Setup { EventConsumer = eventConsumer });
return dispatcher.DispatchAsync(() => HandleSetupAsync(eventConsumer));
}
Task IEventSubscriber.OnEventAsync(IEventSubscription subscription, StoredEvent @event)
private async Task HandleSetupAsync(IEventConsumer consumer)
{
return dispatcher.SendAsync(new SubscriptionEventReceived { Subscription = subscription, Event = @event });
}
eventConsumer = consumer;
Task IEventSubscriber.OnErrorAsync(IEventSubscription subscription, Exception exception)
var status = await eventConsumerInfoRepository.FindAsync(eventConsumer.Name);
if (status != null)
{
return dispatcher.SendAsync(new SubscriptionFailed { Subscription = subscription, Exception = exception });
statusError = status.Error;
statusPosition = status.Position;
statusIsRunning = !status.IsStopped;
}
void IActor.Tell(object message)
if (statusIsRunning)
{
dispatcher.SendAsync(message).Forget();
Subscribe(statusPosition);
}
}
private async Task OnMessage(object message)
private Task HandleEventAsync(IEventSubscription subscription, StoredEvent storedEvent)
{
if (isStopped)
if (subscription != currentSubscription)
{
return;
return TaskHelper.Done;
}
try
return DoAndUpdateStateAsync(async () =>
{
var oldStateId = stateId;
var newStateId = stateId = Guid.NewGuid();
await DispatchConsumerAsync(formatter.Parse(storedEvent.Data));
switch (message)
{
case Teardown teardown:
{
isStopped = true;
return;
statusError = null;
statusPosition = storedEvent.EventPosition;
});
}
case Setup setup:
private Task HandleErrorAsync(IEventSubscription subscription, Exception exception)
{
eventConsumer = setup.EventConsumer;
var status = await eventConsumerInfoRepository.FindAsync(eventConsumer.Name);
if (status != null)
if (subscription != currentSubscription)
{
statusError = status.Error;
statusPosition = status.Position;
statusIsRunning = !status.IsStopped;
return TaskHelper.Done;
}
if (statusIsRunning)
return DoAndUpdateStateAsync(() =>
{
await SubscribeThisAsync(statusPosition);
}
Unsubscribe();
break;
statusError = exception.ToString();
statusIsRunning = false;
});
}
case StartConsumerMessage startConsumer:
private Task HandleStartAsync()
{
if (statusIsRunning)
{
return;
return TaskHelper.Done;
}
await SubscribeThisAsync(statusPosition);
return DoAndUpdateStateAsync(() =>
{
Subscribe(statusPosition);
statusError = null;
statusIsRunning = true;
break;
});
}
case StopConsumerMessage stopConsumer:
private Task HandleStopAsync()
{
if (!statusIsRunning)
{
return;
return TaskHelper.Done;
}
await UnsubscribeThisAsync();
return DoAndUpdateStateAsync(() =>
{
Unsubscribe();
statusError = null;
statusIsRunning = false;
break;
});
}
case ResetConsumerMessage resetConsumer:
private Task HandleResetInternalAsync()
{
await UnsubscribeThisAsync();
return DoAndUpdateStateAsync(async () =>
{
Unsubscribe();
await ClearAsync();
await SubscribeThisAsync(null);
Subscribe(null);
statusError = null;
statusPosition = null;
statusIsRunning = true;
break;
});
}
case Reconnect reconnect:
{
if (!statusIsRunning || reconnect.StateId != oldStateId)
Task IEventSubscriber.OnEventAsync(IEventSubscription subscription, StoredEvent storedEvent)
{
return;
return dispatcher.DispatchAsync(() => HandleEventAsync(subscription, storedEvent));
}
await SubscribeThisAsync(statusPosition);
break;
Task IEventSubscriber.OnErrorAsync(IEventSubscription subscription, Exception exception)
{
return dispatcher.DispatchAsync(() => HandleErrorAsync(subscription, exception));
}
case SubscriptionFailed subscriptionFailed:
void IActor.Tell(object message)
{
if (subscriptionFailed.Subscription != eventSubscription)
switch (message)
{
return;
}
case StopConsumerMessage stop:
dispatcher.DispatchAsync(() => HandleStopAsync()).Forget();
break;
await UnsubscribeThisAsync();
case StartConsumerMessage stop:
dispatcher.DispatchAsync(() => HandleStartAsync()).Forget();
break;
if (retryWindow.CanRetryAfterFailure())
{
Task.Delay(ReconnectWaitMs).ContinueWith(t => dispatcher.SendAsync(new Reconnect { StateId = newStateId })).Forget();
case ResetConsumerMessage stop:
dispatcher.DispatchAsync(() => HandleResetInternalAsync()).Forget();
break;
}
else
{
throw subscriptionFailed.Exception;
}
break;
private Task DoAndUpdateStateAsync(Action action)
{
return DoAndUpdateStateAsync(() => { action(); return TaskHelper.Done; });
}
case SubscriptionEventReceived eventReceived:
private async Task DoAndUpdateStateAsync(Func<Task> action)
{
if (eventReceived.Subscription != eventSubscription)
try
{
return;
}
var @event = ParseEvent(eventReceived.Event);
await DispatchConsumerAsync(@event);
statusError = null;
statusPosition = @eventReceived.Event.EventPosition;
break;
}
}
await action();
await eventConsumerInfoRepository.SetAsync(eventConsumer.Name, statusPosition, !statusIsRunning, statusError);
}
catch (Exception ex)
{
try
{
await UnsubscribeThisAsync();
Unsubscribe();
}
catch (Exception unsubscribeException)
{
@ -290,27 +233,6 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors
}
}
private Task UnsubscribeThisAsync()
{
if (eventSubscription != null)
{
eventSubscription.StopAsync().Forget();
eventSubscription = null;
}
return TaskHelper.Done;
}
private Task SubscribeThisAsync(string position)
{
if (eventSubscription == null)
{
eventSubscription = eventStore.CreateSubscription(this, eventConsumer.EventsFilter, position);
}
return TaskHelper.Done;
}
private async Task ClearAsync()
{
var actionId = Guid.NewGuid().ToString();
@ -356,6 +278,24 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors
}
}
private void Unsubscribe()
{
if (currentSubscription != null)
{
currentSubscription.StopAsync().Forget();
currentSubscription = null;
}
}
private void Subscribe(string position)
{
if (currentSubscription == null)
{
currentSubscription?.StopAsync().Forget();
currentSubscription = CreateSubscription(eventStore, eventConsumer.EventsFilter, position);
}
}
private Envelope<IEvent> ParseEvent(StoredEvent message)
{
var @event = formatter.Parse(message.Data);

2
src/Squidex.Infrastructure/CQRS/Events/IEventSubscriber.cs

@ -13,7 +13,7 @@ namespace Squidex.Infrastructure.CQRS.Events
{
public interface IEventSubscriber
{
Task OnEventAsync(IEventSubscription subscription, StoredEvent @event);
Task OnEventAsync(IEventSubscription subscription, StoredEvent storedEvent);
Task OnErrorAsync(IEventSubscription subscription, Exception exception);
}

104
src/Squidex.Infrastructure/CQRS/Events/RetrySubscription.cs

@ -0,0 +1,104 @@
// ==========================================================================
// RetrySubscription.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System;
using System.Threading;
using System.Threading.Tasks;
using Squidex.Infrastructure.Actors;
using Squidex.Infrastructure.Tasks;
namespace Squidex.Infrastructure.CQRS.Events
{
public sealed class RetrySubscription : IEventSubscription, IEventSubscriber
{
private readonly SingleThreadedDispatcher dispatcher = new SingleThreadedDispatcher(10);
private readonly CancellationTokenSource disposeCts = new CancellationTokenSource();
private readonly RetryWindow retryWindow = new RetryWindow(TimeSpan.FromMinutes(5), 5);
private readonly IEventStore eventStore;
private readonly IEventSubscriber eventSubscriber;
private readonly string streamFilter;
private IEventSubscription currentSubscription;
private string position;
public int ReconnectWaitMs { get; set; } = 5000;
public RetrySubscription(IEventStore eventStore, IEventSubscriber eventSubscriber, string streamFilter, string position)
{
Guard.NotNull(eventStore, nameof(eventStore));
Guard.NotNull(eventSubscriber, nameof(eventSubscriber));
this.position = position;
this.eventStore = eventStore;
this.eventSubscriber = eventSubscriber;
this.streamFilter = streamFilter;
Subscribe();
}
private void Subscribe()
{
currentSubscription = eventStore.CreateSubscription(this, streamFilter, position);
}
private void Unsubscribe()
{
currentSubscription?.StopAsync().Forget();
}
private async Task HandleEventAsync(IEventSubscription subscription, StoredEvent storedEvent)
{
if (subscription == currentSubscription)
{
await eventSubscriber.OnEventAsync(this, storedEvent);
position = storedEvent.EventPosition;
}
}
private async Task HandleErrorAsync(IEventSubscription subscription, Exception exception)
{
if (subscription == currentSubscription)
{
subscription.StopAsync().Forget();
subscription = null;
if (retryWindow.CanRetryAfterFailure())
{
Task.Delay(ReconnectWaitMs, disposeCts.Token).ContinueWith(t =>
{
dispatcher.DispatchAsync(() => Subscribe());
}).Forget();
}
else
{
await eventSubscriber.OnErrorAsync(this, exception);
}
}
}
Task IEventSubscriber.OnEventAsync(IEventSubscription subscription, StoredEvent storedEvent)
{
return dispatcher.DispatchAsync(() => HandleEventAsync(subscription, storedEvent));
}
Task IEventSubscriber.OnErrorAsync(IEventSubscription subscription, Exception exception)
{
return dispatcher.DispatchAsync(() => HandleErrorAsync(subscription, exception));
}
public async Task StopAsync()
{
await dispatcher.DispatchAsync(() => Unsubscribe());
await dispatcher.StopAndWaitAsync();
disposeCts.Cancel();
}
}
}

19
tests/Squidex.Infrastructure.Tests/Actors/ActorRemoteTests.cs

@ -22,20 +22,23 @@ namespace Squidex.Infrastructure.Actors
public int Counter { get; set; }
}
private sealed class MyActor : Actor, IActor
private sealed class MyActor : IActor
{
private readonly SingleThreadedDispatcher dispatcher = new SingleThreadedDispatcher();
public List<object> Invokes { get; } = new List<object>();
public void Tell(object message)
public Task StopAndWaitAsync()
{
DispatchAsync(message).Forget();
return dispatcher.StopAndWaitAsync();
}
protected override Task OnMessage(object message)
public void Tell(object message)
{
dispatcher.DispatchAsync(() =>
{
Invokes.Add(message);
return TaskHelper.Done;
}).Forget();
}
}
@ -55,13 +58,13 @@ namespace Squidex.Infrastructure.Actors
}
[Fact]
public void Should_handle_messages_sequentially()
public async Task Should_handle_messages_sequentially()
{
remoteActor.Tell(new SuccessMessage { Counter = 1 });
remoteActor.Tell(new SuccessMessage { Counter = 2 });
remoteActor.Tell(new SuccessMessage { Counter = 3 });
actor.Dispose();
await actor.StopAndWaitAsync();
actor.Invokes.ShouldBeEquivalentTo(new List<object>
{

158
tests/Squidex.Infrastructure.Tests/Actors/ActorTests.cs

@ -1,158 +0,0 @@
// ==========================================================================
// ActorTests.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using FluentAssertions;
using Squidex.Infrastructure.Tasks;
using Xunit;
namespace Squidex.Infrastructure.Actors
{
public class ActorTests
{
public class SuccessMessage
{
public int Counter { get; set; }
}
public class FailedMessage
{
}
private sealed class MyActor : Actor, IActor
{
public List<object> Invokes { get; } = new List<object>();
public void Tell(Exception exception)
{
FailAsync(exception).Forget();
}
public void Tell(object message)
{
DispatchAsync(message).Forget();
}
public Task StopAsync()
{
return StopAndWaitAsync();
}
protected override Task OnStop()
{
Invokes.Add(true);
return TaskHelper.Done;
}
protected override Task OnError(Exception exception)
{
Invokes.Add(exception);
return TaskHelper.Done;
}
protected override Task OnMessage(object message)
{
if (message is FailedMessage)
{
throw new InvalidOperationException();
}
Invokes.Add(message);
return TaskHelper.Done;
}
}
private readonly MyActor sut = new MyActor();
[Fact]
public async Task Should_invoke_with_exception()
{
sut.Tell(new InvalidOperationException());
await sut.StopAsync();
Assert.True(sut.Invokes[0] is InvalidOperationException);
}
[Fact]
public async Task Should_handle_messages_sequentially()
{
sut.Tell(new SuccessMessage { Counter = 1 });
sut.Tell(new SuccessMessage { Counter = 2 });
sut.Tell(new SuccessMessage { Counter = 3 });
await sut.StopAsync();
sut.Invokes.ShouldBeEquivalentTo(new List<object>
{
new SuccessMessage { Counter = 1 },
new SuccessMessage { Counter = 2 },
new SuccessMessage { Counter = 3 },
true
});
}
[Fact]
public async Task Should_raise_error_event_when_event_handling_failed()
{
sut.Tell(new FailedMessage());
sut.Tell(new SuccessMessage { Counter = 2 });
sut.Tell(new SuccessMessage { Counter = 3 });
await sut.StopAsync();
Assert.True(sut.Invokes[0] is InvalidOperationException);
sut.Invokes.Skip(1).ShouldBeEquivalentTo(new List<object>
{
new SuccessMessage { Counter = 2 },
new SuccessMessage { Counter = 3 },
true
});
}
[Fact]
public async Task Should_not_handle_messages_after_stop()
{
sut.Tell(new SuccessMessage { Counter = 1 });
await sut.StopAsync();
sut.Tell(new SuccessMessage { Counter = 2 });
sut.Tell(new SuccessMessage { Counter = 3 });
sut.Tell(new InvalidOperationException());
sut.Invokes.ShouldBeEquivalentTo(new List<object>
{
new SuccessMessage { Counter = 1 },
true
});
}
[Fact]
public void Should_call_stop_on_dispose()
{
sut.Tell(new SuccessMessage { Counter = 1 });
sut.Dispose();
sut.Invokes.ShouldBeEquivalentTo(new List<object>
{
new SuccessMessage { Counter = 1 },
true
});
}
}
}

91
tests/Squidex.Infrastructure.Tests/Actors/SingleThreadedDispatcherTests.cs

@ -0,0 +1,91 @@
// ==========================================================================
// SingleThreadedDispatcherTests.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Squidex.Infrastructure.Tasks;
using Xunit;
namespace Squidex.Infrastructure.Actors
{
public class SingleThreadedDispatcherTests
{
private readonly SingleThreadedDispatcher sut = new SingleThreadedDispatcher();
[Fact]
public async Task Should_handle_messages_sequentially()
{
var source = Enumerable.Range(1, 100);
var target = new List<int>();
foreach (var item in source)
{
sut.DispatchAsync(() => target.Add(item)).Forget();
}
await sut.StopAndWaitAsync();
Assert.Equal(source, target);
}
/*
[Fact]
public async Task Should_raise_error_event_when_event_handling_failed()
{
sut.Tell(new FailedMessage());
sut.Tell(new SuccessMessage { Counter = 2 });
sut.Tell(new SuccessMessage { Counter = 3 });
await sut.StopAsync();
Assert.True(sut.Invokes[0] is InvalidOperationException);
sut.Invokes.Skip(1).ShouldBeEquivalentTo(new List<object>
{
new SuccessMessage { Counter = 2 },
new SuccessMessage { Counter = 3 },
true
});
}
[Fact]
public async Task Should_not_handle_messages_after_stop()
{
sut.Tell(new SuccessMessage { Counter = 1 });
await sut.StopAsync();
sut.Tell(new SuccessMessage { Counter = 2 });
sut.Tell(new SuccessMessage { Counter = 3 });
sut.Tell(new InvalidOperationException());
sut.Invokes.ShouldBeEquivalentTo(new List<object>
{
new SuccessMessage { Counter = 1 },
true
});
}
[Fact]
public void Should_call_stop_on_dispose()
{
sut.Tell(new SuccessMessage { Counter = 1 });
sut.Dispose();
sut.Invokes.ShouldBeEquivalentTo(new List<object>
{
new SuccessMessage { Counter = 1 },
true
});
}
*/
}
}

76
tests/Squidex.Infrastructure.Tests/CQRS/Events/Actors/EventConsumerActorTests.cs

@ -33,6 +33,23 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors
public string Position { get; set; }
}
public sealed class MyEventConsumerActor : EventConsumerActor
{
public MyEventConsumerActor(
EventDataFormatter formatter,
IEventStore eventStore,
IEventConsumerInfoRepository eventConsumerInfoRepository,
ISemanticLog log)
: base(formatter, eventStore, eventConsumerInfoRepository, log)
{
}
protected override IEventSubscription CreateSubscription(IEventStore eventStore, string streamFilter, string position)
{
return eventStore.CreateSubscription(this, streamFilter, position);
}
}
private readonly IEventConsumerInfoRepository eventConsumerInfoRepository = A.Fake<IEventConsumerInfoRepository>();
private readonly IEventConsumer eventConsumer = A.Fake<IEventConsumer>();
private readonly IEventStore eventStore = A.Fake<IEventStore>();
@ -59,14 +76,18 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors
A.CallTo(() => formatter.Parse(eventData, true)).Returns(envelope);
sut = new EventConsumerActor(formatter, eventStore, eventConsumerInfoRepository, log) { ReconnectWaitMs = 0 };
sut = new MyEventConsumerActor(
formatter,
eventStore,
eventConsumerInfoRepository,
log);
sutActor = sut;
sutSubscriber = sut;
}
[Fact]
public async Task Should_not_not_subscribe_to_event_store_when_stopped_in_db()
public async Task Should_not_subscribe_to_event_store_when_stopped_in_db()
{
consumerInfo.IsStopped = true;
@ -87,9 +108,6 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors
sut.Dispose();
A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, null, false, null))
.MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => eventStore.CreateSubscription(A<IEventSubscriber>.Ignored, A<string>.Ignored, A<string>.Ignored))
.MustHaveHappened(Repeated.Exactly.Once);
}
@ -101,9 +119,6 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors
sut.Dispose();
A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, false, null))
.MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => eventStore.CreateSubscription(A<IEventSubscriber>.Ignored, A<string>.Ignored, A<string>.Ignored))
.MustHaveHappened(Repeated.Exactly.Once);
}
@ -118,9 +133,6 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors
sut.Dispose();
A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, false, null))
.MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, true, null))
.MustHaveHappened(Repeated.Exactly.Once);
@ -137,9 +149,6 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors
sutActor.Tell(new ResetConsumerMessage());
sut.Dispose();
A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, false, null))
.MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, true, null))
.MustHaveHappened(Repeated.Exactly.Once);
@ -169,9 +178,6 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors
sut.Dispose();
A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, false, null))
.MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, @event.EventPosition, false, null))
.MustHaveHappened(Repeated.Exactly.Once);
@ -189,43 +195,10 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors
sut.Dispose();
A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, false, null))
.MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, @event.EventPosition, false, null))
.MustNotHaveHappened();
A.CallTo(() => eventConsumer.On(envelope))
.MustNotHaveHappened();
}
[Fact]
public async Task Should_reopen_subscription_when_exception_is_retrieved()
{
var ex = new InvalidOperationException();
await OnSubscribeAsync();
await OnErrorAsync(eventSubscription, ex);
await Task.Delay(200);
await sut.WaitForCompletionAsync();
sut.Dispose();
A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, false, null))
.MustHaveHappened(Repeated.Exactly.Times(3));
A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, false, ex.ToString()))
.MustNotHaveHappened();
A.CallTo(() => eventSubscription.StopAsync())
.MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => eventStore.CreateSubscription(A<IEventSubscriber>.Ignored, A<string>.Ignored, A<string>.Ignored))
.MustHaveHappened(Repeated.Exactly.Twice);
}
[Fact]
public async Task Should_not_make_error_handling_when_exception_is_from_another_subscription()
{
@ -236,9 +209,6 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors
sut.Dispose();
A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, false, null))
.MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, false, ex.ToString()))
.MustNotHaveHappened();
}

123
tests/Squidex.Infrastructure.Tests/CQRS/Events/RetrySubscriptionTests.cs

@ -0,0 +1,123 @@
// ==========================================================================
// EventConsumerActorTests.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System;
using System.Threading.Tasks;
using FakeItEasy;
using Xunit;
namespace Squidex.Infrastructure.CQRS.Events
{
public class RetrySubscriptionTests
{
private readonly IEventStore eventStore = A.Fake<IEventStore>();
private readonly IEventSubscriber eventSubscriber = A.Fake<IEventSubscriber>();
private readonly IEventSubscription eventSubscription = A.Fake<IEventSubscription>();
private readonly IEventSubscriber sutSubscriber;
private readonly RetrySubscription sut;
private readonly string streamFilter = Guid.NewGuid().ToString();
public RetrySubscriptionTests()
{
A.CallTo(() => eventStore.CreateSubscription(A<IEventSubscriber>.Ignored, A<string>.Ignored, A<string>.Ignored)).Returns(eventSubscription);
sut = new RetrySubscription(eventStore, eventSubscriber, streamFilter, null) { ReconnectWaitMs = 0 };
sutSubscriber = sut;
}
[Fact]
public void Should_subscribe_after_constructor()
{
A.CallTo(() => eventStore.CreateSubscription(sut, streamFilter, null))
.MustHaveHappened();
}
[Fact]
public async Task Should_reopen_subscription_when_exception_is_retrieved()
{
await OnErrorAsync(eventSubscription, new InvalidOperationException());
await Task.Delay(200);
await sut.StopAsync();
A.CallTo(() => eventSubscription.StopAsync())
.MustHaveHappened(Repeated.Exactly.Twice);
A.CallTo(() => eventStore.CreateSubscription(A<IEventSubscriber>.Ignored, A<string>.Ignored, A<string>.Ignored))
.MustHaveHappened(Repeated.Exactly.Twice);
A.CallTo(() => eventSubscriber.OnErrorAsync(A<IEventSubscription>.Ignored, A<Exception>.Ignored))
.MustNotHaveHappened();
}
[Fact]
public async Task Should_forward_error_from_inner_subscription_when_failed_often()
{
var ex = new InvalidOperationException();
await OnErrorAsync(eventSubscription, ex);
await OnErrorAsync(eventSubscription, ex);
await OnErrorAsync(eventSubscription, ex);
await OnErrorAsync(eventSubscription, ex);
await OnErrorAsync(eventSubscription, ex);
await OnErrorAsync(eventSubscription, ex);
await sut.StopAsync();
A.CallTo(() => eventSubscriber.OnErrorAsync(sut, ex))
.MustHaveHappened();
}
[Fact]
public async Task Should_forward_event_from_inner_subscription()
{
var ev = new StoredEvent("1", 2, new EventData());
await OnEventAsync(eventSubscription, ev);
await sut.StopAsync();
A.CallTo(() => eventSubscriber.OnEventAsync(sut, ev))
.MustHaveHappened();
}
[Fact]
public async Task Should_not_forward_error_when_exception_is_from_another_subscription()
{
var ex = new InvalidOperationException();
await OnErrorAsync(A.Fake<IEventSubscription>(), ex);
await sut.StopAsync();
A.CallTo(() => eventSubscriber.OnErrorAsync(A<IEventSubscription>.Ignored, A<Exception>.Ignored))
.MustNotHaveHappened();
}
[Fact]
public async Task Should_not_forward_event_when_message_is_from_another_subscription()
{
var ev = new StoredEvent("1", 2, new EventData());
await OnEventAsync(A.Fake<IEventSubscription>(), ev);
await sut.StopAsync();
A.CallTo(() => eventSubscriber.OnEventAsync(A<IEventSubscription>.Ignored, A<StoredEvent>.Ignored))
.MustNotHaveHappened();
}
private Task OnErrorAsync(IEventSubscription subscriber, Exception ex)
{
return sutSubscriber.OnErrorAsync(subscriber, ex);
}
private Task OnEventAsync(IEventSubscription subscriber, StoredEvent ev)
{
return sutSubscriber.OnEventAsync(subscriber, ev);
}
}
}
Loading…
Cancel
Save