mirror of https://github.com/Squidex/squidex.git
Browse Source
# Conflicts: # backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Reader.cspull/568/head
24 changed files with 791 additions and 166 deletions
@ -0,0 +1,112 @@ |
|||
// ==========================================================================
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex UG (haftungsbeschraenkt)
|
|||
// All rights reserved. Licensed under the MIT license.
|
|||
// ==========================================================================
|
|||
|
|||
using System.Collections.Generic; |
|||
using MongoDB.Driver; |
|||
|
|||
namespace Squidex.Infrastructure.EventSourcing |
|||
{ |
|||
internal static class Filtering |
|||
{ |
|||
public static string CreateIndexPath(string property) |
|||
{ |
|||
return $"Events.Metadata.{property}"; |
|||
} |
|||
|
|||
public static FilterDefinition<MongoEventCommit> ByPosition(StreamPosition streamPosition) |
|||
{ |
|||
if (streamPosition.IsEndOfCommit) |
|||
{ |
|||
return Builders<MongoEventCommit>.Filter.Gt(x => x.Timestamp, streamPosition.Timestamp); |
|||
} |
|||
else |
|||
{ |
|||
return Builders<MongoEventCommit>.Filter.Gte(x => x.Timestamp, streamPosition.Timestamp); |
|||
} |
|||
} |
|||
|
|||
public static FilterDefinition<MongoEventCommit>? ByStream(string? streamFilter) |
|||
{ |
|||
if (StreamFilter.IsAll(streamFilter)) |
|||
{ |
|||
return null; |
|||
} |
|||
|
|||
if (streamFilter.Contains("^")) |
|||
{ |
|||
return Builders<MongoEventCommit>.Filter.Regex(x => x.EventStream, streamFilter); |
|||
} |
|||
else |
|||
{ |
|||
return Builders<MongoEventCommit>.Filter.Eq(x => x.EventStream, streamFilter); |
|||
} |
|||
} |
|||
|
|||
public static FilterDefinition<ChangeStreamDocument<MongoEventCommit>>? ByChangeInStream(string? streamFilter) |
|||
{ |
|||
if (StreamFilter.IsAll(streamFilter)) |
|||
{ |
|||
return null; |
|||
} |
|||
|
|||
if (streamFilter.Contains("^")) |
|||
{ |
|||
return Builders<ChangeStreamDocument<MongoEventCommit>>.Filter.Regex(x => x.FullDocument.EventStream, streamFilter); |
|||
} |
|||
else |
|||
{ |
|||
return Builders<ChangeStreamDocument<MongoEventCommit>>.Filter.Eq(x => x.FullDocument.EventStream, streamFilter); |
|||
} |
|||
} |
|||
|
|||
public static IEnumerable<StoredEvent> Filtered(this MongoEventCommit commit, StreamPosition lastPosition) |
|||
{ |
|||
var eventStreamOffset = commit.EventStreamOffset; |
|||
|
|||
var commitTimestamp = commit.Timestamp; |
|||
var commitOffset = 0; |
|||
|
|||
foreach (var @event in commit.Events) |
|||
{ |
|||
eventStreamOffset++; |
|||
|
|||
if (commitOffset > lastPosition.CommitOffset || commitTimestamp > lastPosition.Timestamp) |
|||
{ |
|||
var eventData = @event.ToEventData(); |
|||
var eventPosition = new StreamPosition(commitTimestamp, commitOffset, commit.Events.Length); |
|||
|
|||
yield return new StoredEvent(commit.EventStream, eventPosition, eventStreamOffset, eventData); |
|||
} |
|||
|
|||
commitOffset++; |
|||
} |
|||
} |
|||
|
|||
public static IEnumerable<StoredEvent> Filtered(this MongoEventCommit commit, long streamPosition) |
|||
{ |
|||
var eventStreamOffset = commit.EventStreamOffset; |
|||
|
|||
var commitTimestamp = commit.Timestamp; |
|||
var commitOffset = 0; |
|||
|
|||
foreach (var @event in commit.Events) |
|||
{ |
|||
eventStreamOffset++; |
|||
|
|||
if (eventStreamOffset >= streamPosition) |
|||
{ |
|||
var eventData = @event.ToEventData(); |
|||
var eventPosition = new StreamPosition(commitTimestamp, commitOffset, commit.Events.Length); |
|||
|
|||
yield return new StoredEvent(commit.EventStream, eventPosition, eventStreamOffset, eventData); |
|||
} |
|||
|
|||
commitOffset++; |
|||
} |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,169 @@ |
|||
// ==========================================================================
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex UG (haftungsbeschraenkt)
|
|||
// All rights reserved. Licensed under the MIT license.
|
|||
// ==========================================================================
|
|||
|
|||
using System; |
|||
using System.Threading; |
|||
using System.Threading.Tasks; |
|||
using MongoDB.Bson; |
|||
using MongoDB.Driver; |
|||
using NodaTime; |
|||
|
|||
namespace Squidex.Infrastructure.EventSourcing |
|||
{ |
|||
public sealed class MongoEventStoreSubscription : IEventSubscription |
|||
{ |
|||
private readonly MongoEventStore eventStore; |
|||
private readonly IEventSubscriber eventSubscriber; |
|||
private readonly CancellationTokenSource stopToken = new CancellationTokenSource(); |
|||
private readonly Task task; |
|||
|
|||
public MongoEventStoreSubscription(MongoEventStore eventStore, IEventSubscriber eventSubscriber, string? streamFilter, string? position) |
|||
{ |
|||
this.eventStore = eventStore; |
|||
this.eventSubscriber = eventSubscriber; |
|||
|
|||
task = QueryAsync(streamFilter, position); |
|||
} |
|||
|
|||
private async Task QueryAsync(string? streamFilter, string? position) |
|||
{ |
|||
try |
|||
{ |
|||
string? lastRawPosition = null; |
|||
|
|||
try |
|||
{ |
|||
lastRawPosition = await QueryOldAsync(streamFilter, position); |
|||
} |
|||
catch (OperationCanceledException) |
|||
{ |
|||
} |
|||
|
|||
if (!stopToken.IsCancellationRequested) |
|||
{ |
|||
await QueryCurrentAsync(streamFilter, lastRawPosition); |
|||
} |
|||
} |
|||
catch (Exception ex) |
|||
{ |
|||
await eventSubscriber.OnErrorAsync(this, ex); |
|||
} |
|||
} |
|||
|
|||
private async Task QueryCurrentAsync(string? streamFilter, StreamPosition lastPosition) |
|||
{ |
|||
BsonDocument? resumeToken = null; |
|||
|
|||
var start = |
|||
lastPosition.Timestamp.Timestamp > 0 ? |
|||
lastPosition.Timestamp.Timestamp - 30 : |
|||
SystemClock.Instance.GetCurrentInstant().Minus(Duration.FromSeconds(30)).ToUnixTimeSeconds(); |
|||
|
|||
var changePipeline = Match(streamFilter); |
|||
var changeStart = new BsonTimestamp((int)start, 0); |
|||
|
|||
while (!stopToken.IsCancellationRequested) |
|||
{ |
|||
var changeOptions = new ChangeStreamOptions(); |
|||
|
|||
if (resumeToken != null) |
|||
{ |
|||
changeOptions.StartAfter = resumeToken; |
|||
} |
|||
else |
|||
{ |
|||
changeOptions.StartAtOperationTime = changeStart; |
|||
} |
|||
|
|||
using (var cursor = eventStore.TypedCollection.Watch(changePipeline, changeOptions, stopToken.Token)) |
|||
{ |
|||
var isRead = false; |
|||
|
|||
await cursor.ForEachAsync(async change => |
|||
{ |
|||
if (change.OperationType == ChangeStreamOperationType.Insert) |
|||
{ |
|||
foreach (var storedEvent in change.FullDocument.Filtered(lastPosition)) |
|||
{ |
|||
await eventSubscriber.OnEventAsync(this, storedEvent); |
|||
} |
|||
} |
|||
|
|||
isRead = true; |
|||
}, stopToken.Token); |
|||
|
|||
resumeToken = cursor.GetResumeToken(); |
|||
|
|||
if (!isRead) |
|||
{ |
|||
await Task.Delay(1000); |
|||
} |
|||
} |
|||
} |
|||
} |
|||
|
|||
private async Task<string?> QueryOldAsync(string? streamFilter, string? position) |
|||
{ |
|||
string? lastRawPosition = null; |
|||
|
|||
using (var cts = new CancellationTokenSource()) |
|||
{ |
|||
using (var combined = CancellationTokenSource.CreateLinkedTokenSource(cts.Token, stopToken.Token)) |
|||
{ |
|||
await eventStore.QueryAsync(async storedEvent => |
|||
{ |
|||
var now = SystemClock.Instance.GetCurrentInstant(); |
|||
|
|||
var timeToNow = now - storedEvent.Data.Headers.Timestamp(); |
|||
|
|||
if (timeToNow <= Duration.FromMinutes(5)) |
|||
{ |
|||
cts.Cancel(); |
|||
} |
|||
else |
|||
{ |
|||
await eventSubscriber.OnEventAsync(this, storedEvent); |
|||
|
|||
lastRawPosition = storedEvent.EventPosition; |
|||
} |
|||
}, streamFilter, position, combined.Token); |
|||
} |
|||
} |
|||
|
|||
return lastRawPosition; |
|||
} |
|||
|
|||
private static PipelineDefinition<ChangeStreamDocument<MongoEventCommit>, ChangeStreamDocument<MongoEventCommit>>? Match(string? streamFilter) |
|||
{ |
|||
var result = new EmptyPipelineDefinition<ChangeStreamDocument<MongoEventCommit>>(); |
|||
|
|||
var byStream = Filtering.ByChangeInStream(streamFilter); |
|||
|
|||
if (byStream != null) |
|||
{ |
|||
var filterBuilder = Builders<ChangeStreamDocument<MongoEventCommit>>.Filter; |
|||
|
|||
var filter = filterBuilder.Or(filterBuilder.Ne(x => x.OperationType, ChangeStreamOperationType.Insert), byStream); |
|||
|
|||
return result.Match(filter); |
|||
} |
|||
|
|||
return result; |
|||
} |
|||
|
|||
public Task StopAsync() |
|||
{ |
|||
stopToken.Cancel(); |
|||
|
|||
return task; |
|||
} |
|||
|
|||
public void WakeUp() |
|||
{ |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,31 @@ |
|||
// ==========================================================================
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex UG (haftungsbeschraenkt)
|
|||
// All rights reserved. Licensed under the MIT license.
|
|||
// ==========================================================================
|
|||
|
|||
using Xunit; |
|||
|
|||
#pragma warning disable SA1300 // Element should begin with upper-case letter
|
|||
|
|||
namespace Squidex.Infrastructure.EventSourcing |
|||
{ |
|||
[Trait("Category", "Dependencies")] |
|||
public class MongoEventStoreTests_ReplicaSet : EventStoreTests<MongoEventStore>, IClassFixture<MongoEventStoreReplicaSetFixture> |
|||
{ |
|||
public MongoEventStoreFixture _ { get; } |
|||
|
|||
protected override int SubscriptionDelayInMs { get; } = 1000; |
|||
|
|||
public MongoEventStoreTests_ReplicaSet(MongoEventStoreReplicaSetFixture fixture) |
|||
{ |
|||
_ = fixture; |
|||
} |
|||
|
|||
public override MongoEventStore CreateStore() |
|||
{ |
|||
return _.EventStore; |
|||
} |
|||
} |
|||
} |
|||
Loading…
Reference in new issue