From b1d79a95d0a3c474540d51d91830dea7e59a6469 Mon Sep 17 00:00:00 2001 From: Sebastian Stehle Date: Wed, 3 Jan 2018 17:46:34 +0100 Subject: [PATCH] Fix --- .../Events/GetEventStoreSubscription.cs | 152 ------------------ .../EventSourcing/{Events => }/Formatter.cs | 0 .../{Events => }/GetEventStore.cs | 0 .../GetEventStoreSubscription.cs | 78 +++++++++ .../EventSourcing/ProjectionHelper.cs | 88 ++++++++++ 5 files changed, 166 insertions(+), 152 deletions(-) delete mode 100644 src/Squidex.Infrastructure.GetEventStore/EventSourcing/Events/GetEventStoreSubscription.cs rename src/Squidex.Infrastructure.GetEventStore/EventSourcing/{Events => }/Formatter.cs (100%) rename src/Squidex.Infrastructure.GetEventStore/EventSourcing/{Events => }/GetEventStore.cs (100%) create mode 100644 src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStoreSubscription.cs create mode 100644 src/Squidex.Infrastructure.GetEventStore/EventSourcing/ProjectionHelper.cs diff --git a/src/Squidex.Infrastructure.GetEventStore/EventSourcing/Events/GetEventStoreSubscription.cs b/src/Squidex.Infrastructure.GetEventStore/EventSourcing/Events/GetEventStoreSubscription.cs deleted file mode 100644 index 98c0f1067..000000000 --- a/src/Squidex.Infrastructure.GetEventStore/EventSourcing/Events/GetEventStoreSubscription.cs +++ /dev/null @@ -1,152 +0,0 @@ -// ========================================================================== -// GetEventStoreSubscription.cs -// Squidex Headless CMS -// ========================================================================== -// Copyright (c) Squidex Group -// All rights reserved. -// ========================================================================== - -using System; -using System.Collections.Concurrent; -using System.Globalization; -using System.Linq; -using System.Net; -using System.Net.Sockets; -using System.Threading.Tasks; -using EventStore.ClientAPI; -using EventStore.ClientAPI.Exceptions; -using EventStore.ClientAPI.Projections; -using Squidex.Infrastructure.Tasks; - -namespace Squidex.Infrastructure.EventSourcing -{ - internal sealed class GetEventStoreSubscription : IEventSubscription - { - private const string ProjectionName = "by-{0}-{1}"; - private static readonly ConcurrentDictionary SubscriptionsCreated = new ConcurrentDictionary(); - private readonly IEventStoreConnection eventStoreConnection; - private readonly IEventSubscriber eventSubscriber; - private readonly string prefix; - private readonly string streamFilter; - private readonly string projectionHost; - private readonly EventStoreCatchUpSubscription subscription; - private readonly long? position; - - public GetEventStoreSubscription( - IEventStoreConnection eventStoreConnection, - IEventSubscriber eventSubscriber, - string projectionHost, - string prefix, - string position, - string streamFilter) - { - Guard.NotNull(eventSubscriber, nameof(eventSubscriber)); - Guard.NotNullOrEmpty(streamFilter, nameof(streamFilter)); - - this.eventStoreConnection = eventStoreConnection; - this.eventSubscriber = eventSubscriber; - this.position = ParsePosition(position); - this.prefix = prefix; - this.projectionHost = projectionHost; - this.streamFilter = streamFilter; - - var streamName = ParseFilter(prefix, streamFilter); - - InitializeAsync(streamName).Wait(); - - subscription = SubscribeToStream(streamName); - } - - public Task StopAsync() - { - subscription.Stop(); - - return TaskHelper.Done; - } - - private EventStoreCatchUpSubscription SubscribeToStream(string streamName) - { - var settings = CatchUpSubscriptionSettings.Default; - - return eventStoreConnection.SubscribeToStreamFrom(streamName, position, settings, - (s, e) => - { - var storedEvent = Formatter.Read(e); - - eventSubscriber.OnEventAsync(this, storedEvent).Wait(); - }, null, - (s, reason, ex) => - { - if (reason != SubscriptionDropReason.ConnectionClosed && - reason != SubscriptionDropReason.UserInitiated) - { - ex = ex ?? new ConnectionClosedException($"Subscription closed with reason {reason}."); - - eventSubscriber.OnErrorAsync(this, ex); - } - }); - } - - private async Task InitializeAsync(string streamName) - { - if (SubscriptionsCreated.TryAdd(streamName, true)) - { - var projectsManager = await ConnectToProjections(); - - var projectionConfig = - $@"fromAll() - .when({{ - $any: function (s, e) {{ - if (e.streamId.indexOf('{prefix}') === 0 && /{streamFilter}/.test(e.streamId.substring({prefix.Length + 1}))) {{ - linkTo('{streamName}', e); - }} - }} - }});"; - - try - { - var credentials = eventStoreConnection.Settings.DefaultUserCredentials; - - await projectsManager.CreateContinuousAsync($"${streamName}", projectionConfig, credentials); - } - catch (Exception ex) - { - if (!ex.Is()) - { - throw; - } - } - } - } - - private async Task ConnectToProjections() - { - var addressParts = projectionHost.Split(':'); - - if (addressParts.Length < 2 || !int.TryParse(addressParts[1], out var port)) - { - port = 2113; - } - - var endpoints = await Dns.GetHostAddressesAsync(addressParts[0]); - var endpoint = new IPEndPoint(endpoints.First(x => x.AddressFamily == AddressFamily.InterNetwork), port); - - var projectionsManager = - new ProjectionsManager( - eventStoreConnection.Settings.Log, endpoint, - eventStoreConnection.Settings.OperationTimeout); - - return projectionsManager; - } - - private static string ParseFilter(string prefix, string filter) - { - return string.Format(CultureInfo.InvariantCulture, ProjectionName, prefix.Simplify(), filter.Simplify()); - } - - private static long? ParsePosition(string position) - { - return long.TryParse(position, out var parsedPosition) ? (long?)parsedPosition : null; - } - } -} diff --git a/src/Squidex.Infrastructure.GetEventStore/EventSourcing/Events/Formatter.cs b/src/Squidex.Infrastructure.GetEventStore/EventSourcing/Formatter.cs similarity index 100% rename from src/Squidex.Infrastructure.GetEventStore/EventSourcing/Events/Formatter.cs rename to src/Squidex.Infrastructure.GetEventStore/EventSourcing/Formatter.cs diff --git a/src/Squidex.Infrastructure.GetEventStore/EventSourcing/Events/GetEventStore.cs b/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStore.cs similarity index 100% rename from src/Squidex.Infrastructure.GetEventStore/EventSourcing/Events/GetEventStore.cs rename to src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStore.cs diff --git a/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStoreSubscription.cs b/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStoreSubscription.cs new file mode 100644 index 000000000..a7dfdd025 --- /dev/null +++ b/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStoreSubscription.cs @@ -0,0 +1,78 @@ +// ========================================================================== +// GetEventStoreSubscription.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +using System.Threading.Tasks; +using EventStore.ClientAPI; +using EventStore.ClientAPI.Exceptions; +using Squidex.Infrastructure.Tasks; + +namespace Squidex.Infrastructure.EventSourcing +{ + internal sealed class GetEventStoreSubscription : IEventSubscription + { + private readonly IEventStoreConnection eventStoreConnection; + private readonly IEventSubscriber eventSubscriber; + private readonly EventStoreCatchUpSubscription subscription; + private readonly long? position; + + public GetEventStoreSubscription( + IEventStoreConnection eventStoreConnection, + IEventSubscriber eventSubscriber, + string projectionHost, + string prefix, + string position, + string streamFilter) + { + Guard.NotNull(eventSubscriber, nameof(eventSubscriber)); + Guard.NotNullOrEmpty(streamFilter, nameof(streamFilter)); + + this.eventStoreConnection = eventStoreConnection; + this.eventSubscriber = eventSubscriber; + this.position = ParsePosition(position); + + var streamName = eventStoreConnection.CreateProjectionAsync(projectionHost, prefix, streamFilter).Result; + + subscription = SubscribeToStream(streamName); + } + + public Task StopAsync() + { + subscription.Stop(); + + return TaskHelper.Done; + } + + private EventStoreCatchUpSubscription SubscribeToStream(string streamName) + { + var settings = CatchUpSubscriptionSettings.Default; + + return eventStoreConnection.SubscribeToStreamFrom(streamName, position, settings, + (s, e) => + { + var storedEvent = Formatter.Read(e); + + eventSubscriber.OnEventAsync(this, storedEvent).Wait(); + }, null, + (s, reason, ex) => + { + if (reason != SubscriptionDropReason.ConnectionClosed && + reason != SubscriptionDropReason.UserInitiated) + { + ex = ex ?? new ConnectionClosedException($"Subscription closed with reason {reason}."); + + eventSubscriber.OnErrorAsync(this, ex); + } + }); + } + + private static long? ParsePosition(string position) + { + return long.TryParse(position, out var parsedPosition) ? (long?)parsedPosition : null; + } + } +} diff --git a/src/Squidex.Infrastructure.GetEventStore/EventSourcing/ProjectionHelper.cs b/src/Squidex.Infrastructure.GetEventStore/EventSourcing/ProjectionHelper.cs new file mode 100644 index 000000000..7ad93b8c7 --- /dev/null +++ b/src/Squidex.Infrastructure.GetEventStore/EventSourcing/ProjectionHelper.cs @@ -0,0 +1,88 @@ +// ========================================================================== +// ProjectionHelper.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +using System; +using System.Collections.Concurrent; +using System.Globalization; +using System.Linq; +using System.Net; +using System.Net.Sockets; +using System.Threading.Tasks; +using EventStore.ClientAPI; +using EventStore.ClientAPI.Exceptions; +using EventStore.ClientAPI.Projections; + +namespace Squidex.Infrastructure.EventSourcing +{ + public static class ProjectionHelper + { + private const string ProjectionName = "by-{0}-{1}"; + private static readonly ConcurrentDictionary SubscriptionsCreated = new ConcurrentDictionary(); + + private static string ParseFilter(string prefix, string filter) + { + return string.Format(CultureInfo.InvariantCulture, ProjectionName, prefix.Simplify(), filter.Simplify()); + } + + public static async Task CreateProjectionAsync(this IEventStoreConnection connection, string projectionHost, string prefix, string streamFilter = null) + { + var streamName = ParseFilter(prefix, streamFilter); + + if (SubscriptionsCreated.TryAdd(streamName, true)) + { + var projectsManager = await ConnectToProjections(connection, projectionHost); + + var projectionConfig = + $@"fromAll() + .when({{ + $any: function (s, e) {{ + if (e.streamId.indexOf('{prefix}') === 0 && /{streamFilter}/.test(e.streamId.substring({prefix.Length + 1}))) {{ + linkTo('{streamName}', e); + }} + }} + }});"; + + try + { + var credentials = connection.Settings.DefaultUserCredentials; + + await projectsManager.CreateContinuousAsync($"${streamName}", projectionConfig, credentials); + } + catch (Exception ex) + { + if (!ex.Is()) + { + throw; + } + } + } + + return streamName; + } + + private static async Task ConnectToProjections(IEventStoreConnection connection, string projectionHost) + { + var addressParts = projectionHost.Split(':'); + + if (addressParts.Length < 2 || !int.TryParse(addressParts[1], out var port)) + { + port = 2113; + } + + var endpoints = await Dns.GetHostAddressesAsync(addressParts[0]); + var endpoint = new IPEndPoint(endpoints.First(x => x.AddressFamily == AddressFamily.InterNetwork), port); + + var projectionsManager = + new ProjectionsManager( + connection.Settings.Log, endpoint, + connection.Settings.OperationTimeout); + + return projectionsManager; + } + } +}