diff --git a/src/Squidex.Infrastructure.Azure/EventSourcing/CosmosDbEvent.cs b/src/Squidex.Infrastructure.Azure/EventSourcing/CosmosDbEvent.cs index ce1beb819..c12c6548f 100644 --- a/src/Squidex.Infrastructure.Azure/EventSourcing/CosmosDbEvent.cs +++ b/src/Squidex.Infrastructure.Azure/EventSourcing/CosmosDbEvent.cs @@ -11,13 +11,13 @@ namespace Squidex.Infrastructure.EventSourcing { internal sealed class CosmosDbEvent { - [JsonProperty] + [JsonProperty("type")] public string Type { get; set; } - [JsonProperty] + [JsonProperty("payload")] public string Payload { get; set; } - [JsonProperty] + [JsonProperty("header")] public EnvelopeHeaders Headers { get; set; } public static CosmosDbEvent FromEventData(EventData data) diff --git a/src/Squidex.Infrastructure.Azure/EventSourcing/CosmosDbEventCommit.cs b/src/Squidex.Infrastructure.Azure/EventSourcing/CosmosDbEventCommit.cs index 7e104f2c9..6a5dca9b3 100644 --- a/src/Squidex.Infrastructure.Azure/EventSourcing/CosmosDbEventCommit.cs +++ b/src/Squidex.Infrastructure.Azure/EventSourcing/CosmosDbEventCommit.cs @@ -12,22 +12,22 @@ namespace Squidex.Infrastructure.EventSourcing { internal sealed class CosmosDbEventCommit { - [JsonProperty] + [JsonProperty("id")] public Guid Id { get; set; } - [JsonProperty] + [JsonProperty("events")] public CosmosDbEvent[] Events { get; set; } - [JsonProperty] + [JsonProperty("eventStreamOffset")] public long EventStreamOffset { get; set; } - [JsonProperty] + [JsonProperty("eventsCount")] public long EventsCount { get; set; } - [JsonProperty] + [JsonProperty("eventStream")] public string EventStream { get; set; } - [JsonProperty] + [JsonProperty("timestamp")] public long Timestamp { get; set; } } } diff --git a/src/Squidex.Infrastructure.Azure/EventSourcing/CosmosDbEventStore.cs b/src/Squidex.Infrastructure.Azure/EventSourcing/CosmosDbEventStore.cs index d25052b78..c28984b0f 100644 --- a/src/Squidex.Infrastructure.Azure/EventSourcing/CosmosDbEventStore.cs +++ b/src/Squidex.Infrastructure.Azure/EventSourcing/CosmosDbEventStore.cs @@ -19,7 +19,8 @@ namespace Squidex.Infrastructure.EventSourcing private readonly DocumentClient documentClient; private readonly Uri databaseUri; private readonly Uri collectionUri; - private readonly string database; + private readonly string databaseId; + private readonly string collectionId; public CosmosDbEventStore(DocumentClient documentClient, string database) { @@ -28,15 +29,16 @@ namespace Squidex.Infrastructure.EventSourcing this.documentClient = documentClient; - this.databaseUri = UriFactory.CreateDatabaseUri(database); - this.database = database; + databaseUri = UriFactory.CreateDatabaseUri(database); + databaseId = database; collectionUri = UriFactory.CreateDocumentCollectionUri(database, FilterBuilder.Collection); + collectionId = FilterBuilder.Collection; } public async Task InitializeAsync(CancellationToken ct = default) { - await documentClient.CreateDatabaseIfNotExistsAsync(new Database { Id = database }); + await documentClient.CreateDatabaseIfNotExistsAsync(new Database { Id = databaseId }); await documentClient.CreateDocumentCollectionIfNotExistsAsync(databaseUri, new DocumentCollection diff --git a/src/Squidex.Infrastructure.Azure/EventSourcing/CosmosDbEventStore_Reader.cs b/src/Squidex.Infrastructure.Azure/EventSourcing/CosmosDbEventStore_Reader.cs index f2d2e2194..adaef8785 100644 --- a/src/Squidex.Infrastructure.Azure/EventSourcing/CosmosDbEventStore_Reader.cs +++ b/src/Squidex.Infrastructure.Azure/EventSourcing/CosmosDbEventStore_Reader.cs @@ -7,11 +7,11 @@ using System; using System.Collections.Generic; -using System.Linq; using System.Threading; using System.Threading.Tasks; -using Microsoft.Azure.Documents.Linq; +using Microsoft.Azure.Documents; using Squidex.Infrastructure.Log; +using Squidex.Infrastructure.Tasks; namespace Squidex.Infrastructure.EventSourcing { @@ -24,7 +24,7 @@ namespace Squidex.Infrastructure.EventSourcing Guard.NotNull(subscriber, nameof(subscriber)); Guard.NotNullOrEmpty(streamFilter, nameof(streamFilter)); - return new PollingSubscription(this, subscriber, streamFilter, position); + throw new NotSupportedException(); } public Task CreateIndexAsync(string property) @@ -36,39 +36,32 @@ namespace Squidex.Infrastructure.EventSourcing { using (Profiler.TraceMethod()) { - var query = - documentClient.CreateDocumentQuery(collectionUri, - FilterBuilder.ByStreamName(streamName, streamPosition)); - - var documentQuery = query.AsDocumentQuery(); + var query = FilterBuilder.ByStreamName(streamName, streamPosition - MaxCommitSize); var result = new List(); - while (documentQuery.HasMoreResults) + await documentClient.QueryAsync(collectionUri, query, commit => { - var commits = await documentQuery.ExecuteNextAsync(); + var eventStreamOffset = (int)commit.EventStreamOffset; - foreach (var commit in commits) - { - var eventStreamOffset = (int)commit.EventStreamOffset; + var commitTimestamp = commit.Timestamp; + var commitOffset = 0; - var commitTimestamp = commit.Timestamp; - var commitOffset = 0; + foreach (var e in commit.Events) + { + eventStreamOffset++; - foreach (var e in commit.Events) + if (eventStreamOffset >= streamPosition) { - eventStreamOffset++; + var eventData = e.ToEventData(); + var eventToken = new StreamPosition(commitTimestamp, commitOffset, commit.Events.Length); - if (eventStreamOffset >= streamPosition) - { - var eventData = e.ToEventData(); - var eventToken = new StreamPosition(commitTimestamp, commitOffset, commit.Events.Length); - - result.Add(new StoredEvent(streamName, eventToken, eventStreamOffset, eventData)); - } + result.Add(new StoredEvent(streamName, eventToken, eventStreamOffset, eventData)); } } - } + + return TaskHelper.Done; + }); return result; } @@ -80,8 +73,8 @@ namespace Squidex.Infrastructure.EventSourcing StreamPosition lastPosition = position; - var filterDefinition = CreateFilter(property, value, lastPosition); - var filterExpression = CreateFilterExpression(property, value); + var filterDefinition = FilterBuilder.CreateByProperty(property, value, lastPosition); + var filterExpression = FilterBuilder.CreateExpression(property, value); return QueryAsync(callback, lastPosition, filterDefinition, filterExpression, ct); } @@ -92,69 +85,43 @@ namespace Squidex.Infrastructure.EventSourcing StreamPosition lastPosition = position; - var filterDefinition = CreateFilter(streamFilter, lastPosition); - var filterExpression = CreateFilterExpression(null, null); + var filterDefinition = FilterBuilder.CreateByFilter(streamFilter, lastPosition); + var filterExpression = FilterBuilder.CreateExpression(null, null); return QueryAsync(callback, lastPosition, filterDefinition, filterExpression, ct); } - private async Task QueryAsync(Func callback, StreamPosition lastPosition, IQueryable query, EventPredicate filterExpression, CancellationToken ct = default) + private async Task QueryAsync(Func callback, StreamPosition lastPosition, SqlQuerySpec query, EventPredicate filterExpression, CancellationToken ct = default) { using (Profiler.TraceMethod()) { - var documentQuery = query.AsDocumentQuery(); - - while (documentQuery.HasMoreResults && !ct.IsCancellationRequested) + await documentClient.QueryAsync(collectionUri, query, async commit => { - var commits = await documentQuery.ExecuteNextAsync(ct); + var eventStreamOffset = (int)commit.EventStreamOffset; - foreach (var commit in commits) - { - var eventStreamOffset = (int)commit.EventStreamOffset; + var commitTimestamp = commit.Timestamp; + var commitOffset = 0; - var commitTimestamp = commit.Timestamp; - var commitOffset = 0; + foreach (var e in commit.Events) + { + eventStreamOffset++; - foreach (var e in commit.Events) + if (commitOffset > lastPosition.CommitOffset || commitTimestamp > lastPosition.Timestamp) { - eventStreamOffset++; + var eventData = e.ToEventData(); - if (commitOffset > lastPosition.CommitOffset || commitTimestamp > lastPosition.Timestamp) + if (filterExpression(eventData)) { - var eventData = e.ToEventData(); - - if (filterExpression(eventData)) - { - var eventToken = new StreamPosition(commitTimestamp, commitOffset, commit.Events.Length); + var eventToken = new StreamPosition(commitTimestamp, commitOffset, commit.Events.Length); - await callback(new StoredEvent(commit.EventStream, eventToken, eventStreamOffset, eventData)); - } + await callback(new StoredEvent(commit.EventStream, eventToken, eventStreamOffset, eventData)); } - - commitOffset++; } + + commitOffset++; } - } + }, ct); } } - - private IQueryable CreateFilter(string property, object value, StreamPosition streamPosition) - { - var query = FilterBuilder.ByProperty(property, value, streamPosition); - - return documentClient.CreateDocumentQuery(collectionUri, query); - } - - private IQueryable CreateFilter(string streamFilter, StreamPosition streamPosition) - { - var query = FilterBuilder.ByFilter(streamFilter, streamPosition); - - return documentClient.CreateDocumentQuery(collectionUri, query); - } - - private static EventPredicate CreateFilterExpression(string property, object value) - { - return FilterBuilder.CreateFilterExpression(property, value); - } } } \ No newline at end of file diff --git a/src/Squidex.Infrastructure.Azure/EventSourcing/CosmosDbEventStore_Writer.cs b/src/Squidex.Infrastructure.Azure/EventSourcing/CosmosDbEventStore_Writer.cs index 912713204..89650117d 100644 --- a/src/Squidex.Infrastructure.Azure/EventSourcing/CosmosDbEventStore_Writer.cs +++ b/src/Squidex.Infrastructure.Azure/EventSourcing/CosmosDbEventStore_Writer.cs @@ -25,7 +25,12 @@ namespace Squidex.Infrastructure.EventSourcing public Task DeleteStreamAsync(string streamName) { - return Task.CompletedTask; + var query = FilterBuilder.AllIds(streamName); + + return documentClient.QueryAsync(collectionUri, query, commit => + { + return documentClient.DeleteDocumentAsync(UriFactory.CreateDocumentUri(databaseId, collectionId, commit.Id.ToString())); + }); } public Task AppendAsync(Guid commitId, string streamName, ICollection events) diff --git a/src/Squidex.Infrastructure.Azure/EventSourcing/FilterBuilder.cs b/src/Squidex.Infrastructure.Azure/EventSourcing/FilterBuilder.cs index 0de29581f..821c53092 100644 --- a/src/Squidex.Infrastructure.Azure/EventSourcing/FilterBuilder.cs +++ b/src/Squidex.Infrastructure.Azure/EventSourcing/FilterBuilder.cs @@ -7,7 +7,11 @@ using System; using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; using Microsoft.Azure.Documents; +using Microsoft.Azure.Documents.Client; +using Microsoft.Azure.Documents.Linq; using Squidex.Infrastructure.Json.Objects; namespace Squidex.Infrastructure.EventSourcing @@ -16,11 +20,53 @@ namespace Squidex.Infrastructure.EventSourcing { public const string Collection = "Events"; + public static readonly string CommitId = nameof(CosmosDbEventCommit.Id).ToCamelCase(); public static readonly string EventsCountField = nameof(CosmosDbEventCommit.EventsCount).ToCamelCase(); public static readonly string EventStreamOffsetField = nameof(CosmosDbEventCommit.EventStreamOffset).ToCamelCase(); public static readonly string EventStreamField = nameof(CosmosDbEventCommit.EventStream).ToCamelCase(); public static readonly string TimestampField = nameof(CosmosDbEventCommit.Timestamp).ToCamelCase(); + public static async Task QueryAsync(this DocumentClient documentClient, Uri collectionUri, SqlQuerySpec querySpec, Func handler, CancellationToken ct = default) + { + var query = + documentClient.CreateDocumentQuery(collectionUri, querySpec) + .AsDocumentQuery(); + + using (query) + { + var result = new List(); + + while (query.HasMoreResults && !ct.IsCancellationRequested) + { + var commits = await query.ExecuteNextAsync(ct); + + foreach (var commit in commits) + { + await handler(commit); + } + } + } + } + + public static SqlQuerySpec AllIds(string streamName) + { + var query = + $"SELECT TOP 1 " + + $" e.{CommitId}," + + $" e.{EventsCountField} " + + $"FROM {Collection} e " + + $"WHERE " + + $" e.{EventStreamField} = @name " + + $"ORDER BY e.{EventStreamOffsetField} DESC"; + + var parameters = new SqlParameterCollection + { + new SqlParameter("@name", streamName) + }; + + return new SqlQuerySpec(query, parameters); + } + public static SqlQuerySpec LastPosition(string streamName) { var query = @@ -59,7 +105,7 @@ namespace Squidex.Infrastructure.EventSourcing return new SqlQuerySpec(query, parameters); } - public static SqlQuerySpec ByProperty(string property, object value, StreamPosition streamPosition) + public static SqlQuerySpec CreateByProperty(string property, object value, StreamPosition streamPosition) { var filters = new List(); @@ -71,7 +117,7 @@ namespace Squidex.Infrastructure.EventSourcing return BuildQuery(filters, parameters); } - public static SqlQuerySpec ByFilter(string streamFilter, StreamPosition streamPosition) + public static SqlQuerySpec CreateByFilter(string streamFilter, StreamPosition streamPosition) { var filters = new List(); @@ -92,7 +138,7 @@ namespace Squidex.Infrastructure.EventSourcing private static void ForProperty(this List filters, SqlParameterCollection parameters, string property, object value) { - filters.Add($"ARRAY_CONTAINS(e.events, {{ \"headers\": {{ \"{property}\": @value }} }}, true)"); + filters.Add($"ARRAY_CONTAINS(e.events, {{ \"header\": {{ \"{property}\": @value }} }}, true)"); parameters.Add(new SqlParameter("@value", value)); } @@ -128,7 +174,7 @@ namespace Squidex.Infrastructure.EventSourcing parameters.Add(new SqlParameter("@time", streamPosition.Timestamp)); } - public static EventPredicate CreateFilterExpression(string property, object value) + public static EventPredicate CreateExpression(string property, object value) { if (!string.IsNullOrWhiteSpace(property)) {