Browse Source

Grain for rule dequeuer.

pull/169/head
Sebastian Stehle 8 years ago
parent
commit
8ba6ef8246
  1. 18
      src/Squidex.Domain.Apps.Read/Rules/Orleans/Grains/IRuleDequeuerGrain.cs
  2. 75
      src/Squidex.Domain.Apps.Read/Rules/Orleans/Grains/Implementation/RuleDequeuerGrain.cs
  3. 32
      src/Squidex.Domain.Apps.Read/Rules/Orleans/RuleDequeuerBootstrap.cs
  4. 3
      src/Squidex/Config/Domain/ReadServices.cs
  5. 2
      src/Squidex/Config/Orleans/SiloServices.cs
  6. 32
      tests/Squidex.Domain.Apps.Read.Tests/Rules/RuleDequeuerGrainTests.cs

18
src/Squidex.Domain.Apps.Read/Rules/Orleans/Grains/IRuleDequeuerGrain.cs

@ -0,0 +1,18 @@
// ==========================================================================
// IRuleDequeuerGrain.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System.Threading.Tasks;
using Orleans;
namespace Squidex.Domain.Apps.Read.Rules.Orleans.Grains
{
public interface IRuleDequeuerGrain : IGrainWithStringKey
{
Task ActivateAsync();
}
}

75
src/Squidex.Domain.Apps.Read/Rules/RuleDequeuer.cs → src/Squidex.Domain.Apps.Read/Rules/Orleans/Grains/Implementation/RuleDequeuerGrain.cs

