From 397de74a9836c94517ada794b51f9ff012fa7af2 Mon Sep 17 00:00:00 2001 From: Sebastian Stehle Date: Wed, 3 Jan 2018 17:50:26 +0100 Subject: [PATCH] Fix finished --- .../EventSourcing/GetEventStore.cs | 25 +++++++++++++++++-- .../GetEventStoreSubscription.cs | 7 +----- .../EventSourcing/ProjectionHelper.cs | 5 ++++ 3 files changed, 29 insertions(+), 8 deletions(-) diff --git a/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStore.cs b/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStore.cs index cd6659044..12cbd708b 100644 --- a/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStore.cs +++ b/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStore.cs @@ -54,9 +54,30 @@ namespace Squidex.Infrastructure.EventSourcing return new GetEventStoreSubscription(connection, subscriber, projectionHost, prefix, position, streamFilter); } - public Task GetEventsAsync(Func callback, CancellationToken cancellationToken, string streamFilter = null, string position = null) + public async Task GetEventsAsync(Func callback, CancellationToken cancellationToken, string streamFilter = null, string position = null) { - throw new NotSupportedException(); + var streamName = await connection.CreateProjectionAsync(projectionHost, prefix, streamFilter); + + var sliceStart = ProjectionHelper.ParsePosition(position) ?? -1; + + StreamEventsSlice currentSlice; + do + { + currentSlice = await connection.ReadStreamEventsForwardAsync(GetStreamName(streamName), sliceStart, ReadPageSize, false); + + if (currentSlice.Status == SliceReadStatus.Success) + { + sliceStart = currentSlice.NextEventNumber; + + foreach (var resolved in currentSlice.Events) + { + var storedEvent = Formatter.Read(resolved); + + await callback(storedEvent); + } + } + } + while (!currentSlice.IsEndOfStream && !cancellationToken.IsCancellationRequested); } public async Task> GetEventsAsync(string streamName, long streamPosition = 0) diff --git a/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStoreSubscription.cs b/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStoreSubscription.cs index a7dfdd025..b2fc0c24f 100644 --- a/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStoreSubscription.cs +++ b/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStoreSubscription.cs @@ -33,7 +33,7 @@ namespace Squidex.Infrastructure.EventSourcing this.eventStoreConnection = eventStoreConnection; this.eventSubscriber = eventSubscriber; - this.position = ParsePosition(position); + this.position = ProjectionHelper.ParsePosition(position); var streamName = eventStoreConnection.CreateProjectionAsync(projectionHost, prefix, streamFilter).Result; @@ -69,10 +69,5 @@ namespace Squidex.Infrastructure.EventSourcing } }); } - - private static long? ParsePosition(string position) - { - return long.TryParse(position, out var parsedPosition) ? (long?)parsedPosition : null; - } } } diff --git a/src/Squidex.Infrastructure.GetEventStore/EventSourcing/ProjectionHelper.cs b/src/Squidex.Infrastructure.GetEventStore/EventSourcing/ProjectionHelper.cs index 7ad93b8c7..3c35b5aac 100644 --- a/src/Squidex.Infrastructure.GetEventStore/EventSourcing/ProjectionHelper.cs +++ b/src/Squidex.Infrastructure.GetEventStore/EventSourcing/ProjectionHelper.cs @@ -84,5 +84,10 @@ namespace Squidex.Infrastructure.EventSourcing return projectionsManager; } + + public static long? ParsePosition(string position) + { + return long.TryParse(position, out var parsedPosition) ? (long?)parsedPosition : null; + } } }