From 735ede590d1a48bbc99deb06d6b4cbcd3dfd2134 Mon Sep 17 00:00:00 2001 From: Sebastian Stehle Date: Sun, 24 Sep 2017 14:18:37 +0200 Subject: [PATCH] A lot of improvements --- .../CQRS/Events/GetEventStore.cs | 4 +- .../CQRS/Events/GetEventStoreSubscription.cs | 34 +-- .../CQRS/Events/MongoEventStore.cs | 6 +- .../CQRS/Events/PollingSubscription.cs | 60 +++-- src/Squidex.Infrastructure/Actors/Actor.cs | 1 - .../CQRS/Events/Actors/EventConsumerActor.cs | 6 +- .../Actors/Messages/ReceiveEventMessage.cs | 1 + .../Actors/Messages/ResetReceiverMessage.cs | 1 + .../Actors/Messages/StartReceiverMessage.cs | 1 + .../Actors/Messages/StopReceiverMessage.cs | 1 + .../Actors/Messages/SubscribeMessage.cs | 5 + .../CQRS/Events/DefaultEventNotifier.cs | 16 +- .../CQRS/Events/EventReceiver.cs | 249 ------------------ .../CQRS/Events/IEventNotifier.cs | 4 +- .../CQRS/Events/IEventStore.cs | 2 +- .../Squidex.Infrastructure.csproj | 2 +- 16 files changed, 88 insertions(+), 305 deletions(-) delete mode 100644 src/Squidex.Infrastructure/CQRS/Events/EventReceiver.cs diff --git a/src/Squidex.Infrastructure.GetEventStore/CQRS/Events/GetEventStore.cs b/src/Squidex.Infrastructure.GetEventStore/CQRS/Events/GetEventStore.cs index b7b00a998..bb60f41d1 100644 --- a/src/Squidex.Infrastructure.GetEventStore/CQRS/Events/GetEventStore.cs +++ b/src/Squidex.Infrastructure.GetEventStore/CQRS/Events/GetEventStore.cs @@ -44,9 +44,9 @@ namespace Squidex.Infrastructure.CQRS.Events } } - public IEventSubscription CreateSubscription(string streamFilter = null, string position = null) + public IEventSubscription CreateSubscription() { - return new GetEventStoreSubscription(connection, streamFilter, position, prefix, projectionHost); + return new GetEventStoreSubscription(connection, prefix, projectionHost); } public async Task> GetEventsAsync(string streamName) diff --git a/src/Squidex.Infrastructure.GetEventStore/CQRS/Events/GetEventStoreSubscription.cs b/src/Squidex.Infrastructure.GetEventStore/CQRS/Events/GetEventStoreSubscription.cs index 4982ca9f4..3651109a6 100644 --- a/src/Squidex.Infrastructure.GetEventStore/CQRS/Events/GetEventStoreSubscription.cs +++ b/src/Squidex.Infrastructure.GetEventStore/CQRS/Events/GetEventStoreSubscription.cs @@ -31,12 +31,12 @@ namespace Squidex.Infrastructure.CQRS.Events private static readonly TimeSpan TimeBetweenReconnects = TimeSpan.FromMinutes(5); private static readonly ConcurrentDictionary SubscriptionsCreated = new ConcurrentDictionary(); private readonly IEventStoreConnection connection; - private readonly string streamFilter; - private readonly string streamName; private readonly string prefix; private readonly string projectionHost; private readonly Queue reconnectTimes = new Queue(); private EventStoreCatchUpSubscription subscription; + private string streamFilter; + private string streamName; private long? position; private IActor parent; @@ -56,15 +56,11 @@ namespace Squidex.Infrastructure.CQRS.Events public EventStoreCatchUpSubscription Subscription; } - public GetEventStoreSubscription(IEventStoreConnection connection, string streamFilter, string position, string prefix, string projectionHost) + public GetEventStoreSubscription(IEventStoreConnection connection, string prefix, string projectionHost) { this.prefix = prefix; - this.position = ParsePosition(position); this.connection = connection; - this.streamFilter = streamFilter; this.projectionHost = projectionHost; - - streamName = $"by-{prefix.Simplify()}-{streamFilter.Simplify()}"; } protected override Task OnStop() @@ -91,6 +87,10 @@ namespace Squidex.Infrastructure.CQRS.Events case SubscribeMessage subscribe when parent == null: { parent = subscribe.Parent; + position = ParsePosition(subscribe.Position); + + streamFilter = subscribe.StreamFilter; + streamName = $"by-{prefix.Simplify()}-{streamFilter.Simplify()}"; await CreateProjectionAsync(); @@ -104,10 +104,7 @@ namespace Squidex.Infrastructure.CQRS.Events if (CanReconnect(DateTime.UtcNow)) { - Task.Delay(ReconnectWaitMs).ContinueWith(t => - { - SendAsync(new ConnectMessage()); - }).Forget(); + Task.Delay(ReconnectWaitMs).ContinueWith(t => SendAsync(new ConnectMessage())).Forget(); } else { @@ -124,13 +121,16 @@ namespace Squidex.Infrastructure.CQRS.Events break; } - case ReceiveESEventMessage receiveEvent when receiveEvent.Subscription == subscription && parent != null: + case ReceiveESEventMessage receiveEvent when parent != null: { - var storedEvent = Formatter.Read(receiveEvent.Event); + if (receiveEvent.Subscription == subscription) + { + var storedEvent = Formatter.Read(receiveEvent.Event); - await parent.SendAsync(new ReceiveEventMessage { Event = storedEvent }); + await parent.SendAsync(new ReceiveEventMessage { Event = storedEvent }); - position = receiveEvent.Event.OriginalEventNumber; + position = receiveEvent.Event.OriginalEventNumber; + } break; } @@ -144,11 +144,11 @@ namespace Squidex.Infrastructure.CQRS.Events private void HandleError(EventStoreCatchUpSubscription s, SubscriptionDropReason reason, Exception ex) { - if (reason == SubscriptionDropReason.ConnectionClosed) + if (reason == SubscriptionDropReason.ConnectionClosed && subscription == s) { SendAsync(new ConnectionFailedMessage { Exception = ex }); } - else if (reason != SubscriptionDropReason.UserInitiated) + else if (reason != SubscriptionDropReason.UserInitiated && subscription == s) { var exception = ex ?? new ConnectionClosedException($"Subscription closed with reason {reason}."); diff --git a/src/Squidex.Infrastructure.MongoDb/CQRS/Events/MongoEventStore.cs b/src/Squidex.Infrastructure.MongoDb/CQRS/Events/MongoEventStore.cs index 5ec8cfbce..bd02be77a 100644 --- a/src/Squidex.Infrastructure.MongoDb/CQRS/Events/MongoEventStore.cs +++ b/src/Squidex.Infrastructure.MongoDb/CQRS/Events/MongoEventStore.cs @@ -55,9 +55,9 @@ namespace Squidex.Infrastructure.CQRS.Events collection.Indexes.CreateOneAsync(Index.Ascending(x => x.EventStream).Descending(x => x.EventStreamOffset), new CreateIndexOptions { Unique = true })); } - public IEventSubscription CreateSubscription(string streamFilter = null, string position = null) + public IEventSubscription CreateSubscription() { - return new PollingSubscription(this, notifier, streamFilter, position); + return new PollingSubscription(this, notifier); } public async Task> GetEventsAsync(string streamName) @@ -144,7 +144,7 @@ namespace Squidex.Infrastructure.CQRS.Events { await Collection.InsertOneAsync(commit); - notifier.NotifyEventsStored(); + notifier.NotifyEventsStored(streamName); return; } diff --git a/src/Squidex.Infrastructure.MongoDb/CQRS/Events/PollingSubscription.cs b/src/Squidex.Infrastructure.MongoDb/CQRS/Events/PollingSubscription.cs index ab77ea289..4d23b8a1b 100644 --- a/src/Squidex.Infrastructure.MongoDb/CQRS/Events/PollingSubscription.cs +++ b/src/Squidex.Infrastructure.MongoDb/CQRS/Events/PollingSubscription.cs @@ -7,12 +7,12 @@ // ========================================================================== using System; +using System.Text.RegularExpressions; using System.Threading; using System.Threading.Tasks; using Squidex.Infrastructure.Actors; using Squidex.Infrastructure.CQRS.Events.Actors.Messages; using Squidex.Infrastructure.Tasks; -using Squidex.Infrastructure.Timers; #pragma warning disable SA1401 // Fields must be private @@ -22,11 +22,12 @@ namespace Squidex.Infrastructure.CQRS.Events { private readonly IEventNotifier eventNotifier; private readonly MongoEventStore eventStore; - private readonly string streamFilter; - private CancellationTokenSource ct; + private CancellationTokenSource cancelPolling; private Timer pollTimer; + private Regex streamRegex; + private Guid subscription; + private string streamFilter; private string position; - private bool isStopped; private IDisposable pollSubscription; private IActor parent; @@ -34,17 +35,22 @@ namespace Squidex.Infrastructure.CQRS.Events { } - public PollingSubscription(MongoEventStore eventStore, IEventNotifier eventNotifier, string streamFilter, string position) + private sealed class ReceiveMongoEventMessage : IMessage + { + public StoredEvent Event; + + public Guid Subscription; + } + + public PollingSubscription(MongoEventStore eventStore, IEventNotifier eventNotifier) { - this.position = position; this.eventStore = eventStore; this.eventNotifier = eventNotifier; - this.streamFilter = streamFilter; } protected override Task OnStop() { - ct?.Cancel(); + cancelPolling?.Cancel(); pollTimer?.Dispose(); pollSubscription?.Dispose(); @@ -71,10 +77,17 @@ namespace Squidex.Infrastructure.CQRS.Events case SubscribeMessage subscribe when parent == null: { parent = subscribe.Parent; + position = subscribe.Position; + + streamFilter = subscribe.StreamFilter; + streamRegex = new Regex(streamFilter); - pollSubscription = eventNotifier.Subscribe(() => + pollSubscription = eventNotifier.Subscribe(streamName => { - SendAsync(new PollMessage()).Forget(); + if (streamRegex.IsMatch(streamName)) + { + SendAsync(new PollMessage()).Forget(); + } }); pollTimer = new Timer(d => @@ -89,30 +102,41 @@ namespace Squidex.Infrastructure.CQRS.Events case PollMessage poll when parent != null: { - ct?.Cancel(); - ct = new CancellationTokenSource(); + cancelPolling?.Cancel(); + cancelPolling = new CancellationTokenSource(); + + subscription = Guid.NewGuid(); - PollAsync().Forget(); + PollAsync(subscription, cancelPolling.Token).Forget(); break; } - case ReceiveEventMessage receiveEvent when parent != null: + case ReceiveMongoEventMessage receiveEvent when parent != null: { - await parent.SendAsync(receiveEvent); + if (receiveEvent.Subscription == subscription) + { + await parent.SendAsync(new ReceiveEventMessage { Event = receiveEvent.Event }); - position = receiveEvent.Event.EventPosition; + position = receiveEvent.Event.EventPosition; + } break; } } } - private async Task PollAsync() + private async Task PollAsync(Guid subscriptionId, CancellationToken ct) { try { - await eventStore.GetEventsAsync(e => SendAsync(new ReceiveEventMessage { Event = e }), ct.Token, streamFilter, position); + await eventStore.GetEventsAsync(async e => + { + if (ct.IsCancellationRequested == true) + { + await SendAsync(new ReceiveMongoEventMessage { Event = e, Subscription = subscriptionId }); + } + }, ct, streamFilter, position); } catch (Exception ex) when (!(ex is OperationCanceledException)) { diff --git a/src/Squidex.Infrastructure/Actors/Actor.cs b/src/Squidex.Infrastructure/Actors/Actor.cs index 90fb963c6..67788d75b 100644 --- a/src/Squidex.Infrastructure/Actors/Actor.cs +++ b/src/Squidex.Infrastructure/Actors/Actor.cs @@ -7,7 +7,6 @@ // ========================================================================== using System; -using System.Threading; using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; using Squidex.Infrastructure.Tasks; diff --git a/src/Squidex.Infrastructure/CQRS/Events/Actors/EventConsumerActor.cs b/src/Squidex.Infrastructure/CQRS/Events/Actors/EventConsumerActor.cs index 3d25f169e..62f7ed754 100644 --- a/src/Squidex.Infrastructure/CQRS/Events/Actors/EventConsumerActor.cs +++ b/src/Squidex.Infrastructure/CQRS/Events/Actors/EventConsumerActor.cs @@ -13,7 +13,7 @@ using Squidex.Infrastructure.CQRS.Events.Actors.Messages; using Squidex.Infrastructure.Log; using Squidex.Infrastructure.Tasks; -namespace Squidex.Infrastructure.CQRS.Events.Receivers +namespace Squidex.Infrastructure.CQRS.Events.Actors { public sealed class EventConsumerActor : Actor { @@ -105,8 +105,8 @@ namespace Squidex.Infrastructure.CQRS.Events.Receivers position = (await eventConsumerInfoRepository.FindAsync(eventConsumer.Name)).Position; - eventSubscription = eventStore.CreateSubscription(eventConsumer.EventsFilter, position); - eventSubscription.SendAsync(new SubscribeMessage { Parent = this }).Forget(); + eventSubscription = eventStore.CreateSubscription(); + eventSubscription.SendAsync(new SubscribeMessage { Parent = this, StreamFilter = eventConsumer.EventsFilter, Position = position }).Forget(); } private async Task StopAsync(Exception exception = null) diff --git a/src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/ReceiveEventMessage.cs b/src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/ReceiveEventMessage.cs index dde227a03..53289ddc2 100644 --- a/src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/ReceiveEventMessage.cs +++ b/src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/ReceiveEventMessage.cs @@ -2,6 +2,7 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors.Messages { + [TypeName(nameof(ReceiveEventMessage))] public sealed class ReceiveEventMessage : IMessage { public StoredEvent Event { get; set; } diff --git a/src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/ResetReceiverMessage.cs b/src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/ResetReceiverMessage.cs index 68b83f267..292680cd3 100644 --- a/src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/ResetReceiverMessage.cs +++ b/src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/ResetReceiverMessage.cs @@ -2,6 +2,7 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors.Messages { + [TypeName(nameof(ResetReceiverMessage))] public sealed class ResetReceiverMessage : IMessage { } diff --git a/src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/StartReceiverMessage.cs b/src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/StartReceiverMessage.cs index f94971c28..b6851589e 100644 --- a/src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/StartReceiverMessage.cs +++ b/src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/StartReceiverMessage.cs @@ -2,6 +2,7 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors.Messages { + [TypeName(nameof(StartReceiverMessage))] public sealed class StartReceiverMessage : IMessage { } diff --git a/src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/StopReceiverMessage.cs b/src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/StopReceiverMessage.cs index ed3ebd85d..52937d2bf 100644 --- a/src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/StopReceiverMessage.cs +++ b/src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/StopReceiverMessage.cs @@ -3,6 +3,7 @@ using Squidex.Infrastructure.Actors; namespace Squidex.Infrastructure.CQRS.Events.Actors.Messages { + [TypeName(nameof(StopReceiverMessage))] public sealed class StopReceiverMessage : IMessage { public Exception Exception { get; set; } diff --git a/src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/SubscribeMessage.cs b/src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/SubscribeMessage.cs index d26c94705..003a255de 100644 --- a/src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/SubscribeMessage.cs +++ b/src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/SubscribeMessage.cs @@ -2,8 +2,13 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors.Messages { + [TypeName(nameof(SubscribeMessage))] public sealed class SubscribeMessage : IMessage { + public string StreamFilter { get; set; } + + public string Position { get; set; } + public IActor Parent { get; set; } } } diff --git a/src/Squidex.Infrastructure/CQRS/Events/DefaultEventNotifier.cs b/src/Squidex.Infrastructure/CQRS/Events/DefaultEventNotifier.cs index 0513a3a3d..69ab25e88 100644 --- a/src/Squidex.Infrastructure/CQRS/Events/DefaultEventNotifier.cs +++ b/src/Squidex.Infrastructure/CQRS/Events/DefaultEventNotifier.cs @@ -14,23 +14,23 @@ namespace Squidex.Infrastructure.CQRS.Events { private static readonly string ChannelName = typeof(DefaultEventNotifier).Name; - private readonly IPubSub invalidator; + private readonly IPubSub pubsub; - public DefaultEventNotifier(IPubSub invalidator) + public DefaultEventNotifier(IPubSub pubsub) { - Guard.NotNull(invalidator, nameof(invalidator)); + Guard.NotNull(pubsub, nameof(pubsub)); - this.invalidator = invalidator; + this.pubsub = pubsub; } - public void NotifyEventsStored() + public void NotifyEventsStored(string streamName) { - invalidator.Publish(ChannelName, string.Empty, true); + pubsub.Publish(ChannelName, streamName, true); } - public IDisposable Subscribe(Action handler) + public IDisposable Subscribe(Action handler) { - return invalidator.Subscribe(ChannelName, x => handler()); + return pubsub.Subscribe(ChannelName, x => handler?.Invoke(x)); } } } diff --git a/src/Squidex.Infrastructure/CQRS/Events/EventReceiver.cs b/src/Squidex.Infrastructure/CQRS/Events/EventReceiver.cs deleted file mode 100644 index 38239535b..000000000 --- a/src/Squidex.Infrastructure/CQRS/Events/EventReceiver.cs +++ /dev/null @@ -1,249 +0,0 @@ -// ========================================================================== -// EventReceiver.cs -// Squidex Headless CMS -// ========================================================================== -// Copyright (c) Squidex Group -// All rights reserved. -// ========================================================================== - -using System; -using System.Threading.Tasks; -using Squidex.Infrastructure.Log; -using Squidex.Infrastructure.Timers; - -namespace Squidex.Infrastructure.CQRS.Events -{ - public sealed class EventReceiver : DisposableObjectBase - { - private readonly EventDataFormatter formatter; - private readonly IEventStore eventStore; - private readonly IEventConsumerInfoRepository eventConsumerInfoRepository; - private readonly ISemanticLog log; - private IEventSubscription currentSubscription; - private CompletionTimer timer; - - public EventReceiver( - EventDataFormatter formatter, - IEventStore eventStore, - IEventConsumerInfoRepository eventConsumerInfoRepository, - ISemanticLog log) - { - Guard.NotNull(log, nameof(log)); - Guard.NotNull(formatter, nameof(formatter)); - Guard.NotNull(eventStore, nameof(eventStore)); - Guard.NotNull(eventConsumerInfoRepository, nameof(eventConsumerInfoRepository)); - - this.log = log; - this.formatter = formatter; - this.eventStore = eventStore; - this.eventConsumerInfoRepository = eventConsumerInfoRepository; - } - - protected override void DisposeObject(bool disposing) - { - if (disposing) - { - try - { - currentSubscription?.Dispose(); - } - catch (Exception ex) - { - log.LogWarning(ex, w => w - .WriteProperty("action", "DisposeEventReceiver") - .WriteProperty("state", "Failed")); - } - - try - { - timer?.StopAsync().Wait(); - } - catch (Exception ex) - { - log.LogWarning(ex, w => w - .WriteProperty("action", "DisposeEventReceiver") - .WriteProperty("state", "Failed")); - } - } - } - - public void Refresh() - { - ThrowIfDisposed(); - - timer?.SkipCurrentDelay(); - } - - public void Subscribe(IEventConsumer eventConsumer) - { - Guard.NotNull(eventConsumer, nameof(eventConsumer)); - - ThrowIfDisposed(); - - if (timer != null) - { - return; - } - - var consumerName = eventConsumer.Name; - var consumerStarted = false; - - timer = new CompletionTimer(5000, async ct => - { - if (!consumerStarted) - { - await eventConsumerInfoRepository.CreateAsync(consumerName); - - consumerStarted = true; - } - - try - { - var status = await eventConsumerInfoRepository.FindAsync(consumerName); - - var position = status.Position; - - if (status.IsResetting) - { - currentSubscription?.Dispose(); - currentSubscription = null; - - position = null; - - await ResetAsync(eventConsumer); - } - else if (status.IsStopped) - { - currentSubscription?.Dispose(); - currentSubscription = null; - - return; - } - - if (currentSubscription == null) - { - await SubscribeAsync(eventConsumer, position); - } - } - catch (Exception ex) - { - log.LogFatal(ex, w => w.WriteProperty("action", "EventHandlingFailed")); - } - }); - } - - private async Task SubscribeAsync(IEventConsumer eventConsumer, string position) - { - var consumerName = eventConsumer.Name; - - var subscription = eventStore.CreateSubscription(eventConsumer.EventsFilter, position); - - await subscription.SubscribeAsync(async storedEvent => - { - await DispatchConsumerAsync(ParseEvent(storedEvent), eventConsumer, eventConsumer.Name); - - await eventConsumerInfoRepository.SetPositionAsync(eventConsumer.Name, storedEvent.EventPosition, false); - }, async exception => - { - await eventConsumerInfoRepository.StopAsync(consumerName, exception.ToString()); - - subscription.Dispose(); - }); - - currentSubscription = subscription; - } - - private async Task ResetAsync(IEventConsumer eventConsumer) - { - var actionId = Guid.NewGuid().ToString(); - try - { - log.LogInformation(w => w - .WriteProperty("action", "EventConsumerReset") - .WriteProperty("actionId", actionId) - .WriteProperty("state", "Started") - .WriteProperty("eventConsumer", eventConsumer.Name)); - - await eventConsumer.ClearAsync(); - await eventConsumerInfoRepository.SetPositionAsync(eventConsumer.Name, null, true); - - log.LogInformation(w => w - .WriteProperty("action", "EventConsumerReset") - .WriteProperty("actionId", actionId) - .WriteProperty("state", "Completed") - .WriteProperty("eventConsumer", eventConsumer.Name)); - } - catch (Exception ex) - { - log.LogFatal(ex, w => w - .WriteProperty("action", "EventConsumerReset") - .WriteProperty("actionId", actionId) - .WriteProperty("state", "Completed") - .WriteProperty("eventConsumer", eventConsumer.GetType().Name)); - - throw; - } - } - - private async Task DispatchConsumerAsync(Envelope @event, IEventConsumer eventConsumer, string consumerName) - { - var eventId = @event.Headers.EventId().ToString(); - var eventType = @event.Payload.GetType().Name; - try - { - log.LogInformation(w => w - .WriteProperty("action", "HandleEvent") - .WriteProperty("actionId", eventId) - .WriteProperty("state", "Started") - .WriteProperty("eventId", eventId) - .WriteProperty("eventType", eventType) - .WriteProperty("eventConsumer", consumerName)); - - await eventConsumer.On(@event); - - log.LogInformation(w => w - .WriteProperty("action", "HandleEvent") - .WriteProperty("actionId", eventId) - .WriteProperty("state", "Completed") - .WriteProperty("eventId", eventId) - .WriteProperty("eventType", eventType) - .WriteProperty("eventConsumer", consumerName)); - } - catch (Exception ex) - { - log.LogError(ex, w => w - .WriteProperty("action", "HandleEvent") - .WriteProperty("actionId", eventId) - .WriteProperty("state", "Started") - .WriteProperty("eventId", eventId) - .WriteProperty("eventType", eventType) - .WriteProperty("eventConsumer", consumerName)); - - throw; - } - } - - private Envelope ParseEvent(StoredEvent storedEvent) - { - try - { - var @event = formatter.Parse(storedEvent.Data); - - @event.SetEventPosition(storedEvent.EventPosition); - @event.SetEventStreamNumber(storedEvent.EventStreamNumber); - - return @event; - } - catch (Exception ex) - { - log.LogFatal(ex, w => w - .WriteProperty("action", "ParseEvent") - .WriteProperty("state", "Failed") - .WriteProperty("eventId", storedEvent.Data.EventId.ToString()) - .WriteProperty("eventPosition", storedEvent.EventPosition)); - - throw; - } - } - } -} \ No newline at end of file diff --git a/src/Squidex.Infrastructure/CQRS/Events/IEventNotifier.cs b/src/Squidex.Infrastructure/CQRS/Events/IEventNotifier.cs index 758bc94f7..72e61b3d9 100644 --- a/src/Squidex.Infrastructure/CQRS/Events/IEventNotifier.cs +++ b/src/Squidex.Infrastructure/CQRS/Events/IEventNotifier.cs @@ -12,8 +12,8 @@ namespace Squidex.Infrastructure.CQRS.Events { public interface IEventNotifier { - void NotifyEventsStored(); + void NotifyEventsStored(string streamName); - IDisposable Subscribe(Action handler); + IDisposable Subscribe(Action handler); } } diff --git a/src/Squidex.Infrastructure/CQRS/Events/IEventStore.cs b/src/Squidex.Infrastructure/CQRS/Events/IEventStore.cs index ae6b265d4..470f70913 100644 --- a/src/Squidex.Infrastructure/CQRS/Events/IEventStore.cs +++ b/src/Squidex.Infrastructure/CQRS/Events/IEventStore.cs @@ -20,6 +20,6 @@ namespace Squidex.Infrastructure.CQRS.Events Task AppendEventsAsync(Guid commitId, string streamName, int expectedVersion, ICollection events); - IEventSubscription CreateSubscription(string streamFilter = null, string position = null); + IEventSubscription CreateSubscription(); } } diff --git a/src/Squidex.Infrastructure/Squidex.Infrastructure.csproj b/src/Squidex.Infrastructure/Squidex.Infrastructure.csproj index 8ce7b1d1e..c67f27aa0 100644 --- a/src/Squidex.Infrastructure/Squidex.Infrastructure.csproj +++ b/src/Squidex.Infrastructure/Squidex.Infrastructure.csproj @@ -11,7 +11,6 @@ - @@ -20,6 +19,7 @@ +