Browse Source

Closes #88

pull/95/head
Sebastian Stehle 9 years ago
parent
commit
85bcca1b9a
  1. 5
      src/Squidex.Infrastructure.GetEventStore/CQRS/Events/Formatter.cs
  2. 190
      src/Squidex.Infrastructure.GetEventStore/CQRS/Events/GetEventStoreSubscription.cs
  3. 4
      src/Squidex.Infrastructure.MongoDb/CQRS/Events/MongoEventConsumerInfoRepository.cs
  4. 8
      src/Squidex/app/features/administration/pages/event-consumers/event-consumers-page.component.html
  5. 23
      src/Squidex/app/features/administration/pages/event-consumers/event-consumers-page.component.ts
  6. 2
      src/Squidex/appsettings.json

5
src/Squidex.Infrastructure.GetEventStore/CQRS/Events/Formatter.cs

@ -23,7 +23,10 @@ namespace Squidex.Infrastructure.CQRS.Events
var eventData = new EventData { Type = @event.EventType, EventId = @event.EventId, Payload = body, Metadata = meta };
return new StoredEvent(resolvedEvent.OriginalEventNumber.ToString(), resolvedEvent.Event.EventNumber, eventData);
return new StoredEvent(
resolvedEvent.OriginalEventNumber.ToString(),
resolvedEvent.Event.EventNumber,
eventData);
}
public static EventStoreData Write(EventData eventData)

190
src/Squidex.Infrastructure.GetEventStore/CQRS/Events/GetEventStoreSubscription.cs

@ -8,9 +8,11 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
using EventStore.ClientAPI;
using EventStore.ClientAPI.Exceptions;
@ -22,21 +24,27 @@ namespace Squidex.Infrastructure.CQRS.Events
{
internal sealed class EventStoreSubscription : DisposableObjectBase, IEventSubscription
{
private const int ReconnectWindowMax = 5;
private const int ReconnectWaitMs = 1000;
private static readonly TimeSpan TimeBetweenReconnects = TimeSpan.FromMinutes(5);
private static readonly ConcurrentDictionary<string, bool> subscriptionsCreated = new ConcurrentDictionary<string, bool>();
private readonly IEventStoreConnection connection;
private readonly string position;
private readonly string streamFilter;
private readonly string streamName;
private readonly string prefix;
private readonly string projectionHost;
private readonly ReaderWriterLockSlim connectionLock = new ReaderWriterLockSlim();
private readonly Queue<DateTime> reconnectTimes = new Queue<DateTime>();
private readonly CancellationTokenSource disposeToken = new CancellationTokenSource();
private Func<StoredEvent, Task> publishNext;
private Func<Exception, Task> publishError;
private EventStoreCatchUpSubscription internalSubscription;
public bool IsDropped { get; private set; }
private long? position;
public EventStoreSubscription(IEventStoreConnection connection, string streamFilter, string position, string prefix, string projectionHost)
{
this.prefix = prefix;
this.position = position;
this.position = ParsePosition(position);
this.connection = connection;
this.streamFilter = streamFilter;
this.projectionHost = projectionHost;
@ -48,7 +56,19 @@ namespace Squidex.Infrastructure.CQRS.Events
{
if (disposing)
{
disposeToken.Cancel();
try
{
connectionLock.EnterWriteLock();
internalSubscription?.Stop();
internalSubscription = null;
}
finally
{
connectionLock.ExitWriteLock();
}
}
}
@ -56,40 +76,173 @@ namespace Squidex.Infrastructure.CQRS.Events
{
Guard.NotNull(onNext, nameof(onNext));
if (internalSubscription != null)
if (publishNext != null)
{
throw new InvalidOperationException("An handler has already been registered.");
}
publishNext = onNext;
publishError = onError;
await CreateProjectionAsync();
long? eventStorePosition = null;
try
{
connectionLock.EnterWriteLock();
internalSubscription = SubscribeToEventStore();
}
finally
{
connectionLock.ExitWriteLock();
}
}
private EventStoreCatchUpSubscription SubscribeToEventStore()
{
return connection.SubscribeToStreamFrom(streamName, position, CatchUpSubscriptionSettings.Default, HandleEvent, null, HandleError);
}
if (long.TryParse(position, out var parsedPosition))
private void HandleEvent(EventStoreCatchUpSubscription subscription, ResolvedEvent resolved)
{
if (!CanHandleSubscriptionEvent(subscription))
{
eventStorePosition = parsedPosition;
return;
}
internalSubscription = connection.SubscribeToStreamFrom(streamName, eventStorePosition, CatchUpSubscriptionSettings.Default,
(subscription, resolved) =>
try
{
connectionLock.EnterReadLock();
if (CanHandleSubscriptionEvent(subscription))
{
var storedEvent = Formatter.Read(resolved);
onNext(storedEvent).Wait();
}, subscriptionDropped: (subscription, reason, ex) =>
PublishAsync(storedEvent).Wait();
position = resolved.OriginalEventNumber;
}
}
finally
{
connectionLock.ExitReadLock();
}
}
private void HandleError(EventStoreCatchUpSubscription subscription, SubscriptionDropReason reason, Exception ex)
{
if (!CanHandleSubscriptionEvent(subscription))
{
return;
}
try
{
connectionLock.EnterUpgradeableReadLock();
if (CanHandleSubscriptionEvent(subscription))
{
if (reason == SubscriptionDropReason.ConnectionClosed)
{
var utcNow = DateTime.UtcNow;
if (CanReconnect(utcNow))
{
RegisterReconnectTime(utcNow);
try
{
connectionLock.EnterWriteLock();
internalSubscription.Stop();
internalSubscription = null;
internalSubscription = SubscribeToEventStore();
}
finally
{
connectionLock.ExitWriteLock();
}
DelayForReconnect().Wait();
if (!CanHandleSubscriptionEvent(subscription))
{
return;
}
try
{
connectionLock.EnterWriteLock();
if (CanHandleSubscriptionEvent(subscription))
{
internalSubscription = SubscribeToEventStore();
}
}
finally
{
if (reason != SubscriptionDropReason.UserInitiated &&
reason != SubscriptionDropReason.ConnectionClosed)
connectionLock.ExitWriteLock();
}
return;
}
}
if (reason != SubscriptionDropReason.UserInitiated && reason != SubscriptionDropReason.EventHandlerException)
{
var exception = ex ?? new ConnectionClosedException($"Subscription closed with reason {reason}.");
onError?.Invoke(exception);
publishError?.Invoke(exception);
}
else
}
}
finally
{
IsDropped = true;
connectionLock.ExitUpgradeableReadLock();
}
}
private bool CanHandleSubscriptionEvent(EventStoreCatchUpSubscription subscription)
{
return !disposeToken.IsCancellationRequested && subscription == internalSubscription;
}
private bool CanReconnect(DateTime utcNow)
{
return reconnectTimes.Count < ReconnectWindowMax && (reconnectTimes.Count == 0 || (utcNow - reconnectTimes.Peek()) > TimeBetweenReconnects);
}
private async Task PublishAsync(StoredEvent storedEvent)
{
await publishNext(storedEvent).ConfigureAwait(false);
}
private static long? ParsePosition(string position)
{
return long.TryParse(position, out var parsedPosition) ? (long?)parsedPosition : null;
}
private void RegisterReconnectTime(DateTime utcNow)
{
reconnectTimes.Enqueue(utcNow);
while (reconnectTimes.Count >= ReconnectWindowMax)
{
reconnectTimes.Dequeue();
}
}
private async Task DelayForReconnect()
{
try
{
await Task.Delay(ReconnectWaitMs, disposeToken.Token).ConfigureAwait(false);
}
catch (TaskCanceledException)
{
// Just ignore.
}
});
}
private async Task CreateProjectionAsync()
@ -135,6 +288,7 @@ namespace Squidex.Infrastructure.CQRS.Events
new ProjectionsManager(
connection.Settings.Log, endpoint,
connection.Settings.OperationTimeout);
return projectionsManager;
}
}

