From fbe51f102607342af94b20c6b641d3808064f357 Mon Sep 17 00:00:00 2001 From: Sebastian Stehle Date: Sun, 31 Oct 2021 11:46:33 +0100 Subject: [PATCH] [WIP] Bring event store back. (#780) * Bring event store back. * Progress with event store3. * Fix offset and position handling. --- backend/Squidex.sln | 15 ++ .../Comments/DomainObject/CommentsGrain.cs | 2 +- .../Diagnostics/GetEventStoreHealthCheck.cs | 12 +- .../EventStoreProjectionClient.cs | 79 +++++++ .../EventSourcing/Formatter.cs | 27 ++- .../EventSourcing/GetEventStore.cs | 213 ++++++++---------- .../GetEventStoreSubscription.cs | 86 +++---- .../EventSourcing/ProjectionClient.cs | 142 ------------ .../EventSourcing/Utils.cs | 83 +++++++ ...quidex.Infrastructure.GetEventStore.csproj | 9 +- .../EventSourcing/MongoEventStore_Reader.cs | 2 +- .../EventSourcing/IEventStore.cs | 2 +- .../Config/Domain/EventSourcingServices.cs | 17 ++ backend/src/Squidex/Squidex.csproj | 2 + .../EventSourcing/EventStoreTests.cs | 39 +++- .../EventSourcing/GetEventStoreFixture.cs | 51 +++++ .../EventSourcing/GetEventStoreTests.cs | 31 +++ .../Squidex.Infrastructure.Tests.csproj | 1 + 18 files changed, 491 insertions(+), 322 deletions(-) create mode 100644 backend/src/Squidex.Infrastructure.GetEventStore/EventSourcing/EventStoreProjectionClient.cs delete mode 100644 backend/src/Squidex.Infrastructure.GetEventStore/EventSourcing/ProjectionClient.cs create mode 100644 backend/src/Squidex.Infrastructure.GetEventStore/EventSourcing/Utils.cs create mode 100644 backend/tests/Squidex.Infrastructure.Tests/EventSourcing/GetEventStoreFixture.cs create mode 100644 backend/tests/Squidex.Infrastructure.Tests/EventSourcing/GetEventStoreTests.cs diff --git a/backend/Squidex.sln b/backend/Squidex.sln index 9abce5569..94ccedbc5 100644 --- a/backend/Squidex.sln +++ b/backend/Squidex.sln @@ -58,6 +58,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Squidex.Web", "src\Squidex. EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Migrations", "src\Migrations\Migrations.csproj", "{23615A39-F3FB-4575-A91C-535899DFB636}" EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Squidex.Infrastructure.GetEventStore", "src\Squidex.Infrastructure.GetEventStore\Squidex.Infrastructure.GetEventStore.csproj", "{4CFBD9FF-6565-457E-B81C-9FCEFEE854BC}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -284,6 +286,18 @@ Global {23615A39-F3FB-4575-A91C-535899DFB636}.Release|x64.Build.0 = Release|Any CPU {23615A39-F3FB-4575-A91C-535899DFB636}.Release|x86.ActiveCfg = Release|Any CPU {23615A39-F3FB-4575-A91C-535899DFB636}.Release|x86.Build.0 = Release|Any CPU + {4CFBD9FF-6565-457E-B81C-9FCEFEE854BC}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {4CFBD9FF-6565-457E-B81C-9FCEFEE854BC}.Debug|Any CPU.Build.0 = Debug|Any CPU + {4CFBD9FF-6565-457E-B81C-9FCEFEE854BC}.Debug|x64.ActiveCfg = Debug|Any CPU + {4CFBD9FF-6565-457E-B81C-9FCEFEE854BC}.Debug|x64.Build.0 = Debug|Any CPU + {4CFBD9FF-6565-457E-B81C-9FCEFEE854BC}.Debug|x86.ActiveCfg = Debug|Any CPU + {4CFBD9FF-6565-457E-B81C-9FCEFEE854BC}.Debug|x86.Build.0 = Debug|Any CPU + {4CFBD9FF-6565-457E-B81C-9FCEFEE854BC}.Release|Any CPU.ActiveCfg = Release|Any CPU + {4CFBD9FF-6565-457E-B81C-9FCEFEE854BC}.Release|Any CPU.Build.0 = Release|Any CPU + {4CFBD9FF-6565-457E-B81C-9FCEFEE854BC}.Release|x64.ActiveCfg = Release|Any CPU + {4CFBD9FF-6565-457E-B81C-9FCEFEE854BC}.Release|x64.Build.0 = Release|Any CPU + {4CFBD9FF-6565-457E-B81C-9FCEFEE854BC}.Release|x86.ActiveCfg = Release|Any CPU + {4CFBD9FF-6565-457E-B81C-9FCEFEE854BC}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -308,6 +322,7 @@ Global {F3C41B82-6A67-409A-B7FE-54543EE4F38B} = {FB8BC3A2-2010-4C3C-A87D-D4A98C05EE52} {5B2D251F-46E3-486A-AE16-E3FE06B559ED} = {7EDE8CF1-B1E4-4005-B154-834B944E0D7A} {23615A39-F3FB-4575-A91C-535899DFB636} = {94207AA6-4923-4183-A558-E0F8196B8CA3} + {4CFBD9FF-6565-457E-B81C-9FCEFEE854BC} = {8CF53B92-5EB1-461D-98F8-70DA9B603FBF} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {02F2E872-3141-44F5-BD6A-33CD84E9FE08} diff --git a/backend/src/Squidex.Domain.Apps.Entities/Comments/DomainObject/CommentsGrain.cs b/backend/src/Squidex.Domain.Apps.Entities/Comments/DomainObject/CommentsGrain.cs index 8931b1812..11392f77b 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Comments/DomainObject/CommentsGrain.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Comments/DomainObject/CommentsGrain.cs @@ -44,7 +44,7 @@ namespace Squidex.Domain.Apps.Entities.Comments.DomainObject { streamName = $"comments-{key}"; - var storedEvents = await eventStore.QueryLatestAsync(streamName, 100); + var storedEvents = await eventStore.QueryReverseAsync(streamName, 100); foreach (var @event in storedEvents) { diff --git a/backend/src/Squidex.Infrastructure.GetEventStore/Diagnostics/GetEventStoreHealthCheck.cs b/backend/src/Squidex.Infrastructure.GetEventStore/Diagnostics/GetEventStoreHealthCheck.cs index fd2f40436..2b8e40ac2 100644 --- a/backend/src/Squidex.Infrastructure.GetEventStore/Diagnostics/GetEventStoreHealthCheck.cs +++ b/backend/src/Squidex.Infrastructure.GetEventStore/Diagnostics/GetEventStoreHealthCheck.cs @@ -5,26 +5,28 @@ // All rights reserved. Licensed under the MIT license. // ========================================================================== +using System.Linq; using System.Threading; using System.Threading.Tasks; -using EventStore.ClientAPI; +using EventStore.Client; using Microsoft.Extensions.Diagnostics.HealthChecks; namespace Squidex.Infrastructure.Diagnostics { public sealed class GetEventStoreHealthCheck : IHealthCheck { - private readonly IEventStoreConnection connection; + private readonly EventStoreClient client; - public GetEventStoreHealthCheck(IEventStoreConnection connection) + public GetEventStoreHealthCheck(EventStoreClientSettings settings) { - this.connection = connection; + client = new EventStoreClient(settings); } public async Task CheckHealthAsync(HealthCheckContext context, CancellationToken cancellationToken = default) { - await connection.ReadEventAsync("test", 1, false); + await client.ReadStreamAsync(Direction.Forwards, "test", default, cancellationToken: cancellationToken) + .FirstOrDefaultAsync(cancellationToken); return HealthCheckResult.Healthy("Application must query data from EventStore."); } diff --git a/backend/src/Squidex.Infrastructure.GetEventStore/EventSourcing/EventStoreProjectionClient.cs b/backend/src/Squidex.Infrastructure.GetEventStore/EventSourcing/EventStoreProjectionClient.cs new file mode 100644 index 000000000..94e288bac --- /dev/null +++ b/backend/src/Squidex.Infrastructure.GetEventStore/EventSourcing/EventStoreProjectionClient.cs @@ -0,0 +1,79 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschraenkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +using System; +using System.Collections.Concurrent; +using System.Threading.Tasks; +using EventStore.Client; +using Squidex.Text; + +namespace Squidex.Infrastructure.EventSourcing +{ + public sealed class EventStoreProjectionClient + { + private readonly ConcurrentDictionary projections = new ConcurrentDictionary(); + private readonly string projectionPrefix; + private readonly EventStoreProjectionManagementClient client; + + public EventStoreProjectionClient(EventStoreClientSettings settings, string projectionPrefix) + { + client = new EventStoreProjectionManagementClient(settings); + + this.projectionPrefix = projectionPrefix; + } + + private string CreateFilterProjectionName(string filter) + { + return $"by-{projectionPrefix.Slugify()}-{filter.Slugify()}"; + } + + public async Task CreateProjectionAsync(string? streamFilter = null) + { + if (!string.IsNullOrWhiteSpace(streamFilter) && streamFilter[0] != '^') + { + return $"{projectionPrefix}-{streamFilter}"; + } + + streamFilter ??= ".*"; + + var name = CreateFilterProjectionName(streamFilter); + + var query = + $@"fromAll() + .when({{ + $any: function (s, e) {{ + if (e.streamId.indexOf('{projectionPrefix}') === 0 && /{streamFilter}/.test(e.streamId.substring({projectionPrefix.Length + 1}))) {{ + linkTo('{name}', e); + }} + }} + }});"; + + await CreateProjectionAsync(name, query); + + return name; + } + + private async Task CreateProjectionAsync(string name, string query) + { + if (projections.TryAdd(name, true)) + { + try + { + await client.CreateContinuousAsync(name, "fromAll().when()"); + await client.UpdateAsync(name, query, true); + } + catch (Exception ex) + { + if (!ex.Is()) + { + throw; + } + } + } + } + } +} diff --git a/backend/src/Squidex.Infrastructure.GetEventStore/EventSourcing/Formatter.cs b/backend/src/Squidex.Infrastructure.GetEventStore/EventSourcing/Formatter.cs index 582779182..4a5c202c8 100644 --- a/backend/src/Squidex.Infrastructure.GetEventStore/EventSourcing/Formatter.cs +++ b/backend/src/Squidex.Infrastructure.GetEventStore/EventSourcing/Formatter.cs @@ -7,11 +7,12 @@ using System; using System.Collections.Generic; +using System.Globalization; using System.Linq; using System.Text; -using EventStore.ClientAPI; +using EventStore.Client; using Squidex.Infrastructure.Json; -using EventStoreData = EventStore.ClientAPI.EventData; +using EventStoreData = EventStore.Client.EventData; namespace Squidex.Infrastructure.EventSourcing { @@ -23,7 +24,7 @@ namespace Squidex.Infrastructure.EventSourcing { var @event = resolvedEvent.Event; - var eventPayload = Encoding.UTF8.GetString(@event.Data); + var eventPayload = Encoding.UTF8.GetString(@event.Data.Span); var eventHeaders = GetHeaders(serializer, @event); var eventData = new EventData(@event.EventType, eventHeaders, eventPayload); @@ -32,12 +33,12 @@ namespace Squidex.Infrastructure.EventSourcing return new StoredEvent( streamName, - resolvedEvent.OriginalEventNumber.ToString(), - resolvedEvent.Event.EventNumber, + resolvedEvent.OriginalEventNumber.ToInt64().ToString(CultureInfo.InvariantCulture), + resolvedEvent.Event.EventNumber.ToInt64(), eventData); } - private static string GetStreamName(string? prefix, RecordedEvent @event) + private static string GetStreamName(string? prefix, EventRecord @event) { var streamName = @event.EventStreamId; @@ -49,10 +50,9 @@ namespace Squidex.Infrastructure.EventSourcing return streamName; } - private static EnvelopeHeaders GetHeaders(IJsonSerializer serializer, RecordedEvent @event) + private static EnvelopeHeaders GetHeaders(IJsonSerializer serializer, EventRecord @event) { - var headersJson = Encoding.UTF8.GetString(@event.Metadata); - var headers = serializer.Deserialize(headersJson); + var headers = Deserialize(serializer, @event.Metadata); foreach (var key in headers.Keys.ToList()) { @@ -65,6 +65,13 @@ namespace Squidex.Infrastructure.EventSourcing return headers; } + private static T Deserialize(IJsonSerializer serializer, ReadOnlyMemory source) + { + var json = Encoding.UTF8.GetString(source.Span); + + return serializer.Deserialize(json); + } + public static EventStoreData Write(EventData eventData, IJsonSerializer serializer) { var payload = Encoding.UTF8.GetBytes(eventData.Payload); @@ -72,7 +79,7 @@ namespace Squidex.Infrastructure.EventSourcing var headersJson = serializer.Serialize(eventData.Headers); var headersBytes = Encoding.UTF8.GetBytes(headersJson); - return new EventStoreData(Guid.NewGuid(), eventData.Type, true, payload, headersBytes); + return new EventStoreData(Uuid.FromGuid(Guid.NewGuid()), eventData.Type, payload, headersBytes); } } } diff --git a/backend/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStore.cs b/backend/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStore.cs index f7e128785..ba33e12f9 100644 --- a/backend/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStore.cs +++ b/backend/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStore.cs @@ -11,44 +11,37 @@ using System.Linq; using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; -using EventStore.ClientAPI; -using EventStore.ClientAPI.Exceptions; +using EventStore.Client; +using NodaTime; using Squidex.Hosting; using Squidex.Hosting.Configuration; using Squidex.Infrastructure.Json; -using Squidex.Log; namespace Squidex.Infrastructure.EventSourcing { public sealed class GetEventStore : IEventStore, IInitializable { - private const int WritePageSize = 500; - private const int ReadPageSize = 500; + private const string StreamPrefix = "squidex"; private static readonly IReadOnlyList EmptyEvents = new List(); - private readonly IEventStoreConnection connection; + private readonly EventStoreClient client; + private readonly EventStoreProjectionClient projectionClient; private readonly IJsonSerializer serializer; - private readonly string prefix = "squidex"; - private readonly ProjectionClient projectionClient; - public GetEventStore(IEventStoreConnection connection, IJsonSerializer serializer, string prefix, string projectionHost) + public GetEventStore(EventStoreClientSettings settings, IJsonSerializer serializer) { - this.connection = connection; this.serializer = serializer; - if (!string.IsNullOrWhiteSpace(prefix)) - { - this.prefix = prefix.Trim(' ', '-'); - } + client = new EventStoreClient(settings); - projectionClient = new ProjectionClient(connection, this.prefix, projectionHost); + projectionClient = new EventStoreProjectionClient(settings, StreamPrefix); } public async Task InitializeAsync( - CancellationToken ct = default) + CancellationToken ct) { try { - await connection.ConnectAsync(); + await client.SoftDeleteAsync(Guid.NewGuid().ToString(), StreamState.Any, cancellationToken: ct); } catch (Exception ex) { @@ -56,18 +49,16 @@ namespace Squidex.Infrastructure.EventSourcing throw new ConfigurationException(error, ex); } - - await projectionClient.ConnectAsync(); } public IEventSubscription CreateSubscription(IEventSubscriber subscriber, string? streamFilter = null, string? position = null) { Guard.NotNull(streamFilter, nameof(streamFilter)); - return new GetEventStoreSubscription(connection, subscriber, serializer, projectionClient, position, prefix, streamFilter); + return new GetEventStoreSubscription(subscriber, client, projectionClient, serializer, position, StreamPrefix, streamFilter); } - public async IAsyncEnumerable QueryAllAsync(string? streamFilter = null, string? position = null, long take = long.MaxValue, + public async IAsyncEnumerable QueryAllAsync(string? streamFilter = null, string? position = null, int take = int.MaxValue, [EnumeratorCancellation] CancellationToken ct = default) { if (take <= 0) @@ -77,15 +68,15 @@ namespace Squidex.Infrastructure.EventSourcing var streamName = await projectionClient.CreateProjectionAsync(streamFilter); - var sliceStart = ProjectionClient.ParsePosition(position); + var stream = QueryAsync(streamName, position.ToPosition(false), take, ct); - await foreach (var storedEvent in QueryReverseAsync(streamName, sliceStart, take, ct)) + await foreach (var storedEvent in stream.IgnoreNotFound(ct)) { yield return storedEvent; } } - public async IAsyncEnumerable QueryAllReverseAsync(string? streamFilter = null, string? position = null, long take = long.MaxValue, + public async IAsyncEnumerable QueryAllReverseAsync(string? streamFilter = null, Instant timestamp = default, int take = int.MaxValue, [EnumeratorCancellation] CancellationToken ct = default) { if (take <= 0) @@ -95,15 +86,16 @@ namespace Squidex.Infrastructure.EventSourcing var streamName = await projectionClient.CreateProjectionAsync(streamFilter); - var sliceStart = ProjectionClient.ParsePosition(position); + var stream = QueryReverseAsync(streamName, StreamPosition.End, take, ct); - await foreach (var storedEvent in QueryAsync(streamName, sliceStart, take, ct)) + await foreach (var storedEvent in stream.IgnoreNotFound(ct).TakeWhile(x => x.Data.Headers.Timestamp() >= timestamp)) { yield return storedEvent; } } - public async Task> QueryLatestAsync(string streamName, int count) + public async Task> QueryReverseAsync(string streamName, int count = int.MaxValue, + CancellationToken ct = default) { Guard.NotNullOrEmpty(streamName, nameof(streamName)); @@ -112,11 +104,13 @@ namespace Squidex.Infrastructure.EventSourcing return EmptyEvents; } - using (Profiler.TraceMethod()) + using (Telemetry.Activities.StartActivity("GetEventStore/GetEventStore")) { var result = new List(); - await foreach (var storedEvent in QueryReverseAsync(streamName, StreamPosition.End, default)) + var stream = QueryReverseAsync(GetStreamName(streamName), StreamPosition.End, count, ct); + + await foreach (var storedEvent in stream.IgnoreNotFound(ct)) { result.Add(storedEvent); } @@ -125,15 +119,18 @@ namespace Squidex.Infrastructure.EventSourcing } } - public async Task> QueryAsync(string streamName, long streamPosition = 0) + public async Task> QueryAsync(string streamName, long streamPosition = 0, + CancellationToken ct = default) { Guard.NotNullOrEmpty(streamName, nameof(streamName)); - using (Profiler.TraceMethod()) + using (Telemetry.Activities.StartActivity("GetEventStore/QueryAsync")) { var result = new List(); - await foreach (var storedEvent in QueryAsync(streamName, StreamPosition.End, default)) + var stream = QueryAsync(GetStreamName(streamName), streamPosition.ToPosition(), int.MaxValue, ct); + + await foreach (var storedEvent in stream.IgnoreNotFound(ct)) { result.Add(storedEvent); } @@ -142,95 +139,63 @@ namespace Squidex.Infrastructure.EventSourcing } } - private async IAsyncEnumerable QueryAsync(string streamName, long sliceStart, long take = int.MaxValue, - [EnumeratorCancellation] CancellationToken ct = default) + private IAsyncEnumerable QueryAsync(string streamName, StreamPosition start, long count, + CancellationToken ct = default) { - var taken = take; - - StreamEventsSlice currentSlice; - do - { - currentSlice = await connection.ReadStreamEventsForwardAsync(streamName, sliceStart, ReadPageSize, true); - - if (currentSlice.Status == SliceReadStatus.Success) - { - sliceStart = currentSlice.NextEventNumber; - - foreach (var resolved in currentSlice.Events) - { - var storedEvent = Formatter.Read(resolved, prefix, serializer); - - yield return storedEvent; - - if (taken == take) - { - break; - } - - taken++; - } - } - } - while (!currentSlice.IsEndOfStream && !ct.IsCancellationRequested && taken < take); + var result = client.ReadStreamAsync( + Direction.Forwards, + streamName, + start, + count, + resolveLinkTos: true, + cancellationToken: ct); + + return result.Select(x => Formatter.Read(x, StreamPrefix, serializer)); } - private async IAsyncEnumerable QueryReverseAsync(string streamName, long sliceStart, long take = int.MaxValue, - [EnumeratorCancellation] CancellationToken ct = default) + private IAsyncEnumerable QueryReverseAsync(string streamName, StreamPosition start, long count, + CancellationToken ct = default) { - var taken = take; - - StreamEventsSlice currentSlice; - do - { - currentSlice = await connection.ReadStreamEventsBackwardAsync(streamName, sliceStart, ReadPageSize, true); - - if (currentSlice.Status == SliceReadStatus.Success) - { - sliceStart = currentSlice.NextEventNumber; - - foreach (var resolved in currentSlice.Events.OrderByDescending(x => x.Event.EventNumber)) - { - var storedEvent = Formatter.Read(resolved, prefix, serializer); - - yield return storedEvent; - - if (taken == take) - { - break; - } - - taken++; - } - } - } - while (!currentSlice.IsEndOfStream && !ct.IsCancellationRequested && taken < take); + var result = client.ReadStreamAsync( + Direction.Backwards, + streamName, + start, + count, + resolveLinkTos: true, + cancellationToken: ct); + + return result.Select(x => Formatter.Read(x, StreamPrefix, serializer)); } - public Task DeleteStreamAsync(string streamName) + public async Task DeleteStreamAsync(string streamName, + CancellationToken ct = default) { Guard.NotNullOrEmpty(streamName, nameof(streamName)); - return connection.DeleteStreamAsync(GetStreamName(streamName), ExpectedVersion.Any); + await client.SoftDeleteAsync(GetStreamName(streamName), StreamState.Any, cancellationToken: ct); } - public Task AppendAsync(Guid commitId, string streamName, ICollection events) + public Task AppendAsync(Guid commitId, string streamName, ICollection events, + CancellationToken ct = default) { - return AppendEventsInternalAsync(streamName, EtagVersion.Any, events); + return AppendEventsInternalAsync(streamName, EtagVersion.Any, events, ct); } - public Task AppendAsync(Guid commitId, string streamName, long expectedVersion, ICollection events) + public Task AppendAsync(Guid commitId, string streamName, long expectedVersion, ICollection events, + CancellationToken ct = default) { Guard.GreaterEquals(expectedVersion, -1, nameof(expectedVersion)); - return AppendEventsInternalAsync(streamName, expectedVersion, events); + return AppendEventsInternalAsync(streamName, expectedVersion, events, ct); } - private async Task AppendEventsInternalAsync(string streamName, long expectedVersion, ICollection events) + private async Task AppendEventsInternalAsync(string streamName, long expectedVersion, ICollection events, + CancellationToken ct) { Guard.NotNullOrEmpty(streamName, nameof(streamName)); Guard.NotNull(events, nameof(events)); - using (Profiler.TraceMethod(nameof(AppendAsync))) + using (Telemetry.Activities.StartActivity("GetEventStore/AppendEventsInternalAsync")) { if (events.Count == 0) { @@ -239,40 +204,58 @@ namespace Squidex.Infrastructure.EventSourcing try { - var eventsToSave = events.Select(x => Formatter.Write(x, serializer)).ToList(); + var eventData = events.Select(x => Formatter.Write(x, serializer)); - if (eventsToSave.Count < WritePageSize) + streamName = GetStreamName(streamName); + + if (expectedVersion == -1) + { + await client.AppendToStreamAsync(streamName, StreamState.NoStream, eventData, cancellationToken: ct); + } + else if (expectedVersion < -1) { - await connection.AppendToStreamAsync(GetStreamName(streamName), expectedVersion, eventsToSave); + await client.AppendToStreamAsync(streamName, StreamState.Any, eventData, cancellationToken: ct); } else { - using (var transaction = await connection.StartTransactionAsync(GetStreamName(streamName), expectedVersion)) - { - for (var p = 0; p < eventsToSave.Count; p += WritePageSize) - { - await transaction.WriteAsync(eventsToSave.Skip(p).Take(WritePageSize)); - } - - await transaction.CommitAsync(); - } + await client.AppendToStreamAsync(streamName, expectedVersion.ToRevision(), eventData, cancellationToken: ct); } } catch (WrongExpectedVersionException ex) { - throw new WrongEventVersionException(ParseVersion(ex.Message), expectedVersion); + throw new WrongEventVersionException(ex.ActualVersion ?? 0, expectedVersion); } } } - private static int ParseVersion(string message) + public async Task DeleteAsync(string streamFilter, + CancellationToken ct = default) { - return int.Parse(message[(message.LastIndexOf(':') + 1)..]); + var streamName = await projectionClient.CreateProjectionAsync(streamFilter); + + var events = client.ReadStreamAsync(Direction.Forwards, streamName, StreamPosition.Start, resolveLinkTos: true, cancellationToken: ct); + + if (await events.ReadState == ReadState.StreamNotFound) + { + return; + } + + var deleted = new HashSet(); + + await foreach (var storedEvent in TaskAsyncEnumerableExtensions.WithCancellation(events, ct)) + { + var streamToDelete = storedEvent.Event.EventStreamId; + + if (deleted.Add(streamToDelete)) + { + await client.SoftDeleteAsync(streamToDelete, StreamState.Any, cancellationToken: ct); + } + } } - private string GetStreamName(string streamName) + private static string GetStreamName(string streamName) { - return $"{prefix}-{streamName}"; + return $"{StreamPrefix}-{streamName}"; } } } diff --git a/backend/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStoreSubscription.cs b/backend/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStoreSubscription.cs index 760e7281b..a32f92363 100644 --- a/backend/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStoreSubscription.cs +++ b/backend/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStoreSubscription.cs @@ -5,70 +5,76 @@ // All rights reserved. Licensed under the MIT license. // ========================================================================== -using EventStore.ClientAPI; -using EventStore.ClientAPI.Exceptions; +using System; +using System.Threading; +using System.Threading.Tasks; +using EventStore.Client; using Squidex.Infrastructure.Json; -using Squidex.Infrastructure.Tasks; namespace Squidex.Infrastructure.EventSourcing { internal sealed class GetEventStoreSubscription : IEventSubscription { - private readonly IEventStoreConnection connection; - private readonly IEventSubscriber subscriber; - private readonly IJsonSerializer serializer; - private readonly string? prefix; - private readonly EventStoreCatchUpSubscription subscription; - private readonly long? position; + private readonly CancellationTokenSource cts = new CancellationTokenSource(); + private StreamSubscription subscription; public GetEventStoreSubscription( - IEventStoreConnection connection, IEventSubscriber subscriber, + EventStoreClient client, + EventStoreProjectionClient projectionClient, IJsonSerializer serializer, - ProjectionClient projectionClient, string? position, string? prefix, string? streamFilter) { - this.connection = connection; + Task.Run(async () => + { + var ct = cts.Token; - this.position = ProjectionClient.ParsePositionOrNull(position); - this.prefix = prefix; + var streamName = await projectionClient.CreateProjectionAsync(streamFilter); - var streamName = AsyncHelper.Sync(() => projectionClient.CreateProjectionAsync(streamFilter)); - - this.serializer = serializer; - this.subscriber = subscriber; - - subscription = SubscribeToStream(streamName); - } - - public void Unsubscribe() - { - subscription.Stop(); - } - - private EventStoreCatchUpSubscription SubscribeToStream(string streamName) - { - var settings = CatchUpSubscriptionSettings.Default; - - return connection.SubscribeToStreamFrom(streamName, position, settings, - async (s, e) => + Func onEvent = async (_, @event, _) => { - var storedEvent = Formatter.Read(e, prefix, serializer); + var storedEvent = Formatter.Read(@event, prefix, serializer); await subscriber.OnEventAsync(this, storedEvent); - }, null, - (s, reason, ex) => + }; + + Action? onError = (_, reason, ex) => { - if (reason != SubscriptionDropReason.ConnectionClosed && - reason != SubscriptionDropReason.UserInitiated) + if (reason != SubscriptionDroppedReason.Disposed && + reason != SubscriptionDroppedReason.SubscriberError) { - ex ??= new ConnectionClosedException($"Subscription closed with reason {reason}."); + ex ??= new InvalidOperationException($"Subscription closed with reason {reason}."); subscriber.OnErrorAsync(this, ex); } - }); + }; + + if (!string.IsNullOrWhiteSpace(position)) + { + var streamPosition = position.ToPosition(true); + + subscription = await client.SubscribeToStreamAsync(streamName, streamPosition, + onEvent, true, + onError, + cancellationToken: ct); + } + else + { + subscription = await client.SubscribeToStreamAsync(streamName, + onEvent, true, + onError, + cancellationToken: ct); + } + }, cts.Token); + } + + public void Unsubscribe() + { + subscription?.Dispose(); + + cts.Cancel(); } } } diff --git a/backend/src/Squidex.Infrastructure.GetEventStore/EventSourcing/ProjectionClient.cs b/backend/src/Squidex.Infrastructure.GetEventStore/EventSourcing/ProjectionClient.cs deleted file mode 100644 index 554ccb2bb..000000000 --- a/backend/src/Squidex.Infrastructure.GetEventStore/EventSourcing/ProjectionClient.cs +++ /dev/null @@ -1,142 +0,0 @@ -// ========================================================================== -// Squidex Headless CMS -// ========================================================================== -// Copyright (c) Squidex UG (haftungsbeschraenkt) -// All rights reserved. Licensed under the MIT license. -// ========================================================================== - -using System; -using System.Collections.Concurrent; -using System.Linq; -using System.Net; -using System.Net.Http; -using System.Net.Sockets; -using System.Threading.Tasks; -using EventStore.ClientAPI; -using EventStore.ClientAPI.Exceptions; -using EventStore.ClientAPI.Projections; -using Squidex.Hosting.Configuration; -using Squidex.Text; - -namespace Squidex.Infrastructure.EventSourcing -{ - public sealed class ProjectionClient - { - private readonly ConcurrentDictionary projections = new ConcurrentDictionary(); - private readonly IEventStoreConnection connection; - private readonly string projectionPrefix; - private readonly string projectionHost; - private ProjectionsManager projectionsManager; - - public ProjectionClient(IEventStoreConnection connection, string projectionPrefix, string projectionHost) - { - this.connection = connection; - - this.projectionPrefix = projectionPrefix; - this.projectionHost = projectionHost; - } - - private string CreateFilterProjectionName(string filter) - { - return $"by-{projectionPrefix.Slugify()}-{filter.Slugify()}"; - } - - public async Task CreateProjectionAsync(string? streamFilter = null) - { - streamFilter ??= ".*"; - - var name = CreateFilterProjectionName(streamFilter); - - var query = - $@"fromAll() - .when({{ - $any: function (s, e) {{ - if (e.streamId.indexOf('{projectionPrefix}') === 0 && /{streamFilter}/.test(e.streamId.substring({projectionPrefix.Length + 1}))) {{ - linkTo('{name}', e); - }} - }} - }});"; - - await CreateProjectionAsync(name, query); - - return name; - } - - private async Task CreateProjectionAsync(string name, string query) - { - if (projections.TryAdd(name, true)) - { - try - { - var credentials = connection.Settings.DefaultUserCredentials; - - await projectionsManager.CreateContinuousAsync(name, query, credentials); - } - catch (Exception ex) - { - if (!ex.Is()) - { - throw; - } - } - } - } - - public async Task ConnectAsync() - { - var addressParts = projectionHost.Split(':'); - - if (addressParts.Length < 2 || !int.TryParse(addressParts[1], out var port)) - { - port = 2113; - } - - var endpoints = await Dns.GetHostAddressesAsync(addressParts[0]); - var endpoint = new IPEndPoint(endpoints.First(x => x.AddressFamily == AddressFamily.InterNetwork), port); - - async Task ConnectToSchemaAsync(string schema) - { - projectionsManager = - new ProjectionsManager( - connection.Settings.Log, endpoint, - connection.Settings.OperationTimeout, - null, - schema); - - await projectionsManager.ListAllAsync(connection.Settings.DefaultUserCredentials); - } - - try - { - try - { - await ConnectToSchemaAsync("https"); - } - catch (HttpRequestException) - { - await ConnectToSchemaAsync("http"); - } - catch (AggregateException ex) when (ex.Flatten().InnerException is HttpRequestException) - { - await ConnectToSchemaAsync("http"); - } - } - catch (Exception ex) - { - var error = new ConfigurationError($"GetEventStore cannot connect to event store projections: {projectionHost}."); - - throw new ConfigurationException(error, ex); - } - } - - public static long? ParsePositionOrNull(string? position) - { - return long.TryParse(position, out var parsedPosition) ? (long?)parsedPosition : null; - } - - public static long ParsePosition(string? position) - { - return long.TryParse(position, out var parsedPosition) ? parsedPosition + 1 : StreamPosition.Start; - } - } -} diff --git a/backend/src/Squidex.Infrastructure.GetEventStore/EventSourcing/Utils.cs b/backend/src/Squidex.Infrastructure.GetEventStore/EventSourcing/Utils.cs new file mode 100644 index 000000000..72b51380b --- /dev/null +++ b/backend/src/Squidex.Infrastructure.GetEventStore/EventSourcing/Utils.cs @@ -0,0 +1,83 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschraenkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +using System.Collections.Generic; +using System.Globalization; +using System.Runtime.CompilerServices; +using System.Threading; +using EventStore.Client; + +namespace Squidex.Infrastructure.EventSourcing +{ + public static class Utils + { + public static StreamRevision ToRevision(this long version) + { + return StreamRevision.FromInt64(version); + } + + public static StreamPosition ToPosition(this long version) + { + if (version <= 0) + { + return StreamPosition.Start; + } + + return StreamPosition.FromInt64(version); + } + + public static StreamPosition ToPosition(this string? position, bool inclusive) + { + if (string.IsNullOrWhiteSpace(position)) + { + return StreamPosition.Start; + } + + if (long.TryParse(position, NumberStyles.Integer, CultureInfo.InvariantCulture, out var parsedPosition)) + { + if (!inclusive) + { + parsedPosition++; + } + + return StreamPosition.FromInt64(parsedPosition); + } + + return StreamPosition.Start; + } + + public static async IAsyncEnumerable IgnoreNotFound(this IAsyncEnumerable source, + [EnumeratorCancellation] CancellationToken ct = default) + { + var enumerator = source.GetAsyncEnumerator(ct); + + bool resultFound; + try + { + resultFound = await enumerator.MoveNextAsync(ct); + } + catch (StreamNotFoundException) + { + resultFound = false; + } + + if (!resultFound) + { + yield break; + } + + yield return enumerator.Current; + + while (await enumerator.MoveNextAsync(ct)) + { + ct.ThrowIfCancellationRequested(); + + yield return enumerator.Current; + } + } + } +} diff --git a/backend/src/Squidex.Infrastructure.GetEventStore/Squidex.Infrastructure.GetEventStore.csproj b/backend/src/Squidex.Infrastructure.GetEventStore/Squidex.Infrastructure.GetEventStore.csproj index ea211acb2..9493e9734 100644 --- a/backend/src/Squidex.Infrastructure.GetEventStore/Squidex.Infrastructure.GetEventStore.csproj +++ b/backend/src/Squidex.Infrastructure.GetEventStore/Squidex.Infrastructure.GetEventStore.csproj @@ -10,7 +10,14 @@ True - + + + + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + diff --git a/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Reader.cs b/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Reader.cs index cfb9b90b0..afea04489 100644 --- a/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Reader.cs +++ b/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Reader.cs @@ -38,7 +38,7 @@ namespace Squidex.Infrastructure.EventSourcing } } - public async Task> QueryLatestAsync(string streamName, int count = int.MaxValue, + public async Task> QueryReverseAsync(string streamName, int count = int.MaxValue, CancellationToken ct = default) { Guard.NotNullOrEmpty(streamName, nameof(streamName)); diff --git a/backend/src/Squidex.Infrastructure/EventSourcing/IEventStore.cs b/backend/src/Squidex.Infrastructure/EventSourcing/IEventStore.cs index ae988501b..b1124c43f 100644 --- a/backend/src/Squidex.Infrastructure/EventSourcing/IEventStore.cs +++ b/backend/src/Squidex.Infrastructure/EventSourcing/IEventStore.cs @@ -15,7 +15,7 @@ namespace Squidex.Infrastructure.EventSourcing { public interface IEventStore { - Task> QueryLatestAsync(string streamName, int take = int.MaxValue, + Task> QueryReverseAsync(string streamName, int take = int.MaxValue, CancellationToken ct = default); Task> QueryAsync(string streamName, long streamPosition = 0, diff --git a/backend/src/Squidex/Config/Domain/EventSourcingServices.cs b/backend/src/Squidex/Config/Domain/EventSourcingServices.cs index c40378a11..7a30e9e4f 100644 --- a/backend/src/Squidex/Config/Domain/EventSourcingServices.cs +++ b/backend/src/Squidex/Config/Domain/EventSourcingServices.cs @@ -6,13 +6,17 @@ // ========================================================================== using System.Linq; +using EventStore.Client; +using EventStore.ClientAPI; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using MongoDB.Driver; using Squidex.Infrastructure; using Squidex.Infrastructure.Commands; +using Squidex.Infrastructure.Diagnostics; using Squidex.Infrastructure.EventSourcing; using Squidex.Infrastructure.EventSourcing.Grains; +using Squidex.Infrastructure.Json; using Squidex.Infrastructure.States; namespace Squidex.Config.Domain @@ -36,6 +40,19 @@ namespace Squidex.Config.Domain return new MongoEventStore(mongDatabase, c.GetRequiredService()); }) .As(); + }, + ["GetEventStore"] = () => + { + var configuration = config.GetRequiredValue("eventStore:getEventStore:configuration"); + + services.AddSingletonAs(_ => EventStoreClientSettings.Create(configuration)) + .AsSelf(); + + services.AddSingletonAs() + .As(); + + services.AddHealthChecks() + .AddCheck("EventStore", tags: new[] { "node" }); } }); diff --git a/backend/src/Squidex/Squidex.csproj b/backend/src/Squidex/Squidex.csproj index e2ca33f1a..3d80284c0 100644 --- a/backend/src/Squidex/Squidex.csproj +++ b/backend/src/Squidex/Squidex.csproj @@ -24,6 +24,7 @@ + @@ -33,6 +34,7 @@ + diff --git a/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/EventStoreTests.cs b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/EventStoreTests.cs index 7824ae4d4..1668ba85c 100644 --- a/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/EventStoreTests.cs +++ b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/EventStoreTests.cs @@ -321,7 +321,7 @@ namespace Squidex.Infrastructure.EventSourcing { var expected = allExpected.TakeLast(take).ToArray(); - var readEvents = await Sut.QueryLatestAsync(streamName, take); + var readEvents = await Sut.QueryReverseAsync(streamName, take); ShouldBeEquivalentTo(readEvents, expected); } @@ -375,9 +375,22 @@ namespace Squidex.Infrastructure.EventSourcing await Sut.AppendAsync(Guid.NewGuid(), streamName, events); - await Sut.DeleteAsync($"^{streamName.Substring(0, 10)}"); + IReadOnlyList? readEvents = null; - var readEvents = await QueryAsync(streamName); + for (var i = 0; i < 5; i++) + { + await Sut.DeleteAsync($"^{streamName.Substring(0, 10)}"); + + readEvents = await QueryAsync(streamName); + + if (readEvents.Count == 0) + { + break; + } + + // Get event store needs a little bit of time for the projections. + await Task.Delay(1000); + } Assert.Empty(readEvents); } @@ -395,9 +408,22 @@ namespace Squidex.Infrastructure.EventSourcing await Sut.AppendAsync(Guid.NewGuid(), streamName, events); - await Sut.DeleteStreamAsync(streamName); + IReadOnlyList? readEvents = null; - var readEvents = await QueryAsync(streamName); + for (var i = 0; i < 5; i++) + { + await Sut.DeleteStreamAsync(streamName); + + readEvents = await QueryAsync(streamName); + + if (readEvents.Count == 0) + { + break; + } + + // Get event store needs a little bit of time for the projections. + await Task.Delay(1000); + } Assert.Empty(readEvents); } @@ -424,7 +450,8 @@ namespace Squidex.Infrastructure.EventSourcing return readEvents; } - private async Task?> QueryWithSubscriptionAsync(string streamFilter, Func? subscriptionRunning = null, bool fromBeginning = false) + private async Task?> QueryWithSubscriptionAsync(string streamFilter, + Func? subscriptionRunning = null, bool fromBeginning = false) { var subscriber = new EventSubscriber(); diff --git a/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/GetEventStoreFixture.cs b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/GetEventStoreFixture.cs new file mode 100644 index 000000000..a092dcd88 --- /dev/null +++ b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/GetEventStoreFixture.cs @@ -0,0 +1,51 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschraenkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +using System; +using System.Threading.Tasks; +using EventStore.Client; +using Squidex.Infrastructure.TestHelpers; + +namespace Squidex.Infrastructure.EventSourcing +{ + public sealed class GetEventStoreFixture : IDisposable + { + private readonly EventStoreClientSettings settings; + + public GetEventStore EventStore { get; } + + public GetEventStoreFixture() + { + AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true); + + settings = EventStoreClientSettings.Create("esdb://admin:changeit@127.0.0.1:2113?tls=false"); + + EventStore = new GetEventStore(settings, TestUtils.DefaultSerializer); + EventStore.InitializeAsync(default).Wait(); + } + + public void Dispose() + { + CleanupAsync().Wait(); + } + + private async Task CleanupAsync() + { + var projectionsManager = new EventStoreProjectionManagementClient(settings); + + await foreach (var projection in projectionsManager.ListAllAsync()) + { + var name = projection.Name; + + if (name.StartsWith("by-squidex-test", StringComparison.OrdinalIgnoreCase)) + { + await projectionsManager.DisableAsync(name); + } + } + } + } +} diff --git a/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/GetEventStoreTests.cs b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/GetEventStoreTests.cs new file mode 100644 index 000000000..00090dfef --- /dev/null +++ b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/GetEventStoreTests.cs @@ -0,0 +1,31 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschraenkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +using Xunit; + +#pragma warning disable SA1300 // Element should begin with upper-case letter + +namespace Squidex.Infrastructure.EventSourcing +{ + [Trait("Category", "Dependencies")] + public class GetEventStoreTests : EventStoreTests, IClassFixture + { + public GetEventStoreFixture _ { get; } + + protected override int SubscriptionDelayInMs { get; } = 1000; + + public GetEventStoreTests(GetEventStoreFixture fixture) + { + _ = fixture; + } + + public override GetEventStore CreateStore() + { + return _.EventStore; + } + } +} diff --git a/backend/tests/Squidex.Infrastructure.Tests/Squidex.Infrastructure.Tests.csproj b/backend/tests/Squidex.Infrastructure.Tests/Squidex.Infrastructure.Tests.csproj index 23c822ece..92fae7deb 100644 --- a/backend/tests/Squidex.Infrastructure.Tests/Squidex.Infrastructure.Tests.csproj +++ b/backend/tests/Squidex.Infrastructure.Tests/Squidex.Infrastructure.Tests.csproj @@ -8,6 +8,7 @@ enable +