Browse Source

PartitionedActionBlock used.

pull/242/head
Sebastian Stehle 8 years ago
parent
commit
a28ff46002
  1. 2
      src/Squidex.Domain.Apps.Core.Model/Rules/RuleJob.cs
  2. 7
      src/Squidex.Domain.Apps.Core.Operations/HandleRules/Actions/AlgoliaActionHandler.cs
  3. 6
      src/Squidex.Domain.Apps.Core.Operations/HandleRules/RuleService.cs
  4. 5
      src/Squidex.Domain.Apps.Entities/Rules/RuleDequeuer.cs
  5. 2
      src/Squidex.Infrastructure/Tasks/PartitionedActionBlock.cs

2
src/Squidex.Domain.Apps.Core.Model/Rules/RuleJob.cs

@ -16,6 +16,8 @@ namespace Squidex.Domain.Apps.Core.Rules
public Guid AppId { get; set; }
public Guid AggregateId { get; set; }
public string EventName { get; set; }
public string ActionName { get; set; }

7
src/Squidex.Domain.Apps.Core.Operations/HandleRules/Actions/AlgoliaActionHandler.cs

@ -10,6 +10,7 @@ using System.Threading.Tasks;
using Algolia.Search;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using Squidex.Domain.Apps.Core.Contents;
using Squidex.Domain.Apps.Core.Rules;
using Squidex.Domain.Apps.Core.Rules.Actions;
using Squidex.Domain.Apps.Events;
@ -60,11 +61,6 @@ namespace Squidex.Domain.Apps.Core.HandleRules.Actions
{
case ContentCreated created:
{
/*
* Do not add the status property here. Sometimes the published event is faster and therefore
* a content item would never become published. We have to find a way to improve the scheduling
* of rules first before we can fix it.
*/
ruleDescription = $"Add entry to Algolia index: {action.IndexName}";
ruleData["Content"] = new JObject(
new JProperty("id", contentEvent.ContentId),
@ -72,6 +68,7 @@ namespace Squidex.Domain.Apps.Core.HandleRules.Actions
new JProperty("createdBy", created.Actor.ToString()),
new JProperty("lastModified", timestamp),
new JProperty("lastModifiedBy", created.Actor.ToString()),
new JProperty("status", Status.Draft.ToString()),
new JProperty("data", formatter.ToRouteData(created.Data)));
break;
}

6
src/Squidex.Domain.Apps.Core.Operations/HandleRules/RuleService.cs

@ -87,11 +87,17 @@ namespace Squidex.Domain.Apps.Core.HandleRules
@event.Headers.Timestamp() :
now;
var aggregateId =
@event.Headers.Contains(CommonHeaders.AggregateId) ?
@event.Headers.AggregateId() :
Guid.NewGuid();
var job = new RuleJob
{
JobId = Guid.NewGuid(),
ActionName = actionName,
ActionData = actionData.Data,
AggregateId = aggregateId,
AppId = appEvent.AppId.Id,
Created = now,
EventName = eventName,

5
src/Squidex.Domain.Apps.Entities/Rules/RuleDequeuer.cs

@ -16,13 +16,14 @@ using Squidex.Domain.Apps.Core.Rules;
using Squidex.Domain.Apps.Entities.Rules.Repositories;
using Squidex.Infrastructure;
using Squidex.Infrastructure.Log;
using Squidex.Infrastructure.Tasks;
using Squidex.Infrastructure.Timers;
namespace Squidex.Domain.Apps.Entities.Rules
{
public class RuleDequeuer : DisposableObjectBase, IRunnable
{
private readonly ActionBlock<IRuleEventEntity> requestBlock;
private readonly ITargetBlock<IRuleEventEntity> requestBlock;
private readonly IRuleEventRepository ruleEventRepository;
private readonly RuleService ruleService;
private readonly CompletionTimer timer;
@ -45,7 +46,7 @@ namespace Squidex.Domain.Apps.Entities.Rules
this.log = log;
requestBlock =
new ActionBlock<IRuleEventEntity>(HandleAsync,
new PartitionedActionBlock<IRuleEventEntity>(HandleAsync, x => x.Job.AggregateId.GetHashCode(),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 32, BoundedCapacity = 32 });
timer = new CompletionTimer(5000, QueryAsync);

2
src/Squidex.Infrastructure/Tasks/PartitionedActionBlock.cs

@ -66,7 +66,7 @@ namespace Squidex.Infrastructure.Tasks
distributor = new ActionBlock<TInput>(x =>
{
var partition = partitioner(x) % workers.Length;
var partition = Math.Abs(partitioner(x)) % workers.Length;
return workers[partition].SendAsync(x);
}, distributorOption);

Loading…
Cancel
Save