Browse Source

[WIP] Bring event store back. (#780)

* Bring event store back.

* Progress with event store3.

* Fix offset and position handling.
pull/785/head
Sebastian Stehle 4 years ago
committed by GitHub
parent
commit
fbe51f1026
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 15
      backend/Squidex.sln
  2. 2
      backend/src/Squidex.Domain.Apps.Entities/Comments/DomainObject/CommentsGrain.cs
  3. 12
      backend/src/Squidex.Infrastructure.GetEventStore/Diagnostics/GetEventStoreHealthCheck.cs
  4. 79
      backend/src/Squidex.Infrastructure.GetEventStore/EventSourcing/EventStoreProjectionClient.cs
  5. 27
      backend/src/Squidex.Infrastructure.GetEventStore/EventSourcing/Formatter.cs
  6. 213
      backend/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStore.cs
  7. 86
      backend/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStoreSubscription.cs
  8. 142
      backend/src/Squidex.Infrastructure.GetEventStore/EventSourcing/ProjectionClient.cs
  9. 83
      backend/src/Squidex.Infrastructure.GetEventStore/EventSourcing/Utils.cs
  10. 9
      backend/src/Squidex.Infrastructure.GetEventStore/Squidex.Infrastructure.GetEventStore.csproj
  11. 2
      backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Reader.cs
  12. 2
      backend/src/Squidex.Infrastructure/EventSourcing/IEventStore.cs
  13. 17
      backend/src/Squidex/Config/Domain/EventSourcingServices.cs
  14. 2
      backend/src/Squidex/Squidex.csproj
  15. 39
      backend/tests/Squidex.Infrastructure.Tests/EventSourcing/EventStoreTests.cs
  16. 51
      backend/tests/Squidex.Infrastructure.Tests/EventSourcing/GetEventStoreFixture.cs
  17. 31
      backend/tests/Squidex.Infrastructure.Tests/EventSourcing/GetEventStoreTests.cs
  18. 1
      backend/tests/Squidex.Infrastructure.Tests/Squidex.Infrastructure.Tests.csproj

15
backend/Squidex.sln

@ -58,6 +58,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Squidex.Web", "src\Squidex.
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Migrations", "src\Migrations\Migrations.csproj", "{23615A39-F3FB-4575-A91C-535899DFB636}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Squidex.Infrastructure.GetEventStore", "src\Squidex.Infrastructure.GetEventStore\Squidex.Infrastructure.GetEventStore.csproj", "{4CFBD9FF-6565-457E-B81C-9FCEFEE854BC}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@ -284,6 +286,18 @@ Global
{23615A39-F3FB-4575-A91C-535899DFB636}.Release|x64.Build.0 = Release|Any CPU
{23615A39-F3FB-4575-A91C-535899DFB636}.Release|x86.ActiveCfg = Release|Any CPU
{23615A39-F3FB-4575-A91C-535899DFB636}.Release|x86.Build.0 = Release|Any CPU
{4CFBD9FF-6565-457E-B81C-9FCEFEE854BC}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{4CFBD9FF-6565-457E-B81C-9FCEFEE854BC}.Debug|Any CPU.Build.0 = Debug|Any CPU
{4CFBD9FF-6565-457E-B81C-9FCEFEE854BC}.Debug|x64.ActiveCfg = Debug|Any CPU
{4CFBD9FF-6565-457E-B81C-9FCEFEE854BC}.Debug|x64.Build.0 = Debug|Any CPU
{4CFBD9FF-6565-457E-B81C-9FCEFEE854BC}.Debug|x86.ActiveCfg = Debug|Any CPU
{4CFBD9FF-6565-457E-B81C-9FCEFEE854BC}.Debug|x86.Build.0 = Debug|Any CPU
{4CFBD9FF-6565-457E-B81C-9FCEFEE854BC}.Release|Any CPU.ActiveCfg = Release|Any CPU
{4CFBD9FF-6565-457E-B81C-9FCEFEE854BC}.Release|Any CPU.Build.0 = Release|Any CPU
{4CFBD9FF-6565-457E-B81C-9FCEFEE854BC}.Release|x64.ActiveCfg = Release|Any CPU
{4CFBD9FF-6565-457E-B81C-9FCEFEE854BC}.Release|x64.Build.0 = Release|Any CPU
{4CFBD9FF-6565-457E-B81C-9FCEFEE854BC}.Release|x86.ActiveCfg = Release|Any CPU
{4CFBD9FF-6565-457E-B81C-9FCEFEE854BC}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@ -308,6 +322,7 @@ Global
{F3C41B82-6A67-409A-B7FE-54543EE4F38B} = {FB8BC3A2-2010-4C3C-A87D-D4A98C05EE52}
{5B2D251F-46E3-486A-AE16-E3FE06B559ED} = {7EDE8CF1-B1E4-4005-B154-834B944E0D7A}
{23615A39-F3FB-4575-A91C-535899DFB636} = {94207AA6-4923-4183-A558-E0F8196B8CA3}
{4CFBD9FF-6565-457E-B81C-9FCEFEE854BC} = {8CF53B92-5EB1-461D-98F8-70DA9B603FBF}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {02F2E872-3141-44F5-BD6A-33CD84E9FE08}

2
backend/src/Squidex.Domain.Apps.Entities/Comments/DomainObject/CommentsGrain.cs

@ -44,7 +44,7 @@ namespace Squidex.Domain.Apps.Entities.Comments.DomainObject
{
streamName = $"comments-{key}";
var storedEvents = await eventStore.QueryLatestAsync(streamName, 100);
var storedEvents = await eventStore.QueryReverseAsync(streamName, 100);
foreach (var @event in storedEvents)
{

12
backend/src/Squidex.Infrastructure.GetEventStore/Diagnostics/GetEventStoreHealthCheck.cs

@ -5,26 +5,28 @@
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using EventStore.ClientAPI;
using EventStore.Client;
using Microsoft.Extensions.Diagnostics.HealthChecks;
namespace Squidex.Infrastructure.Diagnostics
{
public sealed class GetEventStoreHealthCheck : IHealthCheck
{
private readonly IEventStoreConnection connection;
private readonly EventStoreClient client;
public GetEventStoreHealthCheck(IEventStoreConnection connection)
public GetEventStoreHealthCheck(EventStoreClientSettings settings)
{
this.connection = connection;
client = new EventStoreClient(settings);
}
public async Task<HealthCheckResult> CheckHealthAsync(HealthCheckContext context,
CancellationToken cancellationToken = default)
{
await connection.ReadEventAsync("test", 1, false);
await client.ReadStreamAsync(Direction.Forwards, "test", default, cancellationToken: cancellationToken)
.FirstOrDefaultAsync(cancellationToken);
return HealthCheckResult.Healthy("Application must query data from EventStore.");
}

79
backend/src/Squidex.Infrastructure.GetEventStore/EventSourcing/EventStoreProjectionClient.cs

@ -0,0 +1,79 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschraenkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
using EventStore.Client;
using Squidex.Text;
namespace Squidex.Infrastructure.EventSourcing
{
public sealed class EventStoreProjectionClient
{
private readonly ConcurrentDictionary<string, bool> projections = new ConcurrentDictionary<string, bool>();
private readonly string projectionPrefix;
private readonly EventStoreProjectionManagementClient client;
public EventStoreProjectionClient(EventStoreClientSettings settings, string projectionPrefix)
{
client = new EventStoreProjectionManagementClient(settings);
this.projectionPrefix = projectionPrefix;
}
private string CreateFilterProjectionName(string filter)
{
return $"by-{projectionPrefix.Slugify()}-{filter.Slugify()}";
}
public async Task<string> CreateProjectionAsync(string? streamFilter = null)
{
if (!string.IsNullOrWhiteSpace(streamFilter) && streamFilter[0] != '^')
{
return $"{projectionPrefix}-{streamFilter}";
}
streamFilter ??= ".*";
var name = CreateFilterProjectionName(streamFilter);
var query =
$@"fromAll()
.when({{
$any: function (s, e) {{
if (e.streamId.indexOf('{projectionPrefix}') === 0 && /{streamFilter}/.test(e.streamId.substring({projectionPrefix.Length + 1}))) {{
linkTo('{name}', e);
}}
}}
}});";
await CreateProjectionAsync(name, query);
return name;
}
private async Task CreateProjectionAsync(string name, string query)
{
if (projections.TryAdd(name, true))
{
try
{
await client.CreateContinuousAsync(name, "fromAll().when()");
await client.UpdateAsync(name, query, true);
}
catch (Exception ex)
{
if (!ex.Is<InvalidOperationException>())
{
throw;
}
}
}
}
}
}

27
backend/src/Squidex.Infrastructure.GetEventStore/EventSourcing/Formatter.cs

@ -7,11 +7,12 @@
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Linq;
using System.Text;
using EventStore.ClientAPI;
using EventStore.Client;
using Squidex.Infrastructure.Json;
using EventStoreData = EventStore.ClientAPI.EventData;
using EventStoreData = EventStore.Client.EventData;
namespace Squidex.Infrastructure.EventSourcing
{
@ -23,7 +24,7 @@ namespace Squidex.Infrastructure.EventSourcing
{
var @event = resolvedEvent.Event;
var eventPayload = Encoding.UTF8.GetString(@event.Data);
var eventPayload = Encoding.UTF8.GetString(@event.Data.Span);
var eventHeaders = GetHeaders(serializer, @event);
var eventData = new EventData(@event.EventType, eventHeaders, eventPayload);
@ -32,12 +33,12 @@ namespace Squidex.Infrastructure.EventSourcing
return new StoredEvent(
streamName,
resolvedEvent.OriginalEventNumber.ToString(),
resolvedEvent.Event.EventNumber,
resolvedEvent.OriginalEventNumber.ToInt64().ToString(CultureInfo.InvariantCulture),
resolvedEvent.Event.EventNumber.ToInt64(),
eventData);
}
private static string GetStreamName(string? prefix, RecordedEvent @event)
private static string GetStreamName(string? prefix, EventRecord @event)
{
var streamName = @event.EventStreamId;
@ -49,10 +50,9 @@ namespace Squidex.Infrastructure.EventSourcing
return streamName;
}
private static EnvelopeHeaders GetHeaders(IJsonSerializer serializer, RecordedEvent @event)
private static EnvelopeHeaders GetHeaders(IJsonSerializer serializer, EventRecord @event)
{
var headersJson = Encoding.UTF8.GetString(@event.Metadata);
var headers = serializer.Deserialize<EnvelopeHeaders>(headersJson);
var headers = Deserialize<EnvelopeHeaders>(serializer, @event.Metadata);
foreach (var key in headers.Keys.ToList())
{
@ -65,6 +65,13 @@ namespace Squidex.Infrastructure.EventSourcing
return headers;
}
private static T Deserialize<T>(IJsonSerializer serializer, ReadOnlyMemory<byte> source)
{
var json = Encoding.UTF8.GetString(source.Span);
return serializer.Deserialize<T>(json);
}
public static EventStoreData Write(EventData eventData, IJsonSerializer serializer)
{
var payload = Encoding.UTF8.GetBytes(eventData.Payload);
@ -72,7 +79,7 @@ namespace Squidex.Infrastructure.EventSourcing
var headersJson = serializer.Serialize(eventData.Headers);
var headersBytes = Encoding.UTF8.GetBytes(headersJson);
return new EventStoreData(Guid.NewGuid(), eventData.Type, true, payload, headersBytes);
return new EventStoreData(Uuid.FromGuid(Guid.NewGuid()), eventData.Type, payload, headersBytes);
}
}
}

213
backend/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStore.cs

@ -11,44 +11,37 @@ using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using EventStore.ClientAPI;
using EventStore.ClientAPI.Exceptions;
using EventStore.Client;
using NodaTime;
using Squidex.Hosting;
using Squidex.Hosting.Configuration;
using Squidex.Infrastructure.Json;
using Squidex.Log;
namespace Squidex.Infrastructure.EventSourcing
{
public sealed class GetEventStore : IEventStore, IInitializable
{
private const int WritePageSize = 500;
private const int ReadPageSize = 500;
private const string StreamPrefix = "squidex";
private static readonly IReadOnlyList<StoredEvent> EmptyEvents = new List<StoredEvent>();
private readonly IEventStoreConnection connection;
private readonly EventStoreClient client;
private readonly EventStoreProjectionClient projectionClient;
private readonly IJsonSerializer serializer;
private readonly string prefix = "squidex";
private readonly ProjectionClient projectionClient;
public GetEventStore(IEventStoreConnection connection, IJsonSerializer serializer, string prefix, string projectionHost)
public GetEventStore(EventStoreClientSettings settings, IJsonSerializer serializer)
{
this.connection = connection;
this.serializer = serializer;
if (!string.IsNullOrWhiteSpace(prefix))
{
this.prefix = prefix.Trim(' ', '-');
}
client = new EventStoreClient(settings);
projectionClient = new ProjectionClient(connection, this.prefix, projectionHost);
projectionClient = new EventStoreProjectionClient(settings, StreamPrefix);
}
public async Task InitializeAsync(
CancellationToken ct = default)
CancellationToken ct)
{
try
{
await connection.ConnectAsync();
await client.SoftDeleteAsync(Guid.NewGuid().ToString(), StreamState.Any, cancellationToken: ct);
}
catch (Exception ex)
{
@ -56,18 +49,16 @@ namespace Squidex.Infrastructure.EventSourcing
throw new ConfigurationException(error, ex);
}
await projectionClient.ConnectAsync();
}
public IEventSubscription CreateSubscription(IEventSubscriber subscriber, string? streamFilter = null, string? position = null)
{
Guard.NotNull(streamFilter, nameof(streamFilter));
return new GetEventStoreSubscription(connection, subscriber, serializer, projectionClient, position, prefix, streamFilter);
return new GetEventStoreSubscription(subscriber, client, projectionClient, serializer, position, StreamPrefix, streamFilter);
}
public async IAsyncEnumerable<StoredEvent> QueryAllAsync(string? streamFilter = null, string? position = null, long take = long.MaxValue,
public async IAsyncEnumerable<StoredEvent> QueryAllAsync(string? streamFilter = null, string? position = null, int take = int.MaxValue,
[EnumeratorCancellation] CancellationToken ct = default)
{
if (take <= 0)
@ -77,15 +68,15 @@ namespace Squidex.Infrastructure.EventSourcing
var streamName = await projectionClient.CreateProjectionAsync(streamFilter);
var sliceStart = ProjectionClient.ParsePosition(position);
var stream = QueryAsync(streamName, position.ToPosition(false), take, ct);
await foreach (var storedEvent in QueryReverseAsync(streamName, sliceStart, take, ct))
await foreach (var storedEvent in stream.IgnoreNotFound(ct))
{
yield return storedEvent;
}
}
public async IAsyncEnumerable<StoredEvent> QueryAllReverseAsync(string? streamFilter = null, string? position = null, long take = long.MaxValue,
public async IAsyncEnumerable<StoredEvent> QueryAllReverseAsync(string? streamFilter = null, Instant timestamp = default, int take = int.MaxValue,
[EnumeratorCancellation] CancellationToken ct = default)
{
if (take <= 0)
@ -95,15 +86,16 @@ namespace Squidex.Infrastructure.EventSourcing
var streamName = await projectionClient.CreateProjectionAsync(streamFilter);
var sliceStart = ProjectionClient.ParsePosition(position);
var stream = QueryReverseAsync(streamName, StreamPosition.End, take, ct);
await foreach (var storedEvent in QueryAsync(streamName, sliceStart, take, ct))
await foreach (var storedEvent in stream.IgnoreNotFound(ct).TakeWhile(x => x.Data.Headers.Timestamp() >= timestamp))
{
yield return storedEvent;
}
}
public async Task<IReadOnlyList<StoredEvent>> QueryLatestAsync(string streamName, int count)
public async Task<IReadOnlyList<StoredEvent>> QueryReverseAsync(string streamName, int count = int.MaxValue,
CancellationToken ct = default)
{
Guard.NotNullOrEmpty(streamName, nameof(streamName));
@ -112,11 +104,13 @@ namespace Squidex.Infrastructure.EventSourcing
return EmptyEvents;
}
using (Profiler.TraceMethod<GetEventStore>())
using (Telemetry.Activities.StartActivity("GetEventStore/GetEventStore"))
{
var result = new List<StoredEvent>();
await foreach (var storedEvent in QueryReverseAsync(streamName, StreamPosition.End, default))
var stream = QueryReverseAsync(GetStreamName(streamName), StreamPosition.End, count, ct);
await foreach (var storedEvent in stream.IgnoreNotFound(ct))
{
result.Add(storedEvent);
}
@ -125,15 +119,18 @@ namespace Squidex.Infrastructure.EventSourcing
}
}
public async Task<IReadOnlyList<StoredEvent>> QueryAsync(string streamName, long streamPosition = 0)
public async Task<IReadOnlyList<StoredEvent>> QueryAsync(string streamName, long streamPosition = 0,
CancellationToken ct = default)
{
Guard.NotNullOrEmpty(streamName, nameof(streamName));
using (Profiler.TraceMethod<GetEventStore>())
using (Telemetry.Activities.StartActivity("GetEventStore/QueryAsync"))
{
var result = new List<StoredEvent>();
await foreach (var storedEvent in QueryAsync(streamName, StreamPosition.End, default))
var stream = QueryAsync(GetStreamName(streamName), streamPosition.ToPosition(), int.MaxValue, ct);
await foreach (var storedEvent in stream.IgnoreNotFound(ct))
{
result.Add(storedEvent);
}
@ -142,95 +139,63 @@ namespace Squidex.Infrastructure.EventSourcing
}
}
private async IAsyncEnumerable<StoredEvent> QueryAsync(string streamName, long sliceStart, long take = int.MaxValue,
[EnumeratorCancellation] CancellationToken ct = default)
private IAsyncEnumerable<StoredEvent> QueryAsync(string streamName, StreamPosition start, long count,
CancellationToken ct = default)
{
var taken = take;
StreamEventsSlice currentSlice;
do
{
currentSlice = await connection.ReadStreamEventsForwardAsync(streamName, sliceStart, ReadPageSize, true);
if (currentSlice.Status == SliceReadStatus.Success)
{
sliceStart = currentSlice.NextEventNumber;
foreach (var resolved in currentSlice.Events)
{
var storedEvent = Formatter.Read(resolved, prefix, serializer);
yield return storedEvent;
if (taken == take)
{
break;
}
taken++;
}
}
}
while (!currentSlice.IsEndOfStream && !ct.IsCancellationRequested && taken < take);
var result = client.ReadStreamAsync(
Direction.Forwards,
streamName,
start,
count,
resolveLinkTos: true,
cancellationToken: ct);
return result.Select(x => Formatter.Read(x, StreamPrefix, serializer));
}
private async IAsyncEnumerable<StoredEvent> QueryReverseAsync(string streamName, long sliceStart, long take = int.MaxValue,
[EnumeratorCancellation] CancellationToken ct = default)
private IAsyncEnumerable<StoredEvent> QueryReverseAsync(string streamName, StreamPosition start, long count,
CancellationToken ct = default)
{
var taken = take;
StreamEventsSlice currentSlice;
do
{
currentSlice = await connection.ReadStreamEventsBackwardAsync(streamName, sliceStart, ReadPageSize, true);
if (currentSlice.Status == SliceReadStatus.Success)
{
sliceStart = currentSlice.NextEventNumber;
foreach (var resolved in currentSlice.Events.OrderByDescending(x => x.Event.EventNumber))
{
var storedEvent = Formatter.Read(resolved, prefix, serializer);
yield return storedEvent;
if (taken == take)
{
break;
}
taken++;
}
}
}
while (!currentSlice.IsEndOfStream && !ct.IsCancellationRequested && taken < take);
var result = client.ReadStreamAsync(
Direction.Backwards,
streamName,
start,
count,
resolveLinkTos: true,
cancellationToken: ct);
return result.Select(x => Formatter.Read(x, StreamPrefix, serializer));
}
public Task DeleteStreamAsync(string streamName)
public async Task DeleteStreamAsync(string streamName,
CancellationToken ct = default)
{
Guard.NotNullOrEmpty(streamName, nameof(streamName));
return connection.DeleteStreamAsync(GetStreamName(streamName), ExpectedVersion.Any);
await client.SoftDeleteAsync(GetStreamName(streamName), StreamState.Any, cancellationToken: ct);
}
public Task AppendAsync(Guid commitId, string streamName, ICollection<EventData> events)
public Task AppendAsync(Guid commitId, string streamName, ICollection<EventData> events,
CancellationToken ct = default)
{
return AppendEventsInternalAsync(streamName, EtagVersion.Any, events);
return AppendEventsInternalAsync(streamName, EtagVersion.Any, events, ct);
}
public Task AppendAsync(Guid commitId, string streamName, long expectedVersion, ICollection<EventData> events)
public Task AppendAsync(Guid commitId, string streamName, long expectedVersion, ICollection<EventData> events,
CancellationToken ct = default)
{
Guard.GreaterEquals(expectedVersion, -1, nameof(expectedVersion));
return AppendEventsInternalAsync(streamName, expectedVersion, events);
return AppendEventsInternalAsync(streamName, expectedVersion, events, ct);
}
private async Task AppendEventsInternalAsync(string streamName, long expectedVersion, ICollection<EventData> events)
private async Task AppendEventsInternalAsync(string streamName, long expectedVersion, ICollection<EventData> events,
CancellationToken ct)
{
Guard.NotNullOrEmpty(streamName, nameof(streamName));
Guard.NotNull(events, nameof(events));
using (Profiler.TraceMethod<GetEventStore>(nameof(AppendAsync)))
using (Telemetry.Activities.StartActivity("GetEventStore/AppendEventsInternalAsync"))
{
if (events.Count == 0)
{
@ -239,40 +204,58 @@ namespace Squidex.Infrastructure.EventSourcing
try
{
var eventsToSave = events.Select(x => Formatter.Write(x, serializer)).ToList();
var eventData = events.Select(x => Formatter.Write(x, serializer));
if (eventsToSave.Count < WritePageSize)
streamName = GetStreamName(streamName);
if (expectedVersion == -1)
{
await client.AppendToStreamAsync(streamName, StreamState.NoStream, eventData, cancellationToken: ct);
}
else if (expectedVersion < -1)
{
await connection.AppendToStreamAsync(GetStreamName(streamName), expectedVersion, eventsToSave);
await client.AppendToStreamAsync(streamName, StreamState.Any, eventData, cancellationToken: ct);
}
else
{
using (var transaction = await connection.StartTransactionAsync(GetStreamName(streamName), expectedVersion))
{
for (var p = 0; p < eventsToSave.Count; p += WritePageSize)
{
await transaction.WriteAsync(eventsToSave.Skip(p).Take(WritePageSize));
}
await transaction.CommitAsync();
}
await client.AppendToStreamAsync(streamName, expectedVersion.ToRevision(), eventData, cancellationToken: ct);
}
}
catch (WrongExpectedVersionException ex)
{
throw new WrongEventVersionException(ParseVersion(ex.Message), expectedVersion);
throw new WrongEventVersionException(ex.ActualVersion ?? 0, expectedVersion);
}
}
}
private static int ParseVersion(string message)
public async Task DeleteAsync(string streamFilter,
CancellationToken ct = default)
{
return int.Parse(message[(message.LastIndexOf(':') + 1)..]);
var streamName = await projectionClient.CreateProjectionAsync(streamFilter);
var events = client.ReadStreamAsync(Direction.Forwards, streamName, StreamPosition.Start, resolveLinkTos: true, cancellationToken: ct);
if (await events.ReadState == ReadState.StreamNotFound)
{
return;
}
var deleted = new HashSet<string>();
await foreach (var storedEvent in TaskAsyncEnumerableExtensions.WithCancellation(events, ct))
{
var streamToDelete = storedEvent.Event.EventStreamId;
if (deleted.Add(streamToDelete))
{
await client.SoftDeleteAsync(streamToDelete, StreamState.Any, cancellationToken: ct);
}
}
}
private string GetStreamName(string streamName)
private static string GetStreamName(string streamName)
{
return $"{prefix}-{streamName}";
return $"{StreamPrefix}-{streamName}";
}
}
}

86
backend/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStoreSubscription.cs

@ -5,70 +5,76 @@
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using EventStore.ClientAPI;
using EventStore.ClientAPI.Exceptions;
using System;
using System.Threading;
using System.Threading.Tasks;
using EventStore.Client;
using Squidex.Infrastructure.Json;
using Squidex.Infrastructure.Tasks;
namespace Squidex.Infrastructure.EventSourcing
{
internal sealed class GetEventStoreSubscription : IEventSubscription
{
private readonly IEventStoreConnection connection;
private readonly IEventSubscriber subscriber;
private readonly IJsonSerializer serializer;
private readonly string? prefix;
private readonly EventStoreCatchUpSubscription subscription;
private readonly long? position;
private readonly CancellationTokenSource cts = new CancellationTokenSource();
private StreamSubscription subscription;
public GetEventStoreSubscription(
IEventStoreConnection connection,
IEventSubscriber subscriber,
EventStoreClient client,
EventStoreProjectionClient projectionClient,
IJsonSerializer serializer,
ProjectionClient projectionClient,
string? position,
string? prefix,
string? streamFilter)
{
this.connection = connection;
Task.Run(async () =>
{
var ct = cts.Token;
this.position = ProjectionClient.ParsePositionOrNull(position);
this.prefix = prefix;
var streamName = await projectionClient.CreateProjectionAsync(streamFilter);
var streamName = AsyncHelper.Sync(() => projectionClient.CreateProjectionAsync(streamFilter));
this.serializer = serializer;
this.subscriber = subscriber;
subscription = SubscribeToStream(streamName);
}
public void Unsubscribe()
{
subscription.Stop();
}
private EventStoreCatchUpSubscription SubscribeToStream(string streamName)
{
var settings = CatchUpSubscriptionSettings.Default;
return connection.SubscribeToStreamFrom(streamName, position, settings,
async (s, e) =>
Func<StreamSubscription, ResolvedEvent, CancellationToken, Task> onEvent = async (_, @event, _) =>
{
var storedEvent = Formatter.Read(e, prefix, serializer);
var storedEvent = Formatter.Read(@event, prefix, serializer);
await subscriber.OnEventAsync(this, storedEvent);
}, null,
(s, reason, ex) =>
};
Action<StreamSubscription, SubscriptionDroppedReason, Exception?>? onError = (_, reason, ex) =>
{
if (reason != SubscriptionDropReason.ConnectionClosed &&
reason != SubscriptionDropReason.UserInitiated)
if (reason != SubscriptionDroppedReason.Disposed &&
reason != SubscriptionDroppedReason.SubscriberError)
{
ex ??= new ConnectionClosedException($"Subscription closed with reason {reason}.");
ex ??= new InvalidOperationException($"Subscription closed with reason {reason}.");
subscriber.OnErrorAsync(this, ex);
}
});
};
if (!string.IsNullOrWhiteSpace(position))
{
var streamPosition = position.ToPosition(true);
subscription = await client.SubscribeToStreamAsync(streamName, streamPosition,
onEvent, true,
onError,
cancellationToken: ct);
}
else
{
subscription = await client.SubscribeToStreamAsync(streamName,
onEvent, true,
onError,
cancellationToken: ct);
}
}, cts.Token);
}
public void Unsubscribe()
{
subscription?.Dispose();
cts.Cancel();
}
}
}

142
backend/src/Squidex.Infrastructure.GetEventStore/EventSourcing/ProjectionClient.cs

@ -1,142 +0,0 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschraenkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Net.Sockets;
using System.Threading.Tasks;
using EventStore.ClientAPI;
using EventStore.ClientAPI.Exceptions;
using EventStore.ClientAPI.Projections;
using Squidex.Hosting.Configuration;
using Squidex.Text;
namespace Squidex.Infrastructure.EventSourcing
{
public sealed class ProjectionClient
{
private readonly ConcurrentDictionary<string, bool> projections = new ConcurrentDictionary<string, bool>();
private readonly IEventStoreConnection connection;
private readonly string projectionPrefix;
private readonly string projectionHost;
private ProjectionsManager projectionsManager;
public ProjectionClient(IEventStoreConnection connection, string projectionPrefix, string projectionHost)
{
this.connection = connection;
this.projectionPrefix = projectionPrefix;
this.projectionHost = projectionHost;
}
private string CreateFilterProjectionName(string filter)
{
return $"by-{projectionPrefix.Slugify()}-{filter.Slugify()}";
}
public async Task<string> CreateProjectionAsync(string? streamFilter = null)
{
streamFilter ??= ".*";
var name = CreateFilterProjectionName(streamFilter);
var query =
$@"fromAll()
.when({{
$any: function (s, e) {{
if (e.streamId.indexOf('{projectionPrefix}') === 0 && /{streamFilter}/.test(e.streamId.substring({projectionPrefix.Length + 1}))) {{
linkTo('{name}', e);
}}
}}
}});";
await CreateProjectionAsync(name, query);
return name;
}
private async Task CreateProjectionAsync(string name, string query)
{
if (projections.TryAdd(name, true))
{
try
{
var credentials = connection.Settings.DefaultUserCredentials;
await projectionsManager.CreateContinuousAsync(name, query, credentials);
}
catch (Exception ex)
{
if (!ex.Is<ProjectionCommandConflictException>())
{
throw;
}
}
}
}
public async Task ConnectAsync()
{
var addressParts = projectionHost.Split(':');
if (addressParts.Length < 2 || !int.TryParse(addressParts[1], out var port))
{
port = 2113;
}
var endpoints = await Dns.GetHostAddressesAsync(addressParts[0]);
var endpoint = new IPEndPoint(endpoints.First(x => x.AddressFamily == AddressFamily.InterNetwork), port);
async Task ConnectToSchemaAsync(string schema)
{
projectionsManager =
new ProjectionsManager(
connection.Settings.Log, endpoint,
connection.Settings.OperationTimeout,
null,
schema);
await projectionsManager.ListAllAsync(connection.Settings.DefaultUserCredentials);
}
try
{
try
{
await ConnectToSchemaAsync("https");
}
catch (HttpRequestException)
{
await ConnectToSchemaAsync("http");
}
catch (AggregateException ex) when (ex.Flatten().InnerException is HttpRequestException)
{
await ConnectToSchemaAsync("http");
}
}
catch (Exception ex)
{
var error = new ConfigurationError($"GetEventStore cannot connect to event store projections: {projectionHost}.");
throw new ConfigurationException(error, ex);
}
}
public static long? ParsePositionOrNull(string? position)
{
return long.TryParse(position, out var parsedPosition) ? (long?)parsedPosition : null;
}
public static long ParsePosition(string? position)
{
return long.TryParse(position, out var parsedPosition) ? parsedPosition + 1 : StreamPosition.Start;
}
}
}

83
backend/src/Squidex.Infrastructure.GetEventStore/EventSourcing/Utils.cs

@ -0,0 +1,83 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschraenkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System.Collections.Generic;
using System.Globalization;
using System.Runtime.CompilerServices;
using System.Threading;
using EventStore.Client;
namespace Squidex.Infrastructure.EventSourcing
{
public static class Utils
{
public static StreamRevision ToRevision(this long version)
{
return StreamRevision.FromInt64(version);
}
public static StreamPosition ToPosition(this long version)
{
if (version <= 0)
{
return StreamPosition.Start;
}
return StreamPosition.FromInt64(version);
}
public static StreamPosition ToPosition(this string? position, bool inclusive)
{
if (string.IsNullOrWhiteSpace(position))
{
return StreamPosition.Start;
}
if (long.TryParse(position, NumberStyles.Integer, CultureInfo.InvariantCulture, out var parsedPosition))
{
if (!inclusive)
{
parsedPosition++;
}
return StreamPosition.FromInt64(parsedPosition);
}
return StreamPosition.Start;
}
public static async IAsyncEnumerable<StoredEvent> IgnoreNotFound(this IAsyncEnumerable<StoredEvent> source,
[EnumeratorCancellation] CancellationToken ct = default)
{
var enumerator = source.GetAsyncEnumerator(ct);
bool resultFound;
try
{
resultFound = await enumerator.MoveNextAsync(ct);
}
catch (StreamNotFoundException)
{
resultFound = false;
}
if (!resultFound)
{
yield break;
}
yield return enumerator.Current;
while (await enumerator.MoveNextAsync(ct))
{
ct.ThrowIfCancellationRequested();
yield return enumerator.Current;
}
}
}
}

9
backend/src/Squidex.Infrastructure.GetEventStore/Squidex.Infrastructure.GetEventStore.csproj

@ -10,7 +10,14 @@
<DebugSymbols>True</DebugSymbols>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="EventStore.Client" Version="21.2.0" />
<PackageReference Include="EventStore.Client.Grpc.PersistentSubscriptions" Version="21.2.0" />
<PackageReference Include="EventStore.Client.Grpc.ProjectionManagement" Version="21.2.0" />
<PackageReference Include="EventStore.Client.Grpc.Streams" Version="21.2.0" />
<PackageReference Include="Grpc.Net.Client" Version="2.40.0" />
<PackageReference Include="Meziantou.Analyzer" Version="1.0.670">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="RefactoringEssentials" Version="5.6.0" PrivateAssets="all" />
<PackageReference Include="StyleCop.Analyzers" Version="1.1.118" PrivateAssets="all" />
</ItemGroup>

2
backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Reader.cs

@ -38,7 +38,7 @@ namespace Squidex.Infrastructure.EventSourcing
}
}
public async Task<IReadOnlyList<StoredEvent>> QueryLatestAsync(string streamName, int count = int.MaxValue,
public async Task<IReadOnlyList<StoredEvent>> QueryReverseAsync(string streamName, int count = int.MaxValue,
CancellationToken ct = default)
{
Guard.NotNullOrEmpty(streamName, nameof(streamName));

2
backend/src/Squidex.Infrastructure/EventSourcing/IEventStore.cs

@ -15,7 +15,7 @@ namespace Squidex.Infrastructure.EventSourcing
{
public interface IEventStore
{
Task<IReadOnlyList<StoredEvent>> QueryLatestAsync(string streamName, int take = int.MaxValue,
Task<IReadOnlyList<StoredEvent>> QueryReverseAsync(string streamName, int take = int.MaxValue,
CancellationToken ct = default);
Task<IReadOnlyList<StoredEvent>> QueryAsync(string streamName, long streamPosition = 0,

17
backend/src/Squidex/Config/Domain/EventSourcingServices.cs

@ -6,13 +6,17 @@
// ==========================================================================
using System.Linq;
using EventStore.Client;
using EventStore.ClientAPI;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using MongoDB.Driver;
using Squidex.Infrastructure;
using Squidex.Infrastructure.Commands;
using Squidex.Infrastructure.Diagnostics;
using Squidex.Infrastructure.EventSourcing;
using Squidex.Infrastructure.EventSourcing.Grains;
using Squidex.Infrastructure.Json;
using Squidex.Infrastructure.States;
namespace Squidex.Config.Domain
@ -36,6 +40,19 @@ namespace Squidex.Config.Domain
return new MongoEventStore(mongDatabase, c.GetRequiredService<IEventNotifier>());
})
.As<IEventStore>();
},
["GetEventStore"] = () =>
{
var configuration = config.GetRequiredValue("eventStore:getEventStore:configuration");
services.AddSingletonAs(_ => EventStoreClientSettings.Create(configuration))
.AsSelf();
services.AddSingletonAs<GetEventStore>()
.As<IEventStore>();
services.AddHealthChecks()
.AddCheck<GetEventStoreHealthCheck>("EventStore", tags: new[] { "node" });
}
});

2
backend/src/Squidex/Squidex.csproj

@ -24,6 +24,7 @@
<ProjectReference Include="..\Squidex.Domain.Apps.Events\Squidex.Domain.Apps.Events.csproj" />
<ProjectReference Include="..\Squidex.Domain.Users\Squidex.Domain.Users.csproj" />
<ProjectReference Include="..\Squidex.Domain.Users.MongoDb\Squidex.Domain.Users.MongoDb.csproj" />
<ProjectReference Include="..\Squidex.Infrastructure.GetEventStore\Squidex.Infrastructure.GetEventStore.csproj" />
<ProjectReference Include="..\Squidex.Infrastructure.RabbitMq\Squidex.Infrastructure.RabbitMq.csproj" />
<ProjectReference Include="..\Squidex.Infrastructure\Squidex.Infrastructure.csproj" />
<ProjectReference Include="..\Squidex.Infrastructure.MongoDb\Squidex.Infrastructure.MongoDb.csproj" />
@ -33,6 +34,7 @@
<ItemGroup>
<PackageReference Include="AspNet.Security.OAuth.GitHub" Version="5.0.9" />
<PackageReference Include="EventStore.Client" Version="21.2.0" />
<PackageReference Include="GraphQL.DataLoader" Version="4.5.0" />
<PackageReference Include="GraphQL.Server.Core" Version="5.0.2" />
<PackageReference Include="GraphQL.Server.Transports.AspNetCore.NewtonsoftJson" Version="5.0.2" />

39
backend/tests/Squidex.Infrastructure.Tests/EventSourcing/EventStoreTests.cs

@ -321,7 +321,7 @@ namespace Squidex.Infrastructure.EventSourcing
{
var expected = allExpected.TakeLast(take).ToArray();
var readEvents = await Sut.QueryLatestAsync(streamName, take);
var readEvents = await Sut.QueryReverseAsync(streamName, take);
ShouldBeEquivalentTo(readEvents, expected);
}
@ -375,9 +375,22 @@ namespace Squidex.Infrastructure.EventSourcing
await Sut.AppendAsync(Guid.NewGuid(), streamName, events);
await Sut.DeleteAsync($"^{streamName.Substring(0, 10)}");
IReadOnlyList<StoredEvent>? readEvents = null;
var readEvents = await QueryAsync(streamName);
for (var i = 0; i < 5; i++)
{
await Sut.DeleteAsync($"^{streamName.Substring(0, 10)}");
readEvents = await QueryAsync(streamName);
if (readEvents.Count == 0)
{
break;
}
// Get event store needs a little bit of time for the projections.
await Task.Delay(1000);
}
Assert.Empty(readEvents);
}
@ -395,9 +408,22 @@ namespace Squidex.Infrastructure.EventSourcing
await Sut.AppendAsync(Guid.NewGuid(), streamName, events);
await Sut.DeleteStreamAsync(streamName);
IReadOnlyList<StoredEvent>? readEvents = null;
var readEvents = await QueryAsync(streamName);
for (var i = 0; i < 5; i++)
{
await Sut.DeleteStreamAsync(streamName);
readEvents = await QueryAsync(streamName);
if (readEvents.Count == 0)
{
break;
}
// Get event store needs a little bit of time for the projections.
await Task.Delay(1000);
}
Assert.Empty(readEvents);
}
@ -424,7 +450,8 @@ namespace Squidex.Infrastructure.EventSourcing
return readEvents;
}
private async Task<IReadOnlyList<StoredEvent>?> QueryWithSubscriptionAsync(string streamFilter, Func<Task>? subscriptionRunning = null, bool fromBeginning = false)
private async Task<IReadOnlyList<StoredEvent>?> QueryWithSubscriptionAsync(string streamFilter,
Func<Task>? subscriptionRunning = null, bool fromBeginning = false)
{
var subscriber = new EventSubscriber();

51
backend/tests/Squidex.Infrastructure.Tests/EventSourcing/GetEventStoreFixture.cs

@ -0,0 +1,51 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschraenkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System;
using System.Threading.Tasks;
using EventStore.Client;
using Squidex.Infrastructure.TestHelpers;
namespace Squidex.Infrastructure.EventSourcing
{
public sealed class GetEventStoreFixture : IDisposable
{
private readonly EventStoreClientSettings settings;
public GetEventStore EventStore { get; }
public GetEventStoreFixture()
{
AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true);
settings = EventStoreClientSettings.Create("esdb://admin:changeit@127.0.0.1:2113?tls=false");
EventStore = new GetEventStore(settings, TestUtils.DefaultSerializer);
EventStore.InitializeAsync(default).Wait();
}
public void Dispose()
{
CleanupAsync().Wait();
}
private async Task CleanupAsync()
{
var projectionsManager = new EventStoreProjectionManagementClient(settings);
await foreach (var projection in projectionsManager.ListAllAsync())
{
var name = projection.Name;
if (name.StartsWith("by-squidex-test", StringComparison.OrdinalIgnoreCase))
{
await projectionsManager.DisableAsync(name);
}
}
}
}
}

