Browse Source

Improved rule handling.

pull/352/head
Sebastian 7 years ago
parent
commit
5d7ad2fe3e
  1. 22
      extensions/Squidex.Extensions/Actions/Algolia/AlgoliaActionHandler.cs
  2. 8
      extensions/Squidex.Extensions/Actions/AzureQueue/AzureQueueActionHandler.cs
  3. 6
      extensions/Squidex.Extensions/Actions/Discourse/DiscourseActionHandler.cs
  4. 21
      extensions/Squidex.Extensions/Actions/ElasticSearch/ElasticSearchActionHandler.cs
  5. 13
      extensions/Squidex.Extensions/Actions/Email/EmailActionHandler.cs
  6. 5
      extensions/Squidex.Extensions/Actions/Fastly/FastlyActionHandler.cs
  7. 21
      extensions/Squidex.Extensions/Actions/HttpHelper.cs
  8. 9
      extensions/Squidex.Extensions/Actions/Medium/MediumActionHandler.cs
  9. 6
      extensions/Squidex.Extensions/Actions/Prerender/PrerenderActionHandler.cs
  10. 5
      extensions/Squidex.Extensions/Actions/Slack/SlackActionHandler.cs
  11. 14
      extensions/Squidex.Extensions/Actions/Twitter/TweetActionHandler.cs
  12. 5
      extensions/Squidex.Extensions/Actions/Webhook/WebhookActionHandler.cs
  13. 8
      src/Squidex.Domain.Apps.Core.Model/Schemas/Schema.cs
  14. 3
      src/Squidex.Domain.Apps.Core.Operations/HandleRules/IRuleActionHandler.cs
  15. 96
      src/Squidex.Domain.Apps.Core.Operations/HandleRules/Result.cs
  16. 7
      src/Squidex.Domain.Apps.Core.Operations/HandleRules/RuleActionHandler.cs
  17. 45
      src/Squidex.Domain.Apps.Core.Operations/HandleRules/RuleService.cs
  18. 4
      src/Squidex.Domain.Apps.Core.Operations/ValidateContent/ValidationContext.cs
  19. 8
      src/Squidex.Domain.Apps.Entities/Rules/RuleDequeuerGrain.cs
  20. 3
      src/Squidex.Domain.Apps.Entities/Rules/UsageTracking/IUsageTrackerGrain.cs
  21. 17
      src/Squidex.Infrastructure/PubSubExtensions.cs
  22. 20
      src/Squidex.Infrastructure/Tasks/TaskExtensions.cs
  23. 35
      tests/Squidex.Domain.Apps.Core.Tests/Operations/HandleRules/RuleServiceTests.cs
  24. 2
      tests/Squidex.Domain.Apps.Entities.Tests/Rules/RuleDequeuerTests.cs

22
extensions/Squidex.Extensions/Actions/Algolia/AlgoliaActionHandler.cs

@ -5,7 +5,7 @@
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System;
using System.Threading;
using System.Threading.Tasks;
using Algolia.Search;
using Newtonsoft.Json;
@ -13,12 +13,12 @@ using Newtonsoft.Json.Linq;
using Squidex.Domain.Apps.Core.HandleRules;
using Squidex.Domain.Apps.Core.HandleRules.EnrichedEvents;
#pragma warning disable IDE0059 // Value assigned to symbol is never used
namespace Squidex.Extensions.Actions.Algolia
{
public sealed class AlgoliaActionHandler : RuleActionHandler<AlgoliaAction, AlgoliaJob>
{
private const string DescriptionIgnore = "Ignore";
private readonly ClientPool<(string AppId, string ApiKey, string IndexName), Index> clients;
public AlgoliaActionHandler(RuleEventFormatter formatter)
@ -65,14 +65,14 @@ namespace Squidex.Extensions.Actions.Algolia
return (ruleDescription, ruleJob);
}
return (DescriptionIgnore, new AlgoliaJob());
return ("Ignore", new AlgoliaJob());
}
protected override async Task<(string Dump, Exception Exception)> ExecuteJobAsync(AlgoliaJob job)
protected override async Task<Result> ExecuteJobAsync(AlgoliaJob job, CancellationToken ct = default)
{
if (string.IsNullOrWhiteSpace(job.AppId))
{
return (DescriptionIgnore, null);
return Result.Ignored();
}
var index = clients.GetClient((job.AppId, job.ApiKey, job.IndexName));
@ -81,20 +81,20 @@ namespace Squidex.Extensions.Actions.Algolia
{
if (job.Content != null)
{
var response = await index.PartialUpdateObjectAsync(job.Content);
var response = await index.PartialUpdateObjectAsync(job.Content, true, ct);
return (response.ToString(Formatting.Indented), null);
return Result.Success(response.ToString(Formatting.Indented));
}
else
{
var response = await index.DeleteObjectAsync(job.ContentId);
var response = await index.DeleteObjectAsync(job.ContentId, ct);
return (response.ToString(Formatting.Indented), null);
return Result.Success(response.ToString(Formatting.Indented));
}
}
catch (AlgoliaException ex)
{
return (ex.Message, ex);
return Result.Failed(ex);
}
}
}

