Browse Source

Improved bulk endpoint.

pull/613/head
Sebastian 5 years ago
parent
commit
c3ccc597a3
  1. 189
      backend/src/Squidex.Domain.Apps.Entities/Contents/BulkUpdateCommandMiddleware.cs
  2. 2
      backend/src/Squidex.Domain.Apps.Entities/Contents/BulkUpdateResultItem.cs
  3. 2
      backend/src/Squidex.Domain.Apps.Entities/Contents/Commands/BulkUpdateJob.cs
  4. 3
      backend/src/Squidex.Domain.Apps.Entities/Contents/Commands/BulkUpdateType.cs
  5. 4
      backend/src/Squidex/Areas/Api/Controllers/Contents/ContentsController.cs
  6. 8
      backend/src/Squidex/Areas/Api/Controllers/Contents/Models/BulkResultDto.cs
  7. 5
      backend/src/Squidex/Areas/Api/Controllers/Contents/Models/BulkUpdateJobDto.cs
  8. 170
      backend/tests/Squidex.Domain.Apps.Entities.Tests/Contents/BulkUpdateCommandMiddlewareTests.cs

189
backend/src/Squidex.Domain.Apps.Entities/Contents/BulkUpdateCommandMiddleware.cs

@ -6,6 +6,7 @@
// ==========================================================================
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
@ -18,7 +19,6 @@ using Squidex.Infrastructure.Reflection;
using Squidex.Infrastructure.Translations;
using Squidex.Shared;
#pragma warning disable CA1826 // Do not use Enumerable methods on indexable collections
#pragma warning disable SA1313 // Parameter names should begin with lower-case letter
#pragma warning disable RECS0082 // Parameter has the same name as a member and hides it
@ -29,15 +29,20 @@ namespace Squidex.Domain.Apps.Entities.Contents
private readonly IContentQueryService contentQuery;
private readonly IContextProvider contextProvider;
private sealed record BulkTaskCommand(BulkTask Task, DomainId Id, ICommand Command)
{
}
private sealed record BulkTask(
ICommandBus Bus,
Context Context,
string Schema,
int JobIndex,
BulkUpdateJob Job,
BulkUpdateContents Command
BulkUpdateContents Command,
ConcurrentBag<BulkUpdateResultItem> Results
)
{
public BulkUpdateResultItem Result { get; } = new BulkUpdateResultItem();
}
public BulkUpdateCommandMiddleware(IContentQueryService contentQuery, IContextProvider contextProvider)
@ -55,25 +60,30 @@ namespace Squidex.Domain.Apps.Entities.Contents
{
if (bulkUpdates.Jobs?.Length > 0)
{
var actionBlock = new ActionBlock<BulkTask>(async task =>
{
try
{
await ExecuteTaskAsync(task);
}
catch (Exception ex)
{
task.Result.Exception = ex;
}
}, new ExecutionDataflowBlockOptions
var executionOptions = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = Math.Max(1, Environment.ProcessorCount / 2)
};
var createCommandsBlock = new TransformManyBlock<BulkTask, BulkTaskCommand>(async task =>
{
return await CreateCommandsAsync(task);
}, executionOptions);
var executeCommandBlock = new ActionBlock<BulkTaskCommand>(async command =>
{
await ExecuteCommandAsync(command);
}, executionOptions);
createCommandsBlock.LinkTo(executeCommandBlock, new DataflowLinkOptions
{
PropagateCompletion = true
});
var requestContext = contextProvider.Context.WithoutContentEnrichment().WithUnpublished(true);
var requestedSchema = bulkUpdates.SchemaId.Name;
var results = new List<BulkUpdateResultItem>(bulkUpdates.Jobs.Length);
var results = new ConcurrentBag<BulkUpdateResultItem>();
for (var i = 0; i < bulkUpdates.Jobs.Length; i++)
{
@ -81,17 +91,17 @@ namespace Squidex.Domain.Apps.Entities.Contents
context.CommandBus,
requestContext,
requestedSchema,
i,
bulkUpdates.Jobs[i],
bulkUpdates);
bulkUpdates,
results);
await actionBlock.SendAsync(task);
results.Add(task.Result);
await createCommandsBlock.SendAsync(task);
}
actionBlock.Complete();
createCommandsBlock.Complete();
await actionBlock.Completion;
await executeCommandBlock.Completion;
context.Complete(new BulkUpdateResult(results));
}
@ -106,69 +116,140 @@ namespace Squidex.Domain.Apps.Entities.Contents
}
}
private async Task ExecuteTaskAsync(BulkTask task)
private static async Task ExecuteCommandAsync(BulkTaskCommand bulkCommand)
{
var job = task.Job;
var (task, id, command) = bulkCommand;
Exception? exception = null;
try
{
await task.Bus.PublishAsync(command);
}
catch (Exception ex)
{
exception = ex;
}
var resolvedId = await FindIdAsync(task);
task.Results.Add(new BulkUpdateResultItem
{
ContentId = id,
JobIndex = task.JobIndex,
Exception = exception,
});
}
DomainId id;
private async Task<IEnumerable<BulkTaskCommand>> CreateCommandsAsync(BulkTask task)
{
var commands = new List<BulkTaskCommand>();
if (resolvedId == null || resolvedId == DomainId.Empty)
try
{
if (job.Type == BulkUpdateType.Upsert)
var resolvedIds = await FindIdAsync(task);
if (resolvedIds.Length == 0)
{
id = DomainId.NewGuid();
throw new DomainObjectNotFoundException("undefined");
}
else
foreach (var id in resolvedIds)
{
throw new DomainObjectNotFoundException("undefined");
try
{
var command = await CreateCommandAsync(id, task);
commands.Add(new BulkTaskCommand(task, id, command));
}
catch (Exception ex)
{
task.Results.Add(new BulkUpdateResultItem
{
ContentId = id,
JobIndex = task.JobIndex,
Exception = ex,
});
}
}
}
else
catch (Exception ex)
{
id = resolvedId.Value;
task.Results.Add(new BulkUpdateResultItem
{
JobIndex = task.JobIndex,
Exception = ex,
});
}
task.Result.ContentId = id;
return commands;
}
private async Task<ICommand> CreateCommandAsync(DomainId id, BulkTask task)
{
var job = task.Job;
switch (job.Type)
{
case BulkUpdateType.Create:
{
var command = new CreateContent { Data = job.Data! };
await EnrichAsync(id, task, command, Permissions.AppContentsCreate);
return command;
}
case BulkUpdateType.Update:
{
var command = new UpdateContent { Data = job.Data! };
await EnrichAsync(id, task, command, Permissions.AppContentsUpdate);
return command;
}
case BulkUpdateType.Upsert:
{
var command = new UpsertContent { Data = job.Data! };
await PublishAsync(id, task, command, Permissions.AppContentsUpsert);
break;
await EnrichAsync(id, task, command, Permissions.AppContentsUpsert);
return command;
}
case BulkUpdateType.Patch:
{
var command = new PatchContent { Data = job.Data! };
await EnrichAsync(id, task, command, Permissions.AppContentsUpdate);
return command;
}
case BulkUpdateType.Validate:
{
var command = new ValidateContent();
await PublishAsync(id, task, command, Permissions.AppContentsRead);
break;
await EnrichAsync(id, task, command, Permissions.AppContentsRead);
return command;
}
case BulkUpdateType.ChangeStatus:
{
var command = new ChangeContentStatus { Status = job.Status, DueTime = job.DueTime };
await PublishAsync(id, task, command, Permissions.AppContentsUpdate);
break;
await EnrichAsync(id, task, command, Permissions.AppContentsUpdate);
return command;
}
case BulkUpdateType.Delete:
{
var command = new DeleteContent();
await PublishAsync(id, task, command, Permissions.AppContentsDelete);
break;
await EnrichAsync(id, task, command, Permissions.AppContentsDelete);
return command;
}
default:
throw new NotSupportedException();
}
}
private async Task PublishAsync<TCommand>(DomainId id, BulkTask task, TCommand command, string permissionId) where TCommand : ContentCommand
private async Task EnrichAsync<TCommand>(DomainId id, BulkTask task, TCommand command, string permissionId) where TCommand : ContentCommand
{
SimpleMapper.Map(task.Command, command);
@ -189,29 +270,37 @@ namespace Squidex.Domain.Apps.Entities.Contents
}
command.ExpectedVersion = task.Command.ExpectedVersion;
await task.Bus.PublishAsync(command);
}
private async Task<DomainId?> FindIdAsync(BulkTask task)
private async Task<DomainId[]> FindIdAsync(BulkTask task)
{
var id = task.Job.Id;
if (id == null && task.Job.Query != null)
if (id != null)
{
return new[] { id.Value };
}
if (task.Job.Query != null)
{
task.Job.Query.Take = 1;
task.Job.Query.Take = task.Job.ExpectedCount;
var existing = await contentQuery.QueryAsync(task.Context, task.Schema, Q.Empty.WithJsonQuery(task.Job.Query));
if (existing.Total > 1)
if (existing.Total > task.Job.ExpectedCount)
{
throw new DomainException(T.Get("contents.bulkInsertQueryNotUnique"));
}
id = existing.FirstOrDefault()?.Id;
return existing.Select(x => x.Id).ToArray();
}
if (task.Job.Type == BulkUpdateType.Create || task.Job.Type == BulkUpdateType.Upsert)
{
return new[] { DomainId.NewGuid() };
}
return id;
return Array.Empty<DomainId>();
}
}
}

