From 85bcca1b9a4490f53e93653fa99cff6ebfb9c1b5 Mon Sep 17 00:00:00 2001 From: Sebastian Stehle Date: Mon, 31 Jul 2017 21:51:44 +0200 Subject: [PATCH] Closes #88 --- .../CQRS/Events/Formatter.cs | 5 +- .../CQRS/Events/GetEventStoreSubscription.cs | 194 ++++++++++++++++-- .../MongoEventConsumerInfoRepository.cs | 4 +- .../event-consumers-page.component.html | 8 + .../event-consumers-page.component.ts | 25 ++- src/Squidex/appsettings.json | 2 +- 6 files changed, 209 insertions(+), 29 deletions(-) diff --git a/src/Squidex.Infrastructure.GetEventStore/CQRS/Events/Formatter.cs b/src/Squidex.Infrastructure.GetEventStore/CQRS/Events/Formatter.cs index 420526114..186aa0444 100644 --- a/src/Squidex.Infrastructure.GetEventStore/CQRS/Events/Formatter.cs +++ b/src/Squidex.Infrastructure.GetEventStore/CQRS/Events/Formatter.cs @@ -23,7 +23,10 @@ namespace Squidex.Infrastructure.CQRS.Events var eventData = new EventData { Type = @event.EventType, EventId = @event.EventId, Payload = body, Metadata = meta }; - return new StoredEvent(resolvedEvent.OriginalEventNumber.ToString(), resolvedEvent.Event.EventNumber, eventData); + return new StoredEvent( + resolvedEvent.OriginalEventNumber.ToString(), + resolvedEvent.Event.EventNumber, + eventData); } public static EventStoreData Write(EventData eventData) diff --git a/src/Squidex.Infrastructure.GetEventStore/CQRS/Events/GetEventStoreSubscription.cs b/src/Squidex.Infrastructure.GetEventStore/CQRS/Events/GetEventStoreSubscription.cs index 7dffe6da7..ad9e07690 100644 --- a/src/Squidex.Infrastructure.GetEventStore/CQRS/Events/GetEventStoreSubscription.cs +++ b/src/Squidex.Infrastructure.GetEventStore/CQRS/Events/GetEventStoreSubscription.cs @@ -8,9 +8,11 @@ using System; using System.Collections.Concurrent; +using System.Collections.Generic; using System.Linq; using System.Net; using System.Net.Sockets; +using System.Threading; using System.Threading.Tasks; using EventStore.ClientAPI; using EventStore.ClientAPI.Exceptions; @@ -22,21 +24,27 @@ namespace Squidex.Infrastructure.CQRS.Events { internal sealed class EventStoreSubscription : DisposableObjectBase, IEventSubscription { + private const int ReconnectWindowMax = 5; + private const int ReconnectWaitMs = 1000; + private static readonly TimeSpan TimeBetweenReconnects = TimeSpan.FromMinutes(5); private static readonly ConcurrentDictionary subscriptionsCreated = new ConcurrentDictionary(); private readonly IEventStoreConnection connection; - private readonly string position; private readonly string streamFilter; private readonly string streamName; private readonly string prefix; private readonly string projectionHost; + private readonly ReaderWriterLockSlim connectionLock = new ReaderWriterLockSlim(); + private readonly Queue reconnectTimes = new Queue(); + private readonly CancellationTokenSource disposeToken = new CancellationTokenSource(); + private Func publishNext; + private Func publishError; private EventStoreCatchUpSubscription internalSubscription; - - public bool IsDropped { get; private set; } + private long? position; public EventStoreSubscription(IEventStoreConnection connection, string streamFilter, string position, string prefix, string projectionHost) { this.prefix = prefix; - this.position = position; + this.position = ParsePosition(position); this.connection = connection; this.streamFilter = streamFilter; this.projectionHost = projectionHost; @@ -48,7 +56,19 @@ namespace Squidex.Infrastructure.CQRS.Events { if (disposing) { - internalSubscription?.Stop(); + disposeToken.Cancel(); + + try + { + connectionLock.EnterWriteLock(); + + internalSubscription?.Stop(); + internalSubscription = null; + } + finally + { + connectionLock.ExitWriteLock(); + } } } @@ -56,40 +76,173 @@ namespace Squidex.Infrastructure.CQRS.Events { Guard.NotNull(onNext, nameof(onNext)); - if (internalSubscription != null) + if (publishNext != null) { throw new InvalidOperationException("An handler has already been registered."); } + publishNext = onNext; + publishError = onError; + await CreateProjectionAsync(); - long? eventStorePosition = null; + try + { + connectionLock.EnterWriteLock(); - if (long.TryParse(position, out var parsedPosition)) + internalSubscription = SubscribeToEventStore(); + } + finally + { + connectionLock.ExitWriteLock(); + } + } + + private EventStoreCatchUpSubscription SubscribeToEventStore() + { + return connection.SubscribeToStreamFrom(streamName, position, CatchUpSubscriptionSettings.Default, HandleEvent, null, HandleError); + } + + private void HandleEvent(EventStoreCatchUpSubscription subscription, ResolvedEvent resolved) + { + if (!CanHandleSubscriptionEvent(subscription)) { - eventStorePosition = parsedPosition; + return; } - internalSubscription = connection.SubscribeToStreamFrom(streamName, eventStorePosition, CatchUpSubscriptionSettings.Default, - (subscription, resolved) => + try + { + connectionLock.EnterReadLock(); + + if (CanHandleSubscriptionEvent(subscription)) { var storedEvent = Formatter.Read(resolved); - onNext(storedEvent).Wait(); - }, subscriptionDropped: (subscription, reason, ex) => + PublishAsync(storedEvent).Wait(); + + position = resolved.OriginalEventNumber; + } + } + finally + { + connectionLock.ExitReadLock(); + } + } + + private void HandleError(EventStoreCatchUpSubscription subscription, SubscriptionDropReason reason, Exception ex) + { + if (!CanHandleSubscriptionEvent(subscription)) + { + return; + } + + try + { + connectionLock.EnterUpgradeableReadLock(); + + if (CanHandleSubscriptionEvent(subscription)) { - if (reason != SubscriptionDropReason.UserInitiated && - reason != SubscriptionDropReason.ConnectionClosed) + if (reason == SubscriptionDropReason.ConnectionClosed) { - var exception = ex ?? new ConnectionClosedException($"Subscription closed with reason {reason}."); + var utcNow = DateTime.UtcNow; + + if (CanReconnect(utcNow)) + { + RegisterReconnectTime(utcNow); + + try + { + connectionLock.EnterWriteLock(); + + internalSubscription.Stop(); + internalSubscription = null; - onError?.Invoke(exception); + internalSubscription = SubscribeToEventStore(); + } + finally + { + connectionLock.ExitWriteLock(); + } + + DelayForReconnect().Wait(); + + if (!CanHandleSubscriptionEvent(subscription)) + { + return; + } + + try + { + connectionLock.EnterWriteLock(); + + if (CanHandleSubscriptionEvent(subscription)) + { + internalSubscription = SubscribeToEventStore(); + } + } + finally + { + connectionLock.ExitWriteLock(); + } + + return; + } } - else + + if (reason != SubscriptionDropReason.UserInitiated && reason != SubscriptionDropReason.EventHandlerException) { - IsDropped = true; + var exception = ex ?? new ConnectionClosedException($"Subscription closed with reason {reason}."); + + publishError?.Invoke(exception); } - }); + } + } + finally + { + connectionLock.ExitUpgradeableReadLock(); + } + } + + private bool CanHandleSubscriptionEvent(EventStoreCatchUpSubscription subscription) + { + return !disposeToken.IsCancellationRequested && subscription == internalSubscription; + } + + private bool CanReconnect(DateTime utcNow) + { + return reconnectTimes.Count < ReconnectWindowMax && (reconnectTimes.Count == 0 || (utcNow - reconnectTimes.Peek()) > TimeBetweenReconnects); + } + + private async Task PublishAsync(StoredEvent storedEvent) + { + await publishNext(storedEvent).ConfigureAwait(false); + } + + private static long? ParsePosition(string position) + { + return long.TryParse(position, out var parsedPosition) ? (long?)parsedPosition : null; + } + + private void RegisterReconnectTime(DateTime utcNow) + { + reconnectTimes.Enqueue(utcNow); + + while (reconnectTimes.Count >= ReconnectWindowMax) + { + reconnectTimes.Dequeue(); + } + } + + private async Task DelayForReconnect() + { + try + { + await Task.Delay(ReconnectWaitMs, disposeToken.Token).ConfigureAwait(false); + } + catch (TaskCanceledException) + { + // Just ignore. + } } private async Task CreateProjectionAsync() @@ -135,6 +288,7 @@ namespace Squidex.Infrastructure.CQRS.Events new ProjectionsManager( connection.Settings.Log, endpoint, connection.Settings.OperationTimeout); + return projectionsManager; } } diff --git a/src/Squidex.Infrastructure.MongoDb/CQRS/Events/MongoEventConsumerInfoRepository.cs b/src/Squidex.Infrastructure.MongoDb/CQRS/Events/MongoEventConsumerInfoRepository.cs index e18ac09ad..a4679b515 100644 --- a/src/Squidex.Infrastructure.MongoDb/CQRS/Events/MongoEventConsumerInfoRepository.cs +++ b/src/Squidex.Infrastructure.MongoDb/CQRS/Events/MongoEventConsumerInfoRepository.cs @@ -72,7 +72,7 @@ namespace Squidex.Infrastructure.CQRS.Events { var filter = Filter.Eq(NameField, consumerName); - return Collection.UpdateOneAsync(filter, Update.Unset(IsStoppedField)); + return Collection.UpdateOneAsync(filter, Update.Unset(IsStoppedField).Unset(ErrorField)); } public Task StopAsync(string consumerName, string error = null) @@ -86,7 +86,7 @@ namespace Squidex.Infrastructure.CQRS.Events { var filter = Filter.Eq(NameField, consumerName); - return Collection.UpdateOneAsync(filter, Update.Set(IsResettingField, true)); + return Collection.UpdateOneAsync(filter, Update.Set(IsResettingField, true).Unset(ErrorField)); } public Task SetPositionAsync(string consumerName, string position, bool reset) diff --git a/src/Squidex/app/features/administration/pages/event-consumers/event-consumers-page.component.html b/src/Squidex/app/features/administration/pages/event-consumers/event-consumers-page.component.html index 51dc0324c..2fe1a7135 100644 --- a/src/Squidex/app/features/administration/pages/event-consumers/event-consumers-page.component.html +++ b/src/Squidex/app/features/administration/pages/event-consumers/event-consumers-page.component.html @@ -3,6 +3,14 @@
+
+ + + +
+

