From de0f753d7f7f1c4e97dca2d04d8abba7a178df65 Mon Sep 17 00:00:00 2001 From: Sebastian Stehle Date: Sun, 18 Jun 2017 21:19:45 +0200 Subject: [PATCH] Fix tracking of parallel webhooks. --- .../Log/JsonLogWriter.cs | 16 +-- .../StringExtensions.cs | 5 + src/Squidex.Read/Schemas/WebhookInvoker.cs | 99 +++++++++++++++---- src/Squidex.Read/Squidex.Read.csproj | 1 + 4 files changed, 95 insertions(+), 26 deletions(-) diff --git a/src/Squidex.Infrastructure/Log/JsonLogWriter.cs b/src/Squidex.Infrastructure/Log/JsonLogWriter.cs index 8e61631cb..859810ffe 100644 --- a/src/Squidex.Infrastructure/Log/JsonLogWriter.cs +++ b/src/Squidex.Infrastructure/Log/JsonLogWriter.cs @@ -78,7 +78,7 @@ namespace Squidex.Infrastructure.Log IObjectWriter IObjectWriter.WriteProperty(string property, string value) { - jsonWriter.WritePropertyName(property); + jsonWriter.WritePropertyName(property.ToCamelCase()); jsonWriter.WriteValue(value); return this; @@ -86,7 +86,7 @@ namespace Squidex.Infrastructure.Log IObjectWriter IObjectWriter.WriteProperty(string property, double value) { - jsonWriter.WritePropertyName(property); + jsonWriter.WritePropertyName(property.ToCamelCase()); jsonWriter.WriteValue(value); return this; @@ -94,7 +94,7 @@ namespace Squidex.Infrastructure.Log IObjectWriter IObjectWriter.WriteProperty(string property, long value) { - jsonWriter.WritePropertyName(property); + jsonWriter.WritePropertyName(property.ToCamelCase()); jsonWriter.WriteValue(value); return this; @@ -102,7 +102,7 @@ namespace Squidex.Infrastructure.Log IObjectWriter IObjectWriter.WriteProperty(string property, bool value) { - jsonWriter.WritePropertyName(property); + jsonWriter.WritePropertyName(property.ToCamelCase()); jsonWriter.WriteValue(value); return this; @@ -110,7 +110,7 @@ namespace Squidex.Infrastructure.Log IObjectWriter IObjectWriter.WriteProperty(string property, DateTime value) { - jsonWriter.WritePropertyName(property); + jsonWriter.WritePropertyName(property.ToCamelCase()); jsonWriter.WriteValue(value.ToString("o", CultureInfo.InvariantCulture)); return this; @@ -118,7 +118,7 @@ namespace Squidex.Infrastructure.Log IObjectWriter IObjectWriter.WriteProperty(string property, DateTimeOffset value) { - jsonWriter.WritePropertyName(property); + jsonWriter.WritePropertyName(property.ToCamelCase()); jsonWriter.WriteValue(value.ToString("o", CultureInfo.InvariantCulture)); return this; @@ -126,7 +126,7 @@ namespace Squidex.Infrastructure.Log IObjectWriter IObjectWriter.WriteProperty(string property, TimeSpan value) { - jsonWriter.WritePropertyName(property); + jsonWriter.WritePropertyName(property.ToCamelCase()); jsonWriter.WriteValue(value); return this; @@ -146,7 +146,7 @@ namespace Squidex.Infrastructure.Log IObjectWriter IObjectWriter.WriteArray(string property, Action arrayWriter) { - jsonWriter.WritePropertyName(property); + jsonWriter.WritePropertyName(property.ToCamelCase()); jsonWriter.WriteStartArray(); arrayWriter?.Invoke(this); diff --git a/src/Squidex.Infrastructure/StringExtensions.cs b/src/Squidex.Infrastructure/StringExtensions.cs index 7d45af461..1560c8c07 100644 --- a/src/Squidex.Infrastructure/StringExtensions.cs +++ b/src/Squidex.Infrastructure/StringExtensions.cs @@ -27,6 +27,11 @@ namespace Squidex.Infrastructure return value != null && PropertyNameRegex.IsMatch(value); } + public static string ToCamelCase(this string value) + { + return char.ToLower(value[0]) + value.Substring(1); + } + public static string ToPascalCase(this string value) { return string.Concat(value.Split(new[] { '-', '_', ' ' }, StringSplitOptions.RemoveEmptyEntries).Select(c => char.ToUpper(c[0]) + c.Substring(1))); diff --git a/src/Squidex.Read/Schemas/WebhookInvoker.cs b/src/Squidex.Read/Schemas/WebhookInvoker.cs index 250dbf0b8..d72eb85f2 100644 --- a/src/Squidex.Read/Schemas/WebhookInvoker.cs +++ b/src/Squidex.Read/Schemas/WebhookInvoker.cs @@ -11,6 +11,7 @@ using System.Diagnostics; using System.Net.Http; using System.Text; using System.Threading.Tasks; +using System.Threading.Tasks.Dataflow; using Newtonsoft.Json; using Newtonsoft.Json.Linq; using NodaTime; @@ -24,13 +25,33 @@ using Squidex.Read.Schemas.Repositories; namespace Squidex.Read.Schemas { - public sealed class WebhookInvoker : IEventConsumer + public sealed class WebhookInvoker : DisposableObjectBase, IEventConsumer { private static readonly TimeSpan Timeout = TimeSpan.FromSeconds(2); private readonly ISchemaWebhookRepository webhookRepository; private readonly ISemanticLog log; + private readonly TypeNameRegistry typeNameRegistry; private readonly JsonSerializer webhookSerializer; + private readonly TransformBlock invokeBlock; + private readonly ActionBlock dumpBlock; + + private class WebhookData + { + public ISchemaWebhookUrlEntity Webhook; + } + + private sealed class InvocationRequest : WebhookData + { + public JObject Payload; + } + + private sealed class InvocationResponse : WebhookData + { + public string Dump; + public TimeSpan Elapsed; + public WebhookResult Result; + } public string Name { @@ -42,16 +63,37 @@ namespace Squidex.Read.Schemas get { return "^content-"; } } - public WebhookInvoker(ISchemaWebhookRepository webhookRepository, JsonSerializer webhookSerializer, ISemanticLog log) + public WebhookInvoker(ISchemaWebhookRepository webhookRepository, JsonSerializer webhookSerializer, ISemanticLog log, TypeNameRegistry typeNameRegistry) { Guard.NotNull(webhookRepository, nameof(webhookRepository)); Guard.NotNull(webhookSerializer, nameof(webhookSerializer)); + Guard.NotNull(typeNameRegistry, nameof(typeNameRegistry)); Guard.NotNull(log, nameof(log)); this.webhookRepository = webhookRepository; this.webhookSerializer = webhookSerializer; this.log = log; + + this.typeNameRegistry = typeNameRegistry; + + invokeBlock = + new TransformBlock(x => DispatchEventAsync(x), + new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, BoundedCapacity = 8 }); + + dumpBlock = + new ActionBlock(DumpAsync, + new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 32, BoundedCapacity = 64 }); + + invokeBlock.LinkTo(dumpBlock, + new DataflowLinkOptions { PropagateCompletion = true }, x => x != null); + } + + protected override void DisposeObject(bool disposing) + { + invokeBlock.Complete(); + + dumpBlock.Completion.Wait(); } public Task ClearAsync() @@ -63,15 +105,15 @@ namespace Squidex.Read.Schemas { if (@event.Payload is ContentEvent contentEvent) { - var hooks = await webhookRepository.QueryUrlsBySchemaAsync(contentEvent.AppId.Id, contentEvent.SchemaId.Id); + var webhooks = await webhookRepository.QueryUrlsBySchemaAsync(contentEvent.AppId.Id, contentEvent.SchemaId.Id); - if (hooks.Count > 0) + if (webhooks.Count > 0) { var payload = CreatePayload(@event); - foreach (var hook in hooks) + foreach (var webhook in webhooks) { - DispatchEventAsync(payload, hook, @event.Headers.Timestamp()).Forget(); + await invokeBlock.SendAsync(new InvocationRequest { Webhook = webhook, Payload = payload }); } } } @@ -80,21 +122,35 @@ namespace Squidex.Read.Schemas private JObject CreatePayload(Envelope @event) { return new JObject( - new JProperty("type", @event.Payload.GetType().Name), + new JProperty("type", typeNameRegistry.GetName(@event.Payload.GetType())), new JProperty("payload", JObject.FromObject(@event.Payload, webhookSerializer)), new JProperty("timestamp", @event.Headers.Timestamp().ToString())); } - private async Task DispatchEventAsync(JObject payload, ISchemaWebhookUrlEntity webhook, Instant instant) + private async Task DumpAsync(InvocationResponse input) { try { - payload = SignPayload(payload, webhook, instant); + await webhookRepository.AddInvokationAsync(input.Webhook.Id, input.Dump, input.Result, input.Elapsed); + } + catch (Exception ex) + { + log.LogError(ex, w => w + .WriteProperty("action", "DumpHook") + .WriteProperty("status", "Failed")); + } + } + + private async Task DispatchEventAsync(InvocationRequest input) + { + try + { + var payload = SignPayload(input.Payload, input.Webhook); var requestString = payload.ToString(Formatting.Indented); var responseString = string.Empty; - var request = BuildRequest(requestString, webhook); + var request = BuildRequest(requestString, input.Webhook); var response = (HttpResponseMessage)null; var isTimeout = false; @@ -103,9 +159,9 @@ namespace Squidex.Read.Schemas try { using (log.MeasureInformation(w => w - .WriteProperty("Action", "SendToHook") - .WriteProperty("Status", "Invoked") - .WriteProperty("RequestUrl", request.RequestUri.ToString()))) + .WriteProperty("action", "SendToHook") + .WriteProperty("status", "Invoked") + .WriteProperty("requestUrl", request.RequestUri.ToString()))) { using (var client = new HttpClient { Timeout = Timeout }) { @@ -144,19 +200,26 @@ namespace Squidex.Read.Schemas result = WebhookResult.Success; } - await webhookRepository.AddInvokationAsync(webhook.Id, dump, result, watch.Elapsed); + return new InvocationResponse { Dump = dump, Result = result, Elapsed = watch.Elapsed, Webhook = input.Webhook }; } catch (Exception ex) { log.LogError(ex, w => w - .WriteProperty("Action", "SendToHook") - .WriteProperty("Status", "Failed")); + .WriteProperty("action", "SendToHook") + .WriteProperty("status", "Failed")); + + return null; } } - private static JObject SignPayload(JObject payload, ISchemaWebhookUrlEntity webhook, Instant instant) + private static JObject SignPayload(JObject payload, ISchemaWebhookUrlEntity webhook) { - payload["signature"] = $"{instant.ToUnixTimeSeconds()}{webhook.SharedSecret}".Sha256Base64(); + payload = new JObject(payload); + + var eventTimestamp = SystemClock.Instance.GetCurrentInstant().ToUnixTimeSeconds(); + var eventSignature = $"{eventTimestamp}{webhook.SharedSecret}".Sha256Base64(); + + payload["signature"] = eventSignature; return payload; } diff --git a/src/Squidex.Read/Squidex.Read.csproj b/src/Squidex.Read/Squidex.Read.csproj index 4a347c975..4bc4ca905 100644 --- a/src/Squidex.Read/Squidex.Read.csproj +++ b/src/Squidex.Read/Squidex.Read.csproj @@ -17,5 +17,6 @@ +