8
extensions/Squidex.Extensions/Actions/AzureQueue/AzureQueueActionHandler.cs

@ -5,7 +5,7 @@
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Queue;
@ -47,13 +47,13 @@ namespace Squidex.Extensions.Actions.AzureQueue
return (ruleDescription, ruleJob);
}
protected override async Task<(string Dump, Exception Exception)> ExecuteJobAsync(AzureQueueJob job)
protected override async Task<Result> ExecuteJobAsync(AzureQueueJob job, CancellationToken ct = default)
{
var queue = clients.GetClient((job.QueueConnectionString, job.QueueName));
await queue.AddMessageAsync(new CloudQueueMessage(job.MessageBodyV2));
await queue.AddMessageAsync(new CloudQueueMessage(job.MessageBodyV2), null, null, null, null, ct);
return ("Completed", null);
return Result.Complete();
}
}

6
extensions/Squidex.Extensions/Actions/Discourse/DiscourseActionHandler.cs

@ -5,10 +5,10 @@
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System;
using System.Collections.Generic;
using System.Net.Http;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Squidex.Domain.Apps.Core.HandleRules;
using Squidex.Domain.Apps.Core.HandleRules.EnrichedEvents;
@ -65,7 +65,7 @@ namespace Squidex.Extensions.Actions.Discourse
return (description, ruleJob);
}
protected override async Task<(string Dump, Exception Exception)> ExecuteJobAsync(DiscourseJob job)
protected override async Task<Result> ExecuteJobAsync(DiscourseJob job, CancellationToken ct = default)
{
using (var httpClient = httpClientFactory.CreateClient())
{
@ -74,7 +74,7 @@ namespace Squidex.Extensions.Actions.Discourse
Content = new StringContent(job.RequestBody, Encoding.UTF8, "application/json")
};
return await httpClient.OneWayRequestAsync(request, job.RequestBody);
return await httpClient.OneWayRequestAsync(request, job.RequestBody, ct);
}
}
}

21
extensions/Squidex.Extensions/Actions/ElasticSearch/ElasticSearchActionHandler.cs

@ -6,17 +6,18 @@
// ==========================================================================
using System;
using System.Threading;
using System.Threading.Tasks;
using Elasticsearch.Net;
using Squidex.Domain.Apps.Core.HandleRules;
using Squidex.Domain.Apps.Core.HandleRules.EnrichedEvents;
#pragma warning disable IDE0059 // Value assigned to symbol is never used
namespace Squidex.Extensions.Actions.ElasticSearch
{
public sealed class ElasticSearchActionHandler : RuleActionHandler<ElasticSearchAction, ElasticSearchJob>
{
private const string DescriptionIgnore = "Ignore";
private readonly ClientPool<(Uri Host, string Username, string Password), ElasticLowLevelClient> clients;
public ElasticSearchActionHandler(RuleEventFormatter formatter)
@ -70,14 +71,14 @@ namespace Squidex.Extensions.Actions.ElasticSearch
return (ruleDescription, ruleJob);
}
return (DescriptionIgnore, new ElasticSearchJob());
return ("Ignore", new ElasticSearchJob());
}
protected override async Task<(string Dump, Exception Exception)> ExecuteJobAsync(ElasticSearchJob job)
protected override async Task<Result> ExecuteJobAsync(ElasticSearchJob job, CancellationToken ct = default)
{
if (string.IsNullOrWhiteSpace(job.Host))
{
return (DescriptionIgnore, null);
return Result.Ignored();
}
var client = clients.GetClient((new Uri(job.Host, UriKind.Absolute), job.Username, job.Password));
@ -86,20 +87,20 @@ namespace Squidex.Extensions.Actions.ElasticSearch
{
if (job.Content != null)
{
var response = await client.IndexAsync<StringResponse>(job.IndexName, job.IndexType, job.ContentId, job.Content);
var response = await client.IndexAsync<StringResponse>(job.IndexName, job.IndexType, job.ContentId, job.Content, ctx: ct);
return (response.Body, response.OriginalException);
return Result.SuccessOrFailed(response.OriginalException, response.Body);
}
else
{
var response = await client.DeleteAsync<StringResponse>(job.IndexName, job.IndexType, job.ContentId);
var response = await client.DeleteAsync<StringResponse>(job.IndexName, job.IndexType, job.ContentId, ctx: ct);
return (response.Body, response.OriginalException);
return Result.SuccessOrFailed(response.OriginalException, response.Body);
}
}
catch (ElasticsearchClientException ex)
{
return (ex.Message, ex);
return Result.Failed(ex);
}
}
}

