diff --git a/backend/extensions/Squidex.Extensions/Actions/Algolia/AlgoliaActionHandler.cs b/backend/extensions/Squidex.Extensions/Actions/Algolia/AlgoliaActionHandler.cs
index 003bc485f..442738af0 100644
--- a/backend/extensions/Squidex.Extensions/Actions/Algolia/AlgoliaActionHandler.cs
+++ b/backend/extensions/Squidex.Extensions/Actions/Algolia/AlgoliaActionHandler.cs
@@ -102,7 +102,7 @@ namespace Squidex.Extensions.Actions.Algolia
{
if (job.Content != null)
{
- var response = await index.PartialUpdateObjectAsync(job.Content, null, ct, true);
+ var response = await index.SaveObjectAsync(job.Content, null, ct, true);
return Result.Success(JsonConvert.SerializeObject(response, Formatting.Indented));
}
diff --git a/backend/extensions/Squidex.Extensions/Squidex.Extensions.csproj b/backend/extensions/Squidex.Extensions/Squidex.Extensions.csproj
index 45cd0db46..085cb64aa 100644
--- a/backend/extensions/Squidex.Extensions/Squidex.Extensions.csproj
+++ b/backend/extensions/Squidex.Extensions/Squidex.Extensions.csproj
@@ -8,15 +8,14 @@
-
+
-
+
-
-
+
+
-
diff --git a/backend/i18n/frontend_en.json b/backend/i18n/frontend_en.json
index 24b4636a3..c89da2187 100644
--- a/backend/i18n/frontend_en.json
+++ b/backend/i18n/frontend_en.json
@@ -528,7 +528,7 @@
"roles.updateFailed": "Failed to update role. Please reload.",
"rules.actionEdit": "Edit Action",
"rules.cancelFailed": "Failed to cancel rule. Please reload.",
- "rules.create": "Create new Rule",
+ "rules.create": "New Rule",
"rules.createFailed": "Failed to create rule. Please reload.",
"rules.createTooltip": "New Rule (CTRL + SHIFT + G)",
"rules.deleteConfirmText": "Do you really want to delete the rule?",
diff --git a/backend/i18n/frontend_nl.json b/backend/i18n/frontend_nl.json
index 5ffd942c2..b3cc68fb0 100644
--- a/backend/i18n/frontend_nl.json
+++ b/backend/i18n/frontend_nl.json
@@ -134,8 +134,8 @@
"clients.addFailed": "Toevoegen van client is mislukt. Laad opnieuw.",
"clients.allowAnonymous": "Sta anonieme toegang toe.",
"clients.allowAnonymousHint": "Sta toegang tot de API toe zonder toegangstoken voor alle bronnen die zijn geconfigureerd via de rol van deze client. Geef niet meer dan één client anonieme toegang.",
- "clients.apiCallsLimit": "Max API Calls",
- "clients.apiCallsLimitHint": "Limit the number of API calls this client can make per month to protect your API contingent for other clients that are more important.",
+ "clients.apiCallsLimit": "Max. API aanroepen",
+ "clients.apiCallsLimitHint": "Beperk het aantal API aanroepen dat deze client per maand kan doen om uw API quota te beschermen voor andere clients die belangrijker zijn.",
"clients.clientIdValidationMessage": "Naam mag alleen letters, cijfers, streepjes en spaties bevatten.",
"clients.clientNamePlaceholder": "Voer de naam van de klant in",
"clients.connect": "Verbinden",
@@ -211,7 +211,7 @@
"common.create": "Maken",
"common.created": "Gemaakt",
"common.date": "Datum",
- "common.dateTimeEditor.local": "Local",
+ "common.dateTimeEditor.local": "Lokaal",
"common.dateTimeEditor.now": "Nu",
"common.dateTimeEditor.nowTooltip": "Nu gebruiken (UTC)",
"common.dateTimeEditor.today": "Vandaag",
@@ -355,7 +355,7 @@
"contents.loadDataFailed": "Laden van gegevens is mislukt. Laad opnieuw.",
"contents.loadFailed": "Laden van inhoud is mislukt. Laad opnieuw.",
"contents.loadVersionFailed": "Versie van een nieuwe versie is mislukt. Laad opnieuw.",
- "contents.newStatusFieldDescription": "The new status of the content item.",
+ "contents.newStatusFieldDescription": "De nieuwe status van het item.",
"contents.noReference": "- Geen referentie -",
"contents.pendingChangesTextToChange": "Je hebt niet-opgeslagen wijzigingen. \n \n Wanneer je de status wijzigt, raak je ze kwijt. \n \n **Wil je toch doorgaan?**",
"contents.pendingChangesTextToClose": "Je hebt niet-opgeslagen wijzigingen. \n \n Wanneer je de huidige inhoudsweergave sluit, raak je ze kwijt. \n n **Wil je toch doorgaan?**",
diff --git a/backend/i18n/source/frontend_en.json b/backend/i18n/source/frontend_en.json
index 24b4636a3..c89da2187 100644
--- a/backend/i18n/source/frontend_en.json
+++ b/backend/i18n/source/frontend_en.json
@@ -528,7 +528,7 @@
"roles.updateFailed": "Failed to update role. Please reload.",
"rules.actionEdit": "Edit Action",
"rules.cancelFailed": "Failed to cancel rule. Please reload.",
- "rules.create": "Create new Rule",
+ "rules.create": "New Rule",
"rules.createFailed": "Failed to create rule. Please reload.",
"rules.createTooltip": "New Rule (CTRL + SHIFT + G)",
"rules.deleteConfirmText": "Do you really want to delete the rule?",
diff --git a/backend/i18n/source/frontend_nl.json b/backend/i18n/source/frontend_nl.json
index 0b4339069..b3cc68fb0 100644
--- a/backend/i18n/source/frontend_nl.json
+++ b/backend/i18n/source/frontend_nl.json
@@ -134,6 +134,8 @@
"clients.addFailed": "Toevoegen van client is mislukt. Laad opnieuw.",
"clients.allowAnonymous": "Sta anonieme toegang toe.",
"clients.allowAnonymousHint": "Sta toegang tot de API toe zonder toegangstoken voor alle bronnen die zijn geconfigureerd via de rol van deze client. Geef niet meer dan één client anonieme toegang.",
+ "clients.apiCallsLimit": "Max. API aanroepen",
+ "clients.apiCallsLimitHint": "Beperk het aantal API aanroepen dat deze client per maand kan doen om uw API quota te beschermen voor andere clients die belangrijker zijn.",
"clients.clientIdValidationMessage": "Naam mag alleen letters, cijfers, streepjes en spaties bevatten.",
"clients.clientNamePlaceholder": "Voer de naam van de klant in",
"clients.connect": "Verbinden",
@@ -209,10 +211,12 @@
"common.create": "Maken",
"common.created": "Gemaakt",
"common.date": "Datum",
+ "common.dateTimeEditor.local": "Lokaal",
"common.dateTimeEditor.now": "Nu",
"common.dateTimeEditor.nowTooltip": "Nu gebruiken (UTC)",
"common.dateTimeEditor.today": "Vandaag",
"common.dateTimeEditor.todayTooltip": "Gebruik vandaag (UTC)",
+ "common.dateTimeEditor.utc": "UTC",
"common.delete": "Verwijderen",
"common.description": "Beschrijving",
"common.displayName": "Weergavenaam",
@@ -351,6 +355,7 @@
"contents.loadDataFailed": "Laden van gegevens is mislukt. Laad opnieuw.",
"contents.loadFailed": "Laden van inhoud is mislukt. Laad opnieuw.",
"contents.loadVersionFailed": "Versie van een nieuwe versie is mislukt. Laad opnieuw.",
+ "contents.newStatusFieldDescription": "De nieuwe status van het item.",
"contents.noReference": "- Geen referentie -",
"contents.pendingChangesTextToChange": "Je hebt niet-opgeslagen wijzigingen. \n \n Wanneer je de status wijzigt, raak je ze kwijt. \n \n **Wil je toch doorgaan?**",
"contents.pendingChangesTextToClose": "Je hebt niet-opgeslagen wijzigingen. \n \n Wanneer je de huidige inhoudsweergave sluit, raak je ze kwijt. \n n **Wil je toch doorgaan?**",
diff --git a/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/Filtering.cs b/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/Filtering.cs
new file mode 100644
index 000000000..5f6d502c3
--- /dev/null
+++ b/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/Filtering.cs
@@ -0,0 +1,112 @@
+// ==========================================================================
+// Squidex Headless CMS
+// ==========================================================================
+// Copyright (c) Squidex UG (haftungsbeschraenkt)
+// All rights reserved. Licensed under the MIT license.
+// ==========================================================================
+
+using System.Collections.Generic;
+using MongoDB.Driver;
+
+namespace Squidex.Infrastructure.EventSourcing
+{
+ internal static class Filtering
+ {
+ public static string CreateIndexPath(string property)
+ {
+ return $"Events.Metadata.{property}";
+ }
+
+ public static FilterDefinition ByPosition(StreamPosition streamPosition)
+ {
+ if (streamPosition.IsEndOfCommit)
+ {
+ return Builders.Filter.Gt(x => x.Timestamp, streamPosition.Timestamp);
+ }
+ else
+ {
+ return Builders.Filter.Gte(x => x.Timestamp, streamPosition.Timestamp);
+ }
+ }
+
+ public static FilterDefinition? ByStream(string? streamFilter)
+ {
+ if (StreamFilter.IsAll(streamFilter))
+ {
+ return null;
+ }
+
+ if (streamFilter.Contains("^"))
+ {
+ return Builders.Filter.Regex(x => x.EventStream, streamFilter);
+ }
+ else
+ {
+ return Builders.Filter.Eq(x => x.EventStream, streamFilter);
+ }
+ }
+
+ public static FilterDefinition>? ByChangeInStream(string? streamFilter)
+ {
+ if (StreamFilter.IsAll(streamFilter))
+ {
+ return null;
+ }
+
+ if (streamFilter.Contains("^"))
+ {
+ return Builders>.Filter.Regex(x => x.FullDocument.EventStream, streamFilter);
+ }
+ else
+ {
+ return Builders>.Filter.Eq(x => x.FullDocument.EventStream, streamFilter);
+ }
+ }
+
+ public static IEnumerable Filtered(this MongoEventCommit commit, StreamPosition lastPosition)
+ {
+ var eventStreamOffset = commit.EventStreamOffset;
+
+ var commitTimestamp = commit.Timestamp;
+ var commitOffset = 0;
+
+ foreach (var @event in commit.Events)
+ {
+ eventStreamOffset++;
+
+ if (commitOffset > lastPosition.CommitOffset || commitTimestamp > lastPosition.Timestamp)
+ {
+ var eventData = @event.ToEventData();
+ var eventPosition = new StreamPosition(commitTimestamp, commitOffset, commit.Events.Length);
+
+ yield return new StoredEvent(commit.EventStream, eventPosition, eventStreamOffset, eventData);
+ }
+
+ commitOffset++;
+ }
+ }
+
+ public static IEnumerable Filtered(this MongoEventCommit commit, long streamPosition)
+ {
+ var eventStreamOffset = commit.EventStreamOffset;
+
+ var commitTimestamp = commit.Timestamp;
+ var commitOffset = 0;
+
+ foreach (var @event in commit.Events)
+ {
+ eventStreamOffset++;
+
+ if (eventStreamOffset >= streamPosition)
+ {
+ var eventData = @event.ToEventData();
+ var eventPosition = new StreamPosition(commitTimestamp, commitOffset, commit.Events.Length);
+
+ yield return new StoredEvent(commit.EventStream, eventPosition, eventStreamOffset, eventData);
+ }
+
+ commitOffset++;
+ }
+ }
+ }
+}
diff --git a/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore.cs b/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore.cs
index 1cd632a10..4014cee09 100644
--- a/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore.cs
+++ b/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore.cs
@@ -9,6 +9,7 @@ using System.Threading;
using System.Threading.Tasks;
using MongoDB.Bson;
using MongoDB.Driver;
+using MongoDB.Driver.Core.Clusters;
using Squidex.Infrastructure.MongoDb;
namespace Squidex.Infrastructure.EventSourcing
@@ -26,6 +27,13 @@ namespace Squidex.Infrastructure.EventSourcing
get { return Database.GetCollection(CollectionName()); }
}
+ public IMongoCollection TypedCollection
+ {
+ get { return Collection; }
+ }
+
+ public bool IsReplicaSet { get; }
+
public MongoEventStore(IMongoDatabase database, IEventNotifier notifier)
: base(database)
{
diff --git a/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStoreSubscription.cs b/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStoreSubscription.cs
new file mode 100644
index 000000000..be312f7cf
--- /dev/null
+++ b/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStoreSubscription.cs
@@ -0,0 +1,169 @@
+// ==========================================================================
+// Squidex Headless CMS
+// ==========================================================================
+// Copyright (c) Squidex UG (haftungsbeschraenkt)
+// All rights reserved. Licensed under the MIT license.
+// ==========================================================================
+
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using MongoDB.Bson;
+using MongoDB.Driver;
+using NodaTime;
+
+namespace Squidex.Infrastructure.EventSourcing
+{
+ public sealed class MongoEventStoreSubscription : IEventSubscription
+ {
+ private readonly MongoEventStore eventStore;
+ private readonly IEventSubscriber eventSubscriber;
+ private readonly CancellationTokenSource stopToken = new CancellationTokenSource();
+ private readonly Task task;
+
+ public MongoEventStoreSubscription(MongoEventStore eventStore, IEventSubscriber eventSubscriber, string? streamFilter, string? position)
+ {
+ this.eventStore = eventStore;
+ this.eventSubscriber = eventSubscriber;
+
+ task = QueryAsync(streamFilter, position);
+ }
+
+ private async Task QueryAsync(string? streamFilter, string? position)
+ {
+ try
+ {
+ string? lastRawPosition = null;
+
+ try
+ {
+ lastRawPosition = await QueryOldAsync(streamFilter, position);
+ }
+ catch (OperationCanceledException)
+ {
+ }
+
+ if (!stopToken.IsCancellationRequested)
+ {
+ await QueryCurrentAsync(streamFilter, lastRawPosition);
+ }
+ }
+ catch (Exception ex)
+ {
+ await eventSubscriber.OnErrorAsync(this, ex);
+ }
+ }
+
+ private async Task QueryCurrentAsync(string? streamFilter, StreamPosition lastPosition)
+ {
+ BsonDocument? resumeToken = null;
+
+ var start =
+ lastPosition.Timestamp.Timestamp > 0 ?
+ lastPosition.Timestamp.Timestamp - 30 :
+ SystemClock.Instance.GetCurrentInstant().Minus(Duration.FromSeconds(30)).ToUnixTimeSeconds();
+
+ var changePipeline = Match(streamFilter);
+ var changeStart = new BsonTimestamp((int)start, 0);
+
+ while (!stopToken.IsCancellationRequested)
+ {
+ var changeOptions = new ChangeStreamOptions();
+
+ if (resumeToken != null)
+ {
+ changeOptions.StartAfter = resumeToken;
+ }
+ else
+ {
+ changeOptions.StartAtOperationTime = changeStart;
+ }
+
+ using (var cursor = eventStore.TypedCollection.Watch(changePipeline, changeOptions, stopToken.Token))
+ {
+ var isRead = false;
+
+ await cursor.ForEachAsync(async change =>
+ {
+ if (change.OperationType == ChangeStreamOperationType.Insert)
+ {
+ foreach (var storedEvent in change.FullDocument.Filtered(lastPosition))
+ {
+ await eventSubscriber.OnEventAsync(this, storedEvent);
+ }
+ }
+
+ isRead = true;
+ }, stopToken.Token);
+
+ resumeToken = cursor.GetResumeToken();
+
+ if (!isRead)
+ {
+ await Task.Delay(1000);
+ }
+ }
+ }
+ }
+
+ private async Task QueryOldAsync(string? streamFilter, string? position)
+ {
+ string? lastRawPosition = null;
+
+ using (var cts = new CancellationTokenSource())
+ {
+ using (var combined = CancellationTokenSource.CreateLinkedTokenSource(cts.Token, stopToken.Token))
+ {
+ await eventStore.QueryAsync(async storedEvent =>
+ {
+ var now = SystemClock.Instance.GetCurrentInstant();
+
+ var timeToNow = now - storedEvent.Data.Headers.Timestamp();
+
+ if (timeToNow <= Duration.FromMinutes(5))
+ {
+ cts.Cancel();
+ }
+ else
+ {
+ await eventSubscriber.OnEventAsync(this, storedEvent);
+
+ lastRawPosition = storedEvent.EventPosition;
+ }
+ }, streamFilter, position, combined.Token);
+ }
+ }
+
+ return lastRawPosition;
+ }
+
+ private static PipelineDefinition, ChangeStreamDocument>? Match(string? streamFilter)
+ {
+ var result = new EmptyPipelineDefinition>();
+
+ var byStream = Filtering.ByChangeInStream(streamFilter);
+
+ if (byStream != null)
+ {
+ var filterBuilder = Builders>.Filter;
+
+ var filter = filterBuilder.Or(filterBuilder.Ne(x => x.OperationType, ChangeStreamOperationType.Insert), byStream);
+
+ return result.Match(filter);
+ }
+
+ return result;
+ }
+
+ public Task StopAsync()
+ {
+ stopToken.Cancel();
+
+ return task;
+ }
+
+ public void WakeUp()
+ {
+ }
+ }
+}
diff --git a/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Reader.cs b/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Reader.cs
index c1c1458e6..7343a9000 100644
--- a/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Reader.cs
+++ b/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Reader.cs
@@ -11,24 +11,41 @@ using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using MongoDB.Driver;
-using Squidex.Infrastructure.Json.Objects;
using Squidex.Infrastructure.Log;
using Squidex.Infrastructure.MongoDb;
using EventFilter = MongoDB.Driver.FilterDefinition;
namespace Squidex.Infrastructure.EventSourcing
{
- public delegate bool EventPredicate(EventData data);
+ public delegate bool EventPredicate(MongoEvent data);
public partial class MongoEventStore : MongoRepositoryBase, IEventStore
{
- private static readonly IReadOnlyList EmptyEvents = new List();
+ private static readonly List EmptyEvents = new List();
+
+ public Task CreateIndexAsync(string property)
+ {
+ Guard.NotNullOrEmpty(property, nameof(property));
+
+ return Collection.Indexes.CreateOneAsync(
+ new CreateIndexModel(
+ Index
+ .Ascending(CreateIndexPath(property))
+ .Ascending(TimestampField)));
+ }
public IEventSubscription CreateSubscription(IEventSubscriber subscriber, string? streamFilter = null, string? position = null)
{
Guard.NotNull(subscriber, nameof(subscriber));
- return new PollingSubscription(this, subscriber, streamFilter, position);
+ if (IsReplicaSet)
+ {
+ return new MongoEventStoreSubscription(this, subscriber, streamFilter, position);
+ }
+ else
+ {
+ return new PollingSubscription(this, subscriber, streamFilter, position);
+ }
}
public async Task> QueryLatestAsync(string streamName, int count)
@@ -51,20 +68,7 @@ namespace Squidex.Infrastructure.EventSourcing
foreach (var commit in commits)
{
- var eventStreamOffset = (int)commit.EventStreamOffset;
-
- var commitTimestamp = commit.Timestamp;
- var commitOffset = 0;
-
- foreach (var @event in commit.Events)
- {
- eventStreamOffset++;
-
- var eventData = @event.ToEventData();
- var eventToken = new StreamPosition(commitTimestamp, commitOffset, commit.Events.Length);
-
- result.Add(new StoredEvent(streamName, eventToken, eventStreamOffset, eventData));
- }
+ result.AddRange(commit.Filtered(long.MinValue));
}
IEnumerable ordered = result.OrderBy(x => x.EventStreamNumber);
@@ -95,64 +99,33 @@ namespace Squidex.Infrastructure.EventSourcing
foreach (var commit in commits)
{
- var eventStreamOffset = (int)commit.EventStreamOffset;
-
- var commitTimestamp = commit.Timestamp;
- var commitOffset = 0;
-
- foreach (var @event in commit.Events)
- {
- eventStreamOffset++;
-
- if (eventStreamOffset >= streamPosition)
- {
- var eventData = @event.ToEventData();
- var eventToken = new StreamPosition(commitTimestamp, commitOffset, commit.Events.Length);
-
- result.Add(new StoredEvent(streamName, eventToken, eventStreamOffset, eventData));
- }
- }
+ result.AddRange(commit.Filtered(streamPosition));
}
return result;
}
}
- public async Task QueryAsync(Func callback, string? streamFilter = null, string? position = null, CancellationToken ct = default)
+ public Task QueryAsync(Func callback, string? streamFilter = null, string? position = null, CancellationToken ct = default)
{
Guard.NotNull(callback, nameof(callback));
StreamPosition lastPosition = position;
var filterDefinition = CreateFilter(streamFilter, lastPosition);
- var filterExpression = CreateFilterExpression(null, null);
+ return QueryAsync(callback, lastPosition, filterDefinition, ct);
+ }
+
+ private async Task QueryAsync(Func callback, StreamPosition position, EventFilter filter, CancellationToken ct = default)
+ {
using (Profiler.TraceMethod())
{
- await Collection.Find(filterDefinition, options: Batching.Options).Sort(Sort.Ascending(TimestampField)).ForEachPipelineAsync(async commit =>
+ await Collection.Find(filter, options: Batching.Options).Sort(Sort.Ascending(TimestampField)).ForEachPipelineAsync(async commit =>
{
- var eventStreamOffset = (int)commit.EventStreamOffset;
-
- var commitTimestamp = commit.Timestamp;
- var commitOffset = 0;
-
- foreach (var @event in commit.Events)
+ foreach (var @event in commit.Filtered(position))
{
- eventStreamOffset++;
-
- if (commitOffset > lastPosition.CommitOffset || commitTimestamp > lastPosition.Timestamp)
- {
- var eventData = @event.ToEventData();
-
- if (filterExpression(eventData))
- {
- var eventToken = new StreamPosition(commitTimestamp, commitOffset, commit.Events.Length);
-
- await callback(new StoredEvent(commit.EventStream, eventToken, eventStreamOffset, eventData));
- }
- }
-
- commitOffset++;
+ await callback(@event);
}
}, ct);
}
@@ -160,53 +133,20 @@ namespace Squidex.Infrastructure.EventSourcing
private static EventFilter CreateFilter(string? streamFilter, StreamPosition streamPosition)
{
- var filters = new List();
-
- AppendByPosition(streamPosition, filters);
- AppendByStream(streamFilter, filters);
-
- return Filter.And(filters);
- }
+ var byPosition = Filtering.ByPosition(streamPosition);
+ var byStream = Filtering.ByStream(streamFilter);
- private static void AppendByStream(string? streamFilter, List filters)
- {
- if (!StreamFilter.IsAll(streamFilter))
+ if (byStream != null)
{
- if (streamFilter.Contains("^"))
- {
- filters.Add(Filter.Regex(EventStreamField, streamFilter));
- }
- else
- {
- filters.Add(Filter.Eq(EventStreamField, streamFilter));
- }
+ return Filter.And(byPosition, byStream);
}
- }
- private static void AppendByPosition(StreamPosition streamPosition, List filters)
- {
- if (streamPosition.IsEndOfCommit)
- {
- filters.Add(Filter.Gt(TimestampField, streamPosition.Timestamp));
- }
- else
- {
- filters.Add(Filter.Gte(TimestampField, streamPosition.Timestamp));
- }
+ return byPosition;
}
- private static EventPredicate CreateFilterExpression(string? property, object? value)
+ private static string CreateIndexPath(string property)
{
- if (!string.IsNullOrWhiteSpace(property))
- {
- var jsonValue = JsonValue.Create(value);
-
- return x => x.Headers.TryGetValue(property, out var p) && p.Equals(jsonValue);
- }
- else
- {
- return x => true;
- }
+ return $"Events.Metadata.{property}";
}
}
}
\ No newline at end of file
diff --git a/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Writer.cs b/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Writer.cs
index 525414278..2026f83a5 100644
--- a/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Writer.cs
+++ b/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Writer.cs
@@ -64,7 +64,10 @@ namespace Squidex.Infrastructure.EventSourcing
{
await Collection.InsertOneAsync(commit);
- notifier.NotifyEventsStored(streamName);
+ if (!IsReplicaSet)
+ {
+ notifier.NotifyEventsStored(streamName);
+ }
return;
}
diff --git a/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/StreamPosition.cs b/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/StreamPosition.cs
index e6e946c67..7b0353c7d 100644
--- a/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/StreamPosition.cs
+++ b/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/StreamPosition.cs
@@ -16,9 +16,7 @@ namespace Squidex.Infrastructure.EventSourcing
private static readonly ObjectPool StringBuilderPool =
new DefaultObjectPool(new StringBuilderPooledObjectPolicy());
- private static readonly BsonTimestamp EmptyTimestamp = new BsonTimestamp(0, 0);
-
- public static readonly StreamPosition Empty = new StreamPosition(EmptyTimestamp, -1, -1);
+ public static readonly StreamPosition Empty = new StreamPosition(new BsonTimestamp(0, 0), -1, -1);
public BsonTimestamp Timestamp { get; }
@@ -26,10 +24,7 @@ namespace Squidex.Infrastructure.EventSourcing
public long CommitSize { get; }
- public bool IsEndOfCommit
- {
- get { return CommitOffset == CommitSize - 1; }
- }
+ public bool IsEndOfCommit { get; }
public StreamPosition(BsonTimestamp timestamp, long commitOffset, long commitSize)
{
@@ -37,6 +32,8 @@ namespace Squidex.Infrastructure.EventSourcing
CommitOffset = commitOffset;
CommitSize = commitSize;
+
+ IsEndOfCommit = CommitOffset == CommitSize - 1;
}
public static implicit operator string(StreamPosition position)
diff --git a/backend/src/Squidex.Infrastructure/Caching/LRUCache.cs b/backend/src/Squidex.Infrastructure/Caching/LRUCache.cs
index de4de831f..cfb1236d5 100644
--- a/backend/src/Squidex.Infrastructure/Caching/LRUCache.cs
+++ b/backend/src/Squidex.Infrastructure/Caching/LRUCache.cs
@@ -18,6 +18,16 @@ namespace Squidex.Infrastructure.Caching
private readonly int capacity;
private readonly Action itemEvicted;
+ public int Count
+ {
+ get { return cacheMap.Count; }
+ }
+
+ public IEnumerable Keys
+ {
+ get { return cacheMap.Keys; }
+ }
+
public LRUCache(int capacity, Action? itemEvicted = null)
{
Guard.GreaterThan(capacity, 0, nameof(capacity));
@@ -27,6 +37,12 @@ namespace Squidex.Infrastructure.Caching
this.itemEvicted = itemEvicted ?? ((key, value) => { });
}
+ public void Clear()
+ {
+ cacheHistory.Clear();
+ cacheMap.Clear();
+ }
+
public bool Set(TKey key, TValue value)
{
if (cacheMap.TryGetValue(key, out var node))
diff --git a/backend/src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerGrain.cs b/backend/src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerGrain.cs
index eb5b7d2d6..0321a2d61 100644
--- a/backend/src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerGrain.cs
+++ b/backend/src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerGrain.cs
@@ -7,6 +7,7 @@
using System;
using System.Runtime.CompilerServices;
+using System.Threading;
using System.Threading.Tasks;
using Orleans;
using Orleans.Concurrency;
@@ -261,10 +262,11 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
private void Unsubscribe()
{
- if (currentSubscription != null)
+ var subscription = Interlocked.Exchange(ref currentSubscription, null);
+
+ if (subscription != null)
{
- currentSubscription.StopAsync().Forget();
- currentSubscription = null;
+ subscription.StopAsync().Forget();
}
}
@@ -309,9 +311,14 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
return new RetrySubscription(store, subscriber, filter, position);
}
+ protected virtual TaskScheduler GetScheduler()
+ {
+ return scheduler!;
+ }
+
private IEventSubscription CreateSubscription(string streamFilter, string? position)
{
- return CreateSubscription(eventStore, new WrapperSubscription(GetSelf(), scheduler!), streamFilter, position);
+ return CreateSubscription(eventStore, new WrapperSubscription(GetSelf(), GetScheduler()), streamFilter, position);
}
}
}
\ No newline at end of file
diff --git a/backend/src/Squidex.Infrastructure/EventSourcing/PollingSubscription.cs b/backend/src/Squidex.Infrastructure/EventSourcing/PollingSubscription.cs
index 13ef44fa0..8cf9a7fb3 100644
--- a/backend/src/Squidex.Infrastructure/EventSourcing/PollingSubscription.cs
+++ b/backend/src/Squidex.Infrastructure/EventSourcing/PollingSubscription.cs
@@ -37,10 +37,7 @@ namespace Squidex.Infrastructure.EventSourcing
}
catch (Exception ex)
{
- if (!ex.Is())
- {
- await eventSubscriber.OnErrorAsync(this, ex);
- }
+ await eventSubscriber.OnErrorAsync(this, ex);
}
});
}
diff --git a/backend/src/Squidex.Infrastructure/EventSourcing/RetrySubscription.cs b/backend/src/Squidex.Infrastructure/EventSourcing/RetrySubscription.cs
index 7cc7bd767..846e21cde 100644
--- a/backend/src/Squidex.Infrastructure/EventSourcing/RetrySubscription.cs
+++ b/backend/src/Squidex.Infrastructure/EventSourcing/RetrySubscription.cs
@@ -50,8 +50,12 @@ namespace Squidex.Infrastructure.EventSourcing
private void Unsubscribe()
{
- currentSubscription?.StopAsync().Forget();
- currentSubscription = null;
+ var subscription = Interlocked.Exchange(ref currentSubscription, null);
+
+ if (subscription != null)
+ {
+ subscription.StopAsync().Forget();
+ }
}
public void WakeUp()
diff --git a/backend/src/Squidex.Infrastructure/Json/Newtonsoft/JsonValueConverter.cs b/backend/src/Squidex.Infrastructure/Json/Newtonsoft/JsonValueConverter.cs
index 206c8ca5c..994e434cb 100644
--- a/backend/src/Squidex.Infrastructure/Json/Newtonsoft/JsonValueConverter.cs
+++ b/backend/src/Squidex.Infrastructure/Json/Newtonsoft/JsonValueConverter.cs
@@ -81,14 +81,14 @@ namespace Squidex.Infrastructure.Json.Newtonsoft
switch (reader.TokenType)
{
case JsonToken.Comment:
- break;
+ continue;
+ case JsonToken.EndArray:
+ return result;
default:
var value = ReadJson(reader);
result.Add(value);
break;
- case JsonToken.EndArray:
- return result;
}
}
diff --git a/backend/tests/Squidex.Infrastructure.Tests/Caching/LRUCacheTests.cs b/backend/tests/Squidex.Infrastructure.Tests/Caching/LRUCacheTests.cs
index 5c1530c47..8f765f076 100644
--- a/backend/tests/Squidex.Infrastructure.Tests/Caching/LRUCacheTests.cs
+++ b/backend/tests/Squidex.Infrastructure.Tests/Caching/LRUCacheTests.cs
@@ -28,7 +28,20 @@ namespace Squidex.Infrastructure.Caching
}
[Fact]
- public void Should_remove_old_items_when_capacity_reached()
+ public void Should_clear_items()
+ {
+ sut.Set("1", 1);
+ sut.Set("2", 2);
+
+ Assert.Equal(2, sut.Count);
+
+ sut.Clear();
+
+ Assert.Equal(0, sut.Count);
+ }
+
+ [Fact]
+ public void Should_remove_old_items_whentC_capacity_reached()
{
for (var i = 0; i < 15; i++)
{
diff --git a/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/EventStoreTests.cs b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/EventStoreTests.cs
index 57c13c0e2..475ec769e 100644
--- a/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/EventStoreTests.cs
+++ b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/EventStoreTests.cs
@@ -179,8 +179,8 @@ namespace Squidex.Infrastructure.EventSourcing
new StoredEvent(streamName, "Position", 3, events2[1])
};
- ShouldBeEquivalentTo(readEventsFromPosition, expectedFromPosition);
- ShouldBeEquivalentTo(readEventsFromBeginning, expectedFromBeginning);
+ ShouldBeEquivalentTo(readEventsFromPosition.TakeLast(2), expectedFromPosition);
+ ShouldBeEquivalentTo(readEventsFromBeginning.TakeLast(4), expectedFromBeginning);
}
[Fact]
diff --git a/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/Grains/EventConsumerGrainTests.cs b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/Grains/EventConsumerGrainTests.cs
index 55d81c81c..c7e7e00af 100644
--- a/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/Grains/EventConsumerGrainTests.cs
+++ b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/Grains/EventConsumerGrainTests.cs
@@ -89,7 +89,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
await sut.ActivateAsync(consumerName);
await sut.ActivateAsync();
- grainState.Value.Should().BeEquivalentTo(new EventConsumerState { IsStopped = true, Position = initialPosition, Error = null });
+ AssetGrainState(new EventConsumerState { IsStopped = true, Position = initialPosition, Error = null });
A.CallTo(() => eventStore.CreateSubscription(A._, A._, A._))
.MustNotHaveHappened();
@@ -101,7 +101,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
await sut.ActivateAsync(consumerName);
await sut.ActivateAsync();
- grainState.Value.Should().BeEquivalentTo(new EventConsumerState { IsStopped = false, Position = initialPosition, Error = null });
+ AssetGrainState(new EventConsumerState { IsStopped = false, Position = initialPosition, Error = null });
A.CallTo(() => eventStore.CreateSubscription(A._, A._, A._))
.MustHaveHappened(1, Times.Exactly);
@@ -115,7 +115,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
await sut.ActivateAsync(consumerName);
await sut.ActivateAsync();
- grainState.Value.Should().BeEquivalentTo(new EventConsumerState { IsStopped = false, Position = initialPosition, Error = null });
+ AssetGrainState(new EventConsumerState { IsStopped = false, Position = initialPosition, Error = null });
A.CallTo(() => eventStore.CreateSubscription(A._, A._, A._))
.MustHaveHappened(1, Times.Exactly);
@@ -127,7 +127,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
await sut.ActivateAsync(consumerName);
await sut.ActivateAsync();
- grainState.Value.Should().BeEquivalentTo(new EventConsumerState { IsStopped = false, Position = initialPosition, Error = null });
+ AssetGrainState(new EventConsumerState { IsStopped = false, Position = initialPosition, Error = null });
A.CallTo(() => eventStore.CreateSubscription(A._, A._, A._))
.MustHaveHappened(1, Times.Exactly);
@@ -141,7 +141,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
await sut.StopAsync();
await sut.StopAsync();
- grainState.Value.Should().BeEquivalentTo(new EventConsumerState { IsStopped = true, Position = initialPosition, Error = null });
+ AssetGrainState(new EventConsumerState { IsStopped = true, Position = initialPosition, Error = null });
A.CallTo(() => grainState.WriteAsync())
.MustHaveHappened(1, Times.Exactly);
@@ -158,7 +158,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
await sut.StopAsync();
await sut.ResetAsync();
- grainState.Value.Should().BeEquivalentTo(new EventConsumerState { IsStopped = false, Position = null, Error = null });
+ AssetGrainState(new EventConsumerState { IsStopped = false, Position = null, Error = null });
A.CallTo(() => grainState.WriteAsync())
.MustHaveHappened(2, Times.Exactly);
@@ -186,7 +186,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
await OnEventAsync(eventSubscription, @event);
- grainState.Value.Should().BeEquivalentTo(new EventConsumerState { IsStopped = false, Position = @event.EventPosition, Error = null });
+ AssetGrainState(new EventConsumerState { IsStopped = false, Position = @event.EventPosition, Error = null });
A.CallTo(() => grainState.WriteAsync())
.MustHaveHappened(1, Times.Exactly);
@@ -208,7 +208,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
await OnEventAsync(eventSubscription, @event);
- grainState.Value.Should().BeEquivalentTo(new EventConsumerState { IsStopped = false, Position = @event.EventPosition, Error = null });
+ AssetGrainState(new EventConsumerState { IsStopped = false, Position = @event.EventPosition, Error = null });
A.CallTo(() => grainState.WriteAsync())
.MustHaveHappened(1, Times.Exactly);
@@ -230,7 +230,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
await OnEventAsync(eventSubscription, @event);
- grainState.Value.Should().BeEquivalentTo(new EventConsumerState { IsStopped = false, Position = @event.EventPosition, Error = null });
+ AssetGrainState(new EventConsumerState { IsStopped = false, Position = @event.EventPosition, Error = null });
A.CallTo(() => grainState.WriteAsync())
.MustHaveHappened(1, Times.Exactly);
@@ -249,7 +249,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
await OnEventAsync(A.Fake(), @event);
- grainState.Value.Should().BeEquivalentTo(new EventConsumerState { IsStopped = false, Position = initialPosition, Error = null });
+ AssetGrainState(new EventConsumerState { IsStopped = false, Position = initialPosition, Error = null });
A.CallTo(() => eventConsumer.On(envelope))
.MustNotHaveHappened();
@@ -265,7 +265,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
await OnErrorAsync(eventSubscription, ex);
- grainState.Value.Should().BeEquivalentTo(new EventConsumerState { IsStopped = true, Position = initialPosition, Error = ex.ToString() });
+ AssetGrainState(new EventConsumerState { IsStopped = true, Position = initialPosition, Error = ex.ToString() });
A.CallTo(() => grainState.WriteAsync())
.MustHaveHappened(1, Times.Exactly);
@@ -284,7 +284,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
await OnErrorAsync(A.Fake(), ex);
- grainState.Value.Should().BeEquivalentTo(new EventConsumerState { IsStopped = false, Position = initialPosition, Error = null });
+ AssetGrainState(new EventConsumerState { IsStopped = false, Position = initialPosition, Error = null });
A.CallTo(() => grainState.WriteAsync())
.MustNotHaveHappened();
@@ -313,7 +313,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
await sut.ActivateAsync();
await sut.ResetAsync();
- grainState.Value.Should().BeEquivalentTo(new EventConsumerState { IsStopped = true, Position = initialPosition, Error = ex.ToString() });
+ AssetGrainState(new EventConsumerState { IsStopped = true, Position = initialPosition, Error = ex.ToString() });
A.CallTo(() => grainState.WriteAsync())
.MustHaveHappened(1, Times.Exactly);
@@ -337,7 +337,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
await OnEventAsync(eventSubscription, @event);
- grainState.Value.Should().BeEquivalentTo(new EventConsumerState { IsStopped = true, Position = initialPosition, Error = ex.ToString() });
+ AssetGrainState(new EventConsumerState { IsStopped = true, Position = initialPosition, Error = ex.ToString() });
A.CallTo(() => eventConsumer.On(envelope))
.MustHaveHappened();
@@ -364,7 +364,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
await OnEventAsync(eventSubscription, @event);
- grainState.Value.Should().BeEquivalentTo(new EventConsumerState { IsStopped = true, Position = initialPosition, Error = ex.ToString() });
+ AssetGrainState(new EventConsumerState { IsStopped = true, Position = initialPosition, Error = ex.ToString() });
A.CallTo(() => eventConsumer.On(envelope))
.MustNotHaveHappened();
@@ -395,7 +395,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
await sut.StartAsync();
await sut.StartAsync();
- grainState.Value.Should().BeEquivalentTo(new EventConsumerState { IsStopped = false, Position = initialPosition, Error = null });
+ AssetGrainState(new EventConsumerState { IsStopped = false, Position = initialPosition, Error = null });
A.CallTo(() => eventConsumer.On(envelope))
.MustHaveHappened();
@@ -419,5 +419,10 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
{
return sut.OnEventAsync(subscriber.AsImmutable(), ev.AsImmutable());
}
+
+ private void AssetGrainState(EventConsumerState state)
+ {
+ grainState.Value.Should().BeEquivalentTo(state);
+ }
}
}
\ No newline at end of file
diff --git a/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/MongoEventStoreFixture.cs b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/MongoEventStoreFixture.cs
index 6e77f0abd..bef26dd3b 100644
--- a/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/MongoEventStoreFixture.cs
+++ b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/MongoEventStoreFixture.cs
@@ -14,17 +14,20 @@ using Squidex.Infrastructure.TestHelpers;
namespace Squidex.Infrastructure.EventSourcing
{
- public sealed class MongoEventStoreFixture : IDisposable
+ public abstract class MongoEventStoreFixture : IDisposable
{
- private readonly IMongoClient mongoClient = new MongoClient("mongodb://localhost");
+ private readonly IMongoClient mongoClient;
private readonly IMongoDatabase mongoDatabase;
private readonly IEventNotifier notifier = A.Fake();
public MongoEventStore EventStore { get; }
- public MongoEventStoreFixture()
+ protected MongoEventStoreFixture(string connectionString)
{
- mongoDatabase = mongoClient.GetDatabase("EventStoreTest");
+ mongoClient = new MongoClient(connectionString);
+ mongoDatabase = mongoClient.GetDatabase($"EventStoreTest");
+
+ Dispose();
BsonJsonConvention.Register(JsonSerializer.Create(JsonHelper.DefaultSettings()));
@@ -37,4 +40,20 @@ namespace Squidex.Infrastructure.EventSourcing
mongoClient.DropDatabase("EventStoreTest");
}
}
+
+ public sealed class MongoEventStoreDirectFixture : MongoEventStoreFixture
+ {
+ public MongoEventStoreDirectFixture()
+ : base("mongodb://localhost:27019")
+ {
+ }
+ }
+
+ public sealed class MongoEventStoreReplicaSetFixture : MongoEventStoreFixture
+ {
+ public MongoEventStoreReplicaSetFixture()
+ : base("mongodb://localhost:27017")
+ {
+ }
+ }
}
diff --git a/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/MongoEventStoreTests.cs b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/MongoEventStoreTests_Direct.cs
similarity index 80%
rename from backend/tests/Squidex.Infrastructure.Tests/EventSourcing/MongoEventStoreTests.cs
rename to backend/tests/Squidex.Infrastructure.Tests/EventSourcing/MongoEventStoreTests_Direct.cs
index 4463fae7c..8dedc674b 100644
--- a/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/MongoEventStoreTests.cs
+++ b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/MongoEventStoreTests_Direct.cs
@@ -12,13 +12,13 @@ using Xunit;
namespace Squidex.Infrastructure.EventSourcing
{
[Trait("Category", "Dependencies")]
- public class MongoEventStoreTests : EventStoreTests, IClassFixture
+ public class MongoEventStoreTests_Direct : EventStoreTests, IClassFixture
{
public MongoEventStoreFixture _ { get; }
protected override int SubscriptionDelayInMs { get; } = 1000;
- public MongoEventStoreTests(MongoEventStoreFixture fixture)
+ public MongoEventStoreTests_Direct(MongoEventStoreDirectFixture fixture)
{
_ = fixture;
}
diff --git a/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/MongoEventStoreTests_ReplicaSet.cs b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/MongoEventStoreTests_ReplicaSet.cs
new file mode 100644
index 000000000..7250ad7e0
--- /dev/null
+++ b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/MongoEventStoreTests_ReplicaSet.cs
@@ -0,0 +1,31 @@
+// ==========================================================================
+// Squidex Headless CMS
+// ==========================================================================
+// Copyright (c) Squidex UG (haftungsbeschraenkt)
+// All rights reserved. Licensed under the MIT license.
+// ==========================================================================
+
+using Xunit;
+
+#pragma warning disable SA1300 // Element should begin with upper-case letter
+
+namespace Squidex.Infrastructure.EventSourcing
+{
+ [Trait("Category", "Dependencies")]
+ public class MongoEventStoreTests_ReplicaSet : EventStoreTests, IClassFixture
+ {
+ public MongoEventStoreFixture _ { get; }
+
+ protected override int SubscriptionDelayInMs { get; } = 1000;
+
+ public MongoEventStoreTests_ReplicaSet(MongoEventStoreReplicaSetFixture fixture)
+ {
+ _ = fixture;
+ }
+
+ public override MongoEventStore CreateStore()
+ {
+ return _.EventStore;
+ }
+ }
+}
diff --git a/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/MongoParallelInsertTests.cs b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/MongoParallelInsertTests.cs
index 42cd67a36..f527bcb4d 100644
--- a/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/MongoParallelInsertTests.cs
+++ b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/MongoParallelInsertTests.cs
@@ -7,12 +7,16 @@
using System;
using System.Collections.Generic;
+using System.Linq;
+using System.Threading;
using System.Threading.Tasks;
using FakeItEasy;
+using Orleans.Internal;
using Squidex.Infrastructure.EventSourcing.Grains;
using Squidex.Infrastructure.Log;
using Squidex.Infrastructure.Orleans;
using Squidex.Infrastructure.Reflection;
+using Squidex.Infrastructure.Tasks;
using Squidex.Infrastructure.TestHelpers;
using Xunit;
@@ -21,7 +25,7 @@ using Xunit;
namespace Squidex.Infrastructure.EventSourcing
{
[Trait("Category", "Dependencies")]
- public sealed class MongoParallelInsertTests : IClassFixture
+ public sealed class MongoParallelInsertTests : IClassFixture
{
private readonly IGrainState grainState = A.Fake>();
private readonly ISemanticLog log = A.Fake();
@@ -29,8 +33,128 @@ namespace Squidex.Infrastructure.EventSourcing
public MongoEventStoreFixture _ { get; }
+ public class LimitedConcurrencyLevelTaskScheduler : TaskScheduler
+ {
+ [ThreadStatic]
+ private static bool currentThreadIsProcessingItems;
+
+ private readonly LinkedList tasks = new LinkedList();
+ private readonly int maxDegreeOfParallelism;
+ private int delegatesQueuedOrRunning;
+
+ public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism)
+ {
+ this.maxDegreeOfParallelism = maxDegreeOfParallelism;
+ }
+
+ protected sealed override void QueueTask(Task task)
+ {
+ lock (tasks)
+ {
+ tasks.AddLast(task);
+
+ if (delegatesQueuedOrRunning < maxDegreeOfParallelism)
+ {
+ ++delegatesQueuedOrRunning;
+
+ NotifyThreadPoolOfPendingWork();
+ }
+ }
+ }
+
+ private void NotifyThreadPoolOfPendingWork()
+ {
+ ThreadPool.UnsafeQueueUserWorkItem(_ =>
+ {
+ currentThreadIsProcessingItems = true;
+ try
+ {
+ while (true)
+ {
+ Task item;
+ lock (tasks)
+ {
+ if (tasks.Count == 0)
+ {
+ --delegatesQueuedOrRunning;
+ break;
+ }
+
+ item = tasks.First!.Value;
+
+ tasks.RemoveFirst();
+ }
+
+ TryExecuteTask(item);
+ }
+ }
+ finally
+ {
+ currentThreadIsProcessingItems = false;
+ }
+ }, null);
+ }
+
+ protected sealed override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
+ {
+ if (!currentThreadIsProcessingItems)
+ {
+ return false;
+ }
+
+ if (taskWasPreviouslyQueued)
+ {
+ TryDequeue(task);
+ }
+
+ return TryExecuteTask(task);
+ }
+
+ protected sealed override bool TryDequeue(Task task)
+ {
+ lock (tasks)
+ {
+ return tasks.Remove(task);
+ }
+ }
+
+ public sealed override int MaximumConcurrencyLevel
+ {
+ get { return maxDegreeOfParallelism; }
+ }
+
+ protected sealed override IEnumerable GetScheduledTasks()
+ {
+ bool lockTaken = false;
+ try
+ {
+ Monitor.TryEnter(tasks, ref lockTaken);
+
+ if (lockTaken)
+ {
+ return tasks.ToArray();
+ }
+ else
+ {
+ throw new NotSupportedException();
+ }
+ }
+ finally
+ {
+ if (lockTaken)
+ {
+ Monitor.Exit(tasks);
+ }
+ }
+ }
+ }
+
public sealed class MyEventConsumerGrain : EventConsumerGrain
{
+ private readonly TaskScheduler scheduler = new LimitedConcurrencyLevelTaskScheduler(1);
+
+ public TaskScheduler Scheduler => scheduler;
+
public MyEventConsumerGrain(
EventConsumerFactory eventConsumerFactory,
IGrainState state,
@@ -46,6 +170,11 @@ namespace Squidex.Infrastructure.EventSourcing
return this;
}
+ protected override TaskScheduler GetScheduler()
+ {
+ return scheduler;
+ }
+
protected override IEventSubscription CreateSubscription(IEventStore store, IEventSubscriber subscriber, string? filter, string? position)
{
return store.CreateSubscription(subscriber, filter, position);
@@ -62,6 +191,8 @@ namespace Squidex.Infrastructure.EventSourcing
private readonly TaskCompletionSource tcs = new TaskCompletionSource();
private readonly int expectedCount;
+ public Func EventReceived { get; set; }
+
public string Name => "Test";
public string EventsFilter => ".*";
@@ -85,22 +216,25 @@ namespace Squidex.Infrastructure.EventSourcing
return true;
}
- public Task On(Envelope @event)
+ public async Task On(Envelope @event)
{
Received++;
- uniqueReceivedEvents.Add(@event.Headers.CommitId());
+ uniqueReceivedEvents.Add(@event.Headers.EventId());
if (uniqueReceivedEvents.Count == expectedCount)
{
tcs.TrySetResult(true);
}
- return Task.CompletedTask;
+ if (EventReceived != null)
+ {
+ await EventReceived(Received);
+ }
}
}
- public MongoParallelInsertTests(MongoEventStoreFixture fixture)
+ public MongoParallelInsertTests(MongoEventStoreReplicaSetFixture fixture)
{
_ = fixture;
@@ -120,6 +254,170 @@ namespace Squidex.Infrastructure.EventSourcing
await consumerGrain.ActivateAsync(consumer.Name);
await consumerGrain.ActivateAsync();
+ Parallel.For(0, 20, x =>
+ {
+ for (var i = 0; i < 500; i++)
+ {
+ var commitId = Guid.NewGuid();
+
+ var data = eventDataFormatter.ToEventData(Envelope.Create(new MyEvent()), commitId);
+
+ _.EventStore.AppendAsync(commitId, commitId.ToString(), new[] { data }).Wait();
+ }
+ });
+
+ await AssertConsumerAsync(expectedEvents, consumer);
+ }
+
+ [Fact]
+ public async Task Should_insert_and_retrieve_parallel_with_multiple_events_per_commit()
+ {
+ var expectedEvents = 10 * 1000;
+
+ var consumer = new MyEventConsumer(expectedEvents);
+ var consumerGrain = new MyEventConsumerGrain(_ => consumer, grainState, _.EventStore, eventDataFormatter, log);
+
+ await consumerGrain.ActivateAsync(consumer.Name);
+ await consumerGrain.ActivateAsync();
+
+ Parallel.For(0, 10, x =>
+ {
+ for (var i = 0; i < 500; i++)
+ {
+ var commitId = Guid.NewGuid();
+
+ var data1 = eventDataFormatter.ToEventData(Envelope.Create(new MyEvent()), commitId);
+ var data2 = eventDataFormatter.ToEventData(Envelope.Create(new MyEvent()), commitId);
+
+ _.EventStore.AppendAsync(commitId, commitId.ToString(), new[] { data1, data2 }).Wait();
+ }
+ });
+
+ await AssertConsumerAsync(expectedEvents, consumer);
+ }
+
+ [Fact]
+ public async Task Should_insert_and_retrieve_afterwards()
+ {
+ var expectedEvents = 10 * 1000;
+
+ var consumer = new MyEventConsumer(expectedEvents);
+ var consumerGrain = new MyEventConsumerGrain(_ => consumer, grainState, _.EventStore, eventDataFormatter, log);
+
+ Parallel.For(0, 10, x =>
+ {
+ for (var i = 0; i < 1000; i++)
+ {
+ var commitId = Guid.NewGuid();
+
+ var data = eventDataFormatter.ToEventData(Envelope.Create(new MyEvent()), commitId);
+
+ _.EventStore.AppendAsync(commitId, commitId.ToString(), new[] { data }).Wait();
+ }
+ });
+
+ await consumerGrain.ActivateAsync(consumer.Name);
+ await consumerGrain.ActivateAsync();
+
+ await AssertConsumerAsync(expectedEvents, consumer);
+ }
+
+ [Fact]
+ public async Task Should_insert_and_retrieve_partially_afterwards()
+ {
+ var expectedEvents = 10 * 1000;
+
+ var consumer = new MyEventConsumer(expectedEvents);
+ var consumerGrain = new MyEventConsumerGrain(_ => consumer, grainState, _.EventStore, eventDataFormatter, log);
+
+ Parallel.For(0, 10, x =>
+ {
+ for (var i = 0; i < 500; i++)
+ {
+ var commitId = Guid.NewGuid();
+
+ var data = eventDataFormatter.ToEventData(Envelope.Create(new MyEvent()), commitId);
+
+ _.EventStore.AppendAsync(commitId, commitId.ToString(), new[] { data }).Wait();
+ }
+ });
+
+ await consumerGrain.ActivateAsync(consumer.Name);
+ await consumerGrain.ActivateAsync();
+
+ Parallel.For(0, 10, x =>
+ {
+ for (var i = 0; i < 500; i++)
+ {
+ var commitId = Guid.NewGuid();
+
+ var data = eventDataFormatter.ToEventData(Envelope.Create(new MyEvent()), commitId);
+
+ _.EventStore.AppendAsync(commitId, commitId.ToString(), new[] { data }).Wait();
+ }
+ });
+
+ await AssertConsumerAsync(expectedEvents, consumer);
+ }
+
+ [Fact]
+ public async Task Should_insert_and_retrieve_parallel_with_waits()
+ {
+ var expectedEvents = 10 * 1000;
+
+ var consumer = new MyEventConsumer(expectedEvents);
+ var consumerGrain = new MyEventConsumerGrain(_ => consumer, grainState, _.EventStore, eventDataFormatter, log);
+
+ await consumerGrain.ActivateAsync(consumer.Name);
+ await consumerGrain.ActivateAsync();
+
+ Parallel.For(0, 10, x =>
+ {
+ for (var j = 0; j < 10; j++)
+ {
+ for (var i = 0; i < 100; i++)
+ {
+ var commitId = Guid.NewGuid();
+
+ var data = eventDataFormatter.ToEventData(Envelope.Create(new MyEvent()), commitId);
+
+ _.EventStore.AppendAsync(commitId, commitId.ToString(), new[] { data }).Wait();
+ }
+
+ Thread.Sleep(1000);
+ }
+ });
+
+ await AssertConsumerAsync(expectedEvents, consumer);
+ }
+
+ [Fact]
+ public async Task Should_insert_and_retrieve_parallel_with_stops_and_starts()
+ {
+ var expectedEvents = 10 * 1000;
+
+ var consumer = new MyEventConsumer(expectedEvents);
+ var consumerGrain = new MyEventConsumerGrain(_ => consumer, grainState, _.EventStore, eventDataFormatter, log);
+
+ var scheduler = consumerGrain.Scheduler;
+
+ consumer.EventReceived = count =>
+ {
+ if (count % 1000 == 0)
+ {
+ Task.Factory.StartNew(async () =>
+ {
+ await consumerGrain.StopAsync();
+ await consumerGrain.StartAsync();
+ }, default, default, scheduler).Forget();
+ }
+
+ return Task.CompletedTask;
+ };
+
+ await consumerGrain.ActivateAsync(consumer.Name);
+ await consumerGrain.ActivateAsync();
+
Parallel.For(0, 10, x =>
{
for (var i = 0; i < 1000; i++)
@@ -132,13 +430,15 @@ namespace Squidex.Infrastructure.EventSourcing
}
});
- var timeout = Task.Delay(5 * 1000 * 60);
+ await AssertConsumerAsync(expectedEvents, consumer);
+ }
- var result = Task.WhenAny(timeout, consumer.Completed);
+ private static async Task AssertConsumerAsync(int expectedEvents, MyEventConsumer consumer)
+ {
+ await consumer.Completed.WithTimeout(TimeSpan.FromSeconds(100));
- await result;
+ await Task.Delay(2000);
- Assert.NotSame(result, timeout);
Assert.Equal(expectedEvents, consumer.Received);
}
}