diff --git a/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStore.cs b/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStore.cs index 70036d8ee..7f95c5e92 100644 --- a/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStore.cs +++ b/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStore.cs @@ -12,6 +12,7 @@ using System.Linq; using System.Threading; using System.Threading.Tasks; using EventStore.ClientAPI; +using EventStore.ClientAPI.Projections; namespace Squidex.Infrastructure.EventSourcing { @@ -22,6 +23,7 @@ namespace Squidex.Infrastructure.EventSourcing private readonly IEventStoreConnection connection; private readonly string projectionHost; private readonly string prefix; + private ProjectionsManager projectionsManager; public GetEventStore(IEventStoreConnection connection, string prefix, string projectionHost) { @@ -33,10 +35,6 @@ namespace Squidex.Infrastructure.EventSourcing this.prefix = prefix?.Trim(' ', '-').WithFallback("squidex"); } - public GetEventStore() - { - } - public void Initialize() { try @@ -47,23 +45,34 @@ namespace Squidex.Infrastructure.EventSourcing { throw new ConfigurationException("Cannot connect to event store.", ex); } + + try + { + projectionsManager = connection.GetProjectionsManagerAsync(projectionHost).Result; + + projectionsManager.ListAllAsync(connection.Settings.DefaultUserCredentials).Wait(); + } + catch (Exception ex) + { + throw new ConfigurationException($"Cannot connect to event store projections: {projectionHost}.", ex); + } } public IEventSubscription CreateSubscription(IEventSubscriber subscriber, string streamFilter, string position = null) { - return new GetEventStoreSubscription(connection, subscriber, projectionHost, prefix, position, streamFilter); + return new GetEventStoreSubscription(connection, subscriber, projectionsManager, prefix, position, streamFilter); } public async Task GetEventsAsync(Func callback, CancellationToken cancellationToken, string streamFilter = null, string position = null) { - var streamName = await connection.CreateProjectionAsync(projectionHost, prefix, streamFilter); + var streamName = await connection.CreateProjectionAsync(projectionsManager, prefix, streamFilter); var sliceStart = ProjectionHelper.ParsePosition(position); StreamEventsSlice currentSlice; do { - currentSlice = await connection.ReadStreamEventsForwardAsync(GetStreamName(streamName), sliceStart, ReadPageSize, false); + currentSlice = await connection.ReadStreamEventsForwardAsync(streamName, sliceStart, ReadPageSize, false); if (currentSlice.Status == SliceReadStatus.Success) { diff --git a/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStoreSubscription.cs b/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStoreSubscription.cs index 607089bb1..0739cfb6c 100644 --- a/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStoreSubscription.cs +++ b/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStoreSubscription.cs @@ -9,6 +9,7 @@ using System.Threading.Tasks; using EventStore.ClientAPI; using EventStore.ClientAPI.Exceptions; +using EventStore.ClientAPI.Projections; using Squidex.Infrastructure.Tasks; namespace Squidex.Infrastructure.EventSourcing @@ -23,7 +24,7 @@ namespace Squidex.Infrastructure.EventSourcing public GetEventStoreSubscription( IEventStoreConnection eventStoreConnection, IEventSubscriber eventSubscriber, - string projectionHost, + ProjectionsManager projectionsManager, string prefix, string position, string streamFilter) @@ -35,7 +36,7 @@ namespace Squidex.Infrastructure.EventSourcing this.eventSubscriber = eventSubscriber; this.position = ProjectionHelper.ParsePositionOrNull(position); - var streamName = eventStoreConnection.CreateProjectionAsync(projectionHost, prefix, streamFilter).Result; + var streamName = eventStoreConnection.CreateProjectionAsync(projectionsManager, prefix, streamFilter).Result; subscription = SubscribeToStream(streamName); } diff --git a/src/Squidex.Infrastructure.GetEventStore/EventSourcing/ProjectionHelper.cs b/src/Squidex.Infrastructure.GetEventStore/EventSourcing/ProjectionHelper.cs index ee9905dd5..4cebf8f5d 100644 --- a/src/Squidex.Infrastructure.GetEventStore/EventSourcing/ProjectionHelper.cs +++ b/src/Squidex.Infrastructure.GetEventStore/EventSourcing/ProjectionHelper.cs @@ -29,14 +29,12 @@ namespace Squidex.Infrastructure.EventSourcing return string.Format(CultureInfo.InvariantCulture, ProjectionName, prefix.Simplify(), filter.Simplify()); } - public static async Task CreateProjectionAsync(this IEventStoreConnection connection, string projectionHost, string prefix, string streamFilter = null) + public static async Task CreateProjectionAsync(this IEventStoreConnection connection, ProjectionsManager projectionsManager, string prefix, string streamFilter = null) { var streamName = ParseFilter(prefix, streamFilter); if (SubscriptionsCreated.TryAdd(streamName, true)) { - var projectsManager = await ConnectToProjections(connection, projectionHost); - var projectionConfig = $@"fromAll() .when({{ @@ -51,7 +49,7 @@ namespace Squidex.Infrastructure.EventSourcing { var credentials = connection.Settings.DefaultUserCredentials; - await projectsManager.CreateContinuousAsync($"${streamName}", projectionConfig, credentials); + await projectionsManager.CreateContinuousAsync($"${streamName}", projectionConfig, credentials); } catch (Exception ex) { @@ -65,7 +63,7 @@ namespace Squidex.Infrastructure.EventSourcing return streamName; } - private static async Task ConnectToProjections(IEventStoreConnection connection, string projectionHost) + public static async Task GetProjectionsManagerAsync(this IEventStoreConnection connection, string projectionHost) { var addressParts = projectionHost.Split(':');