Browse Source

A lot of improvements

pull/131/head
Sebastian Stehle 9 years ago
parent
commit
735ede590d
  1. 4
      src/Squidex.Infrastructure.GetEventStore/CQRS/Events/GetEventStore.cs
  2. 34
      src/Squidex.Infrastructure.GetEventStore/CQRS/Events/GetEventStoreSubscription.cs
  3. 6
      src/Squidex.Infrastructure.MongoDb/CQRS/Events/MongoEventStore.cs
  4. 60
      src/Squidex.Infrastructure.MongoDb/CQRS/Events/PollingSubscription.cs
  5. 1
      src/Squidex.Infrastructure/Actors/Actor.cs
  6. 6
      src/Squidex.Infrastructure/CQRS/Events/Actors/EventConsumerActor.cs
  7. 1
      src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/ReceiveEventMessage.cs
  8. 1
      src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/ResetReceiverMessage.cs
  9. 1
      src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/StartReceiverMessage.cs
  10. 1
      src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/StopReceiverMessage.cs
  11. 5
      src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/SubscribeMessage.cs
  12. 16
      src/Squidex.Infrastructure/CQRS/Events/DefaultEventNotifier.cs
  13. 249
      src/Squidex.Infrastructure/CQRS/Events/EventReceiver.cs
  14. 4
      src/Squidex.Infrastructure/CQRS/Events/IEventNotifier.cs
  15. 2
      src/Squidex.Infrastructure/CQRS/Events/IEventStore.cs
  16. 2
      src/Squidex.Infrastructure/Squidex.Infrastructure.csproj

4
src/Squidex.Infrastructure.GetEventStore/CQRS/Events/GetEventStore.cs

@ -44,9 +44,9 @@ namespace Squidex.Infrastructure.CQRS.Events
}
}
public IEventSubscription CreateSubscription(string streamFilter = null, string position = null)
public IEventSubscription CreateSubscription()
{
return new GetEventStoreSubscription(connection, streamFilter, position, prefix, projectionHost);
return new GetEventStoreSubscription(connection, prefix, projectionHost);
}
public async Task<IReadOnlyList<StoredEvent>> GetEventsAsync(string streamName)

34
src/Squidex.Infrastructure.GetEventStore/CQRS/Events/GetEventStoreSubscription.cs

