mirror of https://github.com/Squidex/squidex.git
34 changed files with 500 additions and 487 deletions
@ -1,141 +0,0 @@ |
|||
// ==========================================================================
|
|||
// PollingSubscription.cs
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex Group
|
|||
// All rights reserved.
|
|||
// ==========================================================================
|
|||
|
|||
using System; |
|||
using System.Text.RegularExpressions; |
|||
using System.Threading; |
|||
using System.Threading.Tasks; |
|||
using Squidex.Infrastructure.Actors; |
|||
using Squidex.Infrastructure.Tasks; |
|||
|
|||
namespace Squidex.Infrastructure.CQRS.Events |
|||
{ |
|||
public sealed class PollingSubscription : Actor, IEventSubscription |
|||
{ |
|||
private readonly IEventNotifier notifier; |
|||
private readonly MongoEventStore store; |
|||
private readonly CancellationTokenSource disposeToken = new CancellationTokenSource(); |
|||
private readonly Regex streamRegex; |
|||
private readonly string streamFilter; |
|||
private readonly IEventSubscriber subscriber; |
|||
private string position; |
|||
private bool isPolling; |
|||
private IDisposable notification; |
|||
|
|||
private sealed class Connect |
|||
{ |
|||
} |
|||
|
|||
private sealed class StartPoll |
|||
{ |
|||
} |
|||
|
|||
private sealed class StopPoll |
|||
{ |
|||
} |
|||
|
|||
public PollingSubscription(MongoEventStore store, IEventNotifier notifier, IEventSubscriber subscriber, string streamFilter, string position) |
|||
{ |
|||
this.notifier = notifier; |
|||
this.position = position; |
|||
this.store = store; |
|||
this.streamFilter = streamFilter; |
|||
this.subscriber = subscriber; |
|||
|
|||
streamRegex = new Regex(streamFilter); |
|||
|
|||
DispatchAsync(new Connect()).Forget(); |
|||
} |
|||
|
|||
public Task StopAsync() |
|||
{ |
|||
return StopAndWaitAsync(); |
|||
} |
|||
|
|||
protected override Task OnStop() |
|||
{ |
|||
disposeToken?.Cancel(); |
|||
|
|||
notification?.Dispose(); |
|||
|
|||
return TaskHelper.Done; |
|||
} |
|||
|
|||
protected override async Task OnError(Exception exception) |
|||
{ |
|||
await subscriber.OnErrorAsync(this, exception); |
|||
|
|||
await StopAsync(); |
|||
} |
|||
|
|||
protected override async Task OnMessage(object message) |
|||
{ |
|||
switch (message) |
|||
{ |
|||
case Connect connect: |
|||
{ |
|||
notification = notifier.Subscribe(streamName => |
|||
{ |
|||
if (streamRegex.IsMatch(streamName)) |
|||
{ |
|||
DispatchAsync(new StartPoll()).Forget(); |
|||
} |
|||
}); |
|||
|
|||
DispatchAsync(new StartPoll()).Forget(); |
|||
|
|||
break; |
|||
} |
|||
|
|||
case StartPoll poll when !isPolling: |
|||
{ |
|||
isPolling = true; |
|||
|
|||
PollAsync().Forget(); |
|||
|
|||
break; |
|||
} |
|||
|
|||
case StopPoll poll when isPolling: |
|||
{ |
|||
isPolling = false; |
|||
|
|||
Task.Delay(5000).ContinueWith(t => DispatchAsync(new StartPoll())).Forget(); |
|||
|
|||
break; |
|||
} |
|||
|
|||
case StoredEvent storedEvent: |
|||
{ |
|||
await subscriber.OnEventAsync(this, storedEvent); |
|||
|
|||
position = storedEvent.EventPosition; |
|||
|
|||
break; |
|||
} |
|||
} |
|||
} |
|||
|
|||
private async Task PollAsync() |
|||
{ |
|||
try |
|||
{ |
|||
await store.GetEventsAsync(DispatchAsync, disposeToken.Token, streamFilter, position); |
|||
|
|||
await DispatchAsync(new StopPoll()); |
|||
} |
|||
catch (Exception ex) |
|||
{ |
|||
if (!ex.Is<OperationCanceledException>()) |
|||
{ |
|||
await FailAsync(ex); |
|||
} |
|||
} |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,82 @@ |
|||
// ==========================================================================
|
|||
// PollingSubscription.cs
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex Group
|
|||
// All rights reserved.
|
|||
// ==========================================================================
|
|||
|
|||
using System; |
|||
using System.Text.RegularExpressions; |
|||
using System.Threading.Tasks; |
|||
using Squidex.Infrastructure.Timers; |
|||
|
|||
namespace Squidex.Infrastructure.CQRS.Events |
|||
{ |
|||
public sealed class PollingSubscription : IEventSubscription |
|||
{ |
|||
private readonly IEventNotifier eventNotifier; |
|||
private readonly IEventStore eventStore; |
|||
private readonly IEventSubscriber eventSubscriber; |
|||
private readonly IDisposable notification; |
|||
private readonly CompletionTimer timer; |
|||
private readonly Regex streamRegex; |
|||
private readonly string streamFilter; |
|||
private string position; |
|||
|
|||
public PollingSubscription( |
|||
IEventStore eventStore, |
|||
IEventNotifier eventNotifier, |
|||
IEventSubscriber eventSubscriber, |
|||
string streamFilter, |
|||
string position) |
|||
{ |
|||
Guard.NotNull(eventStore, nameof(eventStore)); |
|||
Guard.NotNull(eventNotifier, nameof(eventNotifier)); |
|||
Guard.NotNull(eventSubscriber, nameof(eventSubscriber)); |
|||
|
|||
this.position = position; |
|||
this.eventNotifier = eventNotifier; |
|||
this.eventStore = eventStore; |
|||
this.eventSubscriber = eventSubscriber; |
|||
this.streamFilter = streamFilter; |
|||
|
|||
streamRegex = new Regex(streamFilter); |
|||
|
|||
timer = new CompletionTimer(5000, async ct => |
|||
{ |
|||
try |
|||
{ |
|||
await eventStore.GetEventsAsync(async storedEvent => |
|||
{ |
|||
await eventSubscriber.OnEventAsync(this, storedEvent); |
|||
|
|||
position = storedEvent.EventPosition; |
|||
}, ct, streamFilter, position); |
|||
} |
|||
catch (Exception ex) |
|||
{ |
|||
if (!ex.Is<OperationCanceledException>()) |
|||
{ |
|||
await eventSubscriber.OnErrorAsync(this, ex); |
|||
} |
|||
} |
|||
}); |
|||
|
|||
notification = eventNotifier.Subscribe(streamName => |
|||
{ |
|||
if (streamRegex.IsMatch(streamName)) |
|||
{ |
|||
timer.SkipCurrentDelay(); |
|||
} |
|||
}); |
|||
} |
|||
|
|||
public Task StopAsync() |
|||
{ |
|||
notification?.Dispose(); |
|||
|
|||
return timer.StopAsync(); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,48 @@ |
|||
// ==========================================================================
|
|||
// RetryWindow.cs
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex Group
|
|||
// All rights reserved.
|
|||
// ==========================================================================
|
|||
|
|||
using System; |
|||
using System.Collections.Generic; |
|||
|
|||
namespace Squidex.Infrastructure |
|||
{ |
|||
public sealed class RetryWindow |
|||
{ |
|||
private readonly TimeSpan windowDuration; |
|||
private readonly int windowSize; |
|||
private readonly Queue<DateTime> retries = new Queue<DateTime>(); |
|||
|
|||
public RetryWindow(TimeSpan windowDuration, int windowSize) |
|||
{ |
|||
this.windowDuration = windowDuration; |
|||
this.windowSize = windowSize + 1; |
|||
} |
|||
|
|||
public void Reset() |
|||
{ |
|||
retries.Clear(); |
|||
} |
|||
|
|||
public bool CanRetryAfterFailure() |
|||
{ |
|||
return CanRetryAfterFailure(DateTime.UtcNow); |
|||
} |
|||
|
|||
public bool CanRetryAfterFailure(DateTime utcNow) |
|||
{ |
|||
retries.Enqueue(utcNow); |
|||
|
|||
while (retries.Count > windowSize) |
|||
{ |
|||
retries.Dequeue(); |
|||
} |
|||
|
|||
return retries.Count < windowSize || (retries.Count > 0 && (utcNow - retries.Peek()) > windowDuration); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,90 @@ |
|||
// ==========================================================================
|
|||
// RetryWindowTests.cs
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex Group
|
|||
// All rights reserved.
|
|||
// ==========================================================================
|
|||
|
|||
using System; |
|||
using Xunit; |
|||
|
|||
namespace Squidex.Infrastructure |
|||
{ |
|||
public class RetryWindowTests |
|||
{ |
|||
private const int WindowSize = 5; |
|||
|
|||
[Fact] |
|||
public void Should_allow_to_retry_after_reset() |
|||
{ |
|||
var sut = new RetryWindow(TimeSpan.FromSeconds(1), WindowSize); |
|||
|
|||
for (var i = 0; i < WindowSize * 2; i++) |
|||
{ |
|||
sut.CanRetryAfterFailure(); |
|||
} |
|||
|
|||
sut.Reset(); |
|||
|
|||
Assert.True(sut.CanRetryAfterFailure()); |
|||
} |
|||
|
|||
[Theory] |
|||
[InlineData(6)] |
|||
[InlineData(7)] |
|||
public void Should_not_allow_to_retry_after_many_errors(int errors) |
|||
{ |
|||
var sut = new RetryWindow(TimeSpan.FromSeconds(1), WindowSize); |
|||
var now = DateTime.UtcNow; |
|||
|
|||
for (var i = 0; i < WindowSize; i++) |
|||
{ |
|||
Assert.True(sut.CanRetryAfterFailure(now)); |
|||
} |
|||
|
|||
var remaining = errors - WindowSize; |
|||
|
|||
for (var i = 0; i < remaining; i++) |
|||
{ |
|||
Assert.False(sut.CanRetryAfterFailure(now)); |
|||
} |
|||
} |
|||
|
|||
[Theory] |
|||
[InlineData(1)] |
|||
[InlineData(2)] |
|||
[InlineData(3)] |
|||
[InlineData(4)] |
|||
public void Should_allow_to_retry_after_few_errors(int errors) |
|||
{ |
|||
var sut = new RetryWindow(TimeSpan.FromSeconds(1), WindowSize); |
|||
var now = DateTime.UtcNow; |
|||
|
|||
for (var i = 0; i < errors; i++) |
|||
{ |
|||
Assert.True(sut.CanRetryAfterFailure(now)); |
|||
} |
|||
} |
|||
|
|||
[Theory] |
|||
[InlineData(1)] |
|||
[InlineData(2)] |
|||
[InlineData(3)] |
|||
[InlineData(4)] |
|||
[InlineData(5)] |
|||
[InlineData(6)] |
|||
[InlineData(7)] |
|||
[InlineData(8)] |
|||
public void Should_allow_to_retry_after_few_errors_in_window(int errors) |
|||
{ |
|||
var sut = new RetryWindow(TimeSpan.FromSeconds(1), WindowSize); |
|||
var now = DateTime.UtcNow; |
|||
|
|||
for (var i = 0; i < errors; i++) |
|||
{ |
|||
Assert.True(sut.CanRetryAfterFailure(now.AddMilliseconds(i * 300))); |
|||
} |
|||
} |
|||
} |
|||
} |
|||
Loading…
Reference in new issue