// ========================================================================== // Squidex Headless CMS // ========================================================================== // Copyright (c) Squidex UG (haftungsbeschränkt) // All rights reserved. Licensed under the MIT license. // ========================================================================== using System; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; using MongoDB.Driver; using Squidex.Infrastructure.Json.Objects; using Squidex.Infrastructure.Log; using Squidex.Infrastructure.MongoDb; using EventFilter = MongoDB.Driver.FilterDefinition; namespace Squidex.Infrastructure.EventSourcing { public delegate bool EventPredicate(EventData data); public partial class MongoEventStore : MongoRepositoryBase, IEventStore { private static readonly List EmptyEvents = new List(); public Task CreateIndexAsync(string property) { Guard.NotNullOrEmpty(property); return Collection.Indexes.CreateOneAsync( new CreateIndexModel( Index .Ascending(CreateIndexPath(property)) .Ascending(TimestampField))); } public IEventSubscription CreateSubscription(IEventSubscriber subscriber, string? streamFilter = null, string? position = null) { Guard.NotNull(subscriber); return new PollingSubscription(this, subscriber, streamFilter, position); } public async Task> QueryLatestAsync(string streamName, int count) { Guard.NotNullOrEmpty(streamName); if (count <= 0) { return EmptyEvents; } using (Profiler.TraceMethod()) { var commits = await Collection.Find( Filter.Eq(EventStreamField, streamName)) .Sort(Sort.Descending(TimestampField)).Limit(count).ToListAsync(); var result = new List(); foreach (var commit in commits) { var eventStreamOffset = (int)commit.EventStreamOffset; var commitTimestamp = commit.Timestamp; var commitOffset = 0; foreach (var @event in commit.Events) { eventStreamOffset++; var eventData = @event.ToEventData(); var eventToken = new StreamPosition(commitTimestamp, commitOffset, commit.Events.Length); result.Add(new StoredEvent(streamName, eventToken, eventStreamOffset, eventData)); } } IEnumerable ordered = result.OrderBy(x => x.EventStreamNumber); if (result.Count > count) { ordered = ordered.Skip(result.Count - count); } return ordered.ToList(); } } public async Task> QueryAsync(string streamName, long streamPosition = 0) { Guard.NotNullOrEmpty(streamName); using (Profiler.TraceMethod()) { var commits = await Collection.Find( Filter.And( Filter.Eq(EventStreamField, streamName), Filter.Gte(EventStreamOffsetField, streamPosition - MaxCommitSize))) .Sort(Sort.Ascending(TimestampField)).ToListAsync(); var result = new List(); foreach (var commit in commits) { var eventStreamOffset = (int)commit.EventStreamOffset; var commitTimestamp = commit.Timestamp; var commitOffset = 0; foreach (var @event in commit.Events) { eventStreamOffset++; if (eventStreamOffset >= streamPosition) { var eventData = @event.ToEventData(); var eventToken = new StreamPosition(commitTimestamp, commitOffset, commit.Events.Length); result.Add(new StoredEvent(streamName, eventToken, eventStreamOffset, eventData)); } } } return result; } } public Task QueryAsync(Func callback, string property, object value, string? position = null, CancellationToken ct = default) { Guard.NotNull(callback); Guard.NotNullOrEmpty(property); Guard.NotNull(value); StreamPosition lastPosition = position; var filterDefinition = CreateFilter(property, value, lastPosition); var filterExpression = CreateFilterExpression(property, value); return QueryAsync(callback, lastPosition, filterDefinition, filterExpression, ct); } public Task QueryAsync(Func callback, string? streamFilter = null, string? position = null, CancellationToken ct = default) { Guard.NotNull(callback); StreamPosition lastPosition = position; var filterDefinition = CreateFilter(streamFilter, lastPosition); var filterExpression = CreateFilterExpression(null, null); return QueryAsync(callback, lastPosition, filterDefinition, filterExpression, ct); } private async Task QueryAsync(Func callback, StreamPosition lastPosition, EventFilter filterDefinition, EventPredicate filterExpression, CancellationToken ct = default) { using (Profiler.TraceMethod()) { await Collection.Find(filterDefinition, options: Batching.Options).Sort(Sort.Ascending(TimestampField)).ForEachPipelineAsync(async commit => { var eventStreamOffset = (int)commit.EventStreamOffset; var commitTimestamp = commit.Timestamp; var commitOffset = 0; foreach (var @event in commit.Events) { eventStreamOffset++; if (commitOffset > lastPosition.CommitOffset || commitTimestamp > lastPosition.Timestamp) { var eventData = @event.ToEventData(); if (filterExpression(eventData)) { var eventToken = new StreamPosition(commitTimestamp, commitOffset, commit.Events.Length); await callback(new StoredEvent(commit.EventStream, eventToken, eventStreamOffset, eventData)); } } commitOffset++; } }, ct); } } private static EventFilter CreateFilter(string property, object value, StreamPosition streamPosition) { var filters = new List(); AppendByPosition(streamPosition, filters); AppendByProperty(property, value, filters); return Filter.And(filters); } private static EventFilter CreateFilter(string? streamFilter, StreamPosition streamPosition) { var filters = new List(); AppendByPosition(streamPosition, filters); AppendByStream(streamFilter, filters); return Filter.And(filters); } private static void AppendByProperty(string property, object value, List filters) { filters.Add(Filter.Eq(CreateIndexPath(property), value)); } private static void AppendByStream(string? streamFilter, List filters) { if (!StreamFilter.IsAll(streamFilter)) { if (streamFilter.Contains("^")) { filters.Add(Filter.Regex(EventStreamField, streamFilter)); } else { filters.Add(Filter.Eq(EventStreamField, streamFilter)); } } } private static void AppendByPosition(StreamPosition streamPosition, List filters) { if (streamPosition.IsEndOfCommit) { filters.Add(Filter.Gt(TimestampField, streamPosition.Timestamp)); } else { filters.Add(Filter.Gte(TimestampField, streamPosition.Timestamp)); } } private static EventPredicate CreateFilterExpression(string? property, object? value) { if (!string.IsNullOrWhiteSpace(property)) { var jsonValue = JsonValue.Create(value); return x => x.Headers.TryGetValue(property, out var p) && p.Equals(jsonValue); } else { return x => true; } } private static string CreateIndexPath(string property) { return $"Events.Metadata.{property}"; } } }