From 139fc5843936119f91020bf1aaa842da35baaa45 Mon Sep 17 00:00:00 2001 From: Sebastian Stehle Date: Tue, 6 Apr 2021 16:53:05 +0200 Subject: [PATCH] Revert messages per task. --- .../Migrations/MongoDb/AddAppIdToEventStream.cs | 2 +- .../Migrations/MongoDb/ConvertDocumentIds.cs | 2 +- .../Assets/MongoAssetRepository.cs | 6 +++++- .../History/MongoHistoryEventRepository.cs | 10 ++++++++-- .../Backup/RestoreGrain.cs | 2 +- .../MongoDb/MongoExtensions.cs | 2 +- .../src/Squidex.Infrastructure/Commands/Rebuilder.cs | 2 +- .../EventSourcing/Grains/BatchSubscriber.cs | 6 +++--- .../Tasks/PartitionedActionBlock.cs | 4 ++-- .../Tasks/PartitionedActionBlockTests.cs | 2 +- 10 files changed, 24 insertions(+), 14 deletions(-) diff --git a/backend/src/Migrations/Migrations/MongoDb/AddAppIdToEventStream.cs b/backend/src/Migrations/Migrations/MongoDb/AddAppIdToEventStream.cs index 9f4632503..1d87af1e0 100644 --- a/backend/src/Migrations/Migrations/MongoDb/AddAppIdToEventStream.cs +++ b/backend/src/Migrations/Migrations/MongoDb/AddAppIdToEventStream.cs @@ -105,7 +105,7 @@ namespace Migrations.Migrations.MongoDb }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = Environment.ProcessorCount * 2, - MaxMessagesPerTask = DataflowBlockOptions.Unbounded, + MaxMessagesPerTask = 1, BoundedCapacity = SizeOfQueue }); diff --git a/backend/src/Migrations/Migrations/MongoDb/ConvertDocumentIds.cs b/backend/src/Migrations/Migrations/MongoDb/ConvertDocumentIds.cs index 95e99081a..d40b43ad3 100644 --- a/backend/src/Migrations/Migrations/MongoDb/ConvertDocumentIds.cs +++ b/backend/src/Migrations/Migrations/MongoDb/ConvertDocumentIds.cs @@ -139,7 +139,7 @@ namespace Migrations.Migrations.MongoDb }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = Environment.ProcessorCount * 2, - MaxMessagesPerTask = DataflowBlockOptions.Unbounded, + MaxMessagesPerTask = 1, BoundedCapacity = SizeOfQueue }); diff --git a/backend/src/Squidex.Domain.Apps.Entities.MongoDb/Assets/MongoAssetRepository.cs b/backend/src/Squidex.Domain.Apps.Entities.MongoDb/Assets/MongoAssetRepository.cs index 187aa1d24..2fb9512a0 100644 --- a/backend/src/Squidex.Domain.Apps.Entities.MongoDb/Assets/MongoAssetRepository.cs +++ b/backend/src/Squidex.Domain.Apps.Entities.MongoDb/Assets/MongoAssetRepository.cs @@ -60,7 +60,11 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Assets .Ascending(x => x.IsDeleted) .Ascending(x => x.FileHash) .Ascending(x => x.FileName) - .Ascending(x => x.FileSize)) + .Ascending(x => x.FileSize)), + new CreateIndexModel( + Index + .Ascending(x => x.Id) + .Ascending(x => x.IsDeleted)) }, ct); } diff --git a/backend/src/Squidex.Domain.Apps.Entities.MongoDb/History/MongoHistoryEventRepository.cs b/backend/src/Squidex.Domain.Apps.Entities.MongoDb/History/MongoHistoryEventRepository.cs index df83a8df3..b9b43ee52 100644 --- a/backend/src/Squidex.Domain.Apps.Entities.MongoDb/History/MongoHistoryEventRepository.cs +++ b/backend/src/Squidex.Domain.Apps.Entities.MongoDb/History/MongoHistoryEventRepository.cs @@ -43,14 +43,20 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.History protected override Task SetupCollectionAsync(IMongoCollection collection, CancellationToken ct = default) { - return collection.Indexes.CreateOneAsync( + return collection.Indexes.CreateManyAsync(new[] + { new CreateIndexModel( Index .Ascending(x => x.AppId) .Ascending(x => x.Channel) .Descending(x => x.Created) .Descending(x => x.Version)), - cancellationToken: ct); + new CreateIndexModel( + Index + .Ascending(x => x.AppId) + .Descending(x => x.Created) + .Descending(x => x.Version)), + }, ct); } public async Task> QueryByChannelAsync(DomainId appId, string channelPrefix, int count) diff --git a/backend/src/Squidex.Domain.Apps.Entities/Backup/RestoreGrain.cs b/backend/src/Squidex.Domain.Apps.Entities/Backup/RestoreGrain.cs index 69ad45eee..747d209f4 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Backup/RestoreGrain.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Backup/RestoreGrain.cs @@ -343,7 +343,7 @@ namespace Squidex.Domain.Apps.Entities.Backup }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, - MaxMessagesPerTask = DataflowBlockOptions.Unbounded, + MaxMessagesPerTask = 1, BoundedCapacity = 2 }); diff --git a/backend/src/Squidex.Infrastructure.MongoDb/MongoDb/MongoExtensions.cs b/backend/src/Squidex.Infrastructure.MongoDb/MongoDb/MongoExtensions.cs index 430452097..802fff4ed 100644 --- a/backend/src/Squidex.Infrastructure.MongoDb/MongoDb/MongoExtensions.cs +++ b/backend/src/Squidex.Infrastructure.MongoDb/MongoDb/MongoExtensions.cs @@ -198,7 +198,7 @@ namespace Squidex.Infrastructure.MongoDb new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, - MaxMessagesPerTask = DataflowBlockOptions.Unbounded, + MaxMessagesPerTask = 1, BoundedCapacity = Batching.BufferSize }); try diff --git a/backend/src/Squidex.Infrastructure/Commands/Rebuilder.cs b/backend/src/Squidex.Infrastructure/Commands/Rebuilder.cs index 2889eb73b..f7bfc825f 100644 --- a/backend/src/Squidex.Infrastructure/Commands/Rebuilder.cs +++ b/backend/src/Squidex.Infrastructure/Commands/Rebuilder.cs @@ -115,7 +115,7 @@ namespace Squidex.Infrastructure.Commands new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = parallelism, - MaxMessagesPerTask = DataflowBlockOptions.Unbounded, + MaxMessagesPerTask = 1, BoundedCapacity = parallelism * 2 }); diff --git a/backend/src/Squidex.Infrastructure/EventSourcing/Grains/BatchSubscriber.cs b/backend/src/Squidex.Infrastructure/EventSourcing/Grains/BatchSubscriber.cs index 69cd9d1b1..81e5f1f7e 100644 --- a/backend/src/Squidex.Infrastructure/EventSourcing/Grains/BatchSubscriber.cs +++ b/backend/src/Squidex.Infrastructure/EventSourcing/Grains/BatchSubscriber.cs @@ -70,9 +70,9 @@ namespace Squidex.Infrastructure.EventSourcing.Grains return job; }, new ExecutionDataflowBlockOptions { - BoundedCapacity = batchSize, MaxDegreeOfParallelism = 1, - MaxMessagesPerTask = DataflowBlockOptions.Unbounded + MaxMessagesPerTask = 1, + BoundedCapacity = batchSize }); var buffer = AsyncHelper.CreateBatchBlock(batchSize, batchDelay, new GroupingDataflowBlockOptions @@ -105,7 +105,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains { BoundedCapacity = 2, MaxDegreeOfParallelism = 1, - MaxMessagesPerTask = DataflowBlockOptions.Unbounded, + MaxMessagesPerTask = 1, TaskScheduler = scheduler }); diff --git a/backend/src/Squidex.Infrastructure/Tasks/PartitionedActionBlock.cs b/backend/src/Squidex.Infrastructure/Tasks/PartitionedActionBlock.cs index ee2c921b3..da818d2b6 100644 --- a/backend/src/Squidex.Infrastructure/Tasks/PartitionedActionBlock.cs +++ b/backend/src/Squidex.Infrastructure/Tasks/PartitionedActionBlock.cs @@ -42,7 +42,7 @@ namespace Squidex.Infrastructure.Tasks var workerOption = SimpleMapper.Map(dataflowBlockOptions, new ExecutionDataflowBlockOptions()); workerOption.MaxDegreeOfParallelism = 1; - workerOption.MaxMessagesPerTask = DataflowBlockOptions.Unbounded; + workerOption.MaxMessagesPerTask = 1; workers[i] = new ActionBlock(action, workerOption); } @@ -50,7 +50,7 @@ namespace Squidex.Infrastructure.Tasks var distributorOption = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, - MaxMessagesPerTask = DataflowBlockOptions.Unbounded, + MaxMessagesPerTask = 1, BoundedCapacity = 1 }; diff --git a/backend/tests/Squidex.Infrastructure.Tests/Tasks/PartitionedActionBlockTests.cs b/backend/tests/Squidex.Infrastructure.Tests/Tasks/PartitionedActionBlockTests.cs index 067b9a186..55012fa4e 100644 --- a/backend/tests/Squidex.Infrastructure.Tests/Tasks/PartitionedActionBlockTests.cs +++ b/backend/tests/Squidex.Infrastructure.Tests/Tasks/PartitionedActionBlockTests.cs @@ -39,7 +39,7 @@ namespace Squidex.Infrastructure.Tasks }, x => x.P, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 100, - MaxMessagesPerTask = DataflowBlockOptions.Unbounded, + MaxMessagesPerTask = 1, BoundedCapacity = 100 });