Browse Source

Concurrency issues fixed.

pull/169/head
Sebastian Stehle 9 years ago
parent
commit
56359b1ea5
  1. 75
      src/Squidex.Infrastructure/CQRS/Events/Orleans/Grains/Implementation/EventConsumerGrain.cs
  2. 13
      src/Squidex.Infrastructure/CQRS/Events/RetrySubscription.cs
  3. 40
      src/Squidex.Infrastructure/Tasks/SingleThreadedDispatcher.cs
  4. 8
      tests/Squidex.Infrastructure.Tests/CQRS/Events/RetrySubscriptionTests.cs
  5. 63
      tests/Squidex.Infrastructure.Tests/Tasks/SingleThreadedDispatcherTests.cs

75
src/Squidex.Infrastructure/CQRS/Events/Orleans/Grains/Implementation/EventConsumerGrain.cs

@ -26,7 +26,7 @@ namespace Squidex.Infrastructure.CQRS.Events.Orleans.Grains.Implementation
private readonly ISemanticLog log;
private IEventSubscription currentSubscription;
private IEventConsumer eventConsumer;
private TaskFactory dispatcher;
private SingleThreadedDispatcher dispatcher;
public EventConsumerGrain(
EventDataFormatter eventFormatter,
@ -62,21 +62,29 @@ namespace Squidex.Infrastructure.CQRS.Events.Orleans.Grains.Implementation
public override Task OnActivateAsync()
{
eventConsumer = eventConsumerFactory(this.GetPrimaryKeyString());
dispatcher = new SingleThreadedDispatcher(1, TaskScheduler.Current);
dispatcher = new TaskFactory(TaskScheduler.Current);
eventConsumer = eventConsumerFactory(this.GetPrimaryKeyString());
return TaskHelper.Done;
}
public override Task OnDeactivateAsync()
{
return dispatcher.StopAndWaitAsync();
}
public Task ActivateAsync()
{
if (!State.IsStopped)
return dispatcher.DispatchAndUnwrapAsync(() =>
{
Subscribe(State.Position);
}
if (!State.IsStopped)
{
Subscribe(State.Position);
}
return TaskHelper.Done;
return TaskHelper.Done;
});
}
private Task HandleEventAsync(IEventSubscription subscription, StoredEvent storedEvent)
@ -129,61 +137,70 @@ namespace Squidex.Infrastructure.CQRS.Events.Orleans.Grains.Implementation
public Task StartAsync()
{
if (!State.IsStopped)
return dispatcher.DispatchAndUnwrapAsync(() =>
{
return TaskHelper.Done;
}
if (!State.IsStopped)
{
return TaskHelper.Done;
}
return DoAndUpdateStateAsync(() =>
{
Subscribe(State.Position);
return DoAndUpdateStateAsync(() =>
{
Subscribe(State.Position);
State = State.Started();
State = State.Started();
});
});
}
public Task StopAsync()
{
if (State.IsStopped)
return dispatcher.DispatchAndUnwrapAsync(() =>
{
return TaskHelper.Done;
}
if (State.IsStopped)
{
return TaskHelper.Done;
}
return DoAndUpdateStateAsync(() =>
{
Unsubscribe();
return DoAndUpdateStateAsync(() =>
{
Unsubscribe();
State = State.Stopped();
State = State.Stopped();
});
});
}
public Task ResetAsync()
{
return DoAndUpdateStateAsync(async () =>
return dispatcher.DispatchAndUnwrapAsync(() =>
{
Unsubscribe();
return DoAndUpdateStateAsync(async () =>
{
Unsubscribe();
await ClearAsync();
await ClearAsync();
Subscribe(null);
Subscribe(null);
State = EventConsumerGrainState.Initial();
State = EventConsumerGrainState.Initial();
});
});
}
Task IEventSubscriber.OnEventAsync(IEventSubscription subscription, StoredEvent storedEvent)
{
return dispatcher.StartNew(() => HandleEventAsync(subscription, storedEvent)).Unwrap();
return dispatcher.DispatchAndUnwrapAsync(() => HandleEventAsync(subscription, storedEvent));
}
Task IEventSubscriber.OnErrorAsync(IEventSubscription subscription, Exception exception)
{
return dispatcher.StartNew(() => HandleErrorAsync(subscription, exception)).Unwrap();
return dispatcher.DispatchAndUnwrapAsync(() => HandleErrorAsync(subscription, exception));
}
Task IEventSubscriber.OnClosedAsync(IEventSubscription subscription)
{
return dispatcher.StartNew(() => HandleClosedAsync(subscription)).Unwrap();
return dispatcher.DispatchAndUnwrapAsync(() => HandleClosedAsync(subscription));
}
public Task<Immutable<EventConsumerInfo>> GetStateAsync()

13
src/Squidex.Infrastructure/CQRS/Events/RetrySubscription.cs

@ -15,8 +15,8 @@ namespace Squidex.Infrastructure.CQRS.Events
{
public sealed class RetrySubscription : IEventSubscription, IEventSubscriber
{
private readonly SingleThreadedDispatcher dispatcher = new SingleThreadedDispatcher();
private readonly CancellationTokenSource disposeCts = new CancellationTokenSource();
private readonly SingleThreadedDispatcher dispatcher = new SingleThreadedDispatcher(10);
private readonly CancellationTokenSource timerCts = new CancellationTokenSource();
private readonly RetryWindow retryWindow = new RetryWindow(TimeSpan.FromMinutes(5), 5);
private readonly IEventStore eventStore;
private readonly IEventSubscriber eventSubscriber;
@ -43,7 +43,10 @@ namespace Squidex.Infrastructure.CQRS.Events
private void Subscribe()
{
currentSubscription = eventStore.CreateSubscription(this, streamFilter, position);
if (currentSubscription == null)
{
currentSubscription = eventStore.CreateSubscription(this, streamFilter, position);
}
}
private void Unsubscribe()
@ -80,7 +83,7 @@ namespace Squidex.Infrastructure.CQRS.Events
if (retryWindow.CanRetryAfterFailure())
{
Task.Delay(ReconnectWaitMs, disposeCts.Token).ContinueWith(t =>
Task.Delay(ReconnectWaitMs, timerCts.Token).ContinueWith(t =>
{
dispatcher.DispatchAsync(() => Subscribe());
}).Forget();
@ -112,7 +115,7 @@ namespace Squidex.Infrastructure.CQRS.Events
await dispatcher.DispatchAsync(() => Unsubscribe());
await dispatcher.StopAndWaitAsync();
disposeCts.Cancel();
timerCts.Cancel();
}
}
}

40
src/Squidex.Infrastructure/Tasks/SingleThreadedDispatcher.cs

@ -17,18 +17,54 @@ namespace Squidex.Infrastructure.Tasks
private readonly ActionBlock<Func<Task>> block;
private bool isStopped;
public SingleThreadedDispatcher(int capacity = 10)
public SingleThreadedDispatcher(int capacity = 1, TaskScheduler scheduler = null)
{
var options = new ExecutionDataflowBlockOptions
{
BoundedCapacity = capacity,
MaxMessagesPerTask = -1,
MaxDegreeOfParallelism = 1,
BoundedCapacity = capacity
TaskScheduler = scheduler ?? TaskScheduler.Default
};
block = new ActionBlock<Func<Task>>(Handle, options);
}
public Task DispatchAndUnwrapAsync(Func<Task> action)
{
Guard.NotNull(action, nameof(action));
var cts = new TaskCompletionSource<bool>();
block.SendAsync(async () =>
{
try
{
await action();
cts.SetResult(true);
}
catch (Exception ex)
{
cts.TrySetException(ex);
}
});
return cts.Task;
}
public Task DispatchAndUnwrapAsync(Action action)
{
Guard.NotNull(action, nameof(action));
return DispatchAndUnwrapAsync(() =>
{
action();
return TaskHelper.Done;
});
}
public Task DispatchAsync(Func<Task> action)
{
Guard.NotNull(action, nameof(action));

8
tests/Squidex.Infrastructure.Tests/CQRS/Events/RetrySubscriptionTests.cs

@ -26,20 +26,22 @@ namespace Squidex.Infrastructure.CQRS.Events
{
A.CallTo(() => eventStore.CreateSubscription(A<IEventSubscriber>.Ignored, A<string>.Ignored, A<string>.Ignored)).Returns(eventSubscription);
sut = new RetrySubscription(eventStore, eventSubscriber, streamFilter, null) { ReconnectWaitMs = 0 };
sut = new RetrySubscription(eventStore, eventSubscriber, streamFilter, null) { ReconnectWaitMs = 100 };
sutSubscriber = sut;
}
[Fact]
public void Should_subscribe_after_constructor()
public async Task Should_subscribe_after_constructor()
{
await sut.StopAsync();
A.CallTo(() => eventStore.CreateSubscription(sut, streamFilter, null))
.MustHaveHappened();
}
[Fact]
public async Task Should_reopen_subscription_when_exception_is_retrieved()
public async Task Should_reopen_subscription_once_when_exception_is_retrieved()
{
await OnErrorAsync(eventSubscription, new InvalidOperationException());

63
tests/Squidex.Infrastructure.Tests/Tasks/SingleThreadedDispatcherTests.cs

@ -18,14 +18,19 @@ namespace Squidex.Infrastructure.Tasks
private readonly SingleThreadedDispatcher sut = new SingleThreadedDispatcher();
[Fact]
public async Task Should_handle_messages_sequentially()
public async Task Should_handle_async_messages_sequentially()
{
var source = Enumerable.Range(1, 100);
var target = new List<int>();
foreach (var item in source)
{
sut.DispatchAsync(() => target.Add(item)).Forget();
sut.DispatchAsync(() =>
{
target.Add(item);
return TaskHelper.Done;
}).Forget();
}
await sut.StopAndWaitAsync();
@ -33,58 +38,20 @@ namespace Squidex.Infrastructure.Tasks
Assert.Equal(source, target);
}
/*
[Fact]
public async Task Should_raise_error_event_when_event_handling_failed()
public async Task Should_handle_sync_messages_sequentially()
{
sut.Tell(new FailedMessage());
sut.Tell(new SuccessMessage { Counter = 2 });
sut.Tell(new SuccessMessage { Counter = 3 });
await sut.StopAsync();
Assert.True(sut.Invokes[0] is InvalidOperationException);
sut.Invokes.Skip(1).ShouldBeEquivalentTo(new List<object>
{
new SuccessMessage { Counter = 2 },
new SuccessMessage { Counter = 3 },
true
});
}
[Fact]
public async Task Should_not_handle_messages_after_stop()
{
sut.Tell(new SuccessMessage { Counter = 1 });
await sut.StopAsync();
sut.Tell(new SuccessMessage { Counter = 2 });
sut.Tell(new SuccessMessage { Counter = 3 });
sut.Tell(new InvalidOperationException());
var source = Enumerable.Range(1, 100);
var target = new List<int>();
sut.Invokes.ShouldBeEquivalentTo(new List<object>
foreach (var item in source)
{
new SuccessMessage { Counter = 1 },
true
});
}
[Fact]
public void Should_call_stop_on_dispose()
{
sut.Tell(new SuccessMessage { Counter = 1 });
sut.DispatchAsync(() => target.Add(item)).Forget();
}
sut.Dispose();
await sut.StopAndWaitAsync();
sut.Invokes.ShouldBeEquivalentTo(new List<object>
{
new SuccessMessage { Counter = 1 },
true
});
Assert.Equal(source, target);
}
*/
}
}

Loading…
Cancel
Save