13
extensions/Squidex.Extensions/Actions/Email/EmailActionHandler.cs

@ -5,9 +5,9 @@
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System;
using System.Net;
using System.Net.Mail;
using System.Threading;
using System.Threading.Tasks;
using Squidex.Domain.Apps.Core.HandleRules;
using Squidex.Domain.Apps.Core.HandleRules.EnrichedEvents;
@ -41,7 +41,7 @@ namespace Squidex.Extensions.Actions.Email
return (description, ruleJob);
}
protected override async Task<(string Dump, Exception Exception)> ExecuteJobAsync(EmailJob job)
protected override async Task<Result> ExecuteJobAsync(EmailJob job, CancellationToken ct = default)
{
using (var client = new SmtpClient(job.ServerHost, job.ServerPort))
{
@ -53,11 +53,16 @@ namespace Squidex.Extensions.Actions.Email
message.Subject = job.MessageSubject;
message.Body = job.MessageBody;
await client.SendMailAsync(message);
var sendTask = client.SendMailAsync(message);
using (ct.Register(client.SendAsyncCancel))
{
await client.SendMailAsync(message);
}
}
}
return ("Completed", null);
return Result.Complete();
}
}

5
extensions/Squidex.Extensions/Actions/Fastly/FastlyActionHandler.cs

@ -7,6 +7,7 @@
using System;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using Squidex.Domain.Apps.Core.HandleRules;
using Squidex.Domain.Apps.Core.HandleRules.EnrichedEvents;
@ -42,7 +43,7 @@ namespace Squidex.Extensions.Actions.Fastly
return (Description, ruleJob);
}
protected override async Task<(string Dump, Exception Exception)> ExecuteJobAsync(FastlyJob job)
protected override async Task<Result> ExecuteJobAsync(FastlyJob job, CancellationToken ct = default)
{
using (var httpClient = httpClientFactory.CreateClient())
{
@ -53,7 +54,7 @@ namespace Squidex.Extensions.Actions.Fastly
request.Headers.Add("Fastly-Key", job.FastlyApiKey);
return await httpClient.OneWayRequestAsync(request);
return await httpClient.OneWayRequestAsync(request, ct: ct);
}
}
}

21
extensions/Squidex.Extensions/Actions/HttpHelper.cs

@ -7,38 +7,41 @@
using System;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using Squidex.Domain.Apps.Core.HandleRules;
using Squidex.Infrastructure.Http;
namespace Squidex.Extensions.Actions
{
public static class HttpHelper
{
public static async Task<(string Dump, Exception Exception)> OneWayRequestAsync(this HttpClient client, HttpRequestMessage request, string requestBody = null)
public static async Task<Result> OneWayRequestAsync(this HttpClient client, HttpRequestMessage request, string requestBody = null, CancellationToken ct = default)
{
HttpResponseMessage response = null;
try
{
response = await client.SendAsync(request);
response = await client.SendAsync(request, ct);
var responseString = await response.Content.ReadAsStringAsync();
var requestDump = DumpFormatter.BuildDump(request, response, requestBody, responseString);
Exception ex = null;
if (!response.IsSuccessStatusCode)
{
ex = new HttpRequestException($"Response code does not indicate success: {(int)response.StatusCode} ({response.StatusCode}).");
}
var ex = new HttpRequestException($"Response code does not indicate success: {(int)response.StatusCode} ({response.StatusCode}).");
return (requestDump, ex);
return Result.Failed(ex, requestDump);
}
else
{
return Result.Success(requestDump);
}
}
catch (Exception ex)
{
var requestDump = DumpFormatter.BuildDump(request, response, requestBody, ex.ToString());
return (requestDump, ex);
return Result.Failed(ex, requestDump);
}
}
}

