From 8ba6ef82461ed86c4f9370fcce7236f50a62d4f4 Mon Sep 17 00:00:00 2001 From: Sebastian Stehle Date: Sun, 19 Nov 2017 00:19:52 +0100 Subject: [PATCH] Grain for rule dequeuer. --- .../Orleans/Grains/IRuleDequeuerGrain.cs | 18 +++++ .../Implementation/RuleDequeuerGrain.cs} | 75 +++++++++++++------ .../Rules/Orleans/RuleDequeuerBootstrap.cs | 32 ++++++++ src/Squidex/Config/Domain/ReadServices.cs | 3 - src/Squidex/Config/Orleans/SiloServices.cs | 2 + ...euerTests.cs => RuleDequeuerGrainTests.cs} | 32 ++++++-- 6 files changed, 127 insertions(+), 35 deletions(-) create mode 100644 src/Squidex.Domain.Apps.Read/Rules/Orleans/Grains/IRuleDequeuerGrain.cs rename src/Squidex.Domain.Apps.Read/Rules/{RuleDequeuer.cs => Orleans/Grains/Implementation/RuleDequeuerGrain.cs} (66%) create mode 100644 src/Squidex.Domain.Apps.Read/Rules/Orleans/RuleDequeuerBootstrap.cs rename tests/Squidex.Domain.Apps.Read.Tests/Rules/{RuleDequeuerTests.cs => RuleDequeuerGrainTests.cs} (80%) diff --git a/src/Squidex.Domain.Apps.Read/Rules/Orleans/Grains/IRuleDequeuerGrain.cs b/src/Squidex.Domain.Apps.Read/Rules/Orleans/Grains/IRuleDequeuerGrain.cs new file mode 100644 index 000000000..bf30781c0 --- /dev/null +++ b/src/Squidex.Domain.Apps.Read/Rules/Orleans/Grains/IRuleDequeuerGrain.cs @@ -0,0 +1,18 @@ +// ========================================================================== +// IRuleDequeuerGrain.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +using System.Threading.Tasks; +using Orleans; + +namespace Squidex.Domain.Apps.Read.Rules.Orleans.Grains +{ + public interface IRuleDequeuerGrain : IGrainWithStringKey + { + Task ActivateAsync(); + } +} diff --git a/src/Squidex.Domain.Apps.Read/Rules/RuleDequeuer.cs b/src/Squidex.Domain.Apps.Read/Rules/Orleans/Grains/Implementation/RuleDequeuerGrain.cs similarity index 66% rename from src/Squidex.Domain.Apps.Read/Rules/RuleDequeuer.cs rename to src/Squidex.Domain.Apps.Read/Rules/Orleans/Grains/Implementation/RuleDequeuerGrain.cs index 3f6f6e7c6..32c462c82 100644 --- a/src/Squidex.Domain.Apps.Read/Rules/RuleDequeuer.cs +++ b/src/Squidex.Domain.Apps.Read/Rules/Orleans/Grains/Implementation/RuleDequeuerGrain.cs @@ -1,5 +1,5 @@ // ========================================================================== -// RuleDequeuer.cs +// RuleDequeuerGrain.cs // Squidex Headless CMS // ========================================================================== // Copyright (c) Squidex Group @@ -11,25 +11,35 @@ using System.Threading; using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; using NodaTime; +using Orleans; +using Orleans.Core; +using Orleans.Runtime; using Squidex.Domain.Apps.Core.HandleRules; using Squidex.Domain.Apps.Read.Rules.Repositories; using Squidex.Infrastructure; using Squidex.Infrastructure.Log; -using Squidex.Infrastructure.Timers; +using Squidex.Infrastructure.Tasks; -namespace Squidex.Domain.Apps.Read.Rules +namespace Squidex.Domain.Apps.Read.Rules.Orleans.Grains.Implementation { - public sealed class RuleDequeuer : DisposableObjectBase, IExternalSystem + public class RuleDequeuerGrain : Grain, IRuleDequeuerGrain, IRemindable { - private readonly ActionBlock requestBlock; - private readonly TransformBlock blockBlock; private readonly IRuleEventRepository ruleEventRepository; private readonly RuleService ruleService; - private readonly CompletionTimer timer; private readonly IClock clock; private readonly ISemanticLog log; + private ActionBlock requestBlock; + private TransformBlock blockBlock; - public RuleDequeuer(RuleService ruleService, IRuleEventRepository ruleEventRepository, ISemanticLog log, IClock clock) + public RuleDequeuerGrain(RuleService ruleService, IRuleEventRepository ruleEventRepository, ISemanticLog log, IClock clock) + : this(ruleService, ruleEventRepository, log, clock, null, null) + { + } + + protected RuleDequeuerGrain(RuleService ruleService, IRuleEventRepository ruleEventRepository, ISemanticLog log, IClock clock, + IGrainIdentity identity, + IGrainRuntime runtime) + : base(identity, runtime) { Guard.NotNull(ruleEventRepository, nameof(ruleEventRepository)); Guard.NotNull(ruleService, nameof(ruleService)); @@ -42,47 +52,64 @@ namespace Squidex.Domain.Apps.Read.Rules this.clock = clock; this.log = log; + } + + public override Task OnActivateAsync() + { + DelayDeactivation(TimeSpan.FromDays(1)); + + RegisterOrUpdateReminder("Default", TimeSpan.Zero, TimeSpan.FromMinutes(10)); + RegisterTimer(x => QueryAsync(), null, TimeSpan.Zero, TimeSpan.FromSeconds(10)); requestBlock = new ActionBlock(MakeRequestAsync, - new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 32, BoundedCapacity = 32 }); + new ExecutionDataflowBlockOptions + { + TaskScheduler = TaskScheduler.Current, + MaxMessagesPerTask = 1, + MaxDegreeOfParallelism = 32, + BoundedCapacity = 32 + }); blockBlock = new TransformBlock(x => BlockAsync(x), - new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, BoundedCapacity = 1 }); + new ExecutionDataflowBlockOptions + { + TaskScheduler = TaskScheduler.Current, + MaxMessagesPerTask = 1, + MaxDegreeOfParallelism = 1, + BoundedCapacity = 1 + }); blockBlock.LinkTo(requestBlock, new DataflowLinkOptions { PropagateCompletion = true }); - timer = new CompletionTimer(5000, QueryAsync); + return base.OnActivateAsync(); } - protected override void DisposeObject(bool disposing) + public Task ReceiveReminder(string reminderName, TickStatus status) { - if (disposing) - { - timer.StopAsync().Wait(); - - blockBlock.Complete(); - requestBlock.Completion.Wait(); - } + return QueryAsync(); } - public void Connect() + public override Task OnDeactivateAsync() { + blockBlock.Complete(); + + return requestBlock.Completion; } - public void Next() + public Task ActivateAsync() { - timer.SkipCurrentDelay(); + return TaskHelper.Done; } - private async Task QueryAsync(CancellationToken cancellationToken) + public async Task QueryAsync() { try { var now = clock.GetCurrentInstant(); - await ruleEventRepository.QueryPendingAsync(now, blockBlock.SendAsync, cancellationToken); + await ruleEventRepository.QueryPendingAsync(now, blockBlock.SendAsync, CancellationToken.None); } catch (Exception ex) { diff --git a/src/Squidex.Domain.Apps.Read/Rules/Orleans/RuleDequeuerBootstrap.cs b/src/Squidex.Domain.Apps.Read/Rules/Orleans/RuleDequeuerBootstrap.cs new file mode 100644 index 000000000..636e8acf9 --- /dev/null +++ b/src/Squidex.Domain.Apps.Read/Rules/Orleans/RuleDequeuerBootstrap.cs @@ -0,0 +1,32 @@ +// ========================================================================== +// RuleDequeuerBootstrap.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +using System.Threading.Tasks; +using Orleans.Providers; +using Squidex.Domain.Apps.Read.Rules.Orleans.Grains; +using Squidex.Infrastructure.Tasks; + +namespace Squidex.Domain.Apps.Read.Rules.Orleans +{ + public sealed class RuleDequeuerBootstrap : IBootstrapProvider + { + public string Name { get; private set; } + + public Task Init(string name, IProviderRuntime providerRuntime, IProviderConfiguration config) + { + var grain = providerRuntime.GrainFactory.GetGrain("Default"); + + return grain.ActivateAsync(); + } + + public Task Close() + { + return TaskHelper.Done; + } + } +} diff --git a/src/Squidex/Config/Domain/ReadServices.cs b/src/Squidex/Config/Domain/ReadServices.cs index 3c60cef01..b5c5649fe 100644 --- a/src/Squidex/Config/Domain/ReadServices.cs +++ b/src/Squidex/Config/Domain/ReadServices.cs @@ -75,9 +75,6 @@ namespace Squidex.Config.Domain services.AddSingletonAs() .As(); - services.AddSingletonAs() - .As(); - services.AddSingletonAs() .As(); diff --git a/src/Squidex/Config/Orleans/SiloServices.cs b/src/Squidex/Config/Orleans/SiloServices.cs index b9e94244c..bfa043226 100644 --- a/src/Squidex/Config/Orleans/SiloServices.cs +++ b/src/Squidex/Config/Orleans/SiloServices.cs @@ -11,6 +11,7 @@ using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Orleans; using Orleans.Runtime.Configuration; +using Squidex.Domain.Apps.Read.Rules.Orleans; using Squidex.Infrastructure.CQRS.Events.Orleans.Grains; namespace Squidex.Config.Orleans @@ -31,6 +32,7 @@ namespace Squidex.Config.Orleans if (clusterConfiguration != null) { clusterConfiguration.Globals.RegisterBootstrapProvider("EventConsumers"); + clusterConfiguration.Globals.RegisterBootstrapProvider("RuleDequeuer"); } config.ConfigureByOption("store:type", new Options diff --git a/tests/Squidex.Domain.Apps.Read.Tests/Rules/RuleDequeuerTests.cs b/tests/Squidex.Domain.Apps.Read.Tests/Rules/RuleDequeuerGrainTests.cs similarity index 80% rename from tests/Squidex.Domain.Apps.Read.Tests/Rules/RuleDequeuerTests.cs rename to tests/Squidex.Domain.Apps.Read.Tests/Rules/RuleDequeuerGrainTests.cs index bfa7af426..0cd9684b1 100644 --- a/tests/Squidex.Domain.Apps.Read.Tests/Rules/RuleDequeuerTests.cs +++ b/tests/Squidex.Domain.Apps.Read.Tests/Rules/RuleDequeuerGrainTests.cs @@ -1,5 +1,5 @@ // ========================================================================== -// RuleDequeuerTests.cs +// RuleDequeuerGrainTests.cs // Squidex Headless CMS // ========================================================================== // Copyright (c) Squidex Group @@ -11,8 +11,11 @@ using System.Threading; using System.Threading.Tasks; using FakeItEasy; using NodaTime; +using Orleans.Core; +using Orleans.Runtime; using Squidex.Domain.Apps.Core.HandleRules; using Squidex.Domain.Apps.Core.Rules; +using Squidex.Domain.Apps.Read.Rules.Orleans.Grains.Implementation; using Squidex.Domain.Apps.Read.Rules.Repositories; using Squidex.Infrastructure.Log; using Xunit; @@ -21,7 +24,7 @@ using Xunit; namespace Squidex.Domain.Apps.Read.Rules { - public class RuleDequeuerTests + public class RuleDequeuerGrainTests { private readonly IClock clock = A.Fake(); private readonly ISemanticLog log = A.Fake(); @@ -30,7 +33,17 @@ namespace Squidex.Domain.Apps.Read.Rules private readonly RuleService ruleService = A.Fake(); private readonly Instant now = SystemClock.Instance.GetCurrentInstant(); - public RuleDequeuerTests() + public sealed class MyRuleDequeuerGrain : RuleDequeuerGrain + { + public MyRuleDequeuerGrain(RuleService ruleService, IRuleEventRepository ruleEventRepository, ISemanticLog log, IClock clock, + IGrainIdentity identity, + IGrainRuntime runtime) + : base(ruleService, ruleEventRepository, log, clock, identity, runtime) + { + } + } + + public RuleDequeuerGrainTests() { A.CallTo(() => clock.GetCurrentInstant()).Returns(now); } @@ -42,7 +55,7 @@ namespace Squidex.Domain.Apps.Read.Rules [InlineData(2, 360, RuleResult.Failed, RuleJobResult.Retry)] [InlineData(3, 720, RuleResult.Failed, RuleJobResult.Retry)] [InlineData(4, 0, RuleResult.Failed, RuleJobResult.Failed)] - public void Should_set_next_attempt_based_on_num_calls(int calls, int minutes, RuleResult result, RuleJobResult jobResult) + public async Task Should_set_next_attempt_based_on_num_calls(int calls, int minutes, RuleResult result, RuleJobResult jobResult) { var actionData = new RuleJobData(); var actionName = "MyAction"; @@ -55,14 +68,17 @@ namespace Squidex.Domain.Apps.Read.Rules SetupSender(@event, requestDump, result, requestElapsed); SetupPendingEvents(@event); - var sut = new RuleDequeuer( + var sut = new MyRuleDequeuerGrain( ruleService, ruleEventRepository, log, - clock); + clock, + A.Fake(), + A.Fake()); - sut.Next(); - sut.Dispose(); + await sut.OnActivateAsync(); + await sut.QueryAsync(); + await sut.OnDeactivateAsync(); Instant? nextCall = null;