2
backend/src/Squidex.Domain.Apps.Entities/Contents/BulkUpdateResultItem.cs

@ -14,6 +14,8 @@ namespace Squidex.Domain.Apps.Entities.Contents
{
public DomainId? ContentId { get; set; }
public int JobIndex { get; set; }
public Exception? Exception { get; set; }
}
}

2
backend/src/Squidex.Domain.Apps.Entities/Contents/Commands/BulkUpdateJob.cs

@ -29,6 +29,8 @@ namespace Squidex.Domain.Apps.Entities.Contents.Commands
public string? Schema { get; set; }
public long ExpectedCount { get; set; } = 1;
public long ExpectedVersion { get; set; } = EtagVersion.Any;
}
}

3
backend/src/Squidex.Domain.Apps.Entities/Contents/Commands/BulkUpdateType.cs

@ -11,7 +11,10 @@ namespace Squidex.Domain.Apps.Entities.Contents.Commands
{
Upsert,
ChangeStatus,
Create,
Delete,
Patch,
Update,
Validate
}
}

4
backend/src/Squidex/Areas/Api/Controllers/Contents/ContentsController.cs

@ -21,6 +21,10 @@ using Squidex.Web;
namespace Squidex.Areas.Api.Controllers.Contents
{
/// <summary>
/// Updates and retrieves contents.
/// </summary>
[ApiExplorerSettings(GroupName = nameof(Contents))]
public sealed class ContentsController : ApiController
{
private readonly IContentQueryService contentQuery;

8
backend/src/Squidex/Areas/Api/Controllers/Contents/Models/BulkResultDto.cs

@ -8,6 +8,7 @@
using Microsoft.AspNetCore.Http;
using Squidex.Domain.Apps.Entities.Contents;
using Squidex.Infrastructure;
using Squidex.Infrastructure.Reflection;
using Squidex.Web;
namespace Squidex.Areas.Api.Controllers.Contents.Models
@ -19,6 +20,11 @@ namespace Squidex.Areas.Api.Controllers.Contents.Models
/// </summary>
public ErrorDto? Error { get; set; }
/// <summary>
/// The index of the bulk job where the result belongs to. The order can change.
/// </summary>
public int JobIndex { get; set; }
/// <summary>
/// The id of the content that has been handled successfully or not.
/// </summary>
@ -28,7 +34,7 @@ namespace Squidex.Areas.Api.Controllers.Contents.Models
{
var error = result.Exception?.ToErrorDto(httpContext).Error;
return new BulkResultDto { ContentId = result.ContentId, Error = error };
return SimpleMapper.Map(result, new BulkResultDto { Error = error });
}
}
}