9
extensions/Squidex.Extensions/Actions/Medium/MediumActionHandler.cs

@ -8,6 +8,7 @@
using System;
using System.Net.Http;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Squidex.Domain.Apps.Core.HandleRules;
using Squidex.Domain.Apps.Core.HandleRules.EnrichedEvents;
@ -78,7 +79,7 @@ namespace Squidex.Extensions.Actions.Medium
}
}
protected override async Task<(string Dump, Exception Exception)> ExecuteJobAsync(MediumJob job)
protected override async Task<Result> ExecuteJobAsync(MediumJob job, CancellationToken ct = default)
{
using (var httpClient = httpClientFactory.CreateClient())
{
@ -100,7 +101,7 @@ namespace Squidex.Extensions.Actions.Medium
var meRequest = BuildMeRequest(job);
try
{
response = await httpClient.SendAsync(meRequest);
response = await httpClient.SendAsync(meRequest, ct);
var responseString = await response.Content.ReadAsStringAsync();
var responseJson = serializer.Deserialize<UserResponse>(responseString);
@ -113,11 +114,11 @@ namespace Squidex.Extensions.Actions.Medium
{
var requestDump = DumpFormatter.BuildDump(meRequest, response, ex.ToString());
return (requestDump, ex);
return Result.Failed(ex, requestDump);
}
}
return await httpClient.OneWayRequestAsync(BuildPostRequest(job, path), job.RequestBody);
return await httpClient.OneWayRequestAsync(BuildPostRequest(job, path), job.RequestBody, ct);
}
}

6
extensions/Squidex.Extensions/Actions/Prerender/PrerenderActionHandler.cs

@ -5,9 +5,9 @@
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System;
using System.Net.Http;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Squidex.Domain.Apps.Core.HandleRules;
using Squidex.Domain.Apps.Core.HandleRules.EnrichedEvents;
@ -34,7 +34,7 @@ namespace Squidex.Extensions.Actions.Prerender
return ($"Recache {url}", new PrerenderJob { RequestBody = requestBody });
}
protected override async Task<(string Dump, Exception Exception)> ExecuteJobAsync(PrerenderJob job)
protected override async Task<Result> ExecuteJobAsync(PrerenderJob job, CancellationToken ct = default)
{
using (var httpClient = httpClientFactory.CreateClient())
{
@ -43,7 +43,7 @@ namespace Squidex.Extensions.Actions.Prerender
Content = new StringContent(job.RequestBody, Encoding.UTF8, "application/json")
};
return await httpClient.OneWayRequestAsync(request, job.RequestBody);
return await httpClient.OneWayRequestAsync(request, job.RequestBody, ct);
}
}
}

5
extensions/Squidex.Extensions/Actions/Slack/SlackActionHandler.cs

@ -8,6 +8,7 @@
using System;
using System.Net.Http;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Squidex.Domain.Apps.Core.HandleRules;
using Squidex.Domain.Apps.Core.HandleRules.EnrichedEvents;
@ -42,7 +43,7 @@ namespace Squidex.Extensions.Actions.Slack
return (Description, ruleJob);
}
protected override async Task<(string Dump, Exception Exception)> ExecuteJobAsync(SlackJob job)
protected override async Task<Result> ExecuteJobAsync(SlackJob job, CancellationToken ct = default)
{
using (var httpClient = httpClientFactory.CreateClient())
{
@ -53,7 +54,7 @@ namespace Squidex.Extensions.Actions.Slack
Content = new StringContent(job.RequestBody, Encoding.UTF8, "application/json")
};
return await httpClient.OneWayRequestAsync(request, job.RequestBody);
return await httpClient.OneWayRequestAsync(request, job.RequestBody, ct);
}
}
}

14
extensions/Squidex.Extensions/Actions/Twitter/TweetActionHandler.cs

@ -5,7 +5,8 @@
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using CoreTweet;
using Microsoft.Extensions.Options;
@ -41,7 +42,7 @@ namespace Squidex.Extensions.Actions.Twitter
return (Description, ruleJob);
}
protected override async Task<(string Dump, Exception Exception)> ExecuteJobAsync(TweetJob job)
protected override async Task<Result> ExecuteJobAsync(TweetJob job, CancellationToken ct = default)
{
var tokens = Tokens.Create(
twitterOptions.ClientId,
@ -49,9 +50,14 @@ namespace Squidex.Extensions.Actions.Twitter
job.AccessToken,
job.AccessSecret);
await tokens.Statuses.UpdateAsync(status => job.Text);
var request = new Dictionary<string, object>
{
["status"] = job.Text
};
await tokens.Statuses.UpdateAsync(request, ct);
return ($"Tweeted: {job.Text}", null);
return Result.Success($"Tweeted: {job.Text}");
}
}

