From d69a11ab5c3acebfc64435c63cc59c362f9b3077 Mon Sep 17 00:00:00 2001 From: Sebastian Stehle Date: Thu, 25 Jun 2020 16:32:28 +0200 Subject: [PATCH] Another fix for rule runner grain. (#542) * Another fix. * Rule runner fixes. --- .../HandleRules/RuleService.cs | 4 +-- .../Rules/Runner/RuleRunnerGrain.cs | 27 +++++++++++++------ .../HandleRules/RuleServiceTests.cs | 13 +++++++++ .../Rules/RuleEnqueuerTests.cs | 6 ++--- 4 files changed, 37 insertions(+), 13 deletions(-) diff --git a/backend/src/Squidex.Domain.Apps.Core.Operations/HandleRules/RuleService.cs b/backend/src/Squidex.Domain.Apps.Core.Operations/HandleRules/RuleService.cs index cdcd228b3..4ef439ca5 100644 --- a/backend/src/Squidex.Domain.Apps.Core.Operations/HandleRules/RuleService.cs +++ b/backend/src/Squidex.Domain.Apps.Core.Operations/HandleRules/RuleService.cs @@ -68,7 +68,7 @@ namespace Squidex.Domain.Apps.Core.HandleRules this.log = log; } - public virtual async Task CreateJobsAsync(Rule rule, Guid ruleId, Envelope @event, bool ignoreStale = false) + public virtual async Task CreateJobsAsync(Rule rule, Guid ruleId, Envelope @event, bool ignoreStale = true) { Guard.NotNull(rule, nameof(rule)); Guard.NotNull(@event, nameof(@event)); @@ -108,7 +108,7 @@ namespace Squidex.Domain.Apps.Core.HandleRules @event.Headers.Timestamp() : now; - if (!ignoreStale && eventTime.Plus(Constants.StaleTime) < now) + if (ignoreStale && eventTime.Plus(Constants.StaleTime) < now) { return result; } diff --git a/backend/src/Squidex.Domain.Apps.Entities/Rules/Runner/RuleRunnerGrain.cs b/backend/src/Squidex.Domain.Apps.Entities/Rules/Runner/RuleRunnerGrain.cs index deb6ed6a9..5c4d1e79f 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Rules/Runner/RuleRunnerGrain.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Rules/Runner/RuleRunnerGrain.cs @@ -153,18 +153,29 @@ namespace Squidex.Domain.Apps.Entities.Rules.Runner await eventStore.QueryAsync(async storedEvent => { - var @event = eventDataFormatter.Parse(storedEvent.Data); + try + { + var @event = eventDataFormatter.Parse(storedEvent.Data); - var jobs = await ruleService.CreateJobsAsync(rule.RuleDef, rule.Id, @event); + var jobs = await ruleService.CreateJobsAsync(rule.RuleDef, rule.Id, @event, false); - foreach (var (job, _) in jobs) + foreach (var (job, _) in jobs) + { + await ruleEventRepository.EnqueueAsync(job, job.Created, ct); + } + } + catch (Exception ex) { - await ruleEventRepository.EnqueueAsync(job, job.Created, ct); + log.LogWarning(ex, w => w + .WriteProperty("action", "runRule") + .WriteProperty("status", "failedPartially3")); } + finally + { + job.Position = storedEvent.EventPosition; - job.Position = storedEvent.EventPosition; - - await state.WriteAsync(); + await state.WriteAsync(); + } }, SquidexHeaders.AppId, Key.ToString(), job.Position, ct); } catch (OperationCanceledException) @@ -174,7 +185,7 @@ namespace Squidex.Domain.Apps.Entities.Rules.Runner catch (Exception ex) { log.LogError(ex, w => w - .WriteProperty("action", "runeRule") + .WriteProperty("action", "runRule") .WriteProperty("status", "failed") .WriteProperty("ruleId", job.RuleId?.ToString())); } diff --git a/backend/tests/Squidex.Domain.Apps.Core.Tests/Operations/HandleRules/RuleServiceTests.cs b/backend/tests/Squidex.Domain.Apps.Core.Tests/Operations/HandleRules/RuleServiceTests.cs index 4e1785b80..603dcc3a3 100644 --- a/backend/tests/Squidex.Domain.Apps.Core.Tests/Operations/HandleRules/RuleServiceTests.cs +++ b/backend/tests/Squidex.Domain.Apps.Core.Tests/Operations/HandleRules/RuleServiceTests.cs @@ -159,6 +159,19 @@ namespace Squidex.Domain.Apps.Core.Operations.HandleRules .MustNotHaveHappened(); } + [Fact] + public async Task Should_create_job_if_too_old_but_stale_events_are_not_ignored() + { + var @event = Envelope.Create(new ContentCreated()).SetTimestamp(clock.GetCurrentInstant().Minus(Duration.FromDays(3))); + + var jobs = await sut.CreateJobsAsync(ValidRule(), ruleId, @event, false); + + Assert.Empty(jobs); + + A.CallTo(() => ruleTriggerHandler.Trigger(A._, A._, ruleId)) + .MustHaveHappened(); + } + [Fact] public async Task Should_not_create_job_if_not_triggered_with_precheck() { diff --git a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Rules/RuleEnqueuerTests.cs b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Rules/RuleEnqueuerTests.cs index ff5c11224..a90ffc9ea 100644 --- a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Rules/RuleEnqueuerTests.cs +++ b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Rules/RuleEnqueuerTests.cs @@ -77,7 +77,7 @@ namespace Squidex.Domain.Apps.Entities.Rules var job = new RuleJob { Created = now }; - A.CallTo(() => ruleService.CreateJobsAsync(rule.RuleDef, rule.Id, @event, false)) + A.CallTo(() => ruleService.CreateJobsAsync(rule.RuleDef, rule.Id, @event, true)) .Returns(new List<(RuleJob, Exception?)> { (job, null) }); await sut.Enqueue(rule.RuleDef, rule.Id, @event); @@ -102,10 +102,10 @@ namespace Squidex.Domain.Apps.Entities.Rules A.CallTo(() => appProvider.GetRulesAsync(appId.Id)) .Returns(new List { rule1, rule2 }); - A.CallTo(() => ruleService.CreateJobsAsync(rule1.RuleDef, rule1.Id, @event, false)) + A.CallTo(() => ruleService.CreateJobsAsync(rule1.RuleDef, rule1.Id, @event, true)) .Returns(new List<(RuleJob, Exception?)> { (job1, null) }); - A.CallTo(() => ruleService.CreateJobsAsync(rule2.RuleDef, rule2.Id, @event, false)) + A.CallTo(() => ruleService.CreateJobsAsync(rule2.RuleDef, rule2.Id, @event, true)) .Returns(new List<(RuleJob, Exception?)>()); await sut.On(@event);