|
|
@ -10,6 +10,7 @@ using System.Threading; |
|
|
using System.Threading.Tasks; |
|
|
using System.Threading.Tasks; |
|
|
using Orleans; |
|
|
using Orleans; |
|
|
using Orleans.Runtime; |
|
|
using Orleans.Runtime; |
|
|
|
|
|
using Squidex.Caching; |
|
|
using Squidex.Domain.Apps.Core.HandleRules; |
|
|
using Squidex.Domain.Apps.Core.HandleRules; |
|
|
using Squidex.Domain.Apps.Entities.Rules.Repositories; |
|
|
using Squidex.Domain.Apps.Entities.Rules.Repositories; |
|
|
using Squidex.Infrastructure; |
|
|
using Squidex.Infrastructure; |
|
|
@ -24,12 +25,14 @@ namespace Squidex.Domain.Apps.Entities.Rules.Runner |
|
|
{ |
|
|
{ |
|
|
public sealed class RuleRunnerGrain : GrainOfString, IRuleRunnerGrain, IRemindable |
|
|
public sealed class RuleRunnerGrain : GrainOfString, IRuleRunnerGrain, IRemindable |
|
|
{ |
|
|
{ |
|
|
|
|
|
private const int MaxErrors = 10; |
|
|
private readonly IGrainState<State> state; |
|
|
private readonly IGrainState<State> state; |
|
|
private readonly IAppProvider appProvider; |
|
|
private readonly IAppProvider appProvider; |
|
|
|
|
|
private readonly ILocalCache localCache; |
|
|
private readonly IEventStore eventStore; |
|
|
private readonly IEventStore eventStore; |
|
|
private readonly IEventDataFormatter eventDataFormatter; |
|
|
private readonly IEventDataFormatter eventDataFormatter; |
|
|
private readonly IRuleEventRepository ruleEventRepository; |
|
|
private readonly IRuleEventRepository ruleEventRepository; |
|
|
private readonly RuleService ruleService; |
|
|
private readonly IRuleService ruleService; |
|
|
private readonly ISemanticLog log; |
|
|
private readonly ISemanticLog log; |
|
|
private CancellationTokenSource? currentJobToken; |
|
|
private CancellationTokenSource? currentJobToken; |
|
|
private IGrainReminder? currentReminder; |
|
|
private IGrainReminder? currentReminder; |
|
|
@ -41,19 +44,23 @@ namespace Squidex.Domain.Apps.Entities.Rules.Runner |
|
|
public DomainId? RuleId { get; set; } |
|
|
public DomainId? RuleId { get; set; } |
|
|
|
|
|
|
|
|
public string? Position { get; set; } |
|
|
public string? Position { get; set; } |
|
|
|
|
|
|
|
|
|
|
|
public bool FromSnapshots { get; set; } |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
public RuleRunnerGrain( |
|
|
public RuleRunnerGrain( |
|
|
IGrainState<State> state, |
|
|
IGrainState<State> state, |
|
|
IAppProvider appProvider, |
|
|
IAppProvider appProvider, |
|
|
|
|
|
ILocalCache localCache, |
|
|
IEventStore eventStore, |
|
|
IEventStore eventStore, |
|
|
IEventDataFormatter eventDataFormatter, |
|
|
IEventDataFormatter eventDataFormatter, |
|
|
IRuleEventRepository ruleEventRepository, |
|
|
IRuleEventRepository ruleEventRepository, |
|
|
RuleService ruleService, |
|
|
IRuleService ruleService, |
|
|
ISemanticLog log) |
|
|
ISemanticLog log) |
|
|
{ |
|
|
{ |
|
|
Guard.NotNull(state, nameof(state)); |
|
|
Guard.NotNull(state, nameof(state)); |
|
|
Guard.NotNull(appProvider, nameof(appProvider)); |
|
|
Guard.NotNull(appProvider, nameof(appProvider)); |
|
|
|
|
|
Guard.NotNull(localCache, nameof(localCache)); |
|
|
Guard.NotNull(eventStore, nameof(eventStore)); |
|
|
Guard.NotNull(eventStore, nameof(eventStore)); |
|
|
Guard.NotNull(eventDataFormatter, nameof(eventDataFormatter)); |
|
|
Guard.NotNull(eventDataFormatter, nameof(eventDataFormatter)); |
|
|
Guard.NotNull(ruleEventRepository, nameof(ruleEventRepository)); |
|
|
Guard.NotNull(ruleEventRepository, nameof(ruleEventRepository)); |
|
|
@ -62,6 +69,7 @@ namespace Squidex.Domain.Apps.Entities.Rules.Runner |
|
|
|
|
|
|
|
|
this.state = state; |
|
|
this.state = state; |
|
|
this.appProvider = appProvider; |
|
|
this.appProvider = appProvider; |
|
|
|
|
|
this.localCache = localCache; |
|
|
this.eventStore = eventStore; |
|
|
this.eventStore = eventStore; |
|
|
this.eventDataFormatter = eventDataFormatter; |
|
|
this.eventDataFormatter = eventDataFormatter; |
|
|
this.ruleEventRepository = ruleEventRepository; |
|
|
this.ruleEventRepository = ruleEventRepository; |
|
|
@ -71,9 +79,7 @@ namespace Squidex.Domain.Apps.Entities.Rules.Runner |
|
|
|
|
|
|
|
|
protected override Task OnActivateAsync(string key) |
|
|
protected override Task OnActivateAsync(string key) |
|
|
{ |
|
|
{ |
|
|
EnsureIsRunning(); |
|
|
return EnsureIsRunningAsync(true); |
|
|
|
|
|
|
|
|
return base.OnActivateAsync(key); |
|
|
|
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
public override Task OnDeactivateAsync() |
|
|
public override Task OnDeactivateAsync() |
|
|
@ -104,7 +110,7 @@ namespace Squidex.Domain.Apps.Entities.Rules.Runner |
|
|
return Task.FromResult(state.Value.RuleId); |
|
|
return Task.FromResult(state.Value.RuleId); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
public async Task RunAsync(DomainId ruleId) |
|
|
public async Task RunAsync(DomainId ruleId, bool fromSnapshots) |
|
|
{ |
|
|
{ |
|
|
if (currentJobToken != null) |
|
|
if (currentJobToken != null) |
|
|
{ |
|
|
{ |
|
|
@ -113,21 +119,33 @@ namespace Squidex.Domain.Apps.Entities.Rules.Runner |
|
|
|
|
|
|
|
|
state.Value = new State |
|
|
state.Value = new State |
|
|
{ |
|
|
{ |
|
|
RuleId = ruleId |
|
|
RuleId = ruleId, |
|
|
|
|
|
FromSnapshots = fromSnapshots |
|
|
}; |
|
|
}; |
|
|
|
|
|
|
|
|
EnsureIsRunning(); |
|
|
await EnsureIsRunningAsync(false); |
|
|
|
|
|
|
|
|
await state.WriteAsync(); |
|
|
await state.WriteAsync(); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
private void EnsureIsRunning() |
|
|
private async Task EnsureIsRunningAsync(bool continues) |
|
|
{ |
|
|
{ |
|
|
if (state.Value.RuleId.HasValue && currentJobToken == null) |
|
|
var job = state.Value; |
|
|
|
|
|
|
|
|
|
|
|
if (job.RuleId.HasValue && currentJobToken == null) |
|
|
{ |
|
|
{ |
|
|
currentJobToken = new CancellationTokenSource(); |
|
|
if (state.Value.FromSnapshots && continues) |
|
|
|
|
|
{ |
|
|
|
|
|
state.Value = new State(); |
|
|
|
|
|
|
|
|
|
|
|
await state.WriteAsync(); |
|
|
|
|
|
} |
|
|
|
|
|
else |
|
|
|
|
|
{ |
|
|
|
|
|
currentJobToken = new CancellationTokenSource(); |
|
|
|
|
|
|
|
|
Process(state.Value, currentJobToken.Token); |
|
|
Process(state.Value, currentJobToken.Token); |
|
|
|
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
@ -136,7 +154,7 @@ namespace Squidex.Domain.Apps.Entities.Rules.Runner |
|
|
ProcessAsync(job, ct).Forget(); |
|
|
ProcessAsync(job, ct).Forget(); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
private async Task ProcessAsync(State job, CancellationToken ct) |
|
|
private async Task ProcessAsync(State currentState, CancellationToken ct) |
|
|
{ |
|
|
{ |
|
|
try |
|
|
try |
|
|
{ |
|
|
{ |
|
|
@ -144,42 +162,24 @@ namespace Squidex.Domain.Apps.Entities.Rules.Runner |
|
|
|
|
|
|
|
|
var rules = await appProvider.GetRulesAsync(DomainId.Create(Key)); |
|
|
var rules = await appProvider.GetRulesAsync(DomainId.Create(Key)); |
|
|
|
|
|
|
|
|
var rule = rules.Find(x => x.Id == job.RuleId); |
|
|
var rule = rules.Find(x => x.Id == currentState.RuleId); |
|
|
|
|
|
|
|
|
if (rule == null) |
|
|
if (rule == null) |
|
|
{ |
|
|
{ |
|
|
throw new InvalidOperationException("Cannot find rule."); |
|
|
throw new InvalidOperationException("Cannot find rule."); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
await eventStore.QueryAsync(async storedEvent => |
|
|
using (localCache.StartContext()) |
|
|
{ |
|
|
{ |
|
|
try |
|
|
if (currentState.FromSnapshots && ruleService.CanCreateSnapshotEvents(rule.RuleDef)) |
|
|
{ |
|
|
|
|
|
var @event = eventDataFormatter.ParseIfKnown(storedEvent); |
|
|
|
|
|
|
|
|
|
|
|
if (@event != null) |
|
|
|
|
|
{ |
|
|
|
|
|
var jobs = await ruleService.CreateJobsAsync(rule.RuleDef, rule.Id, @event, false); |
|
|
|
|
|
|
|
|
|
|
|
foreach (var (job, _) in jobs) |
|
|
|
|
|
{ |
|
|
|
|
|
await ruleEventRepository.EnqueueAsync(job, job.Created, ct); |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
catch (Exception ex) |
|
|
|
|
|
{ |
|
|
{ |
|
|
log.LogWarning(ex, w => w |
|
|
await EnqueueFromSnapshotsAsync(rule); |
|
|
.WriteProperty("action", "runRule") |
|
|
|
|
|
.WriteProperty("status", "failedPartially3")); |
|
|
|
|
|
} |
|
|
} |
|
|
finally |
|
|
else |
|
|
{ |
|
|
{ |
|
|
job.Position = storedEvent.EventPosition; |
|
|
await EnqueueFromEventsAsync(currentState, rule, ct); |
|
|
} |
|
|
} |
|
|
|
|
|
} |
|
|
await state.WriteAsync(); |
|
|
|
|
|
}, $"^([a-z]+)\\-{Key}", job.Position, ct); |
|
|
|
|
|
} |
|
|
} |
|
|
catch (OperationCanceledException) |
|
|
catch (OperationCanceledException) |
|
|
{ |
|
|
{ |
|
|
@ -190,14 +190,14 @@ namespace Squidex.Domain.Apps.Entities.Rules.Runner |
|
|
log.LogError(ex, w => w |
|
|
log.LogError(ex, w => w |
|
|
.WriteProperty("action", "runRule") |
|
|
.WriteProperty("action", "runRule") |
|
|
.WriteProperty("status", "failed") |
|
|
.WriteProperty("status", "failed") |
|
|
.WriteProperty("ruleId", job.RuleId?.ToString())); |
|
|
.WriteProperty("ruleId", currentState.RuleId?.ToString())); |
|
|
} |
|
|
} |
|
|
finally |
|
|
finally |
|
|
{ |
|
|
{ |
|
|
if (!isStopping) |
|
|
if (!isStopping) |
|
|
{ |
|
|
{ |
|
|
job.RuleId = null; |
|
|
currentState.RuleId = null; |
|
|
job.Position = null; |
|
|
currentState.Position = null; |
|
|
|
|
|
|
|
|
await state.WriteAsync(); |
|
|
await state.WriteAsync(); |
|
|
|
|
|
|
|
|
@ -214,11 +214,77 @@ namespace Squidex.Domain.Apps.Entities.Rules.Runner |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
public Task ReceiveReminder(string reminderName, TickStatus status) |
|
|
private async Task EnqueueFromSnapshotsAsync(IRuleEntity rule) |
|
|
{ |
|
|
{ |
|
|
EnsureIsRunning(); |
|
|
var errors = 0; |
|
|
|
|
|
|
|
|
return Task.CompletedTask; |
|
|
await foreach (var (job, ex) in ruleService.CreateSnapshotJobsAsync(rule.RuleDef, rule.Id, rule.AppId.Id)) |
|
|
|
|
|
{ |
|
|
|
|
|
if (job != null) |
|
|
|
|
|
{ |
|
|
|
|
|
await ruleEventRepository.EnqueueAsync(job, ex); |
|
|
|
|
|
} |
|
|
|
|
|
else if (ex != null) |
|
|
|
|
|
{ |
|
|
|
|
|
errors++; |
|
|
|
|
|
|
|
|
|
|
|
if (errors >= MaxErrors) |
|
|
|
|
|
{ |
|
|
|
|
|
throw ex; |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
log.LogWarning(ex, w => w |
|
|
|
|
|
.WriteProperty("action", "runRule") |
|
|
|
|
|
.WriteProperty("status", "failedPartially")); |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
private async Task EnqueueFromEventsAsync(State currentState, IRuleEntity rule, CancellationToken ct) |
|
|
|
|
|
{ |
|
|
|
|
|
var errors = 0; |
|
|
|
|
|
|
|
|
|
|
|
await eventStore.QueryAsync(async storedEvent => |
|
|
|
|
|
{ |
|
|
|
|
|
try |
|
|
|
|
|
{ |
|
|
|
|
|
var @event = eventDataFormatter.ParseIfKnown(storedEvent); |
|
|
|
|
|
|
|
|
|
|
|
if (@event != null) |
|
|
|
|
|
{ |
|
|
|
|
|
var jobs = await ruleService.CreateJobsAsync(rule.RuleDef, rule.Id, @event, false); |
|
|
|
|
|
|
|
|
|
|
|
foreach (var (job, ex) in jobs) |
|
|
|
|
|
{ |
|
|
|
|
|
await ruleEventRepository.EnqueueAsync(job, ex); |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
catch (Exception ex) |
|
|
|
|
|
{ |
|
|
|
|
|
errors++; |
|
|
|
|
|
|
|
|
|
|
|
if (errors >= MaxErrors) |
|
|
|
|
|
{ |
|
|
|
|
|
throw; |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
log.LogWarning(ex, w => w |
|
|
|
|
|
.WriteProperty("action", "runRule") |
|
|
|
|
|
.WriteProperty("status", "failedPartially")); |
|
|
|
|
|
} |
|
|
|
|
|
finally |
|
|
|
|
|
{ |
|
|
|
|
|
currentState.Position = storedEvent.EventPosition; |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
await state.WriteAsync(); |
|
|
|
|
|
}, $"^([a-z]+)\\-{Key}", currentState.Position, ct); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
public Task ReceiveReminder(string reminderName, TickStatus status) |
|
|
|
|
|
{ |
|
|
|
|
|
return EnsureIsRunningAsync(true); |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|