Browse Source

Fix for older mongo databases.

pull/620/head
Sebastian 5 years ago
parent
commit
55b86aaec7
  1. 8
      backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore.cs
  2. 2
      backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Reader.cs
  3. 2
      backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Writer.cs
  4. 13
      backend/src/Squidex.Infrastructure.MongoDb/MongoDb/MongoExtensions.cs
  5. 10
      backend/src/Squidex.Infrastructure/EventSourcing/RetrySubscription.cs
  6. 20
      backend/tests/Squidex.Infrastructure.Tests/EventSourcing/RetrySubscriptionTests.cs

8
backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore.cs

@ -5,6 +5,7 @@
// All rights reserved. Licensed under the MIT license. // All rights reserved. Licensed under the MIT license.
// ========================================================================== // ==========================================================================
using System;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using MongoDB.Bson; using MongoDB.Bson;
@ -32,7 +33,7 @@ namespace Squidex.Infrastructure.EventSourcing
get { return Collection; } get { return Collection; }
} }
public bool IsReplicaSet { get; private set; } public bool CanUseChangeStreams { get; private set; }
public MongoEventStore(IMongoDatabase database, IEventNotifier notifier) public MongoEventStore(IMongoDatabase database, IEventNotifier notifier)
: base(database) : base(database)
@ -77,7 +78,10 @@ namespace Squidex.Infrastructure.EventSourcing
}) })
}, ct); }, ct);
IsReplicaSet = Database.Client.Cluster.Description.Type == ClusterType.ReplicaSet; var clusterVersion = await Database.GetVersionAsync();
var clustered = Database.Client.Cluster.Description.Type == ClusterType.ReplicaSet;
CanUseChangeStreams = clustered && clusterVersion >= new Version("4.0");
} }
} }
} }

2
backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Reader.cs

@ -38,7 +38,7 @@ namespace Squidex.Infrastructure.EventSourcing
{ {
Guard.NotNull(subscriber, nameof(subscriber)); Guard.NotNull(subscriber, nameof(subscriber));
if (IsReplicaSet) if (CanUseChangeStreams)
{ {
return new MongoEventStoreSubscription(this, subscriber, streamFilter, position); return new MongoEventStoreSubscription(this, subscriber, streamFilter, position);
} }

2
backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Writer.cs

@ -64,7 +64,7 @@ namespace Squidex.Infrastructure.EventSourcing
{ {
await Collection.InsertOneAsync(commit); await Collection.InsertOneAsync(commit);
if (!IsReplicaSet) if (!CanUseChangeStreams)
{ {
notifier.NotifyEventsStored(streamName); notifier.NotifyEventsStored(streamName);
} }

13
backend/src/Squidex.Infrastructure.MongoDb/MongoDb/MongoExtensions.cs

@ -224,5 +224,18 @@ namespace Squidex.Infrastructure.MongoDb
} }
} }
} }
public static async Task<Version> GetVersionAsync(this IMongoDatabase database)
{
var command =
new BsonDocumentCommand<BsonDocument>(new BsonDocument
{
{ "buildInfo", 1 }
});
var result = await database.RunCommandAsync(command);
return Version.Parse(result["version"].AsString);
}
} }
} }

10
backend/src/Squidex.Infrastructure/EventSourcing/RetrySubscription.cs

@ -35,6 +35,8 @@ namespace Squidex.Infrastructure.EventSourcing
} }
private void Subscribe() private void Subscribe()
{
if (currentSubscription == null)
{ {
lock (this) lock (this)
{ {
@ -44,8 +46,11 @@ namespace Squidex.Infrastructure.EventSourcing
} }
} }
} }
}
public void Unsubscribe() public void Unsubscribe()
{
if (currentSubscription != null)
{ {
lock (this) lock (this)
{ {
@ -61,6 +66,7 @@ namespace Squidex.Infrastructure.EventSourcing
} }
} }
} }
}
public void WakeUp() public void WakeUp()
{ {
@ -79,10 +85,10 @@ namespace Squidex.Infrastructure.EventSourcing
return; return;
} }
Unsubscribe();
if (retryWindow.CanRetryAfterFailure()) if (retryWindow.CanRetryAfterFailure())
{ {
Unsubscribe();
await Task.Delay(ReconnectWaitMs, timerCancellation.Token); await Task.Delay(ReconnectWaitMs, timerCancellation.Token);
Subscribe(); Subscribe();

20
backend/tests/Squidex.Infrastructure.Tests/EventSourcing/RetrySubscriptionTests.cs

@ -87,6 +87,26 @@ namespace Squidex.Infrastructure.EventSourcing
.MustHaveHappened(); .MustHaveHappened();
} }
[Fact]
public async Task Should_not_unsubscribe_after_last_error_to_keep_sender()
{
var ex = new InvalidOperationException();
await OnErrorAsync(eventSubscription, ex);
await OnErrorAsync(eventSubscription, ex);
await OnErrorAsync(eventSubscription, ex);
await OnErrorAsync(eventSubscription, ex);
await OnErrorAsync(eventSubscription, ex);
await OnErrorAsync(eventSubscription, ex);
A.CallTo(() => eventSubscriber.OnErrorAsync(eventSubscription, ex))
.MustHaveHappened();
Assert.NotNull(sut.Sender);
sut.Unsubscribe();
}
[Fact] [Fact]
public async Task Should_not_forward_error_when_exception_is_raised_after_unsubscribe() public async Task Should_not_forward_error_when_exception_is_raised_after_unsubscribe()
{ {

Loading…
Cancel
Save