5
extensions/Squidex.Extensions/Actions/Webhook/WebhookActionHandler.cs

@ -8,6 +8,7 @@
using System;
using System.Net.Http;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Squidex.Domain.Apps.Core.HandleRules;
using Squidex.Domain.Apps.Core.HandleRules.EnrichedEvents;
@ -44,7 +45,7 @@ namespace Squidex.Extensions.Actions.Webhook
return (ruleDescription, ruleJob);
}
protected override async Task<(string Dump, Exception Exception)> ExecuteJobAsync(WebhookJob job)
protected override async Task<Result> ExecuteJobAsync(WebhookJob job, CancellationToken ct = default)
{
using (var httpClient = httpClientFactory.CreateClient())
{
@ -59,7 +60,7 @@ namespace Squidex.Extensions.Actions.Webhook
request.Headers.Add("X-Application", "Squidex Webhook");
request.Headers.Add("User-Agent", "Squidex Webhook");
return await httpClient.OneWayRequestAsync(request, job.RequestBody);
return await httpClient.OneWayRequestAsync(request, job.RequestBody, ct);
}
}
}

8
src/Squidex.Domain.Apps.Core.Model/Schemas/Schema.cs

@ -142,20 +142,20 @@ namespace Squidex.Domain.Apps.Core.Schemas
}
[Pure]
public Schema ChangeCategory(string category)
public Schema ChangeCategory(string newCategory)
{
return Clone(clone =>
{
clone.category = category;
clone.category = newCategory;
});
}
[Pure]
public Schema ConfigurePreviewUrls(IReadOnlyDictionary<string, string> previewUrls)
public Schema ConfigurePreviewUrls(IReadOnlyDictionary<string, string> newPreviewUrls)
{
return Clone(clone =>
{
clone.previewUrls = previewUrls ?? EmptyPreviewUrls;
clone.previewUrls = newPreviewUrls ?? EmptyPreviewUrls;
});
}

3
src/Squidex.Domain.Apps.Core.Operations/HandleRules/IRuleActionHandler.cs

@ -6,6 +6,7 @@
// ==========================================================================
using System;
using System.Threading;
using System.Threading.Tasks;
using Squidex.Domain.Apps.Core.HandleRules.EnrichedEvents;
using Squidex.Domain.Apps.Core.Rules;
@ -20,6 +21,6 @@ namespace Squidex.Domain.Apps.Core.HandleRules
Task<(string Description, object Data)> CreateJobAsync(EnrichedEvent @event, RuleAction action);
Task<(string Dump, Exception Exception)> ExecuteJobAsync(object data);
Task<Result> ExecuteJobAsync(object data, CancellationToken ct = default);
}
}

96
src/Squidex.Domain.Apps.Core.Operations/HandleRules/Result.cs

@ -0,0 +1,96 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschraenkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System;
using System.Text;
namespace Squidex.Domain.Apps.Core.HandleRules
{
public sealed class Result
{
public Exception Exception { get; private set; }
public string Dump { get; private set; }
public RuleResult Status { get; private set; }
public void Enrich(TimeSpan elapsed)
{
var dumpBuilder = new StringBuilder();
if (!string.IsNullOrWhiteSpace(Dump))
{
dumpBuilder.AppendLine(Dump);
}
if (Status == RuleResult.Timeout)
{
dumpBuilder.AppendLine();
dumpBuilder.AppendLine("Action timed out.");
}
dumpBuilder.AppendLine();
dumpBuilder.AppendFormat("Elapsed {0}.", elapsed);
dumpBuilder.AppendLine();
Dump = dumpBuilder.ToString();
}
public static Result Ignored()
{
return Success("Ignored");
}
public static Result Complete()
{
return Success("Completed");
}
public static Result Create(string dump, RuleResult result)
{
return new Result { Dump = dump, Status = result };
}
public static Result Success(string dump)
{
return new Result { Dump = dump, Status = RuleResult.Success };
}
public static Result Failed(Exception ex)
{
return Failed(ex, ex?.Message);
}
public static Result SuccessOrFailed(Exception ex, string dump)
{
if (ex != null)
{
return Failed(ex, dump);
}
else
{
return Success(dump);
}
}
public static Result Failed(Exception ex, string dump)
{
var result = new Result { Exception = ex, Dump = dump ?? ex.Message };
if (ex is OperationCanceledException || ex is TimeoutException)
{
result.Status = RuleResult.Timeout;
}
else
{
result.Status = RuleResult.Failed;
}
return result;
}
}
}

