|
|
|
@ -7,11 +7,11 @@ |
|
|
|
|
|
|
|
using System; |
|
|
|
using System.Collections.Generic; |
|
|
|
using System.Linq; |
|
|
|
using System.Threading; |
|
|
|
using System.Threading.Tasks; |
|
|
|
using Microsoft.Azure.Documents.Linq; |
|
|
|
using Microsoft.Azure.Documents; |
|
|
|
using Squidex.Infrastructure.Log; |
|
|
|
using Squidex.Infrastructure.Tasks; |
|
|
|
|
|
|
|
namespace Squidex.Infrastructure.EventSourcing |
|
|
|
{ |
|
|
|
@ -24,7 +24,7 @@ namespace Squidex.Infrastructure.EventSourcing |
|
|
|
Guard.NotNull(subscriber, nameof(subscriber)); |
|
|
|
Guard.NotNullOrEmpty(streamFilter, nameof(streamFilter)); |
|
|
|
|
|
|
|
return new PollingSubscription(this, subscriber, streamFilter, position); |
|
|
|
throw new NotSupportedException(); |
|
|
|
} |
|
|
|
|
|
|
|
public Task CreateIndexAsync(string property) |
|
|
|
@ -36,39 +36,32 @@ namespace Squidex.Infrastructure.EventSourcing |
|
|
|
{ |
|
|
|
using (Profiler.TraceMethod<CosmosDbEventStore>()) |
|
|
|
{ |
|
|
|
var query = |
|
|
|
documentClient.CreateDocumentQuery<CosmosDbEventCommit>(collectionUri, |
|
|
|
FilterBuilder.ByStreamName(streamName, streamPosition)); |
|
|
|
|
|
|
|
var documentQuery = query.AsDocumentQuery(); |
|
|
|
var query = FilterBuilder.ByStreamName(streamName, streamPosition - MaxCommitSize); |
|
|
|
|
|
|
|
var result = new List<StoredEvent>(); |
|
|
|
|
|
|
|
while (documentQuery.HasMoreResults) |
|
|
|
await documentClient.QueryAsync(collectionUri, query, commit => |
|
|
|
{ |
|
|
|
var commits = await documentQuery.ExecuteNextAsync<CosmosDbEventCommit>(); |
|
|
|
var eventStreamOffset = (int)commit.EventStreamOffset; |
|
|
|
|
|
|
|
foreach (var commit in commits) |
|
|
|
{ |
|
|
|
var eventStreamOffset = (int)commit.EventStreamOffset; |
|
|
|
var commitTimestamp = commit.Timestamp; |
|
|
|
var commitOffset = 0; |
|
|
|
|
|
|
|
var commitTimestamp = commit.Timestamp; |
|
|
|
var commitOffset = 0; |
|
|
|
foreach (var e in commit.Events) |
|
|
|
{ |
|
|
|
eventStreamOffset++; |
|
|
|
|
|
|
|
foreach (var e in commit.Events) |
|
|
|
if (eventStreamOffset >= streamPosition) |
|
|
|
{ |
|
|
|
eventStreamOffset++; |
|
|
|
var eventData = e.ToEventData(); |
|
|
|
var eventToken = new StreamPosition(commitTimestamp, commitOffset, commit.Events.Length); |
|
|
|
|
|
|
|
if (eventStreamOffset >= streamPosition) |
|
|
|
{ |
|
|
|
var eventData = e.ToEventData(); |
|
|
|
var eventToken = new StreamPosition(commitTimestamp, commitOffset, commit.Events.Length); |
|
|
|
|
|
|
|
result.Add(new StoredEvent(streamName, eventToken, eventStreamOffset, eventData)); |
|
|
|
} |
|
|
|
result.Add(new StoredEvent(streamName, eventToken, eventStreamOffset, eventData)); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
return TaskHelper.Done; |
|
|
|
}); |
|
|
|
|
|
|
|
return result; |
|
|
|
} |
|
|
|
@ -80,8 +73,8 @@ namespace Squidex.Infrastructure.EventSourcing |
|
|
|
|
|
|
|
StreamPosition lastPosition = position; |
|
|
|
|
|
|
|
var filterDefinition = CreateFilter(property, value, lastPosition); |
|
|
|
var filterExpression = CreateFilterExpression(property, value); |
|
|
|
var filterDefinition = FilterBuilder.CreateByProperty(property, value, lastPosition); |
|
|
|
var filterExpression = FilterBuilder.CreateExpression(property, value); |
|
|
|
|
|
|
|
return QueryAsync(callback, lastPosition, filterDefinition, filterExpression, ct); |
|
|
|
} |
|
|
|
@ -92,69 +85,43 @@ namespace Squidex.Infrastructure.EventSourcing |
|
|
|
|
|
|
|
StreamPosition lastPosition = position; |
|
|
|
|
|
|
|
var filterDefinition = CreateFilter(streamFilter, lastPosition); |
|
|
|
var filterExpression = CreateFilterExpression(null, null); |
|
|
|
var filterDefinition = FilterBuilder.CreateByFilter(streamFilter, lastPosition); |
|
|
|
var filterExpression = FilterBuilder.CreateExpression(null, null); |
|
|
|
|
|
|
|
return QueryAsync(callback, lastPosition, filterDefinition, filterExpression, ct); |
|
|
|
} |
|
|
|
|
|
|
|
private async Task QueryAsync(Func<StoredEvent, Task> callback, StreamPosition lastPosition, IQueryable<CosmosDbEventCommit> query, EventPredicate filterExpression, CancellationToken ct = default) |
|
|
|
private async Task QueryAsync(Func<StoredEvent, Task> callback, StreamPosition lastPosition, SqlQuerySpec query, EventPredicate filterExpression, CancellationToken ct = default) |
|
|
|
{ |
|
|
|
using (Profiler.TraceMethod<CosmosDbEventStore>()) |
|
|
|
{ |
|
|
|
var documentQuery = query.AsDocumentQuery(); |
|
|
|
|
|
|
|
while (documentQuery.HasMoreResults && !ct.IsCancellationRequested) |
|
|
|
await documentClient.QueryAsync(collectionUri, query, async commit => |
|
|
|
{ |
|
|
|
var commits = await documentQuery.ExecuteNextAsync<CosmosDbEventCommit>(ct); |
|
|
|
var eventStreamOffset = (int)commit.EventStreamOffset; |
|
|
|
|
|
|
|
foreach (var commit in commits) |
|
|
|
{ |
|
|
|
var eventStreamOffset = (int)commit.EventStreamOffset; |
|
|
|
var commitTimestamp = commit.Timestamp; |
|
|
|
var commitOffset = 0; |
|
|
|
|
|
|
|
var commitTimestamp = commit.Timestamp; |
|
|
|
var commitOffset = 0; |
|
|
|
foreach (var e in commit.Events) |
|
|
|
{ |
|
|
|
eventStreamOffset++; |
|
|
|
|
|
|
|
foreach (var e in commit.Events) |
|
|
|
if (commitOffset > lastPosition.CommitOffset || commitTimestamp > lastPosition.Timestamp) |
|
|
|
{ |
|
|
|
eventStreamOffset++; |
|
|
|
var eventData = e.ToEventData(); |
|
|
|
|
|
|
|
if (commitOffset > lastPosition.CommitOffset || commitTimestamp > lastPosition.Timestamp) |
|
|
|
if (filterExpression(eventData)) |
|
|
|
{ |
|
|
|
var eventData = e.ToEventData(); |
|
|
|
|
|
|
|
if (filterExpression(eventData)) |
|
|
|
{ |
|
|
|
var eventToken = new StreamPosition(commitTimestamp, commitOffset, commit.Events.Length); |
|
|
|
var eventToken = new StreamPosition(commitTimestamp, commitOffset, commit.Events.Length); |
|
|
|
|
|
|
|
await callback(new StoredEvent(commit.EventStream, eventToken, eventStreamOffset, eventData)); |
|
|
|
} |
|
|
|
await callback(new StoredEvent(commit.EventStream, eventToken, eventStreamOffset, eventData)); |
|
|
|
} |
|
|
|
|
|
|
|
commitOffset++; |
|
|
|
} |
|
|
|
|
|
|
|
commitOffset++; |
|
|
|
} |
|
|
|
} |
|
|
|
}, ct); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private IQueryable<CosmosDbEventCommit> CreateFilter(string property, object value, StreamPosition streamPosition) |
|
|
|
{ |
|
|
|
var query = FilterBuilder.ByProperty(property, value, streamPosition); |
|
|
|
|
|
|
|
return documentClient.CreateDocumentQuery<CosmosDbEventCommit>(collectionUri, query); |
|
|
|
} |
|
|
|
|
|
|
|
private IQueryable<CosmosDbEventCommit> CreateFilter(string streamFilter, StreamPosition streamPosition) |
|
|
|
{ |
|
|
|
var query = FilterBuilder.ByFilter(streamFilter, streamPosition); |
|
|
|
|
|
|
|
return documentClient.CreateDocumentQuery<CosmosDbEventCommit>(collectionUri, query); |
|
|
|
} |
|
|
|
|
|
|
|
private static EventPredicate CreateFilterExpression(string property, object value) |
|
|
|
{ |
|
|
|
return FilterBuilder.CreateFilterExpression(property, value); |
|
|
|
} |
|
|
|
} |
|
|
|
} |