31
backend/tests/Squidex.Infrastructure.Tests/EventSourcing/GetEventStoreTests.cs

@ -0,0 +1,31 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschraenkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using Xunit;
#pragma warning disable SA1300 // Element should begin with upper-case letter
namespace Squidex.Infrastructure.EventSourcing
{
[Trait("Category", "Dependencies")]
public class GetEventStoreTests : EventStoreTests<GetEventStore>, IClassFixture<GetEventStoreFixture>
{
public GetEventStoreFixture _ { get; }
protected override int SubscriptionDelayInMs { get; } = 1000;
public GetEventStoreTests(GetEventStoreFixture fixture)
{
_ = fixture;
}
public override GetEventStore CreateStore()
{
return _.EventStore;
}
}
}

1
backend/tests/Squidex.Infrastructure.Tests/Squidex.Infrastructure.Tests.csproj

@ -8,6 +8,7 @@
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\Squidex.Infrastructure.GetEventStore\Squidex.Infrastructure.GetEventStore.csproj" />
<ProjectReference Include="..\..\src\Squidex.Infrastructure.MongoDb\Squidex.Infrastructure.MongoDb.csproj" />
<ProjectReference Include="..\..\src\Squidex.Infrastructure\Squidex.Infrastructure.csproj" />
</ItemGroup>

Loading…
Cancel
Save