7
src/Squidex.Domain.Apps.Core.Operations/HandleRules/RuleActionHandler.cs

@ -6,6 +6,7 @@
// ==========================================================================
using System;
using System.Threading;
using System.Threading.Tasks;
using Squidex.Domain.Apps.Core.HandleRules.EnrichedEvents;
using Squidex.Domain.Apps.Core.Rules;
@ -63,11 +64,11 @@ namespace Squidex.Domain.Apps.Core.HandleRules
return (description, data);
}
async Task<(string Dump, Exception Exception)> IRuleActionHandler.ExecuteJobAsync(object data)
async Task<Result> IRuleActionHandler.ExecuteJobAsync(object data, CancellationToken ct)
{
var typedData = (TData)data;
return await ExecuteJobAsync(typedData);
return await ExecuteJobAsync(typedData, ct);
}
protected virtual Task<(string Description, TData Data)> CreateJobAsync(EnrichedEvent @event, TAction action)
@ -80,6 +81,6 @@ namespace Squidex.Domain.Apps.Core.HandleRules
throw new NotImplementedException();
}
protected abstract Task<(string Dump, Exception Exception)> ExecuteJobAsync(TData job);
protected abstract Task<Result> ExecuteJobAsync(TData job, CancellationToken ct = default);
}
}

45
src/Squidex.Domain.Apps.Core.Operations/HandleRules/RuleService.cs

@ -8,7 +8,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using NodaTime;
using Squidex.Domain.Apps.Core.Rules;
@ -17,11 +17,13 @@ using Squidex.Infrastructure;
using Squidex.Infrastructure.EventSourcing;
using Squidex.Infrastructure.Json;
using Squidex.Infrastructure.Log;
using Squidex.Infrastructure.Tasks;
namespace Squidex.Domain.Apps.Core.HandleRules
{
public class RuleService
{
private static readonly TimeSpan DefaultTimeout = TimeSpan.FromSeconds(3);
private readonly Dictionary<Type, IRuleActionHandler> ruleActionHandlers;
private readonly Dictionary<Type, IRuleTriggerHandler> ruleTriggerHandlers;
private readonly TypeNameRegistry typeNameRegistry;
@ -156,47 +158,34 @@ namespace Squidex.Domain.Apps.Core.HandleRules
}
}
public virtual async Task<(string Dump, RuleResult Result, TimeSpan Elapsed)> InvokeAsync(string actionName, string job)
public virtual async Task<(Result Result, TimeSpan Elapsed)> InvokeAsync(string actionName, string job)
{
var actionWatch = ValueStopwatch.StartNew();
Result result;
try
{
var actionType = typeNameRegistry.GetType(actionName);
var actionWatch = ValueStopwatch.StartNew();
var actionHandler = ruleActionHandlers[actionType];
var deserialized = jsonSerializer.Deserialize<object>(job, actionHandler.DataType);
var result = await actionHandler.ExecuteJobAsync(deserialized);
var elapsed = TimeSpan.FromMilliseconds(actionWatch.Stop());
var dumpBuilder = new StringBuilder(result.Dump);
dumpBuilder.AppendLine();
dumpBuilder.AppendFormat("Elapsed {0}.", elapsed);
dumpBuilder.AppendLine();
if (result.Exception is TimeoutException || result.Exception is OperationCanceledException)
using (var cts = new CancellationTokenSource(DefaultTimeout))
{
dumpBuilder.AppendLine();
dumpBuilder.AppendLine("Action timed out.");
return (dumpBuilder.ToString(), RuleResult.Timeout, elapsed);
}
else if (result.Exception != null)
{
return (dumpBuilder.ToString(), RuleResult.Failed, elapsed);
}
else
{
return (dumpBuilder.ToString(), RuleResult.Success, elapsed);
result = await actionHandler.ExecuteJobAsync(deserialized, cts.Token).WithCancellation(cts.Token);
}
}
catch (Exception ex)
{
return (ex.ToString(), RuleResult.Failed, TimeSpan.Zero);
result = Result.Failed(ex);
}
var elapsed = TimeSpan.FromMilliseconds(actionWatch.Stop());
result.Enrich(elapsed);
return (result, elapsed);
}
}
}

