mirror of https://github.com/Squidex/squidex.git
11 changed files with 563 additions and 545 deletions
@ -1,125 +0,0 @@ |
|||
// ==========================================================================
|
|||
// Actor.cs
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex Group
|
|||
// All rights reserved.
|
|||
// ==========================================================================
|
|||
|
|||
using System; |
|||
using System.Threading.Tasks; |
|||
using System.Threading.Tasks.Dataflow; |
|||
using Squidex.Infrastructure.Tasks; |
|||
|
|||
namespace Squidex.Infrastructure.Actors |
|||
{ |
|||
public abstract class Actor : IDisposable |
|||
{ |
|||
private readonly ActionBlock<object> block; |
|||
private bool isStopped; |
|||
|
|||
private sealed class StopMessage |
|||
{ |
|||
} |
|||
|
|||
private sealed class ErrorMessage |
|||
{ |
|||
public Exception Exception { get; set; } |
|||
} |
|||
|
|||
protected Actor() |
|||
{ |
|||
var options = new ExecutionDataflowBlockOptions |
|||
{ |
|||
MaxMessagesPerTask = -1, |
|||
MaxDegreeOfParallelism = 1, |
|||
BoundedCapacity = 10 |
|||
}; |
|||
|
|||
block = new ActionBlock<object>(Handle, options); |
|||
} |
|||
|
|||
public void Dispose() |
|||
{ |
|||
StopAndWaitAsync().Wait(); |
|||
} |
|||
|
|||
protected async Task DispatchAsync(object message) |
|||
{ |
|||
Guard.NotNull(message, nameof(message)); |
|||
|
|||
await block.SendAsync(message); |
|||
} |
|||
|
|||
protected async Task FailAsync(Exception exception) |
|||
{ |
|||
Guard.NotNull(exception, nameof(exception)); |
|||
|
|||
await block.SendAsync(new ErrorMessage { Exception = exception }); |
|||
} |
|||
|
|||
protected async Task StopAndWaitAsync() |
|||
{ |
|||
await block.SendAsync(new StopMessage()); |
|||
await block.Completion; |
|||
} |
|||
|
|||
protected virtual Task OnStop() |
|||
{ |
|||
return TaskHelper.Done; |
|||
} |
|||
|
|||
protected virtual Task OnError(Exception exception) |
|||
{ |
|||
return TaskHelper.Done; |
|||
} |
|||
|
|||
protected virtual Task OnMessage(object message) |
|||
{ |
|||
return TaskHelper.Done; |
|||
} |
|||
|
|||
private async Task Handle(object message) |
|||
{ |
|||
if (isStopped) |
|||
{ |
|||
return; |
|||
} |
|||
|
|||
switch (message) |
|||
{ |
|||
case StopMessage stopMessage: |
|||
{ |
|||
isStopped = true; |
|||
|
|||
block.Complete(); |
|||
|
|||
await OnStop(); |
|||
|
|||
break; |
|||
} |
|||
|
|||
case ErrorMessage errorMessage: |
|||
{ |
|||
await OnError(errorMessage.Exception); |
|||
|
|||
break; |
|||
} |
|||
|
|||
default: |
|||
{ |
|||
try |
|||
{ |
|||
await OnMessage(message); |
|||
} |
|||
catch (Exception ex) |
|||
{ |
|||
await OnError(ex); |
|||
} |
|||
|
|||
break; |
|||
} |
|||
} |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,69 @@ |
|||
// ==========================================================================
|
|||
// Actor.cs
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex Group
|
|||
// All rights reserved.
|
|||
// ==========================================================================
|
|||
|
|||
using System; |
|||
using System.Threading.Tasks; |
|||
using System.Threading.Tasks.Dataflow; |
|||
using Squidex.Infrastructure.Tasks; |
|||
|
|||
namespace Squidex.Infrastructure.Actors |
|||
{ |
|||
public sealed class SingleThreadedDispatcher |
|||
{ |
|||
private readonly ActionBlock<Func<Task>> block; |
|||
private bool isStopped; |
|||
|
|||
public SingleThreadedDispatcher(int capacity = 10) |
|||
{ |
|||
var options = new ExecutionDataflowBlockOptions |
|||
{ |
|||
MaxMessagesPerTask = -1, |
|||
MaxDegreeOfParallelism = 1, |
|||
BoundedCapacity = capacity |
|||
}; |
|||
|
|||
block = new ActionBlock<Func<Task>>(Handle, options); |
|||
} |
|||
|
|||
public Task DispatchAsync(Func<Task> action) |
|||
{ |
|||
Guard.NotNull(action, nameof(action)); |
|||
|
|||
return block.SendAsync(action); |
|||
} |
|||
|
|||
public Task DispatchAsync(Action action) |
|||
{ |
|||
Guard.NotNull(action, nameof(action)); |
|||
|
|||
return block.SendAsync(() => { action(); return TaskHelper.Done; }); |
|||
} |
|||
|
|||
public async Task StopAndWaitAsync() |
|||
{ |
|||
await DispatchAsync(() => |
|||
{ |
|||
isStopped = true; |
|||
|
|||
block.Complete(); |
|||
}); |
|||
|
|||
await block.Completion; |
|||
} |
|||
|
|||
private Task Handle(Func<Task> action) |
|||
{ |
|||
if (isStopped) |
|||
{ |
|||
return TaskHelper.Done; |
|||
} |
|||
|
|||
return action(); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,104 @@ |
|||
// ==========================================================================
|
|||
// RetrySubscription.cs
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex Group
|
|||
// All rights reserved.
|
|||
// ==========================================================================
|
|||
|
|||
using System; |
|||
using System.Threading; |
|||
using System.Threading.Tasks; |
|||
using Squidex.Infrastructure.Actors; |
|||
using Squidex.Infrastructure.Tasks; |
|||
|
|||
namespace Squidex.Infrastructure.CQRS.Events |
|||
{ |
|||
public sealed class RetrySubscription : IEventSubscription, IEventSubscriber |
|||
{ |
|||
private readonly SingleThreadedDispatcher dispatcher = new SingleThreadedDispatcher(10); |
|||
private readonly CancellationTokenSource disposeCts = new CancellationTokenSource(); |
|||
private readonly RetryWindow retryWindow = new RetryWindow(TimeSpan.FromMinutes(5), 5); |
|||
private readonly IEventStore eventStore; |
|||
private readonly IEventSubscriber eventSubscriber; |
|||
private readonly string streamFilter; |
|||
private IEventSubscription currentSubscription; |
|||
private string position; |
|||
|
|||
public int ReconnectWaitMs { get; set; } = 5000; |
|||
|
|||
public RetrySubscription(IEventStore eventStore, IEventSubscriber eventSubscriber, string streamFilter, string position) |
|||
{ |
|||
Guard.NotNull(eventStore, nameof(eventStore)); |
|||
Guard.NotNull(eventSubscriber, nameof(eventSubscriber)); |
|||
|
|||
this.position = position; |
|||
|
|||
this.eventStore = eventStore; |
|||
this.eventSubscriber = eventSubscriber; |
|||
|
|||
this.streamFilter = streamFilter; |
|||
|
|||
Subscribe(); |
|||
} |
|||
|
|||
private void Subscribe() |
|||
{ |
|||
currentSubscription = eventStore.CreateSubscription(this, streamFilter, position); |
|||
} |
|||
|
|||
private void Unsubscribe() |
|||
{ |
|||
currentSubscription?.StopAsync().Forget(); |
|||
} |
|||
|
|||
private async Task HandleEventAsync(IEventSubscription subscription, StoredEvent storedEvent) |
|||
{ |
|||
if (subscription == currentSubscription) |
|||
{ |
|||
await eventSubscriber.OnEventAsync(this, storedEvent); |
|||
|
|||
position = storedEvent.EventPosition; |
|||
} |
|||
} |
|||
|
|||
private async Task HandleErrorAsync(IEventSubscription subscription, Exception exception) |
|||
{ |
|||
if (subscription == currentSubscription) |
|||
{ |
|||
subscription.StopAsync().Forget(); |
|||
subscription = null; |
|||
|
|||
if (retryWindow.CanRetryAfterFailure()) |
|||
{ |
|||
Task.Delay(ReconnectWaitMs, disposeCts.Token).ContinueWith(t => |
|||
{ |
|||
dispatcher.DispatchAsync(() => Subscribe()); |
|||
}).Forget(); |
|||
} |
|||
else |
|||
{ |
|||
await eventSubscriber.OnErrorAsync(this, exception); |
|||
} |
|||
} |
|||
} |
|||
|
|||
Task IEventSubscriber.OnEventAsync(IEventSubscription subscription, StoredEvent storedEvent) |
|||
{ |
|||
return dispatcher.DispatchAsync(() => HandleEventAsync(subscription, storedEvent)); |
|||
} |
|||
|
|||
Task IEventSubscriber.OnErrorAsync(IEventSubscription subscription, Exception exception) |
|||
{ |
|||
return dispatcher.DispatchAsync(() => HandleErrorAsync(subscription, exception)); |
|||
} |
|||
|
|||
public async Task StopAsync() |
|||
{ |
|||
await dispatcher.DispatchAsync(() => Unsubscribe()); |
|||
await dispatcher.StopAndWaitAsync(); |
|||
|
|||
disposeCts.Cancel(); |
|||
} |
|||
} |
|||
} |
|||
@ -1,158 +0,0 @@ |
|||
// ==========================================================================
|
|||
// ActorTests.cs
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex Group
|
|||
// All rights reserved.
|
|||
// ==========================================================================
|
|||
|
|||
using System; |
|||
using System.Collections.Generic; |
|||
using System.Linq; |
|||
using System.Threading.Tasks; |
|||
using FluentAssertions; |
|||
using Squidex.Infrastructure.Tasks; |
|||
using Xunit; |
|||
|
|||
namespace Squidex.Infrastructure.Actors |
|||
{ |
|||
public class ActorTests |
|||
{ |
|||
public class SuccessMessage |
|||
{ |
|||
public int Counter { get; set; } |
|||
} |
|||
|
|||
public class FailedMessage |
|||
{ |
|||
} |
|||
|
|||
private sealed class MyActor : Actor, IActor |
|||
{ |
|||
public List<object> Invokes { get; } = new List<object>(); |
|||
|
|||
public void Tell(Exception exception) |
|||
{ |
|||
FailAsync(exception).Forget(); |
|||
} |
|||
|
|||
public void Tell(object message) |
|||
{ |
|||
DispatchAsync(message).Forget(); |
|||
} |
|||
|
|||
public Task StopAsync() |
|||
{ |
|||
return StopAndWaitAsync(); |
|||
} |
|||
|
|||
protected override Task OnStop() |
|||
{ |
|||
Invokes.Add(true); |
|||
|
|||
return TaskHelper.Done; |
|||
} |
|||
|
|||
protected override Task OnError(Exception exception) |
|||
{ |
|||
Invokes.Add(exception); |
|||
|
|||
return TaskHelper.Done; |
|||
} |
|||
|
|||
protected override Task OnMessage(object message) |
|||
{ |
|||
if (message is FailedMessage) |
|||
{ |
|||
throw new InvalidOperationException(); |
|||
} |
|||
|
|||
Invokes.Add(message); |
|||
|
|||
return TaskHelper.Done; |
|||
} |
|||
} |
|||
|
|||
private readonly MyActor sut = new MyActor(); |
|||
|
|||
[Fact] |
|||
public async Task Should_invoke_with_exception() |
|||
{ |
|||
sut.Tell(new InvalidOperationException()); |
|||
|
|||
await sut.StopAsync(); |
|||
|
|||
Assert.True(sut.Invokes[0] is InvalidOperationException); |
|||
} |
|||
|
|||
[Fact] |
|||
public async Task Should_handle_messages_sequentially() |
|||
{ |
|||
sut.Tell(new SuccessMessage { Counter = 1 }); |
|||
sut.Tell(new SuccessMessage { Counter = 2 }); |
|||
sut.Tell(new SuccessMessage { Counter = 3 }); |
|||
|
|||
await sut.StopAsync(); |
|||
|
|||
sut.Invokes.ShouldBeEquivalentTo(new List<object> |
|||
{ |
|||
new SuccessMessage { Counter = 1 }, |
|||
new SuccessMessage { Counter = 2 }, |
|||
new SuccessMessage { Counter = 3 }, |
|||
true |
|||
}); |
|||
} |
|||
|
|||
[Fact] |
|||
public async Task Should_raise_error_event_when_event_handling_failed() |
|||
{ |
|||
sut.Tell(new FailedMessage()); |
|||
sut.Tell(new SuccessMessage { Counter = 2 }); |
|||
sut.Tell(new SuccessMessage { Counter = 3 }); |
|||
|
|||
await sut.StopAsync(); |
|||
|
|||
Assert.True(sut.Invokes[0] is InvalidOperationException); |
|||
|
|||
sut.Invokes.Skip(1).ShouldBeEquivalentTo(new List<object> |
|||
{ |
|||
new SuccessMessage { Counter = 2 }, |
|||
new SuccessMessage { Counter = 3 }, |
|||
true |
|||
}); |
|||
} |
|||
|
|||
[Fact] |
|||
public async Task Should_not_handle_messages_after_stop() |
|||
{ |
|||
sut.Tell(new SuccessMessage { Counter = 1 }); |
|||
|
|||
await sut.StopAsync(); |
|||
|
|||
sut.Tell(new SuccessMessage { Counter = 2 }); |
|||
sut.Tell(new SuccessMessage { Counter = 3 }); |
|||
|
|||
sut.Tell(new InvalidOperationException()); |
|||
|
|||
sut.Invokes.ShouldBeEquivalentTo(new List<object> |
|||
{ |
|||
new SuccessMessage { Counter = 1 }, |
|||
true |
|||
}); |
|||
} |
|||
|
|||
[Fact] |
|||
public void Should_call_stop_on_dispose() |
|||
{ |
|||
sut.Tell(new SuccessMessage { Counter = 1 }); |
|||
|
|||
sut.Dispose(); |
|||
|
|||
sut.Invokes.ShouldBeEquivalentTo(new List<object> |
|||
{ |
|||
new SuccessMessage { Counter = 1 }, |
|||
true |
|||
}); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,91 @@ |
|||
// ==========================================================================
|
|||
// SingleThreadedDispatcherTests.cs
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex Group
|
|||
// All rights reserved.
|
|||
// ==========================================================================
|
|||
|
|||
using System.Collections.Generic; |
|||
using System.Linq; |
|||
using System.Threading.Tasks; |
|||
using Squidex.Infrastructure.Tasks; |
|||
using Xunit; |
|||
|
|||
namespace Squidex.Infrastructure.Actors |
|||
{ |
|||
public class SingleThreadedDispatcherTests |
|||
{ |
|||
private readonly SingleThreadedDispatcher sut = new SingleThreadedDispatcher(); |
|||
|
|||
[Fact] |
|||
public async Task Should_handle_messages_sequentially() |
|||
{ |
|||
var source = Enumerable.Range(1, 100); |
|||
var target = new List<int>(); |
|||
|
|||
foreach (var item in source) |
|||
{ |
|||
sut.DispatchAsync(() => target.Add(item)).Forget(); |
|||
} |
|||
|
|||
await sut.StopAndWaitAsync(); |
|||
|
|||
Assert.Equal(source, target); |
|||
} |
|||
|
|||
/* |
|||
[Fact] |
|||
public async Task Should_raise_error_event_when_event_handling_failed() |
|||
{ |
|||
sut.Tell(new FailedMessage()); |
|||
sut.Tell(new SuccessMessage { Counter = 2 }); |
|||
sut.Tell(new SuccessMessage { Counter = 3 }); |
|||
|
|||
await sut.StopAsync(); |
|||
|
|||
Assert.True(sut.Invokes[0] is InvalidOperationException); |
|||
|
|||
sut.Invokes.Skip(1).ShouldBeEquivalentTo(new List<object> |
|||
{ |
|||
new SuccessMessage { Counter = 2 }, |
|||
new SuccessMessage { Counter = 3 }, |
|||
true |
|||
}); |
|||
} |
|||
|
|||
[Fact] |
|||
public async Task Should_not_handle_messages_after_stop() |
|||
{ |
|||
sut.Tell(new SuccessMessage { Counter = 1 }); |
|||
|
|||
await sut.StopAsync(); |
|||
|
|||
sut.Tell(new SuccessMessage { Counter = 2 }); |
|||
sut.Tell(new SuccessMessage { Counter = 3 }); |
|||
|
|||
sut.Tell(new InvalidOperationException()); |
|||
|
|||
sut.Invokes.ShouldBeEquivalentTo(new List<object> |
|||
{ |
|||
new SuccessMessage { Counter = 1 }, |
|||
true |
|||
}); |
|||
} |
|||
|
|||
[Fact] |
|||
public void Should_call_stop_on_dispose() |
|||
{ |
|||
sut.Tell(new SuccessMessage { Counter = 1 }); |
|||
|
|||
sut.Dispose(); |
|||
|
|||
sut.Invokes.ShouldBeEquivalentTo(new List<object> |
|||
{ |
|||
new SuccessMessage { Counter = 1 }, |
|||
true |
|||
}); |
|||
} |
|||
*/ |
|||
} |
|||
} |
|||
@ -0,0 +1,123 @@ |
|||
// ==========================================================================
|
|||
// EventConsumerActorTests.cs
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex Group
|
|||
// All rights reserved.
|
|||
// ==========================================================================
|
|||
|
|||
using System; |
|||
using System.Threading.Tasks; |
|||
using FakeItEasy; |
|||
using Xunit; |
|||
|
|||
namespace Squidex.Infrastructure.CQRS.Events |
|||
{ |
|||
public class RetrySubscriptionTests |
|||
{ |
|||
private readonly IEventStore eventStore = A.Fake<IEventStore>(); |
|||
private readonly IEventSubscriber eventSubscriber = A.Fake<IEventSubscriber>(); |
|||
private readonly IEventSubscription eventSubscription = A.Fake<IEventSubscription>(); |
|||
private readonly IEventSubscriber sutSubscriber; |
|||
private readonly RetrySubscription sut; |
|||
private readonly string streamFilter = Guid.NewGuid().ToString(); |
|||
|
|||
public RetrySubscriptionTests() |
|||
{ |
|||
A.CallTo(() => eventStore.CreateSubscription(A<IEventSubscriber>.Ignored, A<string>.Ignored, A<string>.Ignored)).Returns(eventSubscription); |
|||
|
|||
sut = new RetrySubscription(eventStore, eventSubscriber, streamFilter, null) { ReconnectWaitMs = 0 }; |
|||
|
|||
sutSubscriber = sut; |
|||
} |
|||
|
|||
[Fact] |
|||
public void Should_subscribe_after_constructor() |
|||
{ |
|||
A.CallTo(() => eventStore.CreateSubscription(sut, streamFilter, null)) |
|||
.MustHaveHappened(); |
|||
} |
|||
|
|||
[Fact] |
|||
public async Task Should_reopen_subscription_when_exception_is_retrieved() |
|||
{ |
|||
await OnErrorAsync(eventSubscription, new InvalidOperationException()); |
|||
|
|||
await Task.Delay(200); |
|||
|
|||
await sut.StopAsync(); |
|||
|
|||
A.CallTo(() => eventSubscription.StopAsync()) |
|||
.MustHaveHappened(Repeated.Exactly.Twice); |
|||
|
|||
A.CallTo(() => eventStore.CreateSubscription(A<IEventSubscriber>.Ignored, A<string>.Ignored, A<string>.Ignored)) |
|||
.MustHaveHappened(Repeated.Exactly.Twice); |
|||
|
|||
A.CallTo(() => eventSubscriber.OnErrorAsync(A<IEventSubscription>.Ignored, A<Exception>.Ignored)) |
|||
.MustNotHaveHappened(); |
|||
} |
|||
|
|||
[Fact] |
|||
public async Task Should_forward_error_from_inner_subscription_when_failed_often() |
|||
{ |
|||
var ex = new InvalidOperationException(); |
|||
|
|||
await OnErrorAsync(eventSubscription, ex); |
|||
await OnErrorAsync(eventSubscription, ex); |
|||
await OnErrorAsync(eventSubscription, ex); |
|||
await OnErrorAsync(eventSubscription, ex); |
|||
await OnErrorAsync(eventSubscription, ex); |
|||
await OnErrorAsync(eventSubscription, ex); |
|||
await sut.StopAsync(); |
|||
|
|||
A.CallTo(() => eventSubscriber.OnErrorAsync(sut, ex)) |
|||
.MustHaveHappened(); |
|||
} |
|||
|
|||
[Fact] |
|||
public async Task Should_forward_event_from_inner_subscription() |
|||
{ |
|||
var ev = new StoredEvent("1", 2, new EventData()); |
|||
|
|||
await OnEventAsync(eventSubscription, ev); |
|||
await sut.StopAsync(); |
|||
|
|||
A.CallTo(() => eventSubscriber.OnEventAsync(sut, ev)) |
|||
.MustHaveHappened(); |
|||
} |
|||
|
|||
[Fact] |
|||
public async Task Should_not_forward_error_when_exception_is_from_another_subscription() |
|||
{ |
|||
var ex = new InvalidOperationException(); |
|||
|
|||
await OnErrorAsync(A.Fake<IEventSubscription>(), ex); |
|||
await sut.StopAsync(); |
|||
|
|||
A.CallTo(() => eventSubscriber.OnErrorAsync(A<IEventSubscription>.Ignored, A<Exception>.Ignored)) |
|||
.MustNotHaveHappened(); |
|||
} |
|||
|
|||
[Fact] |
|||
public async Task Should_not_forward_event_when_message_is_from_another_subscription() |
|||
{ |
|||
var ev = new StoredEvent("1", 2, new EventData()); |
|||
|
|||
await OnEventAsync(A.Fake<IEventSubscription>(), ev); |
|||
await sut.StopAsync(); |
|||
|
|||
A.CallTo(() => eventSubscriber.OnEventAsync(A<IEventSubscription>.Ignored, A<StoredEvent>.Ignored)) |
|||
.MustNotHaveHappened(); |
|||
} |
|||
|
|||
private Task OnErrorAsync(IEventSubscription subscriber, Exception ex) |
|||
{ |
|||
return sutSubscriber.OnErrorAsync(subscriber, ex); |
|||
} |
|||
|
|||
private Task OnEventAsync(IEventSubscription subscriber, StoredEvent ev) |
|||
{ |
|||
return sutSubscriber.OnEventAsync(subscriber, ev); |
|||
} |
|||
} |
|||
} |
|||
Loading…
Reference in new issue