5
backend/src/Squidex/Areas/Api/Controllers/Contents/Models/BulkUpdateJobDto.cs

@ -52,6 +52,11 @@ namespace Squidex.Areas.Api.Controllers.Contents.Models
/// </summary>
public string? Schema { get; set; }
/// <summary>
/// The number of expected items. Set it to a higher number to update multiple items when a query is defined.
/// </summary>
public long ExpectedCount { get; set; } = 1;
/// <summary>
/// The expected version.
/// </summary>

170
backend/tests/Squidex.Domain.Apps.Entities.Tests/Contents/BulkUpdateCommandMiddlewareTests.cs

@ -67,7 +67,7 @@ namespace Squidex.Domain.Apps.Entities.Contents
var result = await PublishAsync(command);
Assert.Single(result, x => x.ContentId == null && x.Exception is DomainObjectNotFoundException);
Assert.Single(result, x => x.JobIndex == 0 && x.ContentId == null && x.Exception is DomainObjectNotFoundException);
A.CallTo(() => commandBus.PublishAsync(A<ICommand>._))
.MustNotHaveHappened();
@ -87,7 +87,7 @@ namespace Squidex.Domain.Apps.Entities.Contents
var result = await PublishAsync(command);
Assert.Single(result, x => x.ContentId == null && x.Exception is DomainException);
Assert.Single(result, x => x.JobIndex == 0 && x.ContentId == null && x.Exception is DomainException);
A.CallTo(() => commandBus.PublishAsync(A<ICommand>._))
.MustNotHaveHappened();
@ -98,7 +98,7 @@ namespace Squidex.Domain.Apps.Entities.Contents
{
var requestContext = SetupContext(Permissions.AppContentsUpsert);
var (id, data, query) = CreateTestData(false);
var (id, data, query) = CreateTestData(true);
A.CallTo(() => contentQuery.QueryAsync(requestContext, A<string>._, A<Q>.That.Matches(x => x.JsonQuery == query)))
.Returns(ResultList.CreateFrom(1, CreateContent(id)));
@ -107,10 +107,43 @@ namespace Squidex.Domain.Apps.Entities.Contents
var result = await PublishAsync(command);
Assert.Single(result, x => x.ContentId != default && x.Exception == null);
Assert.Single(result, x => x.JobIndex == 0 && x.ContentId == id && x.Exception == null);
A.CallTo(() => commandBus.PublishAsync(
A<UpsertContent>.That.Matches(x => x.Data == data && x.ContentId.ToString().Length == 36)))
A<UpsertContent>.That.Matches(x => x.Data == data && x.ContentId == id)))
.MustHaveHappenedOnceExactly();
}
[Fact]
public async Task Should_upsert_content_with_with_resolved_ids()
{
var requestContext = SetupContext(Permissions.AppContentsUpsert);
var (_, data, query) = CreateTestData(true);
var id1 = DomainId.NewGuid();
var id2 = DomainId.NewGuid();
A.CallTo(() => contentQuery.QueryAsync(requestContext, A<string>._, A<Q>.That.Matches(x => x.JsonQuery == query)))
.Returns(ResultList.CreateFrom(2,
CreateContent(id1),
CreateContent(id2)));
var command = BulkCommand(BulkUpdateType.Upsert, query: query, data: data);
command.Jobs![0].ExpectedCount = 2;
var result = await PublishAsync(command);
Assert.Single(result, x => x.JobIndex == 0 && x.ContentId == id1 && x.Exception == null);
Assert.Single(result, x => x.JobIndex == 0 && x.ContentId == id2 && x.Exception == null);
A.CallTo(() => commandBus.PublishAsync(
A<UpsertContent>.That.Matches(x => x.Data == data && x.ContentId == id1)))
.MustHaveHappenedOnceExactly();
A.CallTo(() => commandBus.PublishAsync(
A<UpsertContent>.That.Matches(x => x.Data == data && x.ContentId == id2)))
.MustHaveHappenedOnceExactly();
}
@ -125,7 +158,7 @@ namespace Squidex.Domain.Apps.Entities.Contents
var result = await PublishAsync(command);
Assert.Single(result, x => x.ContentId != default && x.Exception == null);
Assert.Single(result, x => x.JobIndex == 0 && x.ContentId != default && x.Exception == null);
A.CallTo(() => commandBus.PublishAsync(
A<UpsertContent>.That.Matches(x => x.Data == data && x.ContentId.ToString().Length == 36)))
@ -143,7 +176,7 @@ namespace Squidex.Domain.Apps.Entities.Contents
var result = await PublishAsync(command);
Assert.Single(result, x => x.ContentId != default && x.Exception == null);
Assert.Single(result, x => x.JobIndex == 0 && x.ContentId != default && x.Exception == null);
A.CallTo(() => commandBus.PublishAsync(
A<UpsertContent>.That.Matches(x => x.Data == data && x.ContentId.ToString().Length == 36)))
@ -161,7 +194,7 @@ namespace Squidex.Domain.Apps.Entities.Contents
var result = await PublishAsync(command);
Assert.Single(result, x => x.ContentId != default && x.Exception == null);
Assert.Single(result, x => x.JobIndex == 0 && x.ContentId != default && x.Exception == null);
A.CallTo(() => commandBus.PublishAsync(
A<UpsertContent>.That.Matches(x => x.Data == data && x.ContentId == id)))
@ -179,13 +212,118 @@ namespace Squidex.Domain.Apps.Entities.Contents
var result = await PublishAsync(command);
Assert.Single(result, x => x.ContentId != default && x.Exception == null);
Assert.Single(result, x => x.JobIndex == 0 && x.ContentId != default && x.Exception == null);
A.CallTo(() => commandBus.PublishAsync(
A<UpsertContent>.That.Matches(x => x.Data == data && x.ContentId == id)))
.MustHaveHappenedOnceExactly();
}
[Fact]
public async Task Should_create_content()
{
SetupContext(Permissions.AppContentsCreate);
var (id, data, _) = CreateTestData(false);
var command = BulkCommand(BulkUpdateType.Create, id: id, data: data);
var result = await PublishAsync(command);
Assert.Single(result, x => x.JobIndex == 0 && x.ContentId == id && x.Exception == null);
A.CallTo(() => commandBus.PublishAsync(
A<CreateContent>.That.Matches(x => x.ContentId == id && x.Data == data)))
.MustHaveHappened();
}
[Fact]
public async Task Should_throw_security_exception_when_user_has_no_permission_for_creating()
{
SetupContext(Permissions.AppContentsRead);
var (id, data, _) = CreateTestData(false);
var command = BulkCommand(BulkUpdateType.Create, id: id, data: data);
var result = await PublishAsync(command);
Assert.Single(result, x => x.JobIndex == 0 && x.ContentId == id && x.Exception is DomainForbiddenException);
A.CallTo(() => commandBus.PublishAsync(A<ICommand>._))
.MustNotHaveHappened();
}
[Fact]
public async Task Should_update_content()
{
SetupContext(Permissions.AppContentsUpdate);
var (id, data, _) = CreateTestData(false);
var command = BulkCommand(BulkUpdateType.Update, id: id, data: data);
var result = await PublishAsync(command);
Assert.Single(result, x => x.JobIndex == 0 && x.ContentId == id && x.Exception == null);
A.CallTo(() => commandBus.PublishAsync(
A<UpdateContent>.That.Matches(x => x.ContentId == id && x.Data == data)))
.MustHaveHappened();
}
[Fact]
public async Task Should_throw_security_exception_when_user_has_no_permission_for_updating()
{
SetupContext(Permissions.AppContentsRead);
var (id, data, _) = CreateTestData(false);
var command = BulkCommand(BulkUpdateType.Update, id: id, data: data);
var result = await PublishAsync(command);
Assert.Single(result, x => x.JobIndex == 0 && x.ContentId == id && x.Exception is DomainForbiddenException);
A.CallTo(() => commandBus.PublishAsync(A<ICommand>._))
.MustNotHaveHappened();
}
[Fact]
public async Task Should_patch_content()
{
SetupContext(Permissions.AppContentsUpdate);
var (id, data, _) = CreateTestData(false);
var command = BulkCommand(BulkUpdateType.Patch, id: id, data: data);
var result = await PublishAsync(command);
Assert.Single(result, x => x.JobIndex == 0 && x.ContentId == id && x.Exception == null);
A.CallTo(() => commandBus.PublishAsync(
A<PatchContent>.That.Matches(x => x.ContentId == id && x.Data == data)))
.MustHaveHappened();
}
[Fact]
public async Task Should_throw_security_exception_when_user_has_no_permission_for_patching()
{
SetupContext(Permissions.AppContentsRead);
var (id, data, _) = CreateTestData(false);
var command = BulkCommand(BulkUpdateType.Delete, id: id, data: data);
var result = await PublishAsync(command);
Assert.Single(result, x => x.JobIndex == 0 && x.ContentId == id && x.Exception is DomainForbiddenException);
A.CallTo(() => commandBus.PublishAsync(A<ICommand>._))
.MustNotHaveHappened();
}
[Fact]
public async Task Should_change_content_status()
{
@ -197,7 +335,7 @@ namespace Squidex.Domain.Apps.Entities.Contents
var result = await PublishAsync(command);
Assert.Single(result, x => x.ContentId == id && x.Exception == null);
Assert.Single(result, x => x.JobIndex == 0 && x.ContentId == id && x.Exception == null);
A.CallTo(() => commandBus.PublishAsync(A<ChangeContentStatus>.That.Matches(x => x.ContentId == id && x.DueTime == null)))
.MustHaveHappened();
@ -216,7 +354,7 @@ namespace Squidex.Domain.Apps.Entities.Contents
var result = await PublishAsync(command);
Assert.Single(result, x => x.ContentId == id && x.Exception == null);
Assert.Single(result, x => x.JobIndex == 0 && x.ContentId == id && x.Exception == null);
A.CallTo(() => commandBus.PublishAsync(A<ChangeContentStatus>.That.Matches(x => x.ContentId == id && x.DueTime == time)))
.MustHaveHappened();
@ -233,7 +371,7 @@ namespace Squidex.Domain.Apps.Entities.Contents
var result = await PublishAsync(command);
Assert.Single(result, x => x.ContentId == id && x.Exception is DomainForbiddenException);
Assert.Single(result, x => x.JobIndex == 0 && x.ContentId == id && x.Exception is DomainForbiddenException);
A.CallTo(() => commandBus.PublishAsync(A<ICommand>._))
.MustNotHaveHappened();
@ -250,7 +388,7 @@ namespace Squidex.Domain.Apps.Entities.Contents
var result = await PublishAsync(command);
Assert.Single(result, x => x.ContentId == id && x.Exception == null);
Assert.Single(result, x => x.JobIndex == 0 && x.ContentId == id && x.Exception == null);
A.CallTo(() => commandBus.PublishAsync(
A<ValidateContent>.That.Matches(x => x.ContentId == id)))
@ -268,7 +406,7 @@ namespace Squidex.Domain.Apps.Entities.Contents
var result = await PublishAsync(command);
Assert.Single(result, x => x.ContentId == id && x.Exception is DomainForbiddenException);
Assert.Single(result, x => x.JobIndex == 0 && x.ContentId == id && x.Exception is DomainForbiddenException);
A.CallTo(() => commandBus.PublishAsync(A<ICommand>._))
.MustNotHaveHappened();
@ -285,7 +423,7 @@ namespace Squidex.Domain.Apps.Entities.Contents
var result = await PublishAsync(command);
Assert.Single(result, x => x.ContentId == id && x.Exception == null);
Assert.Single(result, x => x.JobIndex == 0 && x.ContentId == id && x.Exception == null);
A.CallTo(() => commandBus.PublishAsync(
A<DeleteContent>.That.Matches(x => x.ContentId == id)))
@ -303,7 +441,7 @@ namespace Squidex.Domain.Apps.Entities.Contents
var result = await PublishAsync(command);
Assert.Single(result, x => x.ContentId == id && x.Exception is DomainForbiddenException);
Assert.Single(result, x => x.JobIndex == 0 && x.ContentId == id && x.Exception is DomainForbiddenException);
A.CallTo(() => commandBus.PublishAsync(A<ICommand>._))
.MustNotHaveHappened();

Loading…
Cancel
Save