4
src/Squidex.Domain.Apps.Core.Operations/ValidateContent/ValidationContext.cs

@ -84,9 +84,9 @@ namespace Squidex.Domain.Apps.Core.ValidateContent
return new ValidationContext(contentId, schemaId, checkContent, checkAsset, propertyPath.Enqueue(property), IsOptional);
}
public Task<IReadOnlyList<Guid>> GetContentIdsAsync(Guid schemaId, FilterNode filter)
public Task<IReadOnlyList<Guid>> GetContentIdsAsync(Guid validatedSchemaId, FilterNode filter)
{
return checkContent(schemaId, filter);
return checkContent(validatedSchemaId, filter);
}
public Task<IReadOnlyList<IAssetInfo>> GetAssetInfosAsync(IEnumerable<Guid> assetId)

8
src/Squidex.Domain.Apps.Entities/Rules/RuleDequeuerGrain.cs

@ -98,12 +98,12 @@ namespace Squidex.Domain.Apps.Entities.Rules
{
var job = @event.Job;
var response = await ruleService.InvokeAsync(job.ActionName, job.ActionData);
var (response, elapsed) = await ruleService.InvokeAsync(job.ActionName, job.ActionData);
var jobInvoke = ComputeJobInvoke(response.Result, @event, job);
var jobResult = ComputeJobResult(response.Result, jobInvoke);
var jobInvoke = ComputeJobInvoke(response.Status, @event, job);
var jobResult = ComputeJobResult(response.Status, jobInvoke);
await ruleEventRepository.MarkSentAsync(@event.Id, response.Dump, response.Result, jobResult, response.Elapsed, jobInvoke);
await ruleEventRepository.MarkSentAsync(@event.Id, response.Dump, response.Status, jobResult, elapsed, jobInvoke);
}
catch (Exception ex)
{

3
src/Squidex.Domain.Apps.Entities/Rules/UsageTracking/IUsageTrackerGrain.cs

@ -7,13 +7,12 @@
using System;
using System.Threading.Tasks;
using Orleans;
using Squidex.Infrastructure;
using Squidex.Infrastructure.Orleans;
namespace Squidex.Domain.Apps.Entities.Rules.UsageTracking
{
public interface IUsageTrackerGrain : IGrainWithStringKey, IBackgroundGrain
public interface IUsageTrackerGrain : IBackgroundGrain
{
Task AddTargetAsync(Guid ruleId, NamedId<Guid> appId, int limits, int? numDays);

17
src/Squidex.Infrastructure/PubSubExtensions.cs

@ -8,6 +8,7 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Squidex.Infrastructure.Tasks;
#pragma warning disable 4014
#pragma warning disable RECS0165 // Asynchronous methods should return a Task instead of void
@ -59,21 +60,9 @@ namespace Squidex.Infrastructure
Task.Run(() => pubsub.Publish(request, self));
using (var cts = new CancellationTokenSource())
using (var cts = new CancellationTokenSource(timeout))
{
var delayTask = Task.Delay(timeout, cts.Token);
var resultTask = await Task.WhenAny(receiveTask.Task, delayTask);
if (resultTask == delayTask)
{
throw new TaskCanceledException();
}
else
{
cts.Cancel();
return await receiveTask.Task;
}
return await receiveTask.Task.WithCancellation(cts.Token);
}
}
finally

20
src/Squidex.Infrastructure/Tasks/TaskExtensions.cs

@ -77,5 +77,25 @@ namespace Squidex.Infrastructure.Tasks
return TaskHelper.Done;
};
}
public static async Task<T> WithCancellation<T>(this Task<T> task, CancellationToken cancellationToken)
{
var tcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
using (cancellationToken.Register(state =>
{
((TaskCompletionSource<object>)state).TrySetResult(null);
},
tcs))
{
var resultTask = await Task.WhenAny(task, tcs.Task);
if (resultTask == tcs.Task)
{
throw new OperationCanceledException(cancellationToken);
}
return await task;
}
}
}
}

35
tests/Squidex.Domain.Apps.Core.Tests/Operations/HandleRules/RuleServiceTests.cs

