Browse Source

Projections fixed

pull/221/head
Sebastian Stehle 8 years ago
parent
commit
4b791e8314
  1. 23
      src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStore.cs
  2. 5
      src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStoreSubscription.cs
  3. 8
      src/Squidex.Infrastructure.GetEventStore/EventSourcing/ProjectionHelper.cs

23
src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStore.cs

@ -12,6 +12,7 @@ using System.Linq;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using EventStore.ClientAPI; using EventStore.ClientAPI;
using EventStore.ClientAPI.Projections;
namespace Squidex.Infrastructure.EventSourcing namespace Squidex.Infrastructure.EventSourcing
{ {
@ -22,6 +23,7 @@ namespace Squidex.Infrastructure.EventSourcing
private readonly IEventStoreConnection connection; private readonly IEventStoreConnection connection;
private readonly string projectionHost; private readonly string projectionHost;
private readonly string prefix; private readonly string prefix;
private ProjectionsManager projectionsManager;
public GetEventStore(IEventStoreConnection connection, string prefix, string projectionHost) public GetEventStore(IEventStoreConnection connection, string prefix, string projectionHost)
{ {
@ -33,10 +35,6 @@ namespace Squidex.Infrastructure.EventSourcing
this.prefix = prefix?.Trim(' ', '-').WithFallback("squidex"); this.prefix = prefix?.Trim(' ', '-').WithFallback("squidex");
} }
public GetEventStore()
{
}
public void Initialize() public void Initialize()
{ {
try try
@ -47,23 +45,34 @@ namespace Squidex.Infrastructure.EventSourcing
{ {
throw new ConfigurationException("Cannot connect to event store.", ex); throw new ConfigurationException("Cannot connect to event store.", ex);
} }
try
{
projectionsManager = connection.GetProjectionsManagerAsync(projectionHost).Result;
projectionsManager.ListAllAsync(connection.Settings.DefaultUserCredentials).Wait();
}
catch (Exception ex)
{
throw new ConfigurationException($"Cannot connect to event store projections: {projectionHost}.", ex);
}
} }
public IEventSubscription CreateSubscription(IEventSubscriber subscriber, string streamFilter, string position = null) public IEventSubscription CreateSubscription(IEventSubscriber subscriber, string streamFilter, string position = null)
{ {
return new GetEventStoreSubscription(connection, subscriber, projectionHost, prefix, position, streamFilter); return new GetEventStoreSubscription(connection, subscriber, projectionsManager, prefix, position, streamFilter);
} }
public async Task GetEventsAsync(Func<StoredEvent, Task> callback, CancellationToken cancellationToken, string streamFilter = null, string position = null) public async Task GetEventsAsync(Func<StoredEvent, Task> callback, CancellationToken cancellationToken, string streamFilter = null, string position = null)
{ {
var streamName = await connection.CreateProjectionAsync(projectionHost, prefix, streamFilter); var streamName = await connection.CreateProjectionAsync(projectionsManager, prefix, streamFilter);
var sliceStart = ProjectionHelper.ParsePosition(position); var sliceStart = ProjectionHelper.ParsePosition(position);
StreamEventsSlice currentSlice; StreamEventsSlice currentSlice;
do do
{ {
currentSlice = await connection.ReadStreamEventsForwardAsync(GetStreamName(streamName), sliceStart, ReadPageSize, false); currentSlice = await connection.ReadStreamEventsForwardAsync(streamName, sliceStart, ReadPageSize, false);
if (currentSlice.Status == SliceReadStatus.Success) if (currentSlice.Status == SliceReadStatus.Success)
{ {

5
src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStoreSubscription.cs

@ -9,6 +9,7 @@
using System.Threading.Tasks; using System.Threading.Tasks;
using EventStore.ClientAPI; using EventStore.ClientAPI;
using EventStore.ClientAPI.Exceptions; using EventStore.ClientAPI.Exceptions;
using EventStore.ClientAPI.Projections;
using Squidex.Infrastructure.Tasks; using Squidex.Infrastructure.Tasks;
namespace Squidex.Infrastructure.EventSourcing namespace Squidex.Infrastructure.EventSourcing
@ -23,7 +24,7 @@ namespace Squidex.Infrastructure.EventSourcing
public GetEventStoreSubscription( public GetEventStoreSubscription(
IEventStoreConnection eventStoreConnection, IEventStoreConnection eventStoreConnection,
IEventSubscriber eventSubscriber, IEventSubscriber eventSubscriber,
string projectionHost, ProjectionsManager projectionsManager,
string prefix, string prefix,
string position, string position,
string streamFilter) string streamFilter)
@ -35,7 +36,7 @@ namespace Squidex.Infrastructure.EventSourcing
this.eventSubscriber = eventSubscriber; this.eventSubscriber = eventSubscriber;
this.position = ProjectionHelper.ParsePositionOrNull(position); this.position = ProjectionHelper.ParsePositionOrNull(position);
var streamName = eventStoreConnection.CreateProjectionAsync(projectionHost, prefix, streamFilter).Result; var streamName = eventStoreConnection.CreateProjectionAsync(projectionsManager, prefix, streamFilter).Result;
subscription = SubscribeToStream(streamName); subscription = SubscribeToStream(streamName);
} }

8
src/Squidex.Infrastructure.GetEventStore/EventSourcing/ProjectionHelper.cs

@ -29,14 +29,12 @@ namespace Squidex.Infrastructure.EventSourcing
return string.Format(CultureInfo.InvariantCulture, ProjectionName, prefix.Simplify(), filter.Simplify()); return string.Format(CultureInfo.InvariantCulture, ProjectionName, prefix.Simplify(), filter.Simplify());
} }
public static async Task<string> CreateProjectionAsync(this IEventStoreConnection connection, string projectionHost, string prefix, string streamFilter = null) public static async Task<string> CreateProjectionAsync(this IEventStoreConnection connection, ProjectionsManager projectionsManager, string prefix, string streamFilter = null)
{ {
var streamName = ParseFilter(prefix, streamFilter); var streamName = ParseFilter(prefix, streamFilter);
if (SubscriptionsCreated.TryAdd(streamName, true)) if (SubscriptionsCreated.TryAdd(streamName, true))
{ {
var projectsManager = await ConnectToProjections(connection, projectionHost);
var projectionConfig = var projectionConfig =
$@"fromAll() $@"fromAll()
.when({{ .when({{
@ -51,7 +49,7 @@ namespace Squidex.Infrastructure.EventSourcing
{ {
var credentials = connection.Settings.DefaultUserCredentials; var credentials = connection.Settings.DefaultUserCredentials;
await projectsManager.CreateContinuousAsync($"${streamName}", projectionConfig, credentials); await projectionsManager.CreateContinuousAsync($"${streamName}", projectionConfig, credentials);
} }
catch (Exception ex) catch (Exception ex)
{ {
@ -65,7 +63,7 @@ namespace Squidex.Infrastructure.EventSourcing
return streamName; return streamName;
} }
private static async Task<ProjectionsManager> ConnectToProjections(IEventStoreConnection connection, string projectionHost) public static async Task<ProjectionsManager> GetProjectionsManagerAsync(this IEventStoreConnection connection, string projectionHost)
{ {
var addressParts = projectionHost.Split(':'); var addressParts = projectionHost.Split(':');

Loading…
Cancel
Save