Browse Source

Fix finished

pull/214/head
Sebastian Stehle 8 years ago
parent
commit
397de74a98
  1. 25
      src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStore.cs
  2. 7
      src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStoreSubscription.cs
  3. 5
      src/Squidex.Infrastructure.GetEventStore/EventSourcing/ProjectionHelper.cs

25
src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStore.cs

@ -54,9 +54,30 @@ namespace Squidex.Infrastructure.EventSourcing
return new GetEventStoreSubscription(connection, subscriber, projectionHost, prefix, position, streamFilter);
}
public Task GetEventsAsync(Func<StoredEvent, Task> callback, CancellationToken cancellationToken, string streamFilter = null, string position = null)
public async Task GetEventsAsync(Func<StoredEvent, Task> callback, CancellationToken cancellationToken, string streamFilter = null, string position = null)
{
throw new NotSupportedException();
var streamName = await connection.CreateProjectionAsync(projectionHost, prefix, streamFilter);
var sliceStart = ProjectionHelper.ParsePosition(position) ?? -1;
StreamEventsSlice currentSlice;
do
{
currentSlice = await connection.ReadStreamEventsForwardAsync(GetStreamName(streamName), sliceStart, ReadPageSize, false);
if (currentSlice.Status == SliceReadStatus.Success)
{
sliceStart = currentSlice.NextEventNumber;
foreach (var resolved in currentSlice.Events)
{
var storedEvent = Formatter.Read(resolved);
await callback(storedEvent);
}
}
}
while (!currentSlice.IsEndOfStream && !cancellationToken.IsCancellationRequested);
}
public async Task<IReadOnlyList<StoredEvent>> GetEventsAsync(string streamName, long streamPosition = 0)

7
src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStoreSubscription.cs

@ -33,7 +33,7 @@ namespace Squidex.Infrastructure.EventSourcing
this.eventStoreConnection = eventStoreConnection;
this.eventSubscriber = eventSubscriber;
this.position = ParsePosition(position);
this.position = ProjectionHelper.ParsePosition(position);
var streamName = eventStoreConnection.CreateProjectionAsync(projectionHost, prefix, streamFilter).Result;
@ -69,10 +69,5 @@ namespace Squidex.Infrastructure.EventSourcing
}
});
}
private static long? ParsePosition(string position)
{
return long.TryParse(position, out var parsedPosition) ? (long?)parsedPosition : null;
}
}
}

5
src/Squidex.Infrastructure.GetEventStore/EventSourcing/ProjectionHelper.cs

@ -84,5 +84,10 @@ namespace Squidex.Infrastructure.EventSourcing
return projectionsManager;
}
public static long? ParsePosition(string position)
{
return long.TryParse(position, out var parsedPosition) ? (long?)parsedPosition : null;
}
}
}

Loading…
Cancel
Save