@ -1,5 +1,5 @@
// ==========================================================================
// RuleDequeuer.cs
// RuleDequeuerGrain.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
@ -11,25 +11,35 @@ using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using NodaTime;
using Orleans;
using Orleans.Core;
using Orleans.Runtime;
using Squidex.Domain.Apps.Core.HandleRules;
using Squidex.Domain.Apps.Read.Rules.Repositories;
using Squidex.Infrastructure;
using Squidex.Infrastructure.Log;
using Squidex.Infrastructure.Timers;
using Squidex.Infrastructure.Tasks;
namespace Squidex.Domain.Apps.Read.Rules
namespace Squidex.Domain.Apps.Read.Rules.Orleans.Grains.Implementation
{
public sealed class RuleDequeuer : DisposableObjectBase, IExternalSystem
public class RuleDequeuerGrain : Grain, IRuleDequeuerGrain, IRemindable
{
private readonly ActionBlock<IRuleEventEntity> requestBlock;
private readonly TransformBlock<IRuleEventEntity, IRuleEventEntity> blockBlock;
private readonly IRuleEventRepository ruleEventRepository;
private readonly RuleService ruleService;
private readonly CompletionTimer timer;
private readonly IClock clock;
private readonly ISemanticLog log;
private ActionBlock<IRuleEventEntity> requestBlock;
private TransformBlock<IRuleEventEntity, IRuleEventEntity> blockBlock;
public RuleDequeuer(RuleService ruleService, IRuleEventRepository ruleEventRepository, ISemanticLog log, IClock clock)
public RuleDequeuerGrain(RuleService ruleService, IRuleEventRepository ruleEventRepository, ISemanticLog log, IClock clock)
: this(ruleService, ruleEventRepository, log, clock, null, null)
{
}
protected RuleDequeuerGrain(RuleService ruleService, IRuleEventRepository ruleEventRepository, ISemanticLog log, IClock clock,
IGrainIdentity identity,
IGrainRuntime runtime)
: base(identity, runtime)
{
Guard.NotNull(ruleEventRepository, nameof(ruleEventRepository));
Guard.NotNull(ruleService, nameof(ruleService));
@ -42,47 +52,64 @@ namespace Squidex.Domain.Apps.Read.Rules
this.clock = clock;
this.log = log;
}
public override Task OnActivateAsync()
{
DelayDeactivation(TimeSpan.FromDays(1));
RegisterOrUpdateReminder("Default", TimeSpan.Zero, TimeSpan.FromMinutes(10));
RegisterTimer(x => QueryAsync(), null, TimeSpan.Zero, TimeSpan.FromSeconds(10));
requestBlock =
new ActionBlock<IRuleEventEntity>(MakeRequestAsync,
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 32, BoundedCapacity = 32 });
new ExecutionDataflowBlockOptions
{
TaskScheduler = TaskScheduler.Current,
MaxMessagesPerTask = 1,
MaxDegreeOfParallelism = 32,
BoundedCapacity = 32
});
blockBlock =
new TransformBlock<IRuleEventEntity, IRuleEventEntity>(x => BlockAsync(x),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, BoundedCapacity = 1 });
new ExecutionDataflowBlockOptions
{
TaskScheduler = TaskScheduler.Current,
MaxMessagesPerTask = 1,
MaxDegreeOfParallelism = 1,
BoundedCapacity = 1
});
blockBlock.LinkTo(requestBlock, new DataflowLinkOptions { PropagateCompletion = true });
timer = new CompletionTimer(5000, QueryAsync);
return base.OnActivateAsync();
}
protected override void DisposeObject(bool disposing)
{
if (disposing)
public Task ReceiveReminder(string reminderName, TickStatus status)
{
timer.StopAsync().Wait();
blockBlock.Complete();
requestBlock.Completion.Wait();
}
return QueryAsync();
}
public void Connect()
public override Task OnDeactivateAsync()
{
blockBlock.Complete();
return requestBlock.Completion;
}
public void Next()
public Task ActivateAsync()
{
timer.SkipCurrentDelay();
return TaskHelper.Done;
}
private async Task QueryAsync(CancellationToken cancellationToken)
public async Task QueryAsync()
{
try
{
var now = clock.GetCurrentInstant();
await ruleEventRepository.QueryPendingAsync(now, blockBlock.SendAsync, cancellationToken);
await ruleEventRepository.QueryPendingAsync(now, blockBlock.SendAsync, CancellationToken.None);
}
catch (Exception ex)
{

32
src/Squidex.Domain.Apps.Read/Rules/Orleans/RuleDequeuerBootstrap.cs

@ -0,0 +1,32 @@
// ==========================================================================
// RuleDequeuerBootstrap.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System.Threading.Tasks;
using Orleans.Providers;
using Squidex.Domain.Apps.Read.Rules.Orleans.Grains;
using Squidex.Infrastructure.Tasks;
namespace Squidex.Domain.Apps.Read.Rules.Orleans
{
public sealed class RuleDequeuerBootstrap : IBootstrapProvider
{
public string Name { get; private set; }
public Task Init(string name, IProviderRuntime providerRuntime, IProviderConfiguration config)
{
var grain = providerRuntime.GrainFactory.GetGrain<IRuleDequeuerGrain>("Default");
return grain.ActivateAsync();
}
public Task Close()
{
return TaskHelper.Done;
}
}
}

3
src/Squidex/Config/Domain/ReadServices.cs

@ -75,9 +75,6 @@ namespace Squidex.Config.Domain
services.AddSingletonAs<OrleansEventNotifier>()
.As<IEventNotifier>();
services.AddSingletonAs<RuleDequeuer>()
.As<IExternalSystem>();
services.AddSingletonAs<OrleansAppProvider>()
.As<IAppProvider>();

2
src/Squidex/Config/Orleans/SiloServices.cs

@ -11,6 +11,7 @@ using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Orleans;
using Orleans.Runtime.Configuration;
using Squidex.Domain.Apps.Read.Rules.Orleans;
using Squidex.Infrastructure.CQRS.Events.Orleans.Grains;
namespace Squidex.Config.Orleans
@ -31,6 +32,7 @@ namespace Squidex.Config.Orleans
if (clusterConfiguration != null)
{
clusterConfiguration.Globals.RegisterBootstrapProvider<EventConsumerBootstrap>("EventConsumers");
clusterConfiguration.Globals.RegisterBootstrapProvider<RuleDequeuerBootstrap>("RuleDequeuer");
}
config.ConfigureByOption("store:type", new Options

32
tests/Squidex.Domain.Apps.Read.Tests/Rules/RuleDequeuerTests.cs → tests/Squidex.Domain.Apps.Read.Tests/Rules/RuleDequeuerGrainTests.cs

@ -1,5 +1,5 @@
// ==========================================================================
// RuleDequeuerTests.cs
// RuleDequeuerGrainTests.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
@ -11,8 +11,11 @@ using System.Threading;
using System.Threading.Tasks;
using FakeItEasy;
using NodaTime;
using Orleans.Core;
using Orleans.Runtime;
using Squidex.Domain.Apps.Core.HandleRules;
using Squidex.Domain.Apps.Core.Rules;
using Squidex.Domain.Apps.Read.Rules.Orleans.Grains.Implementation;
using Squidex.Domain.Apps.Read.Rules.Repositories;
using Squidex.Infrastructure.Log;
using Xunit;
@ -21,7 +24,7 @@ using Xunit;
namespace Squidex.Domain.Apps.Read.Rules
{
public class RuleDequeuerTests
public class RuleDequeuerGrainTests
{
private readonly IClock clock = A.Fake<IClock>();
private readonly ISemanticLog log = A.Fake<ISemanticLog>();
@ -30,7 +33,17 @@ namespace Squidex.Domain.Apps.Read.Rules
private readonly RuleService ruleService = A.Fake<RuleService>();
private readonly Instant now = SystemClock.Instance.GetCurrentInstant();
public RuleDequeuerTests()
public sealed class MyRuleDequeuerGrain : RuleDequeuerGrain
{
public MyRuleDequeuerGrain(RuleService ruleService, IRuleEventRepository ruleEventRepository, ISemanticLog log, IClock clock,
IGrainIdentity identity,
IGrainRuntime runtime)
: base(ruleService, ruleEventRepository, log, clock, identity, runtime)
{
}
}
public RuleDequeuerGrainTests()
{
A.CallTo(() => clock.GetCurrentInstant()).Returns(now);
}
@ -42,7 +55,7 @@ namespace Squidex.Domain.Apps.Read.Rules
[InlineData(2, 360, RuleResult.Failed, RuleJobResult.Retry)]
[InlineData(3, 720, RuleResult.Failed, RuleJobResult.Retry)]
[InlineData(4, 0, RuleResult.Failed, RuleJobResult.Failed)]
public void Should_set_next_attempt_based_on_num_calls(int calls, int minutes, RuleResult result, RuleJobResult jobResult)
public async Task Should_set_next_attempt_based_on_num_calls(int calls, int minutes, RuleResult result, RuleJobResult jobResult)
{
var actionData = new RuleJobData();
var actionName = "MyAction";
@ -55,14 +68,17 @@ namespace Squidex.Domain.Apps.Read.Rules
SetupSender(@event, requestDump, result, requestElapsed);
SetupPendingEvents(@event);
var sut = new RuleDequeuer(
var sut = new MyRuleDequeuerGrain(
ruleService,
ruleEventRepository,
log,
clock);
clock,
A.Fake<IGrainIdentity>(),
A.Fake<IGrainRuntime>());
sut.Next();
sut.Dispose();
await sut.OnActivateAsync();
await sut.QueryAsync();
await sut.OnDeactivateAsync();
Instant? nextCall = null;
Loading…
Cancel
Save