diff --git a/backend/src/Squidex.Domain.Apps.Entities/Rules/Runner/DefaultRuleRunnerService.cs b/backend/src/Squidex.Domain.Apps.Entities/Rules/Runner/DefaultRuleRunnerService.cs index 52a72df5e..84ae5d952 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Rules/Runner/DefaultRuleRunnerService.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Rules/Runner/DefaultRuleRunnerService.cs @@ -8,6 +8,7 @@ using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; +using NodaTime; using Orleans; using Squidex.Domain.Apps.Core.HandleRules; using Squidex.Domain.Apps.Core.Rules.Triggers; @@ -49,7 +50,9 @@ namespace Squidex.Domain.Apps.Entities.Rules.Runner var result = new List(MaxSimulatedEvents); - await foreach (var storedEvent in eventStore.QueryAllReverseAsync($"^([a-z]+)\\-{rule.AppId.Id}", null, MaxSimulatedEvents, ct)) + var fromNow = SystemClock.Instance.GetCurrentInstant().Minus(Duration.FromDays(7)); + + await foreach (var storedEvent in eventStore.QueryAllReverseAsync($"^([a-z]+)\\-{rule.AppId.Id}", fromNow, MaxSimulatedEvents, ct)) { var @event = eventDataFormatter.ParseIfKnown(storedEvent); diff --git a/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Reader.cs b/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Reader.cs index 8f7c33029..405786898 100644 --- a/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Reader.cs +++ b/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Reader.cs @@ -12,6 +12,7 @@ using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; using MongoDB.Driver; +using NodaTime; using Squidex.Infrastructure.MongoDb; using Squidex.Log; using EventFilter = MongoDB.Driver.FilterDefinition; @@ -103,7 +104,7 @@ namespace Squidex.Infrastructure.EventSourcing } } - public async IAsyncEnumerable QueryAllReverseAsync(string? streamFilter = null, string? position = null, long take = long.MaxValue, + public async IAsyncEnumerable QueryAllReverseAsync(string? streamFilter = null, Instant timestamp = default, long take = long.MaxValue, [EnumeratorCancellation] CancellationToken ct = default) { if (take <= 0) @@ -111,7 +112,7 @@ namespace Squidex.Infrastructure.EventSourcing yield break; } - StreamPosition lastPosition = position; + StreamPosition lastPosition = timestamp; var filterDefinition = CreateFilter(streamFilter, lastPosition); @@ -127,7 +128,7 @@ namespace Squidex.Infrastructure.EventSourcing { foreach (var current in cursor.Current) { - foreach (var @event in current.Filtered(position).Reverse()) + foreach (var @event in current.Filtered(lastPosition).Reverse()) { yield return @event; @@ -167,7 +168,7 @@ namespace Squidex.Infrastructure.EventSourcing { foreach (var current in cursor.Current) { - foreach (var @event in current.Filtered(position)) + foreach (var @event in current.Filtered(lastPosition)) { yield return @event; diff --git a/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/StreamPosition.cs b/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/StreamPosition.cs index 77bd5de6b..fc683ad3c 100644 --- a/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/StreamPosition.cs +++ b/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/StreamPosition.cs @@ -6,6 +6,7 @@ // ========================================================================== using MongoDB.Bson; +using NodaTime; using Squidex.Infrastructure.ObjectPool; namespace Squidex.Infrastructure.EventSourcing @@ -70,5 +71,18 @@ namespace Squidex.Infrastructure.EventSourcing return Empty; } + + public static implicit operator StreamPosition(Instant timestamp) + { + if (timestamp != default) + { + return new StreamPosition( + new BsonTimestamp((int)timestamp.ToUnixTimeSeconds(), 0), + 0, + 0); + } + + return Empty; + } } } diff --git a/backend/src/Squidex.Infrastructure/EventSourcing/IEventStore.cs b/backend/src/Squidex.Infrastructure/EventSourcing/IEventStore.cs index 62eb88fd5..97b147613 100644 --- a/backend/src/Squidex.Infrastructure/EventSourcing/IEventStore.cs +++ b/backend/src/Squidex.Infrastructure/EventSourcing/IEventStore.cs @@ -9,6 +9,7 @@ using System; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; +using NodaTime; namespace Squidex.Infrastructure.EventSourcing { @@ -18,7 +19,7 @@ namespace Squidex.Infrastructure.EventSourcing Task> QueryAsync(string streamName, long streamPosition = 0); - IAsyncEnumerable QueryAllReverseAsync(string? streamFilter = null, string? position = null, long take = long.MaxValue, CancellationToken ct = default); + IAsyncEnumerable QueryAllReverseAsync(string? streamFilter = null, Instant timestamp = default, long take = long.MaxValue, CancellationToken ct = default); IAsyncEnumerable QueryAllAsync(string? streamFilter = null, string? position = null, long take = long.MaxValue, CancellationToken ct = default); diff --git a/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/EventStoreTests.cs b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/EventStoreTests.cs index 4da7d99d7..d7648992a 100644 --- a/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/EventStoreTests.cs +++ b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/EventStoreTests.cs @@ -344,7 +344,7 @@ namespace Squidex.Infrastructure.EventSourcing { var expected = allExpected.Reverse().Take(take).ToArray(); - var readEvents = await Sut.QueryAllReverseAsync(streamName, null, take).ToArrayAsync(); + var readEvents = await Sut.QueryAllReverseAsync(streamName, default, take).ToArrayAsync(); ShouldBeEquivalentTo(readEvents, expected); } diff --git a/frontend/app/features/rules/pages/rule/rule-page.component.html b/frontend/app/features/rules/pages/rule/rule-page.component.html index cf325d7f5..07ac4628b 100644 --- a/frontend/app/features/rules/pages/rule/rule-page.component.html +++ b/frontend/app/features/rules/pages/rule/rule-page.component.html @@ -7,7 +7,7 @@ -
+
{{ 'common.enabled' | sqxTranslate }}