@ -6,6 +6,7 @@
// ==========================================================================
using System;
using System.Threading;
using System.Threading.Tasks;
using FakeItEasy;
using NodaTime;
@ -253,58 +254,58 @@ namespace Squidex.Domain.Apps.Core.Operations.HandleRules
[Fact]
public async Task Should_return_succeeded_job_with_full_dump_when_handler_returns_no_exception()
{
A.CallTo(() => ruleActionHandler.ExecuteJobAsync(A<ValidData>.That.Matches(x => x.Value == 10)))
.Returns((actionDump, null));
A.CallTo(() => ruleActionHandler.ExecuteJobAsync(A<ValidData>.That.Matches(x => x.Value == 10), A<CancellationToken>.Ignored))
.Returns(Result.Success(actionDump));
var result = await sut.InvokeAsync(actionName, actionData);
Assert.Equal(RuleResult.Success, result.Result);
Assert.Equal(RuleResult.Success, result.Result.Status);
Assert.True(result.Elapsed >= TimeSpan.Zero);
Assert.True(result.Dump.StartsWith(actionDump, StringComparison.OrdinalIgnoreCase));
Assert.True(result.Result.Dump.StartsWith(actionDump, StringComparison.OrdinalIgnoreCase));
}
[Fact]
public async Task Should_return_failed_job_with_full_dump_when_handler_returns_exception()
{
A.CallTo(() => ruleActionHandler.ExecuteJobAsync(A<ValidData>.That.Matches(x => x.Value == 10)))
.Returns((actionDump, new InvalidOperationException()));
A.CallTo(() => ruleActionHandler.ExecuteJobAsync(A<ValidData>.That.Matches(x => x.Value == 10), A<CancellationToken>.Ignored))
.Returns(Result.Failed(new InvalidOperationException(), actionDump));
var result = await sut.InvokeAsync(actionName, actionData);
Assert.Equal(RuleResult.Failed, result.Result);
Assert.Equal(RuleResult.Failed, result.Result.Status);
Assert.True(result.Elapsed >= TimeSpan.Zero);
Assert.True(result.Dump.StartsWith(actionDump, StringComparison.OrdinalIgnoreCase));
Assert.True(result.Result.Dump.StartsWith(actionDump, StringComparison.OrdinalIgnoreCase));
}
[Fact]
public async Task Should_return_timedout_job_with_full_dump_when_exception_from_handler_indicates_timeout()
{
A.CallTo(() => ruleActionHandler.ExecuteJobAsync(A<ValidData>.That.Matches(x => x.Value == 10)))
.Returns((actionDump, new TimeoutException()));
A.CallTo(() => ruleActionHandler.ExecuteJobAsync(A<ValidData>.That.Matches(x => x.Value == 10), A<CancellationToken>.Ignored))
.Returns(Result.Failed(new TimeoutException(), actionDump));
var result = await sut.InvokeAsync(actionName, actionData);
Assert.Equal(RuleResult.Timeout, result.Result);
Assert.Equal(RuleResult.Timeout, result.Result.Status);
Assert.True(result.Elapsed >= TimeSpan.Zero);
Assert.True(result.Dump.StartsWith(actionDump, StringComparison.OrdinalIgnoreCase));
Assert.True(result.Result.Dump.StartsWith(actionDump, StringComparison.OrdinalIgnoreCase));
Assert.True(result.Dump.IndexOf("Action timed out.", StringComparison.OrdinalIgnoreCase) >= 0);
Assert.True(result.Result.Dump.IndexOf("Action timed out.", StringComparison.OrdinalIgnoreCase) >= 0);
}
[Fact]
public async Task Should_create_exception_details_when_job_to_execute_failed()
{
var ruleError = new InvalidOperationException();
var ex = new InvalidOperationException();
A.CallTo(() => ruleActionHandler.ExecuteJobAsync(A<ValidData>.That.Matches(x => x.Value == 10)))
.Throws(ruleError);
A.CallTo(() => ruleActionHandler.ExecuteJobAsync(A<ValidData>.That.Matches(x => x.Value == 10), A<CancellationToken>.Ignored))
.Throws(ex);
var result = await sut.InvokeAsync(actionName, actionData);
Assert.Equal((ruleError.ToString(), RuleResult.Failed, TimeSpan.Zero), result);
Assert.Equal(ex, result.Result.Exception);
}
private static Rule RuleInvalidAction()

2
tests/Squidex.Domain.Apps.Entities.Tests/Rules/RuleDequeuerTests.cs

@ -54,7 +54,7 @@ namespace Squidex.Domain.Apps.Entities.Rules
var requestDump = "Dump";
A.CallTo(() => ruleService.InvokeAsync(@event.Job.ActionName, @event.Job.ActionData))
.Returns((requestDump, result, requestElapsed));
.Returns((Result.Create(requestDump, result), requestElapsed));
Instant? nextCall = null;

Loading…
Cancel
Save