diff --git a/backend/src/Squidex.Domain.Apps.Entities/Rules/RuleEnqueuer.cs b/backend/src/Squidex.Domain.Apps.Entities/Rules/RuleEnqueuer.cs index 11b1d7c71..52262799f 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Rules/RuleEnqueuer.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Rules/RuleEnqueuer.cs @@ -6,6 +6,7 @@ // ========================================================================== using Microsoft.Extensions.Caching.Memory; +using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Squidex.Caching; using Squidex.Domain.Apps.Core.HandleRules; @@ -22,6 +23,7 @@ namespace Squidex.Domain.Apps.Entities.Rules private readonly IMemoryCache cache; private readonly IRuleEventRepository ruleEventRepository; private readonly IRuleService ruleService; + private readonly ILogger log; private readonly IAppProvider appProvider; private readonly ILocalCache localCache; private readonly TimeSpan cacheDuration; @@ -35,13 +37,15 @@ namespace Squidex.Domain.Apps.Entities.Rules IAppProvider appProvider, IRuleEventRepository ruleEventRepository, IRuleService ruleService, - IOptions options) + IOptions options, + ILogger log) { this.appProvider = appProvider; this.cache = cache; this.cacheDuration = options.Value.RuleCacheDuration; this.ruleEventRepository = ruleEventRepository; this.ruleService = ruleService; + this.log = log; this.localCache = localCache; } @@ -63,6 +67,11 @@ namespace Squidex.Domain.Apps.Entities.Rules // We do not want to handle disabled rules in the normal flow. if (job.Job != null && job.SkipReason is SkipReason.None or SkipReason.Failed) { + log.LogInformation("Adding rule job {jobId} for Rule(action={ruleAction}, trigger={ruleTrigger})", + job.Job.Id, + rule.Action.GetType().Name, + rule.Trigger.GetType().Name); + await ruleEventRepository.EnqueueAsync(job.Job, job.EnrichmentError); } } diff --git a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Rules/RuleEnqueuerTests.cs b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Rules/RuleEnqueuerTests.cs index 1b30f1321..5cc681fa6 100644 --- a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Rules/RuleEnqueuerTests.cs +++ b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Rules/RuleEnqueuerTests.cs @@ -7,6 +7,7 @@ using FakeItEasy; using Microsoft.Extensions.Caching.Memory; +using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using NodaTime; using Squidex.Caching; @@ -45,7 +46,8 @@ namespace Squidex.Domain.Apps.Entities.Rules appProvider, ruleEventRepository, ruleService, - options); + options, + A.Fake>()); } [Fact] diff --git a/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/EventStoreTests.cs b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/EventStoreTests.cs index dafdedfd3..0dfb9b899 100644 --- a/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/EventStoreTests.cs +++ b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/EventStoreTests.cs @@ -227,6 +227,33 @@ namespace Squidex.Infrastructure.EventSourcing ShouldBeEquivalentTo(readEventsFromBeginning?.TakeLast(4), expectedFromBeginning); } + [Fact] + public async Task Should_subscribe_with_parallel_writes() + { + var streamName = $"test-{Guid.NewGuid()}"; + + // Append and read in parallel. + var readEvents = await QueryWithSubscriptionAsync($"^{streamName}", async () => + { + await Parallel.ForEachAsync(Enumerable.Range(0, 50), async (i, ct) => + { + var fullStreamName = $"{streamName}-{Guid.NewGuid()}"; + + for (var j = 0; j < 100; j++) + { + var commit1 = new[] + { + CreateEventData(i * j) + }; + + await Sut.AppendAsync(Guid.NewGuid(), fullStreamName, commit1); + } + }); + }); + + Assert.Equal(5000, readEvents?.Count); + } + [Fact] public async Task Should_read_events_from_offset() {