Browse Source

Another fix for rule runner grain. (#542)

* Another fix.

* Rule runner fixes.
pull/544/head
Sebastian Stehle 6 years ago
committed by GitHub
parent
commit
d69a11ab5c
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      backend/src/Squidex.Domain.Apps.Core.Operations/HandleRules/RuleService.cs
  2. 27
      backend/src/Squidex.Domain.Apps.Entities/Rules/Runner/RuleRunnerGrain.cs
  3. 13
      backend/tests/Squidex.Domain.Apps.Core.Tests/Operations/HandleRules/RuleServiceTests.cs
  4. 6
      backend/tests/Squidex.Domain.Apps.Entities.Tests/Rules/RuleEnqueuerTests.cs

4
backend/src/Squidex.Domain.Apps.Core.Operations/HandleRules/RuleService.cs

@ -68,7 +68,7 @@ namespace Squidex.Domain.Apps.Core.HandleRules
this.log = log;
}
public virtual async Task<JobList> CreateJobsAsync(Rule rule, Guid ruleId, Envelope<IEvent> @event, bool ignoreStale = false)
public virtual async Task<JobList> CreateJobsAsync(Rule rule, Guid ruleId, Envelope<IEvent> @event, bool ignoreStale = true)
{
Guard.NotNull(rule, nameof(rule));
Guard.NotNull(@event, nameof(@event));
@ -108,7 +108,7 @@ namespace Squidex.Domain.Apps.Core.HandleRules
@event.Headers.Timestamp() :
now;
if (!ignoreStale && eventTime.Plus(Constants.StaleTime) < now)
if (ignoreStale && eventTime.Plus(Constants.StaleTime) < now)
{
return result;
}

27
backend/src/Squidex.Domain.Apps.Entities/Rules/Runner/RuleRunnerGrain.cs

@ -153,18 +153,29 @@ namespace Squidex.Domain.Apps.Entities.Rules.Runner
await eventStore.QueryAsync(async storedEvent =>
{
var @event = eventDataFormatter.Parse(storedEvent.Data);
try
{
var @event = eventDataFormatter.Parse(storedEvent.Data);
var jobs = await ruleService.CreateJobsAsync(rule.RuleDef, rule.Id, @event);
var jobs = await ruleService.CreateJobsAsync(rule.RuleDef, rule.Id, @event, false);
foreach (var (job, _) in jobs)
foreach (var (job, _) in jobs)
{
await ruleEventRepository.EnqueueAsync(job, job.Created, ct);
}
}
catch (Exception ex)
{
await ruleEventRepository.EnqueueAsync(job, job.Created, ct);
log.LogWarning(ex, w => w
.WriteProperty("action", "runRule")
.WriteProperty("status", "failedPartially3"));
}
finally
{
job.Position = storedEvent.EventPosition;
job.Position = storedEvent.EventPosition;
await state.WriteAsync();
await state.WriteAsync();
}
}, SquidexHeaders.AppId, Key.ToString(), job.Position, ct);
}
catch (OperationCanceledException)
@ -174,7 +185,7 @@ namespace Squidex.Domain.Apps.Entities.Rules.Runner
catch (Exception ex)
{
log.LogError(ex, w => w
.WriteProperty("action", "runeRule")
.WriteProperty("action", "runRule")
.WriteProperty("status", "failed")
.WriteProperty("ruleId", job.RuleId?.ToString()));
}

13
backend/tests/Squidex.Domain.Apps.Core.Tests/Operations/HandleRules/RuleServiceTests.cs

@ -159,6 +159,19 @@ namespace Squidex.Domain.Apps.Core.Operations.HandleRules
.MustNotHaveHappened();
}
[Fact]
public async Task Should_create_job_if_too_old_but_stale_events_are_not_ignored()
{
var @event = Envelope.Create(new ContentCreated()).SetTimestamp(clock.GetCurrentInstant().Minus(Duration.FromDays(3)));
var jobs = await sut.CreateJobsAsync(ValidRule(), ruleId, @event, false);
Assert.Empty(jobs);
A.CallTo(() => ruleTriggerHandler.Trigger(A<AppEvent>._, A<RuleTrigger>._, ruleId))
.MustHaveHappened();
}
[Fact]
public async Task Should_not_create_job_if_not_triggered_with_precheck()
{

6
backend/tests/Squidex.Domain.Apps.Entities.Tests/Rules/RuleEnqueuerTests.cs

@ -77,7 +77,7 @@ namespace Squidex.Domain.Apps.Entities.Rules
var job = new RuleJob { Created = now };
A.CallTo(() => ruleService.CreateJobsAsync(rule.RuleDef, rule.Id, @event, false))
A.CallTo(() => ruleService.CreateJobsAsync(rule.RuleDef, rule.Id, @event, true))
.Returns(new List<(RuleJob, Exception?)> { (job, null) });
await sut.Enqueue(rule.RuleDef, rule.Id, @event);
@ -102,10 +102,10 @@ namespace Squidex.Domain.Apps.Entities.Rules
A.CallTo(() => appProvider.GetRulesAsync(appId.Id))
.Returns(new List<IRuleEntity> { rule1, rule2 });
A.CallTo(() => ruleService.CreateJobsAsync(rule1.RuleDef, rule1.Id, @event, false))
A.CallTo(() => ruleService.CreateJobsAsync(rule1.RuleDef, rule1.Id, @event, true))
.Returns(new List<(RuleJob, Exception?)> { (job1, null) });
A.CallTo(() => ruleService.CreateJobsAsync(rule2.RuleDef, rule2.Id, @event, false))
A.CallTo(() => ruleService.CreateJobsAsync(rule2.RuleDef, rule2.Id, @event, true))
.Returns(new List<(RuleJob, Exception?)>());
await sut.On(@event);

Loading…
Cancel
Save