Browse Source

Revert messages per task.

pull/681/head
Sebastian Stehle 5 years ago
parent
commit
139fc58439
  1. 2
      backend/src/Migrations/Migrations/MongoDb/AddAppIdToEventStream.cs
  2. 2
      backend/src/Migrations/Migrations/MongoDb/ConvertDocumentIds.cs
  3. 6
      backend/src/Squidex.Domain.Apps.Entities.MongoDb/Assets/MongoAssetRepository.cs
  4. 10
      backend/src/Squidex.Domain.Apps.Entities.MongoDb/History/MongoHistoryEventRepository.cs
  5. 2
      backend/src/Squidex.Domain.Apps.Entities/Backup/RestoreGrain.cs
  6. 2
      backend/src/Squidex.Infrastructure.MongoDb/MongoDb/MongoExtensions.cs
  7. 2
      backend/src/Squidex.Infrastructure/Commands/Rebuilder.cs
  8. 6
      backend/src/Squidex.Infrastructure/EventSourcing/Grains/BatchSubscriber.cs
  9. 4
      backend/src/Squidex.Infrastructure/Tasks/PartitionedActionBlock.cs
  10. 2
      backend/tests/Squidex.Infrastructure.Tests/Tasks/PartitionedActionBlockTests.cs

2
backend/src/Migrations/Migrations/MongoDb/AddAppIdToEventStream.cs

@ -105,7 +105,7 @@ namespace Migrations.Migrations.MongoDb
}, new ExecutionDataflowBlockOptions }, new ExecutionDataflowBlockOptions
{ {
MaxDegreeOfParallelism = Environment.ProcessorCount * 2, MaxDegreeOfParallelism = Environment.ProcessorCount * 2,
MaxMessagesPerTask = DataflowBlockOptions.Unbounded, MaxMessagesPerTask = 1,
BoundedCapacity = SizeOfQueue BoundedCapacity = SizeOfQueue
}); });

2
backend/src/Migrations/Migrations/MongoDb/ConvertDocumentIds.cs

@ -139,7 +139,7 @@ namespace Migrations.Migrations.MongoDb
}, new ExecutionDataflowBlockOptions }, new ExecutionDataflowBlockOptions
{ {
MaxDegreeOfParallelism = Environment.ProcessorCount * 2, MaxDegreeOfParallelism = Environment.ProcessorCount * 2,
MaxMessagesPerTask = DataflowBlockOptions.Unbounded, MaxMessagesPerTask = 1,
BoundedCapacity = SizeOfQueue BoundedCapacity = SizeOfQueue
}); });

6
backend/src/Squidex.Domain.Apps.Entities.MongoDb/Assets/MongoAssetRepository.cs

@ -60,7 +60,11 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Assets
.Ascending(x => x.IsDeleted) .Ascending(x => x.IsDeleted)
.Ascending(x => x.FileHash) .Ascending(x => x.FileHash)
.Ascending(x => x.FileName) .Ascending(x => x.FileName)
.Ascending(x => x.FileSize)) .Ascending(x => x.FileSize)),
new CreateIndexModel<MongoAssetEntity>(
Index
.Ascending(x => x.Id)
.Ascending(x => x.IsDeleted))
}, ct); }, ct);
} }

10
backend/src/Squidex.Domain.Apps.Entities.MongoDb/History/MongoHistoryEventRepository.cs

@ -43,14 +43,20 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.History
protected override Task SetupCollectionAsync(IMongoCollection<HistoryEvent> collection, CancellationToken ct = default) protected override Task SetupCollectionAsync(IMongoCollection<HistoryEvent> collection, CancellationToken ct = default)
{ {
return collection.Indexes.CreateOneAsync( return collection.Indexes.CreateManyAsync(new[]
{
new CreateIndexModel<HistoryEvent>( new CreateIndexModel<HistoryEvent>(
Index Index
.Ascending(x => x.AppId) .Ascending(x => x.AppId)
.Ascending(x => x.Channel) .Ascending(x => x.Channel)
.Descending(x => x.Created) .Descending(x => x.Created)
.Descending(x => x.Version)), .Descending(x => x.Version)),
cancellationToken: ct); new CreateIndexModel<HistoryEvent>(
Index
.Ascending(x => x.AppId)
.Descending(x => x.Created)
.Descending(x => x.Version)),
}, ct);
} }
public async Task<IReadOnlyList<HistoryEvent>> QueryByChannelAsync(DomainId appId, string channelPrefix, int count) public async Task<IReadOnlyList<HistoryEvent>> QueryByChannelAsync(DomainId appId, string channelPrefix, int count)

2
backend/src/Squidex.Domain.Apps.Entities/Backup/RestoreGrain.cs