@ -31,12 +31,12 @@ namespace Squidex.Infrastructure.CQRS.Events
private static readonly TimeSpan TimeBetweenReconnects = TimeSpan.FromMinutes(5);
private static readonly ConcurrentDictionary<string, bool> SubscriptionsCreated = new ConcurrentDictionary<string, bool>();
private readonly IEventStoreConnection connection;
private readonly string streamFilter;
private readonly string streamName;
private readonly string prefix;
private readonly string projectionHost;
private readonly Queue<DateTime> reconnectTimes = new Queue<DateTime>();
private EventStoreCatchUpSubscription subscription;
private string streamFilter;
private string streamName;
private long? position;
private IActor parent;
@ -56,15 +56,11 @@ namespace Squidex.Infrastructure.CQRS.Events
public EventStoreCatchUpSubscription Subscription;
}
public GetEventStoreSubscription(IEventStoreConnection connection, string streamFilter, string position, string prefix, string projectionHost)
public GetEventStoreSubscription(IEventStoreConnection connection, string prefix, string projectionHost)
{
this.prefix = prefix;
this.position = ParsePosition(position);
this.connection = connection;
this.streamFilter = streamFilter;
this.projectionHost = projectionHost;
streamName = $"by-{prefix.Simplify()}-{streamFilter.Simplify()}";
}
protected override Task OnStop()
@ -91,6 +87,10 @@ namespace Squidex.Infrastructure.CQRS.Events
case SubscribeMessage subscribe when parent == null:
{
parent = subscribe.Parent;
position = ParsePosition(subscribe.Position);
streamFilter = subscribe.StreamFilter;
streamName = $"by-{prefix.Simplify()}-{streamFilter.Simplify()}";
await CreateProjectionAsync();
@ -104,10 +104,7 @@ namespace Squidex.Infrastructure.CQRS.Events
if (CanReconnect(DateTime.UtcNow))
{
Task.Delay(ReconnectWaitMs).ContinueWith(t =>
{
SendAsync(new ConnectMessage());
}).Forget();
Task.Delay(ReconnectWaitMs).ContinueWith(t => SendAsync(new ConnectMessage())).Forget();
}
else
{
@ -124,13 +121,16 @@ namespace Squidex.Infrastructure.CQRS.Events
break;
}
case ReceiveESEventMessage receiveEvent when receiveEvent.Subscription == subscription && parent != null:
case ReceiveESEventMessage receiveEvent when parent != null:
{
var storedEvent = Formatter.Read(receiveEvent.Event);
if (receiveEvent.Subscription == subscription)
{
var storedEvent = Formatter.Read(receiveEvent.Event);
await parent.SendAsync(new ReceiveEventMessage { Event = storedEvent });
await parent.SendAsync(new ReceiveEventMessage { Event = storedEvent });
position = receiveEvent.Event.OriginalEventNumber;
position = receiveEvent.Event.OriginalEventNumber;
}
break;
}
@ -144,11 +144,11 @@ namespace Squidex.Infrastructure.CQRS.Events
private void HandleError(EventStoreCatchUpSubscription s, SubscriptionDropReason reason, Exception ex)
{
if (reason == SubscriptionDropReason.ConnectionClosed)
if (reason == SubscriptionDropReason.ConnectionClosed && subscription == s)
{
SendAsync(new ConnectionFailedMessage { Exception = ex });
}
else if (reason != SubscriptionDropReason.UserInitiated)
else if (reason != SubscriptionDropReason.UserInitiated && subscription == s)
{
var exception = ex ?? new ConnectionClosedException($"Subscription closed with reason {reason}.");

6
src/Squidex.Infrastructure.MongoDb/CQRS/Events/MongoEventStore.cs

@ -55,9 +55,9 @@ namespace Squidex.Infrastructure.CQRS.Events
collection.Indexes.CreateOneAsync(Index.Ascending(x => x.EventStream).Descending(x => x.EventStreamOffset), new CreateIndexOptions { Unique = true }));
}
public IEventSubscription CreateSubscription(string streamFilter = null, string position = null)
public IEventSubscription CreateSubscription()
{
return new PollingSubscription(this, notifier, streamFilter, position);
return new PollingSubscription(this, notifier);
}
public async Task<IReadOnlyList<StoredEvent>> GetEventsAsync(string streamName)
@ -144,7 +144,7 @@ namespace Squidex.Infrastructure.CQRS.Events
{
await Collection.InsertOneAsync(commit);
notifier.NotifyEventsStored();
notifier.NotifyEventsStored(streamName);
return;
}

60
src/Squidex.Infrastructure.MongoDb/CQRS/Events/PollingSubscription.cs

@ -7,12 +7,12 @@
// ==========================================================================
using System;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
using Squidex.Infrastructure.Actors;
using Squidex.Infrastructure.CQRS.Events.Actors.Messages;
using Squidex.Infrastructure.Tasks;
using Squidex.Infrastructure.Timers;
#pragma warning disable SA1401 // Fields must be private
@ -22,11 +22,12 @@ namespace Squidex.Infrastructure.CQRS.Events
{
private readonly IEventNotifier eventNotifier;
private readonly MongoEventStore eventStore;
private readonly string streamFilter;
private CancellationTokenSource ct;
private CancellationTokenSource cancelPolling;
private Timer pollTimer;
private Regex streamRegex;
private Guid subscription;
private string streamFilter;
private string position;
private bool isStopped;
private IDisposable pollSubscription;
private IActor parent;
@ -34,17 +35,22 @@ namespace Squidex.Infrastructure.CQRS.Events
{
}
public PollingSubscription(MongoEventStore eventStore, IEventNotifier eventNotifier, string streamFilter, string position)
private sealed class ReceiveMongoEventMessage : IMessage
{
public StoredEvent Event;
public Guid Subscription;
}
public PollingSubscription(MongoEventStore eventStore, IEventNotifier eventNotifier)
{
this.position = position;
this.eventStore = eventStore;
this.eventNotifier = eventNotifier;
this.streamFilter = streamFilter;
}
protected override Task OnStop()
{
ct?.Cancel();
cancelPolling?.Cancel();
pollTimer?.Dispose();
pollSubscription?.Dispose();
@ -71,10 +77,17 @@ namespace Squidex.Infrastructure.CQRS.Events
case SubscribeMessage subscribe when parent == null:
{
parent = subscribe.Parent;
position = subscribe.Position;
streamFilter = subscribe.StreamFilter;
streamRegex = new Regex(streamFilter);
pollSubscription = eventNotifier.Subscribe(() =>
pollSubscription = eventNotifier.Subscribe(streamName =>
{
SendAsync(new PollMessage()).Forget();
if (streamRegex.IsMatch(streamName))
{
SendAsync(new PollMessage()).Forget();
}
});
pollTimer = new Timer(d =>
@ -89,30 +102,41 @@ namespace Squidex.Infrastructure.CQRS.Events
case PollMessage poll when parent != null:
{
ct?.Cancel();
ct = new CancellationTokenSource();
cancelPolling?.Cancel();
cancelPolling = new CancellationTokenSource();
subscription = Guid.NewGuid();
PollAsync().Forget();
PollAsync(subscription, cancelPolling.Token).Forget();
break;
}
case ReceiveEventMessage receiveEvent when parent != null:
case ReceiveMongoEventMessage receiveEvent when parent != null:
{
await parent.SendAsync(receiveEvent);
if (receiveEvent.Subscription == subscription)
{
await parent.SendAsync(new ReceiveEventMessage { Event = receiveEvent.Event });
position = receiveEvent.Event.EventPosition;
position = receiveEvent.Event.EventPosition;
}
break;
}
}
}
private async Task PollAsync()
private async Task PollAsync(Guid subscriptionId, CancellationToken ct)
{
try
{
await eventStore.GetEventsAsync(e => SendAsync(new ReceiveEventMessage { Event = e }), ct.Token, streamFilter, position);
await eventStore.GetEventsAsync(async e =>
{
if (ct.IsCancellationRequested == true)
{
await SendAsync(new ReceiveMongoEventMessage { Event = e, Subscription = subscriptionId });
}
}, ct, streamFilter, position);
}
catch (Exception ex) when (!(ex is OperationCanceledException))
{

1
src/Squidex.Infrastructure/Actors/Actor.cs

@ -7,7 +7,6 @@
// ==========================================================================
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using Squidex.Infrastructure.Tasks;

6
src/Squidex.Infrastructure/CQRS/Events/Actors/EventConsumerActor.cs

@ -13,7 +13,7 @@ using Squidex.Infrastructure.CQRS.Events.Actors.Messages;
using Squidex.Infrastructure.Log;
using Squidex.Infrastructure.Tasks;
namespace Squidex.Infrastructure.CQRS.Events.Receivers
namespace Squidex.Infrastructure.CQRS.Events.Actors
{
public sealed class EventConsumerActor : Actor
{
@ -105,8 +105,8 @@ namespace Squidex.Infrastructure.CQRS.Events.Receivers
position = (await eventConsumerInfoRepository.FindAsync(eventConsumer.Name)).Position;
eventSubscription = eventStore.CreateSubscription(eventConsumer.EventsFilter, position);
eventSubscription.SendAsync(new SubscribeMessage { Parent = this }).Forget();
eventSubscription = eventStore.CreateSubscription();
eventSubscription.SendAsync(new SubscribeMessage { Parent = this, StreamFilter = eventConsumer.EventsFilter, Position = position }).Forget();
}
private async Task StopAsync(Exception exception = null)

1
src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/ReceiveEventMessage.cs

@ -2,6 +2,7 @@
namespace Squidex.Infrastructure.CQRS.Events.Actors.Messages
{
[TypeName(nameof(ReceiveEventMessage))]
public sealed class ReceiveEventMessage : IMessage
{
public StoredEvent Event { get; set; }

1
src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/ResetReceiverMessage.cs

@ -2,6 +2,7 @@
namespace Squidex.Infrastructure.CQRS.Events.Actors.Messages
{
[TypeName(nameof(ResetReceiverMessage))]
public sealed class ResetReceiverMessage : IMessage
{
}

1
src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/StartReceiverMessage.cs

@ -2,6 +2,7 @@
namespace Squidex.Infrastructure.CQRS.Events.Actors.Messages
{
[TypeName(nameof(StartReceiverMessage))]
public sealed class StartReceiverMessage : IMessage
{
}

1
src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/StopReceiverMessage.cs

@ -3,6 +3,7 @@ using Squidex.Infrastructure.Actors;
namespace Squidex.Infrastructure.CQRS.Events.Actors.Messages
{
[TypeName(nameof(StopReceiverMessage))]
public sealed class StopReceiverMessage : IMessage
{
public Exception Exception { get; set; }

5
src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/SubscribeMessage.cs

@ -2,8 +2,13 @@
namespace Squidex.Infrastructure.CQRS.Events.Actors.Messages
{
[TypeName(nameof(SubscribeMessage))]
public sealed class SubscribeMessage : IMessage
{
public string StreamFilter { get; set; }
public string Position { get; set; }
public IActor Parent { get; set; }
}
}

16
src/Squidex.Infrastructure/CQRS/Events/DefaultEventNotifier.cs

@ -14,23 +14,23 @@ namespace Squidex.Infrastructure.CQRS.Events
{
private static readonly string ChannelName = typeof(DefaultEventNotifier).Name;
private readonly IPubSub invalidator;
private readonly IPubSub pubsub;
public DefaultEventNotifier(IPubSub invalidator)
public DefaultEventNotifier(IPubSub pubsub)
{
Guard.NotNull(invalidator, nameof(invalidator));
Guard.NotNull(pubsub, nameof(pubsub));
this.invalidator = invalidator;
this.pubsub = pubsub;
}
public void NotifyEventsStored()
public void NotifyEventsStored(string streamName)
{
invalidator.Publish(ChannelName, string.Empty, true);
pubsub.Publish(ChannelName, streamName, true);
}
public IDisposable Subscribe(Action handler)
public IDisposable Subscribe(Action<string> handler)
{
return invalidator.Subscribe(ChannelName, x => handler());
return pubsub.Subscribe(ChannelName, x => handler?.Invoke(x));
}
}
}

249
src/Squidex.Infrastructure/CQRS/Events/EventReceiver.cs

@ -1,249 +0,0 @@
// ==========================================================================
// EventReceiver.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System;
using System.Threading.Tasks;
using Squidex.Infrastructure.Log;
using Squidex.Infrastructure.Timers;
namespace Squidex.Infrastructure.CQRS.Events
{
public sealed class EventReceiver : DisposableObjectBase
{
private readonly EventDataFormatter formatter;
private readonly IEventStore eventStore;
private readonly IEventConsumerInfoRepository eventConsumerInfoRepository;
private readonly ISemanticLog log;
private IEventSubscription currentSubscription;
private CompletionTimer timer;
public EventReceiver(
EventDataFormatter formatter,
IEventStore eventStore,
IEventConsumerInfoRepository eventConsumerInfoRepository,
ISemanticLog log)
{
Guard.NotNull(log, nameof(log));
Guard.NotNull(formatter, nameof(formatter));
Guard.NotNull(eventStore, nameof(eventStore));
Guard.NotNull(eventConsumerInfoRepository, nameof(eventConsumerInfoRepository));
this.log = log;
this.formatter = formatter;
this.eventStore = eventStore;
this.eventConsumerInfoRepository = eventConsumerInfoRepository;
}
protected override void DisposeObject(bool disposing)
{
if (disposing)
{
try
{
currentSubscription?.Dispose();
}
catch (Exception ex)
{
log.LogWarning(ex, w => w
.WriteProperty("action", "DisposeEventReceiver")
.WriteProperty("state", "Failed"));
}
try
{
timer?.StopAsync().Wait();
}
catch (Exception ex)
{
log.LogWarning(ex, w => w
.WriteProperty("action", "DisposeEventReceiver")
.WriteProperty("state", "Failed"));
}
}
}
public void Refresh()
{
ThrowIfDisposed();
timer?.SkipCurrentDelay();
}
public void Subscribe(IEventConsumer eventConsumer)
{
Guard.NotNull(eventConsumer, nameof(eventConsumer));
ThrowIfDisposed();
if (timer != null)
{
return;
}
var consumerName = eventConsumer.Name;
var consumerStarted = false;
timer = new CompletionTimer(5000, async ct =>
{
if (!consumerStarted)
{
await eventConsumerInfoRepository.CreateAsync(consumerName);
consumerStarted = true;
}
try
{
var status = await eventConsumerInfoRepository.FindAsync(consumerName);
var position = status.Position;
if (status.IsResetting)
{
currentSubscription?.Dispose();
currentSubscription = null;
position = null;
await ResetAsync(eventConsumer);
}
else if (status.IsStopped)
{
currentSubscription?.Dispose();
currentSubscription = null;
return;
}
if (currentSubscription == null)
{
await SubscribeAsync(eventConsumer, position);
}
}
catch (Exception ex)
{
log.LogFatal(ex, w => w.WriteProperty("action", "EventHandlingFailed"));
}
});
}
private async Task SubscribeAsync(IEventConsumer eventConsumer, string position)
{
var consumerName = eventConsumer.Name;
var subscription = eventStore.CreateSubscription(eventConsumer.EventsFilter, position);
await subscription.SubscribeAsync(async storedEvent =>
{
await DispatchConsumerAsync(ParseEvent(storedEvent), eventConsumer, eventConsumer.Name);
await eventConsumerInfoRepository.SetPositionAsync(eventConsumer.Name, storedEvent.EventPosition, false);
}, async exception =>
{
await eventConsumerInfoRepository.StopAsync(consumerName, exception.ToString());
subscription.Dispose();
});
currentSubscription = subscription;
}
private async Task ResetAsync(IEventConsumer eventConsumer)
{
var actionId = Guid.NewGuid().ToString();
try
{
log.LogInformation(w => w
.WriteProperty("action", "EventConsumerReset")
.WriteProperty("actionId", actionId)
.WriteProperty("state", "Started")
.WriteProperty("eventConsumer", eventConsumer.Name));
await eventConsumer.ClearAsync();
await eventConsumerInfoRepository.SetPositionAsync(eventConsumer.Name, null, true);
log.LogInformation(w => w
.WriteProperty("action", "EventConsumerReset")
.WriteProperty("actionId", actionId)
.WriteProperty("state", "Completed")
.WriteProperty("eventConsumer", eventConsumer.Name));
}
catch (Exception ex)
{
log.LogFatal(ex, w => w
.WriteProperty("action", "EventConsumerReset")
.WriteProperty("actionId", actionId)
.WriteProperty("state", "Completed")
.WriteProperty("eventConsumer", eventConsumer.GetType().Name));
throw;
}
}
private async Task DispatchConsumerAsync(Envelope<IEvent> @event, IEventConsumer eventConsumer, string consumerName)
{
var eventId = @event.Headers.EventId().ToString();
var eventType = @event.Payload.GetType().Name;
try
{
log.LogInformation(w => w
.WriteProperty("action", "HandleEvent")
.WriteProperty("actionId", eventId)
.WriteProperty("state", "Started")
.WriteProperty("eventId", eventId)
.WriteProperty("eventType", eventType)
.WriteProperty("eventConsumer", consumerName));
await eventConsumer.On(@event);
log.LogInformation(w => w
.WriteProperty("action", "HandleEvent")
.WriteProperty("actionId", eventId)
.WriteProperty("state", "Completed")
.WriteProperty("eventId", eventId)
.WriteProperty("eventType", eventType)
.WriteProperty("eventConsumer", consumerName));
}
catch (Exception ex)
{
log.LogError(ex, w => w
.WriteProperty("action", "HandleEvent")
.WriteProperty("actionId", eventId)
.WriteProperty("state", "Started")
.WriteProperty("eventId", eventId)
.WriteProperty("eventType", eventType)
.WriteProperty("eventConsumer", consumerName));
throw;
}
}
private Envelope<IEvent> ParseEvent(StoredEvent storedEvent)
{
try
{
var @event = formatter.Parse(storedEvent.Data);
@event.SetEventPosition(storedEvent.EventPosition);
@event.SetEventStreamNumber(storedEvent.EventStreamNumber);
return @event;
}
catch (Exception ex)
{
log.LogFatal(ex, w => w
.WriteProperty("action", "ParseEvent")
.WriteProperty("state", "Failed")
.WriteProperty("eventId", storedEvent.Data.EventId.ToString())
.WriteProperty("eventPosition", storedEvent.EventPosition));
throw;
}
}
}
}

4
src/Squidex.Infrastructure/CQRS/Events/IEventNotifier.cs

@ -12,8 +12,8 @@ namespace Squidex.Infrastructure.CQRS.Events
{
public interface IEventNotifier
{
void NotifyEventsStored();
void NotifyEventsStored(string streamName);
IDisposable Subscribe(Action handler);
IDisposable Subscribe(Action<string> handler);
}
}

2
src/Squidex.Infrastructure/CQRS/Events/IEventStore.cs

@ -20,6 +20,6 @@ namespace Squidex.Infrastructure.CQRS.Events
Task AppendEventsAsync(Guid commitId, string streamName, int expectedVersion, ICollection<EventData> events);
IEventSubscription CreateSubscription(string streamFilter = null, string position = null);
IEventSubscription CreateSubscription();
}
}

2
src/Squidex.Infrastructure/Squidex.Infrastructure.csproj

@ -11,7 +11,6 @@
<PackageReference Include="ImageSharp" Version="1.0.0-alpha9-00191" />
<PackageReference Include="Microsoft.Extensions.Caching.Abstractions" Version="1.1.2" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="1.1.2" />
<PackageReference Include="Microsoft.Tpl.Dataflow" Version="4.5.24" />
<PackageReference Include="Newtonsoft.Json" Version="10.0.3" />
<PackageReference Include="NodaTime" Version="2.2.0" />
<PackageReference Include="RefactoringEssentials" Version="5.2.0" />
@ -20,6 +19,7 @@
<PackageReference Include="System.Reactive" Version="3.1.1" />
<PackageReference Include="System.Reflection.TypeExtensions" Version="4.4.0" />
<PackageReference Include="System.Security.Claims" Version="4.3.0" />
<PackageReference Include="System.Threading.Tasks.Dataflow" Version="4.8.0" />
<PackageReference Include="System.ValueTuple" Version="4.4.0" />
</ItemGroup>
<PropertyGroup>

Loading…
Cancel
Save