Browse Source

Bulk fix.

pull/613/head
Sebastian 5 years ago
parent
commit
f24b859e94
  1. 210
      backend/src/Squidex.Domain.Apps.Entities/Contents/BulkUpdateCommandMiddleware.cs
  2. 2
      backend/src/Squidex.Domain.Apps.Entities/Contents/BulkUpdateResultItem.cs
  3. 3
      backend/src/Squidex.Domain.Apps.Entities/Contents/Commands/BulkUpdateJob.cs
  4. 4
      backend/src/Squidex/Areas/Api/Controllers/Contents/Models/BulkResultDto.cs
  5. 6
      backend/src/Squidex/Areas/Api/Controllers/Contents/Models/BulkUpdateJobDto.cs
  6. 34
      backend/tests/Squidex.Domain.Apps.Entities.Tests/Contents/BulkUpdateCommandMiddlewareTests.cs

210
backend/src/Squidex.Domain.Apps.Entities/Contents/BulkUpdateCommandMiddleware.cs

@ -6,6 +6,7 @@
// ==========================================================================
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
@ -18,6 +19,8 @@ using Squidex.Infrastructure.Translations;
using Squidex.Shared;
#pragma warning disable CA1826 // Do not use Enumerable methods on indexable collections
#pragma warning disable SA1313 // Parameter names should begin with lower-case letter
#pragma warning disable RECS0082 // Parameter has the same name as a member and hides it
namespace Squidex.Domain.Apps.Entities.Contents
{
@ -26,6 +29,17 @@ namespace Squidex.Domain.Apps.Entities.Contents
private readonly IContentQueryService contentQuery;
private readonly IContextProvider contextProvider;
private sealed record BulkTask(
ICommandBus Bus,
Context Context,
string Schema,
BulkUpdateJob Job,
BulkUpdateContents Command
)
{
public BulkUpdateResultItem Result { get; } = new BulkUpdateResultItem();
}
public BulkUpdateCommandMiddleware(IContentQueryService contentQuery, IContextProvider contextProvider)
{
Guard.NotNull(contentQuery, nameof(contentQuery));
@ -41,135 +55,153 @@ namespace Squidex.Domain.Apps.Entities.Contents
{
if (bulkUpdates.Jobs?.Length > 0)
{
var requestContext = contextProvider.Context.WithoutContentEnrichment().WithUnpublished(true);
var requestedSchema = bulkUpdates.SchemaId.Name;
async Task PublishAsync<TCommand>(BulkUpdateJob job, TCommand command, string permissionId) where TCommand : ContentCommand
var actionBlock = new ActionBlock<BulkTask>(async task =>
{
SimpleMapper.Map(bulkUpdates, command);
if (!string.IsNullOrWhiteSpace(job.Schema))
try
{
var schema = await contentQuery.GetSchemaOrThrowAsync(requestContext, job.Schema);
command.SchemaId = schema.NamedId();
await ExecuteTaskAsync(task);
}
var permission = Permissions.ForApp(permissionId, command.AppId.Name, command.SchemaId.Name);
if (!requestContext.Permissions.Allows(permission))
catch (Exception ex)
{
throw new DomainForbiddenException("Forbidden");
task.Result.Exception = ex;
}
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = Math.Max(1, Environment.ProcessorCount / 2)
});
command.ExpectedVersion = job.ExpectedVersion;
await context.CommandBus.PublishAsync(command);
}
var requestContext = contextProvider.Context.WithoutContentEnrichment().WithUnpublished(true);
var requestedSchema = bulkUpdates.SchemaId.Name;
var results = new BulkUpdateResultItem[bulkUpdates.Jobs.Length];
var results = new List<BulkUpdateResultItem>(bulkUpdates.Jobs.Length);
var actionBlock = new ActionBlock<int>(async index =>
for (var i = 0; i < bulkUpdates.Jobs.Length; i++)
{
var job = bulkUpdates.Jobs[index];
var task = new BulkTask(
context.CommandBus,
requestContext,
requestedSchema,
bulkUpdates.Jobs[i],
bulkUpdates);
var result = new BulkUpdateResultItem();
await actionBlock.SendAsync(task);
try
{
var id = await FindIdAsync(requestContext, requestedSchema, job);
results.Add(task.Result);
}
if (job.Type != BulkUpdateType.Upsert && (id == null || id == DomainId.Empty))
{
throw new DomainObjectNotFoundException("undefined");
}
actionBlock.Complete();
result.ContentId = id;
await actionBlock.Completion;
switch (job.Type)
{
case BulkUpdateType.Upsert:
{
var command = new UpsertContent { Data = job.Data! };
context.Complete(new BulkUpdateResult(results));
}
else
{
context.Complete(new BulkUpdateResult());
}
}
else
{
await next(context);
}
}
if (id != null && id != DomainId.Empty)
{
command.ContentId = id.Value;
}
private async Task ExecuteTaskAsync(BulkTask task)
{
var job = task.Job;
result.ContentId = command.ContentId;
var resolvedId = await FindIdAsync(task);
await PublishAsync(job, command, Permissions.AppContentsUpsert);
break;
}
DomainId id;
case BulkUpdateType.Validate:
{
var command = new ValidateContent { ContentId = id.Value };
if (resolvedId == null || resolvedId == DomainId.Empty)
{
if (job.Type == BulkUpdateType.Upsert)
{
id = DomainId.NewGuid();
}
else
{
throw new DomainObjectNotFoundException("undefined");
}
}
else
{
id = resolvedId.Value;
}
await PublishAsync(job, command, Permissions.AppContentsRead);
break;
}
task.Result.ContentId = id;
case BulkUpdateType.ChangeStatus:
{
var command = new ChangeContentStatus { ContentId = id.Value, Status = job.Status };
switch (job.Type)
{
case BulkUpdateType.Upsert:
{
var command = new UpsertContent { Data = job.Data! };
await PublishAsync(job, command, Permissions.AppContentsUpdate);
break;
}
await PublishAsync(id, task, command, Permissions.AppContentsUpsert);
break;
}
case BulkUpdateType.Delete:
{
var command = new DeleteContent { ContentId = id.Value };
case BulkUpdateType.Validate:
{
var command = new ValidateContent();
await PublishAsync(job, command, Permissions.AppContentsDelete);
break;
}
}
}
catch (Exception ex)
{
result.Exception = ex;
}
await PublishAsync(id, task, command, Permissions.AppContentsRead);
break;
}
results[index] = result;
}, new ExecutionDataflowBlockOptions
case BulkUpdateType.ChangeStatus:
{
MaxDegreeOfParallelism = Math.Max(1, Environment.ProcessorCount / 2)
});
var command = new ChangeContentStatus { Status = job.Status, DueTime = job.DueTime };
for (var i = 0; i < bulkUpdates.Jobs.Length; i++)
await PublishAsync(id, task, command, Permissions.AppContentsUpdate);
break;
}
case BulkUpdateType.Delete:
{
await actionBlock.SendAsync(i);
var command = new DeleteContent();
await PublishAsync(id, task, command, Permissions.AppContentsDelete);
break;
}
}
}
actionBlock.Complete();
private async Task PublishAsync<TCommand>(DomainId id, BulkTask task, TCommand command, string permissionId) where TCommand : ContentCommand
{
SimpleMapper.Map(task.Command, command);
await actionBlock.Completion;
command.ContentId = id;
context.Complete(new BulkUpdateResult(results));
}
else
{
context.Complete(new BulkUpdateResult());
}
if (!string.IsNullOrWhiteSpace(task.Job.Schema))
{
var schema = await contentQuery.GetSchemaOrThrowAsync(task.Context, task.Schema);
command.SchemaId = schema.NamedId();
}
else
var permission = Permissions.ForApp(permissionId, command.AppId.Name, command.SchemaId.Name);
if (!task.Context.Permissions.Allows(permission))
{
await next(context);
throw new DomainForbiddenException("Forbidden");
}
command.ExpectedVersion = task.Command.ExpectedVersion;
await task.Bus.PublishAsync(command);
}
private async Task<DomainId?> FindIdAsync(Context context, string schema, BulkUpdateJob job)
private async Task<DomainId?> FindIdAsync(BulkTask task)
{
var id = job.Id;
var id = task.Job.Id;
if (id == null && job.Query != null)
if (id == null && task.Job.Query != null)
{
job.Query.Take = 1;
task.Job.Query.Take = 1;
var existing = await contentQuery.QueryAsync(context, schema, Q.Empty.WithJsonQuery(job.Query));
var existing = await contentQuery.QueryAsync(task.Context, task.Schema, Q.Empty.WithJsonQuery(task.Job.Query));
if (existing.Total > 1)
{

2
backend/src/Squidex.Domain.Apps.Entities/Contents/BulkUpdateResultItem.cs

@ -12,7 +12,7 @@ namespace Squidex.Domain.Apps.Entities.Contents
{
public sealed class BulkUpdateResultItem
{
public DomainId? ContentId { get; set; }
public DomainId ContentId { get; set; }
public Exception? Exception { get; set; }
}

3
backend/src/Squidex.Domain.Apps.Entities/Contents/Commands/BulkUpdateJob.cs

@ -5,6 +5,7 @@
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using NodaTime;
using Squidex.Domain.Apps.Core.Contents;
using Squidex.Infrastructure;
using Squidex.Infrastructure.Json.Objects;
@ -20,6 +21,8 @@ namespace Squidex.Domain.Apps.Entities.Contents.Commands
public Status Status { get; set; }
public Instant? DueTime { get; set; }
public BulkUpdateType Type { get; set; }
public NamedContentData? Data { get; set; }

4
backend/src/Squidex/Areas/Api/Controllers/Contents/Models/BulkResultDto.cs

@ -20,9 +20,9 @@ namespace Squidex.Areas.Api.Controllers.Contents.Models
public ErrorDto? Error { get; set; }
/// <summary>
/// The id of the content when the import succeeds.
/// The id of the content that has been handled successfully or not.
/// </summary>
public DomainId? ContentId { get; set; }
public DomainId ContentId { get; set; }
public static BulkResultDto FromImportResult(BulkUpdateResultItem result, HttpContext httpContext)
{

6
backend/src/Squidex/Areas/Api/Controllers/Contents/Models/BulkUpdateJobDto.cs

@ -5,6 +5,7 @@
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using NodaTime;
using Squidex.Domain.Apps.Core.Contents;
using Squidex.Domain.Apps.Entities.Contents.Commands;
using Squidex.Infrastructure;
@ -36,6 +37,11 @@ namespace Squidex.Areas.Api.Controllers.Contents.Models
/// </summary>
public Status Status { get; set; }
/// <summary>
/// The due time.
/// </summary>
public Instant? DueTime { get; set; }
/// <summary>
/// The update type.
/// </summary>

34
backend/tests/Squidex.Domain.Apps.Entities.Tests/Contents/BulkUpdateCommandMiddlewareTests.cs

@ -9,6 +9,7 @@ using System;
using System.Security.Claims;
using System.Threading.Tasks;
using FakeItEasy;
using NodaTime;
using Squidex.Domain.Apps.Core.Contents;
using Squidex.Domain.Apps.Entities.Contents.Commands;
using Squidex.Infrastructure;
@ -198,7 +199,26 @@ namespace Squidex.Domain.Apps.Entities.Contents
Assert.Single(result, x => x.ContentId == id && x.Exception == null);
A.CallTo(() => commandBus.PublishAsync(A<ChangeContentStatus>.That.Matches(x => x.ContentId == id)))
A.CallTo(() => commandBus.PublishAsync(A<ChangeContentStatus>.That.Matches(x => x.ContentId == id && x.DueTime == null)))
.MustHaveHappened();
}
[Fact]
public async Task Should_change_content_status_with_due_time()
{
SetupContext(Permissions.AppContentsUpdate);
var time = Instant.FromDateTimeUtc(DateTime.UtcNow);
var (id, _, _) = CreateTestData(false);
var command = BulkCommand(BulkUpdateType.ChangeStatus, id: id, dueTime: time);
var result = await PublishAsync(command);
Assert.Single(result, x => x.ContentId == id && x.Exception == null);
A.CallTo(() => commandBus.PublishAsync(A<ChangeContentStatus>.That.Matches(x => x.ContentId == id && x.DueTime == time)))
.MustHaveHappened();
}
@ -298,14 +318,22 @@ namespace Squidex.Domain.Apps.Entities.Contents
return (context.PlainResult as BulkUpdateResult)!;
}
private BulkUpdateContents BulkCommand(BulkUpdateType type, Query<IJsonValue>? query = null, DomainId? id = null, NamedContentData? data = null)
private BulkUpdateContents BulkCommand(BulkUpdateType type, Query<IJsonValue>? query = null,
DomainId? id = null, NamedContentData? data = null, Instant? dueTime = null)
{
return new BulkUpdateContents
{
AppId = appId,
Jobs = new[]
{
new BulkUpdateJob { Type = type, Query = query, Id = id, Data = data! }
new BulkUpdateJob
{
Type = type,
Id = id,
Data = data!,
DueTime = dueTime,
Query = query,
}
},
SchemaId = schemaId
};

Loading…
Cancel
Save