4
src/Squidex.Infrastructure.MongoDb/CQRS/Events/MongoEventConsumerInfoRepository.cs

@ -72,7 +72,7 @@ namespace Squidex.Infrastructure.CQRS.Events
{
var filter = Filter.Eq(NameField, consumerName);
return Collection.UpdateOneAsync(filter, Update.Unset(IsStoppedField));
return Collection.UpdateOneAsync(filter, Update.Unset(IsStoppedField).Unset(ErrorField));
}
public Task StopAsync(string consumerName, string error = null)
@ -86,7 +86,7 @@ namespace Squidex.Infrastructure.CQRS.Events
{
var filter = Filter.Eq(NameField, consumerName);
return Collection.UpdateOneAsync(filter, Update.Set(IsResettingField, true));
return Collection.UpdateOneAsync(filter, Update.Set(IsResettingField, true).Unset(ErrorField));
}
public Task SetPositionAsync(string consumerName, string position, bool reset)

8
src/Squidex/app/features/administration/pages/event-consumers/event-consumers-page.component.html

@ -3,6 +3,14 @@
<sqx-panel theme="light" panelWidth="50rem">
<div class="panel-header">
<div class="panel-title-row">
<div class="float-right">
<button class="btn btn-link btn-decent" (click)="load(true)" title="Refresh Event Consumers (CTRL + SHIFT + R)">
<i class="icon-reset"></i> Refresh
</button>
<sqx-shortcut keys="ctrl+shift+r" (trigger)="load(true)"></sqx-shortcut>
</div>
<h3 class="panel-title">Event Consumers</h3>
</div>

23
src/Squidex/app/features/administration/pages/event-consumers/event-consumers-page.component.ts

@ -40,11 +40,11 @@ export class EventConsumersPageComponent extends ComponentBase implements OnInit
}
public ngOnInit() {
this.load(false, true);
this.subscription =
Observable.timer(0, 4000)
.switchMap(() => this.eventConsumersService.getEventConsumers())
.subscribe(dtos => {
this.eventConsumers = ImmutableArray.of(dtos);
Observable.timer(4000, 4000).subscribe(() => {
this.load();
});
}
@ -52,6 +52,21 @@ export class EventConsumersPageComponent extends ComponentBase implements OnInit
this.subscription.unsubscribe();
}
public load(showInfo = false, showError = false) {
this.eventConsumersService.getEventConsumers()
.subscribe(dtos => {
this.eventConsumers = ImmutableArray.of(dtos);
if (showInfo) {
this.notifyInfo('Event Consumers reloaded.');
}
}, error => {
if (showError) {
this.notifyError(error);
}
});
}
public start(consumer: EventConsumerDto) {
this.eventConsumersService.startEventConsumer(consumer.name)
.subscribe(() => {

2
src/Squidex/appsettings.json

@ -89,7 +89,7 @@
*
* Read Mode: http://docs.geteventstore.com/dotnet-api/4.0.0/connecting-to-a-server/
*/
"configuration": "ConnectTo=tcp://admin:changeit@localhost:1113; HeartBeatTimeout=500",
"configuration": "ConnectTo=tcp://admin:changeit@localhost:1113; HeartBeatTimeout=500; MaxReconnections=-1",
/*
* The host name of your EventStore where projection requests will be sent to.
*/

Loading…
Cancel
Save