From a28ff4600283a2f000d9643f09e0aa4c3f4b17ad Mon Sep 17 00:00:00 2001 From: Sebastian Stehle Date: Sun, 4 Feb 2018 17:23:07 +0100 Subject: [PATCH] PartitionedActionBlock used. --- src/Squidex.Domain.Apps.Core.Model/Rules/RuleJob.cs | 2 ++ .../HandleRules/Actions/AlgoliaActionHandler.cs | 7 ++----- .../HandleRules/RuleService.cs | 6 ++++++ src/Squidex.Domain.Apps.Entities/Rules/RuleDequeuer.cs | 5 +++-- src/Squidex.Infrastructure/Tasks/PartitionedActionBlock.cs | 2 +- 5 files changed, 14 insertions(+), 8 deletions(-) diff --git a/src/Squidex.Domain.Apps.Core.Model/Rules/RuleJob.cs b/src/Squidex.Domain.Apps.Core.Model/Rules/RuleJob.cs index 030ff718f..963832075 100644 --- a/src/Squidex.Domain.Apps.Core.Model/Rules/RuleJob.cs +++ b/src/Squidex.Domain.Apps.Core.Model/Rules/RuleJob.cs @@ -16,6 +16,8 @@ namespace Squidex.Domain.Apps.Core.Rules public Guid AppId { get; set; } + public Guid AggregateId { get; set; } + public string EventName { get; set; } public string ActionName { get; set; } diff --git a/src/Squidex.Domain.Apps.Core.Operations/HandleRules/Actions/AlgoliaActionHandler.cs b/src/Squidex.Domain.Apps.Core.Operations/HandleRules/Actions/AlgoliaActionHandler.cs index fe9c8f7bd..3a2194a9c 100644 --- a/src/Squidex.Domain.Apps.Core.Operations/HandleRules/Actions/AlgoliaActionHandler.cs +++ b/src/Squidex.Domain.Apps.Core.Operations/HandleRules/Actions/AlgoliaActionHandler.cs @@ -10,6 +10,7 @@ using System.Threading.Tasks; using Algolia.Search; using Newtonsoft.Json; using Newtonsoft.Json.Linq; +using Squidex.Domain.Apps.Core.Contents; using Squidex.Domain.Apps.Core.Rules; using Squidex.Domain.Apps.Core.Rules.Actions; using Squidex.Domain.Apps.Events; @@ -60,11 +61,6 @@ namespace Squidex.Domain.Apps.Core.HandleRules.Actions { case ContentCreated created: { - /* - * Do not add the status property here. Sometimes the published event is faster and therefore - * a content item would never become published. We have to find a way to improve the scheduling - * of rules first before we can fix it. - */ ruleDescription = $"Add entry to Algolia index: {action.IndexName}"; ruleData["Content"] = new JObject( new JProperty("id", contentEvent.ContentId), @@ -72,6 +68,7 @@ namespace Squidex.Domain.Apps.Core.HandleRules.Actions new JProperty("createdBy", created.Actor.ToString()), new JProperty("lastModified", timestamp), new JProperty("lastModifiedBy", created.Actor.ToString()), + new JProperty("status", Status.Draft.ToString()), new JProperty("data", formatter.ToRouteData(created.Data))); break; } diff --git a/src/Squidex.Domain.Apps.Core.Operations/HandleRules/RuleService.cs b/src/Squidex.Domain.Apps.Core.Operations/HandleRules/RuleService.cs index dbae6c8e9..ecac1cf7a 100644 --- a/src/Squidex.Domain.Apps.Core.Operations/HandleRules/RuleService.cs +++ b/src/Squidex.Domain.Apps.Core.Operations/HandleRules/RuleService.cs @@ -87,11 +87,17 @@ namespace Squidex.Domain.Apps.Core.HandleRules @event.Headers.Timestamp() : now; + var aggregateId = + @event.Headers.Contains(CommonHeaders.AggregateId) ? + @event.Headers.AggregateId() : + Guid.NewGuid(); + var job = new RuleJob { JobId = Guid.NewGuid(), ActionName = actionName, ActionData = actionData.Data, + AggregateId = aggregateId, AppId = appEvent.AppId.Id, Created = now, EventName = eventName, diff --git a/src/Squidex.Domain.Apps.Entities/Rules/RuleDequeuer.cs b/src/Squidex.Domain.Apps.Entities/Rules/RuleDequeuer.cs index 8150f7013..32d0096bb 100644 --- a/src/Squidex.Domain.Apps.Entities/Rules/RuleDequeuer.cs +++ b/src/Squidex.Domain.Apps.Entities/Rules/RuleDequeuer.cs @@ -16,13 +16,14 @@ using Squidex.Domain.Apps.Core.Rules; using Squidex.Domain.Apps.Entities.Rules.Repositories; using Squidex.Infrastructure; using Squidex.Infrastructure.Log; +using Squidex.Infrastructure.Tasks; using Squidex.Infrastructure.Timers; namespace Squidex.Domain.Apps.Entities.Rules { public class RuleDequeuer : DisposableObjectBase, IRunnable { - private readonly ActionBlock requestBlock; + private readonly ITargetBlock requestBlock; private readonly IRuleEventRepository ruleEventRepository; private readonly RuleService ruleService; private readonly CompletionTimer timer; @@ -45,7 +46,7 @@ namespace Squidex.Domain.Apps.Entities.Rules this.log = log; requestBlock = - new ActionBlock(HandleAsync, + new PartitionedActionBlock(HandleAsync, x => x.Job.AggregateId.GetHashCode(), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 32, BoundedCapacity = 32 }); timer = new CompletionTimer(5000, QueryAsync); diff --git a/src/Squidex.Infrastructure/Tasks/PartitionedActionBlock.cs b/src/Squidex.Infrastructure/Tasks/PartitionedActionBlock.cs index 3f6137cb7..16fd0c779 100644 --- a/src/Squidex.Infrastructure/Tasks/PartitionedActionBlock.cs +++ b/src/Squidex.Infrastructure/Tasks/PartitionedActionBlock.cs @@ -66,7 +66,7 @@ namespace Squidex.Infrastructure.Tasks distributor = new ActionBlock(x => { - var partition = partitioner(x) % workers.Length; + var partition = Math.Abs(partitioner(x)) % workers.Length; return workers[partition].SendAsync(x); }, distributorOption);