@ -343,7 +343,7 @@ namespace Squidex.Domain.Apps.Entities.Backup
}, new ExecutionDataflowBlockOptions }, new ExecutionDataflowBlockOptions
{ {
MaxDegreeOfParallelism = 1, MaxDegreeOfParallelism = 1,
MaxMessagesPerTask = DataflowBlockOptions.Unbounded, MaxMessagesPerTask = 1,
BoundedCapacity = 2 BoundedCapacity = 2
}); });

2
backend/src/Squidex.Infrastructure.MongoDb/MongoDb/MongoExtensions.cs

@ -198,7 +198,7 @@ namespace Squidex.Infrastructure.MongoDb
new ExecutionDataflowBlockOptions new ExecutionDataflowBlockOptions
{ {
MaxDegreeOfParallelism = 1, MaxDegreeOfParallelism = 1,
MaxMessagesPerTask = DataflowBlockOptions.Unbounded, MaxMessagesPerTask = 1,
BoundedCapacity = Batching.BufferSize BoundedCapacity = Batching.BufferSize
}); });
try try

2
backend/src/Squidex.Infrastructure/Commands/Rebuilder.cs

@ -115,7 +115,7 @@ namespace Squidex.Infrastructure.Commands
new ExecutionDataflowBlockOptions new ExecutionDataflowBlockOptions
{ {
MaxDegreeOfParallelism = parallelism, MaxDegreeOfParallelism = parallelism,
MaxMessagesPerTask = DataflowBlockOptions.Unbounded, MaxMessagesPerTask = 1,
BoundedCapacity = parallelism * 2 BoundedCapacity = parallelism * 2
}); });

6
backend/src/Squidex.Infrastructure/EventSourcing/Grains/BatchSubscriber.cs

@ -70,9 +70,9 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
return job; return job;
}, new ExecutionDataflowBlockOptions }, new ExecutionDataflowBlockOptions
{ {
BoundedCapacity = batchSize,
MaxDegreeOfParallelism = 1, MaxDegreeOfParallelism = 1,
MaxMessagesPerTask = DataflowBlockOptions.Unbounded MaxMessagesPerTask = 1,
BoundedCapacity = batchSize
}); });
var buffer = AsyncHelper.CreateBatchBlock<Job>(batchSize, batchDelay, new GroupingDataflowBlockOptions var buffer = AsyncHelper.CreateBatchBlock<Job>(batchSize, batchDelay, new GroupingDataflowBlockOptions
@ -105,7 +105,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
{ {
BoundedCapacity = 2, BoundedCapacity = 2,
MaxDegreeOfParallelism = 1, MaxDegreeOfParallelism = 1,
MaxMessagesPerTask = DataflowBlockOptions.Unbounded, MaxMessagesPerTask = 1,
TaskScheduler = scheduler TaskScheduler = scheduler
}); });

4
backend/src/Squidex.Infrastructure/Tasks/PartitionedActionBlock.cs

@ -42,7 +42,7 @@ namespace Squidex.Infrastructure.Tasks
var workerOption = SimpleMapper.Map(dataflowBlockOptions, new ExecutionDataflowBlockOptions()); var workerOption = SimpleMapper.Map(dataflowBlockOptions, new ExecutionDataflowBlockOptions());
workerOption.MaxDegreeOfParallelism = 1; workerOption.MaxDegreeOfParallelism = 1;
workerOption.MaxMessagesPerTask = DataflowBlockOptions.Unbounded; workerOption.MaxMessagesPerTask = 1;
workers[i] = new ActionBlock<TInput>(action, workerOption); workers[i] = new ActionBlock<TInput>(action, workerOption);
} }
@ -50,7 +50,7 @@ namespace Squidex.Infrastructure.Tasks
var distributorOption = new ExecutionDataflowBlockOptions var distributorOption = new ExecutionDataflowBlockOptions
{ {
MaxDegreeOfParallelism = 1, MaxDegreeOfParallelism = 1,
MaxMessagesPerTask = DataflowBlockOptions.Unbounded, MaxMessagesPerTask = 1,
BoundedCapacity = 1 BoundedCapacity = 1
}; };

2
backend/tests/Squidex.Infrastructure.Tests/Tasks/PartitionedActionBlockTests.cs

@ -39,7 +39,7 @@ namespace Squidex.Infrastructure.Tasks
}, x => x.P, new ExecutionDataflowBlockOptions }, x => x.P, new ExecutionDataflowBlockOptions
{ {
MaxDegreeOfParallelism = 100, MaxDegreeOfParallelism = 100,
MaxMessagesPerTask = DataflowBlockOptions.Unbounded, MaxMessagesPerTask = 1,
BoundedCapacity = 100 BoundedCapacity = 100
}); });

Loading…
Cancel
Save