|
|
|
@ -8,16 +8,14 @@ |
|
|
|
|
|
|
|
using System; |
|
|
|
using System.Collections.Generic; |
|
|
|
using System.Linq; |
|
|
|
using System.Reactive.Linq; |
|
|
|
using System.Threading; |
|
|
|
using System.Threading.Tasks; |
|
|
|
using MongoDB.Bson; |
|
|
|
using MongoDB.Driver; |
|
|
|
using Squidex.Infrastructure.MongoDb; |
|
|
|
using Squidex.Infrastructure.Tasks; |
|
|
|
|
|
|
|
namespace Squidex.Infrastructure.CQRS.Events |
|
|
|
namespace Squidex.Infrastructure.EventSourcing |
|
|
|
{ |
|
|
|
public class MongoEventStore : MongoRepositoryBase<MongoEventCommit>, IEventStore |
|
|
|
{ |
|
|
|
@ -63,9 +61,9 @@ namespace Squidex.Infrastructure.CQRS.Events |
|
|
|
return new PollingSubscription(this, notifier, subscriber, streamFilter, position); |
|
|
|
} |
|
|
|
|
|
|
|
public async Task<IReadOnlyList<StoredEvent>> GetEventsAsync(string streamName, int position = -1) |
|
|
|
public async Task<IReadOnlyList<StoredEvent>> GetEventsAsync(string streamName, long streamPosition = -1) |
|
|
|
{ |
|
|
|
var commits = await Collection.Find(x => x.EventStreamOffset > position).Sort(Sort.Ascending(TimestampField)).ToListAsync(); |
|
|
|
var commits = await Collection.Find(x => x.EventStreamOffset >= streamPosition).Sort(Sort.Ascending(TimestampField)).ToListAsync(); |
|
|
|
|
|
|
|
var result = new List<StoredEvent>(); |
|
|
|
|
|
|
|
@ -80,7 +78,7 @@ namespace Squidex.Infrastructure.CQRS.Events |
|
|
|
{ |
|
|
|
eventStreamOffset++; |
|
|
|
|
|
|
|
if (eventStreamOffset > position) |
|
|
|
if (eventStreamOffset >= streamPosition) |
|
|
|
{ |
|
|
|
var eventData = e.ToEventData(); |
|
|
|
var eventToken = new StreamPosition(commitTimestamp, commitOffset, commit.Events.Length); |
|
|
|
@ -130,7 +128,7 @@ namespace Squidex.Infrastructure.CQRS.Events |
|
|
|
return AppendEventsInternalAsync(commitId, streamName, AnyVersion, events); |
|
|
|
} |
|
|
|
|
|
|
|
public Task AppendEventsAsync(Guid commitId, string streamName, int expectedVersion, ICollection<EventData> events) |
|
|
|
public Task AppendEventsAsync(Guid commitId, string streamName, long expectedVersion, ICollection<EventData> events) |
|
|
|
{ |
|
|
|
Guard.GreaterEquals(expectedVersion, -1, nameof(expectedVersion)); |
|
|
|
|