diff --git a/src/Squidex.Infrastructure/Tasks/PartitionedActionBlock.cs b/src/Squidex.Infrastructure/Tasks/PartitionedActionBlock.cs new file mode 100644 index 000000000..3f6137cb7 --- /dev/null +++ b/src/Squidex.Infrastructure/Tasks/PartitionedActionBlock.cs @@ -0,0 +1,110 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschraenkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +using System; +using System.Linq; +using System.Threading.Tasks; +using System.Threading.Tasks.Dataflow; +using Squidex.Infrastructure.Reflection; + +namespace Squidex.Infrastructure.Tasks +{ + public class PartitionedActionBlock : ITargetBlock + { + private readonly ITargetBlock distributor; + private readonly ActionBlock[] workers; + + public Task Completion + { + get { return Task.WhenAll(workers.Select(x => x.Completion)); } + } + + public PartitionedActionBlock(Action action, Func partitioner) + : this (ToAsync(action), partitioner, new ExecutionDataflowBlockOptions()) + { + } + + public PartitionedActionBlock(Func action, Func partitioner) + : this(action, partitioner, new ExecutionDataflowBlockOptions()) + { + } + + public PartitionedActionBlock(Action action, Func partitioner, ExecutionDataflowBlockOptions dataflowBlockOptions) + : this(ToAsync(action), partitioner, dataflowBlockOptions) + { + } + + public PartitionedActionBlock(Func action, Func partitioner, ExecutionDataflowBlockOptions dataflowBlockOptions) + { + Guard.NotNull(action, nameof(action)); + Guard.NotNull(partitioner, nameof(partitioner)); + Guard.NotNull(dataflowBlockOptions, nameof(dataflowBlockOptions)); + Guard.GreaterThan(dataflowBlockOptions.MaxDegreeOfParallelism, 1, nameof(dataflowBlockOptions.MaxDegreeOfParallelism)); + + workers = new ActionBlock[dataflowBlockOptions.MaxDegreeOfParallelism]; + + for (var i = 0; i < dataflowBlockOptions.MaxDegreeOfParallelism; i++) + { + var workerOption = SimpleMapper.Map(dataflowBlockOptions, new ExecutionDataflowBlockOptions()); + + workerOption.MaxDegreeOfParallelism = 1; + workerOption.MaxMessagesPerTask = 1; + + workers[i] = new ActionBlock(action, workerOption); + } + + var distributorOption = new ExecutionDataflowBlockOptions + { + MaxDegreeOfParallelism = 1, + MaxMessagesPerTask = 1, + BoundedCapacity = 1 + }; + + distributor = new ActionBlock(x => + { + var partition = partitioner(x) % workers.Length; + + return workers[partition].SendAsync(x); + }, distributorOption); + + distributor.Completion.ContinueWith(x => + { + foreach (var worker in workers) + { + worker.Complete(); + } + }); + } + + public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, TInput messageValue, ISourceBlock source, bool consumeToAccept) + { + return distributor.OfferMessage(messageHeader, messageValue, source, consumeToAccept); + } + + public void Complete() + { + distributor.Complete(); + } + + public void Fault(Exception exception) + { + distributor.Fault(exception); + } + + private static Func ToAsync(Action action) + { + Guard.NotNull(action, nameof(action)); + + return x => + { + action(x); + + return TaskHelper.Done; + }; + } + } +} diff --git a/src/Squidex/Areas/Api/Controllers/Rules/Models/RuleActionDto.cs b/src/Squidex/Areas/Api/Controllers/Rules/Models/RuleActionDto.cs index ebf67c556..5cf41aa29 100644 --- a/src/Squidex/Areas/Api/Controllers/Rules/Models/RuleActionDto.cs +++ b/src/Squidex/Areas/Api/Controllers/Rules/Models/RuleActionDto.cs @@ -7,12 +7,14 @@ using System; using System.Linq; +using System.Runtime.Serialization; using Newtonsoft.Json; using Squidex.Domain.Apps.Core.Rules; namespace Squidex.Areas.Api.Controllers.Rules.Models { [JsonConverter(typeof(JsonInheritanceConverter), "actionType", typeof(RuleActionDto))] + [KnownType(nameof(Subtypes))] public abstract class RuleActionDto { public abstract RuleAction ToAction(); diff --git a/src/Squidex/Areas/Api/Controllers/Rules/Models/RuleTriggerDto.cs b/src/Squidex/Areas/Api/Controllers/Rules/Models/RuleTriggerDto.cs index 4b1b9bbcf..f30b5d5e0 100644 --- a/src/Squidex/Areas/Api/Controllers/Rules/Models/RuleTriggerDto.cs +++ b/src/Squidex/Areas/Api/Controllers/Rules/Models/RuleTriggerDto.cs @@ -14,7 +14,7 @@ using Squidex.Domain.Apps.Core.Rules; namespace Squidex.Areas.Api.Controllers.Rules.Models { [JsonConverter(typeof(JsonInheritanceConverter), "triggerType", typeof(RuleTriggerDto))] - [KnownType("Subtypes")] + [KnownType(nameof(Subtypes))] public abstract class RuleTriggerDto { public abstract RuleTrigger ToTrigger(); diff --git a/src/Squidex/Areas/Api/Controllers/Schemas/Models/FieldPropertiesDto.cs b/src/Squidex/Areas/Api/Controllers/Schemas/Models/FieldPropertiesDto.cs index 58f986535..afd3e8094 100644 --- a/src/Squidex/Areas/Api/Controllers/Schemas/Models/FieldPropertiesDto.cs +++ b/src/Squidex/Areas/Api/Controllers/Schemas/Models/FieldPropertiesDto.cs @@ -8,12 +8,14 @@ using System; using System.ComponentModel.DataAnnotations; using System.Linq; +using System.Runtime.Serialization; using Newtonsoft.Json; using Squidex.Domain.Apps.Core.Schemas; namespace Squidex.Areas.Api.Controllers.Schemas.Models { [JsonConverter(typeof(JsonInheritanceConverter), "fieldType", typeof(FieldPropertiesDto))] + [KnownType(nameof(Subtypes))] public abstract class FieldPropertiesDto { /// diff --git a/tests/Squidex.Domain.Apps.Core.Tests/Model/InvariantPartitionTests.cs b/tests/Squidex.Domain.Apps.Core.Tests/Model/InvariantPartitionTests.cs index 0efc950ad..df3ddf895 100644 --- a/tests/Squidex.Domain.Apps.Core.Tests/Model/InvariantPartitionTests.cs +++ b/tests/Squidex.Domain.Apps.Core.Tests/Model/InvariantPartitionTests.cs @@ -14,7 +14,7 @@ using Xunit; namespace Squidex.Domain.Apps.Core.Model { - public sealed class InvariantPartitionTests + public class InvariantPartitionTests { [Fact] public void Should_provide_single_value() diff --git a/tests/Squidex.Domain.Apps.Core.Tests/Model/PartitioningTests.cs b/tests/Squidex.Domain.Apps.Core.Tests/Model/PartitioningTests.cs index b322e215f..b82d85bcc 100644 --- a/tests/Squidex.Domain.Apps.Core.Tests/Model/PartitioningTests.cs +++ b/tests/Squidex.Domain.Apps.Core.Tests/Model/PartitioningTests.cs @@ -9,7 +9,7 @@ using Xunit; namespace Squidex.Domain.Apps.Core.Model { - public sealed class PartitioningTests + public class PartitioningTests { [Fact] public void Should_consider_null_as_valid_partitioning() diff --git a/tests/Squidex.Domain.Apps.Core.Tests/Operations/HandleRules/Triggers/AssetChangedTriggerTests.cs b/tests/Squidex.Domain.Apps.Core.Tests/Operations/HandleRules/Triggers/AssetChangedTriggerTests.cs index 2490baeac..f83bb3e8c 100644 --- a/tests/Squidex.Domain.Apps.Core.Tests/Operations/HandleRules/Triggers/AssetChangedTriggerTests.cs +++ b/tests/Squidex.Domain.Apps.Core.Tests/Operations/HandleRules/Triggers/AssetChangedTriggerTests.cs @@ -17,7 +17,7 @@ using Xunit; namespace Squidex.Domain.Apps.Core.Operations.HandleRules.Triggers { - public sealed class AssetChangedTriggerTests + public class AssetChangedTriggerTests { private readonly IRuleTriggerHandler sut = new AssetChangedTriggerHandler(); diff --git a/tests/Squidex.Domain.Apps.Core.Tests/Operations/HandleRules/Triggers/ContentChangedTriggerTests.cs b/tests/Squidex.Domain.Apps.Core.Tests/Operations/HandleRules/Triggers/ContentChangedTriggerTests.cs index e390e03f8..2f48944e4 100644 --- a/tests/Squidex.Domain.Apps.Core.Tests/Operations/HandleRules/Triggers/ContentChangedTriggerTests.cs +++ b/tests/Squidex.Domain.Apps.Core.Tests/Operations/HandleRules/Triggers/ContentChangedTriggerTests.cs @@ -22,7 +22,7 @@ using Xunit; namespace Squidex.Domain.Apps.Core.Operations.HandleRules.Triggers { - public sealed class ContentChangedTriggerTests + public class ContentChangedTriggerTests { private readonly IRuleTriggerHandler sut = new ContentChangedTriggerHandler(); private static readonly NamedId SchemaMatch = new NamedId(Guid.NewGuid(), "my-schema1"); diff --git a/tests/Squidex.Domain.Apps.Core.Tests/Operations/Scripting/ContentDataObjectTests.cs b/tests/Squidex.Domain.Apps.Core.Tests/Operations/Scripting/ContentDataObjectTests.cs index a0a728505..9c5f3c7ee 100644 --- a/tests/Squidex.Domain.Apps.Core.Tests/Operations/Scripting/ContentDataObjectTests.cs +++ b/tests/Squidex.Domain.Apps.Core.Tests/Operations/Scripting/ContentDataObjectTests.cs @@ -14,7 +14,7 @@ using Xunit; namespace Squidex.Domain.Apps.Core.Operations.Scripting { - public sealed class ContentDataObjectTests + public class ContentDataObjectTests { [Fact] public void Should_update_data_when_setting_field() diff --git a/tests/Squidex.Domain.Apps.Core.Tests/Operations/ValidateContent/Validators/RequiredStringValidatorTests.cs b/tests/Squidex.Domain.Apps.Core.Tests/Operations/ValidateContent/Validators/RequiredStringValidatorTests.cs index 7ac52e39a..d0b81afec 100644 --- a/tests/Squidex.Domain.Apps.Core.Tests/Operations/ValidateContent/Validators/RequiredStringValidatorTests.cs +++ b/tests/Squidex.Domain.Apps.Core.Tests/Operations/ValidateContent/Validators/RequiredStringValidatorTests.cs @@ -13,7 +13,7 @@ using Xunit; namespace Squidex.Domain.Apps.Core.Operations.ValidateContent.Validators { - public sealed class RequiredStringValidatorTests + public class RequiredStringValidatorTests { private readonly List errors = new List(); diff --git a/tests/Squidex.Domain.Apps.Core.Tests/Operations/ValidateContent/Validators/RequiredValidatorTests.cs b/tests/Squidex.Domain.Apps.Core.Tests/Operations/ValidateContent/Validators/RequiredValidatorTests.cs index f3d219596..42c10195c 100644 --- a/tests/Squidex.Domain.Apps.Core.Tests/Operations/ValidateContent/Validators/RequiredValidatorTests.cs +++ b/tests/Squidex.Domain.Apps.Core.Tests/Operations/ValidateContent/Validators/RequiredValidatorTests.cs @@ -13,7 +13,7 @@ using Xunit; namespace Squidex.Domain.Apps.Core.Operations.ValidateContent.Validators { - public sealed class RequiredValidatorTests + public class RequiredValidatorTests { private readonly List errors = new List(); diff --git a/tests/Squidex.Domain.Apps.Entities.Tests/Rules/Guards/Actions/AlgoliaActionTests.cs b/tests/Squidex.Domain.Apps.Entities.Tests/Rules/Guards/Actions/AlgoliaActionTests.cs index 570b1730c..87105bd9a 100644 --- a/tests/Squidex.Domain.Apps.Entities.Tests/Rules/Guards/Actions/AlgoliaActionTests.cs +++ b/tests/Squidex.Domain.Apps.Entities.Tests/Rules/Guards/Actions/AlgoliaActionTests.cs @@ -11,7 +11,7 @@ using Xunit; namespace Squidex.Domain.Apps.Entities.Rules.Guards.Actions { - public sealed class AlgoliaActionTests + public class AlgoliaActionTests { [Fact] public async Task Should_add_error_if_app_id_not_defined() diff --git a/tests/Squidex.Domain.Apps.Entities.Tests/Rules/Guards/Actions/AzureQueueActionTests.cs b/tests/Squidex.Domain.Apps.Entities.Tests/Rules/Guards/Actions/AzureQueueActionTests.cs index 1569a7ada..915104e0f 100644 --- a/tests/Squidex.Domain.Apps.Entities.Tests/Rules/Guards/Actions/AzureQueueActionTests.cs +++ b/tests/Squidex.Domain.Apps.Entities.Tests/Rules/Guards/Actions/AzureQueueActionTests.cs @@ -11,7 +11,7 @@ using Xunit; namespace Squidex.Domain.Apps.Entities.Rules.Guards.Actions { - public sealed class AzureQueueActionTests + public class AzureQueueActionTests { [Fact] public async Task Should_add_error_if_connection_string_is_null() diff --git a/tests/Squidex.Domain.Apps.Entities.Tests/Rules/Guards/Actions/FastlyActionTests.cs b/tests/Squidex.Domain.Apps.Entities.Tests/Rules/Guards/Actions/FastlyActionTests.cs index 1e614edac..f744dcd45 100644 --- a/tests/Squidex.Domain.Apps.Entities.Tests/Rules/Guards/Actions/FastlyActionTests.cs +++ b/tests/Squidex.Domain.Apps.Entities.Tests/Rules/Guards/Actions/FastlyActionTests.cs @@ -11,7 +11,7 @@ using Xunit; namespace Squidex.Domain.Apps.Entities.Rules.Guards.Actions { - public sealed class FastlyActionTests + public class FastlyActionTests { [Fact] public async Task Should_add_error_if_service_id_not_defined() diff --git a/tests/Squidex.Domain.Apps.Entities.Tests/Rules/Guards/Actions/SlackActionTests.cs b/tests/Squidex.Domain.Apps.Entities.Tests/Rules/Guards/Actions/SlackActionTests.cs index b9c63d1c0..84d3a2c0c 100644 --- a/tests/Squidex.Domain.Apps.Entities.Tests/Rules/Guards/Actions/SlackActionTests.cs +++ b/tests/Squidex.Domain.Apps.Entities.Tests/Rules/Guards/Actions/SlackActionTests.cs @@ -12,7 +12,7 @@ using Xunit; namespace Squidex.Domain.Apps.Entities.Rules.Guards.Actions { - public sealed class SlackActionTests + public class SlackActionTests { [Fact] public async Task Should_add_error_if_webhook_url_is_null() diff --git a/tests/Squidex.Domain.Apps.Entities.Tests/Rules/Guards/Actions/WebhookActionTests.cs b/tests/Squidex.Domain.Apps.Entities.Tests/Rules/Guards/Actions/WebhookActionTests.cs index 2ab299c27..869dd2193 100644 --- a/tests/Squidex.Domain.Apps.Entities.Tests/Rules/Guards/Actions/WebhookActionTests.cs +++ b/tests/Squidex.Domain.Apps.Entities.Tests/Rules/Guards/Actions/WebhookActionTests.cs @@ -12,7 +12,7 @@ using Xunit; namespace Squidex.Domain.Apps.Entities.Rules.Guards.Actions { - public sealed class WebhookActionTests + public class WebhookActionTests { [Fact] public async Task Should_add_error_if_url_is_null() diff --git a/tests/Squidex.Infrastructure.Tests/Commands/EnrichWithTimestampCommandMiddlewareTests.cs b/tests/Squidex.Infrastructure.Tests/Commands/EnrichWithTimestampCommandMiddlewareTests.cs index c1856ff6f..528b4bfcd 100644 --- a/tests/Squidex.Infrastructure.Tests/Commands/EnrichWithTimestampCommandMiddlewareTests.cs +++ b/tests/Squidex.Infrastructure.Tests/Commands/EnrichWithTimestampCommandMiddlewareTests.cs @@ -13,7 +13,7 @@ using Xunit; namespace Squidex.Infrastructure.Commands { - public sealed class EnrichWithTimestampCommandMiddlewareTests + public class EnrichWithTimestampCommandMiddlewareTests { private readonly IClock clock = A.Fake(); diff --git a/tests/Squidex.Infrastructure.Tests/DispatchingTests.cs b/tests/Squidex.Infrastructure.Tests/DispatchingTests.cs index 7178f6c47..a7ea52049 100644 --- a/tests/Squidex.Infrastructure.Tests/DispatchingTests.cs +++ b/tests/Squidex.Infrastructure.Tests/DispatchingTests.cs @@ -12,7 +12,7 @@ using Xunit; namespace Squidex.Infrastructure { - public sealed class DispatchingTests + public class DispatchingTests { private interface IMyEvent { diff --git a/tests/Squidex.Infrastructure.Tests/EventSourcing/DefaultEventNotifierTests.cs b/tests/Squidex.Infrastructure.Tests/EventSourcing/DefaultEventNotifierTests.cs index ad39596af..5730499c2 100644 --- a/tests/Squidex.Infrastructure.Tests/EventSourcing/DefaultEventNotifierTests.cs +++ b/tests/Squidex.Infrastructure.Tests/EventSourcing/DefaultEventNotifierTests.cs @@ -10,7 +10,7 @@ using Xunit; namespace Squidex.Infrastructure.EventSourcing { - public sealed class DefaultEventNotifierTests + public class DefaultEventNotifierTests { private readonly DefaultEventNotifier sut = new DefaultEventNotifier(new InMemoryPubSub()); diff --git a/tests/Squidex.Infrastructure.Tests/Json/InstantConverterTests.cs b/tests/Squidex.Infrastructure.Tests/Json/InstantConverterTests.cs index a0b6a5bbd..b207d365a 100644 --- a/tests/Squidex.Infrastructure.Tests/Json/InstantConverterTests.cs +++ b/tests/Squidex.Infrastructure.Tests/Json/InstantConverterTests.cs @@ -12,7 +12,7 @@ using Xunit; namespace Squidex.Infrastructure.Json { - public sealed class InstantConverterTests + public class InstantConverterTests { [Fact] public void Should_serialize_and_deserialize() diff --git a/tests/Squidex.Infrastructure.Tests/Migrations/MigratorTests.cs b/tests/Squidex.Infrastructure.Tests/Migrations/MigratorTests.cs index 930298678..bbce93481 100644 --- a/tests/Squidex.Infrastructure.Tests/Migrations/MigratorTests.cs +++ b/tests/Squidex.Infrastructure.Tests/Migrations/MigratorTests.cs @@ -16,7 +16,7 @@ using Xunit; namespace Squidex.Infrastructure.Migrations { - public sealed class MigratorTests + public class MigratorTests { private readonly IMigrationStatus status = A.Fake(); private readonly ISemanticLog log = A.Fake(); diff --git a/tests/Squidex.Infrastructure.Tests/Tasks/PartitionedActionBlockTests.cs b/tests/Squidex.Infrastructure.Tests/Tasks/PartitionedActionBlockTests.cs new file mode 100644 index 000000000..20e58a34f --- /dev/null +++ b/tests/Squidex.Infrastructure.Tests/Tasks/PartitionedActionBlockTests.cs @@ -0,0 +1,62 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschraenkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using System.Threading.Tasks.Dataflow; +using Xunit; + +namespace Squidex.Infrastructure.Tasks +{ + public class PartitionedActionBlockTests + { + [Fact] + public async Task Should_propagate_in_order() + { + var random = new Random(); + var partitions = 10; + + var lists = new List[partitions]; + + for (var i = 0; i < partitions; i++) + { + lists[i] = new List(); + } + + var block = new PartitionedActionBlock<(int P, int V)>(x => + { + random.Next(10); + + lists[x.P].Add(x.V); + }, x => x.P, new ExecutionDataflowBlockOptions + { + MaxDegreeOfParallelism = 100, + MaxMessagesPerTask = 1, + BoundedCapacity = 100 + }); + + for (var i = 0; i < partitions; i++) + { + for (var j = 0; j < 10; j++) + { + await block.SendAsync((i, j)); + } + } + + block.Complete(); + + await block.Completion; + + foreach (var list in lists) + { + Assert.Equal(Enumerable.Range(0, 10).ToList(), list); + } + } + } +} diff --git a/tests/Squidex.Infrastructure.Tests/ValidationExtensionsTests.cs b/tests/Squidex.Infrastructure.Tests/ValidationExtensionsTests.cs index 1f3ec016b..8e099cfce 100644 --- a/tests/Squidex.Infrastructure.Tests/ValidationExtensionsTests.cs +++ b/tests/Squidex.Infrastructure.Tests/ValidationExtensionsTests.cs @@ -10,7 +10,7 @@ using Xunit; namespace Squidex.Infrastructure { - public sealed class ValidationExtensionsTests + public class ValidationExtensionsTests { [Fact] public void Should_return_true_if_is_between()