Event Consumers

diff --git a/src/Squidex/app/features/administration/pages/event-consumers/event-consumers-page.component.ts b/src/Squidex/app/features/administration/pages/event-consumers/event-consumers-page.component.ts index d1d6de52f..01813b2ae 100644 --- a/src/Squidex/app/features/administration/pages/event-consumers/event-consumers-page.component.ts +++ b/src/Squidex/app/features/administration/pages/event-consumers/event-consumers-page.component.ts @@ -40,18 +40,33 @@ export class EventConsumersPageComponent extends ComponentBase implements OnInit } public ngOnInit() { + this.load(false, true); + this.subscription = - Observable.timer(0, 4000) - .switchMap(() => this.eventConsumersService.getEventConsumers()) - .subscribe(dtos => { - this.eventConsumers = ImmutableArray.of(dtos); - }); + Observable.timer(4000, 4000).subscribe(() => { + this.load(); + }); } public ngOnDestroy() { this.subscription.unsubscribe(); } + public load(showInfo = false, showError = false) { + this.eventConsumersService.getEventConsumers() + .subscribe(dtos => { + this.eventConsumers = ImmutableArray.of(dtos); + + if (showInfo) { + this.notifyInfo('Event Consumers reloaded.'); + } + }, error => { + if (showError) { + this.notifyError(error); + } + }); + } + public start(consumer: EventConsumerDto) { this.eventConsumersService.startEventConsumer(consumer.name) .subscribe(() => { diff --git a/src/Squidex/appsettings.json b/src/Squidex/appsettings.json index 99b782c3a..b4962fefa 100644 --- a/src/Squidex/appsettings.json +++ b/src/Squidex/appsettings.json @@ -89,7 +89,7 @@ * * Read Mode: http://docs.geteventstore.com/dotnet-api/4.0.0/connecting-to-a-server/ */ - "configuration": "ConnectTo=tcp://admin:changeit@localhost:1113; HeartBeatTimeout=500", + "configuration": "ConnectTo=tcp://admin:changeit@localhost:1113; HeartBeatTimeout=500; MaxReconnections=-1", /* * The host name of your EventStore where projection requests will be sent to. */