diff --git a/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore.cs b/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore.cs index c8fd6da02..6b4f34b2b 100644 --- a/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore.cs +++ b/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore.cs @@ -5,6 +5,7 @@ // All rights reserved. Licensed under the MIT license. // ========================================================================== +using System; using System.Threading; using System.Threading.Tasks; using MongoDB.Bson; @@ -32,7 +33,7 @@ namespace Squidex.Infrastructure.EventSourcing get { return Collection; } } - public bool IsReplicaSet { get; private set; } + public bool CanUseChangeStreams { get; private set; } public MongoEventStore(IMongoDatabase database, IEventNotifier notifier) : base(database) @@ -77,7 +78,10 @@ namespace Squidex.Infrastructure.EventSourcing }) }, ct); - IsReplicaSet = Database.Client.Cluster.Description.Type == ClusterType.ReplicaSet; + var clusterVersion = await Database.GetVersionAsync(); + var clustered = Database.Client.Cluster.Description.Type == ClusterType.ReplicaSet; + + CanUseChangeStreams = clustered && clusterVersion >= new Version("4.0"); } } } \ No newline at end of file diff --git a/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Reader.cs b/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Reader.cs index 075e4d43c..7d43935f2 100644 --- a/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Reader.cs +++ b/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Reader.cs @@ -38,7 +38,7 @@ namespace Squidex.Infrastructure.EventSourcing { Guard.NotNull(subscriber, nameof(subscriber)); - if (IsReplicaSet) + if (CanUseChangeStreams) { return new MongoEventStoreSubscription(this, subscriber, streamFilter, position); } diff --git a/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Writer.cs b/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Writer.cs index 90fd872c8..33faa6c56 100644 --- a/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Writer.cs +++ b/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Writer.cs @@ -64,7 +64,7 @@ namespace Squidex.Infrastructure.EventSourcing { await Collection.InsertOneAsync(commit); - if (!IsReplicaSet) + if (!CanUseChangeStreams) { notifier.NotifyEventsStored(streamName); } diff --git a/backend/src/Squidex.Infrastructure.MongoDb/MongoDb/MongoExtensions.cs b/backend/src/Squidex.Infrastructure.MongoDb/MongoDb/MongoExtensions.cs index 0da0dd8e8..802fff4ed 100644 --- a/backend/src/Squidex.Infrastructure.MongoDb/MongoDb/MongoExtensions.cs +++ b/backend/src/Squidex.Infrastructure.MongoDb/MongoDb/MongoExtensions.cs @@ -224,5 +224,18 @@ namespace Squidex.Infrastructure.MongoDb } } } + + public static async Task GetVersionAsync(this IMongoDatabase database) + { + var command = + new BsonDocumentCommand(new BsonDocument + { + { "buildInfo", 1 } + }); + + var result = await database.RunCommandAsync(command); + + return Version.Parse(result["version"].AsString); + } } } diff --git a/backend/src/Squidex.Infrastructure/EventSourcing/RetrySubscription.cs b/backend/src/Squidex.Infrastructure/EventSourcing/RetrySubscription.cs index 561b6506e..8aa2e9fa3 100644 --- a/backend/src/Squidex.Infrastructure/EventSourcing/RetrySubscription.cs +++ b/backend/src/Squidex.Infrastructure/EventSourcing/RetrySubscription.cs @@ -36,28 +36,34 @@ namespace Squidex.Infrastructure.EventSourcing private void Subscribe() { - lock (this) + if (currentSubscription == null) { - if (currentSubscription == null) + lock (this) { - currentSubscription = eventSubscriptionFactory(this); + if (currentSubscription == null) + { + currentSubscription = eventSubscriptionFactory(this); + } } } } public void Unsubscribe() { - lock (this) + if (currentSubscription != null) { - if (currentSubscription != null) + lock (this) { - timerCancellation.Cancel(); - timerCancellation.Dispose(); + if (currentSubscription != null) + { + timerCancellation.Cancel(); + timerCancellation.Dispose(); - currentSubscription.Unsubscribe(); - currentSubscription = null; + currentSubscription.Unsubscribe(); + currentSubscription = null; - timerCancellation = new CancellationTokenSource(); + timerCancellation = new CancellationTokenSource(); + } } } } @@ -79,10 +85,10 @@ namespace Squidex.Infrastructure.EventSourcing return; } - Unsubscribe(); - if (retryWindow.CanRetryAfterFailure()) { + Unsubscribe(); + await Task.Delay(ReconnectWaitMs, timerCancellation.Token); Subscribe(); diff --git a/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/RetrySubscriptionTests.cs b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/RetrySubscriptionTests.cs index b92419eb2..fe97c3f56 100644 --- a/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/RetrySubscriptionTests.cs +++ b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/RetrySubscriptionTests.cs @@ -87,6 +87,26 @@ namespace Squidex.Infrastructure.EventSourcing .MustHaveHappened(); } + [Fact] + public async Task Should_not_unsubscribe_after_last_error_to_keep_sender() + { + var ex = new InvalidOperationException(); + + await OnErrorAsync(eventSubscription, ex); + await OnErrorAsync(eventSubscription, ex); + await OnErrorAsync(eventSubscription, ex); + await OnErrorAsync(eventSubscription, ex); + await OnErrorAsync(eventSubscription, ex); + await OnErrorAsync(eventSubscription, ex); + + A.CallTo(() => eventSubscriber.OnErrorAsync(eventSubscription, ex)) + .MustHaveHappened(); + + Assert.NotNull(sut.Sender); + + sut.Unsubscribe(); + } + [Fact] public async Task Should_not_forward_error_when_exception_is_raised_after_unsubscribe() {