@ -78,11 +78,11 @@ namespace Squidex.Infrastructure.MongoDb.EventStore
} ) ;
}
public IObservable < StoredEvent > GetEventsAsync ( long lastReceivedPosition = - 1 )
public IObservable < StoredEvent > GetEventsAsync ( long lastReceivedEventNumber = - 1 )
{
return Observable . Create < StoredEvent > ( async ( observer , ct ) = >
{
var commitOffset = await GetPreviousOffset ( lastReceivedPosition ) ;
var commitOffset = await GetPreviousOffset ( lastReceivedEventNumber ) ;
await Collection . Find ( x = > x . EventsOffset > = commitOffset ) . SortBy ( x = > x . EventsOffset ) . ForEachAsync ( commit = >
{
@ -92,7 +92,7 @@ namespace Squidex.Infrastructure.MongoDb.EventStore
{
eventNumber + + ;
if ( eventNumber > lastReceivedPosition )
if ( eventNumber > lastReceivedEventNumber )
{
var eventData = SimpleMapper . Map ( @event , new EventData ( ) ) ;
@ -144,13 +144,13 @@ namespace Squidex.Infrastructure.MongoDb.EventStore
return ;
}
catch ( MongoWriteException e )
catch ( MongoWriteException ex )
{
if ( e . Message . IndexOf ( eventsOffsetIndex , StringComparison . OrdinalIgnoreCase ) > = 0 )
if ( ex . Message . IndexOf ( eventsOffsetIndex , StringComparison . OrdinalIgnoreCase ) > = 0 )
{
commit . EventsOffset = await GetEventOffset ( ) ;
}
else if ( e . WriteError ? . Category = = ServerErrorCategory . DuplicateKey )
else if ( ex . WriteError ? . Category = = ServerErrorCategory . DuplicateKey )
{
currentVersion = await GetEventStreamOffset ( streamName ) ;
@ -165,10 +165,10 @@ namespace Squidex.Infrastructure.MongoDb.EventStore
}
}
private async Task < long > GetPreviousOffset ( long startPosition )
private async Task < long > GetPreviousOffset ( long startEventNumber )
{
var document =
await Collection . Find ( x = > x . EventsOffset < = startPosition )
await Collection . Find ( x = > x . EventsOffset < = startEventNumber )
. Project < BsonDocument > ( Projection
. Include ( x = > x . EventStreamOffset )
. Include ( x = > x . EventsCount ) )