Browse Source

Fix tracking of parallel webhooks.

pull/65/head
Sebastian Stehle 9 years ago
parent
commit
de0f753d7f
  1. 16
      src/Squidex.Infrastructure/Log/JsonLogWriter.cs
  2. 5
      src/Squidex.Infrastructure/StringExtensions.cs
  3. 99
      src/Squidex.Read/Schemas/WebhookInvoker.cs
  4. 1
      src/Squidex.Read/Squidex.Read.csproj

16
src/Squidex.Infrastructure/Log/JsonLogWriter.cs

@ -78,7 +78,7 @@ namespace Squidex.Infrastructure.Log
IObjectWriter IObjectWriter.WriteProperty(string property, string value)
{
jsonWriter.WritePropertyName(property);
jsonWriter.WritePropertyName(property.ToCamelCase());
jsonWriter.WriteValue(value);
return this;
@ -86,7 +86,7 @@ namespace Squidex.Infrastructure.Log
IObjectWriter IObjectWriter.WriteProperty(string property, double value)
{
jsonWriter.WritePropertyName(property);
jsonWriter.WritePropertyName(property.ToCamelCase());
jsonWriter.WriteValue(value);
return this;
@ -94,7 +94,7 @@ namespace Squidex.Infrastructure.Log
IObjectWriter IObjectWriter.WriteProperty(string property, long value)
{
jsonWriter.WritePropertyName(property);
jsonWriter.WritePropertyName(property.ToCamelCase());
jsonWriter.WriteValue(value);
return this;
@ -102,7 +102,7 @@ namespace Squidex.Infrastructure.Log
IObjectWriter IObjectWriter.WriteProperty(string property, bool value)
{
jsonWriter.WritePropertyName(property);
jsonWriter.WritePropertyName(property.ToCamelCase());
jsonWriter.WriteValue(value);
return this;
@ -110,7 +110,7 @@ namespace Squidex.Infrastructure.Log
IObjectWriter IObjectWriter.WriteProperty(string property, DateTime value)
{
jsonWriter.WritePropertyName(property);
jsonWriter.WritePropertyName(property.ToCamelCase());
jsonWriter.WriteValue(value.ToString("o", CultureInfo.InvariantCulture));
return this;
@ -118,7 +118,7 @@ namespace Squidex.Infrastructure.Log
IObjectWriter IObjectWriter.WriteProperty(string property, DateTimeOffset value)
{
jsonWriter.WritePropertyName(property);
jsonWriter.WritePropertyName(property.ToCamelCase());
jsonWriter.WriteValue(value.ToString("o", CultureInfo.InvariantCulture));
return this;
@ -126,7 +126,7 @@ namespace Squidex.Infrastructure.Log
IObjectWriter IObjectWriter.WriteProperty(string property, TimeSpan value)
{
jsonWriter.WritePropertyName(property);
jsonWriter.WritePropertyName(property.ToCamelCase());
jsonWriter.WriteValue(value);
return this;
@ -146,7 +146,7 @@ namespace Squidex.Infrastructure.Log
IObjectWriter IObjectWriter.WriteArray(string property, Action<IArrayWriter> arrayWriter)
{
jsonWriter.WritePropertyName(property);
jsonWriter.WritePropertyName(property.ToCamelCase());
jsonWriter.WriteStartArray();
arrayWriter?.Invoke(this);

5
src/Squidex.Infrastructure/StringExtensions.cs

@ -27,6 +27,11 @@ namespace Squidex.Infrastructure
return value != null && PropertyNameRegex.IsMatch(value);
}
public static string ToCamelCase(this string value)
{
return char.ToLower(value[0]) + value.Substring(1);
}
public static string ToPascalCase(this string value)
{
return string.Concat(value.Split(new[] { '-', '_', ' ' }, StringSplitOptions.RemoveEmptyEntries).Select(c => char.ToUpper(c[0]) + c.Substring(1)));

99
src/Squidex.Read/Schemas/WebhookInvoker.cs

@ -11,6 +11,7 @@ using System.Diagnostics;
using System.Net.Http;
using System.Text;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using NodaTime;
@ -24,13 +25,33 @@ using Squidex.Read.Schemas.Repositories;
namespace Squidex.Read.Schemas
{
public sealed class WebhookInvoker : IEventConsumer
public sealed class WebhookInvoker : DisposableObjectBase, IEventConsumer
{
private static readonly TimeSpan Timeout = TimeSpan.FromSeconds(2);
private readonly ISchemaWebhookRepository webhookRepository;
private readonly ISemanticLog log;
private readonly TypeNameRegistry typeNameRegistry;
private readonly JsonSerializer webhookSerializer;
private readonly TransformBlock<InvocationRequest, InvocationResponse> invokeBlock;
private readonly ActionBlock<InvocationResponse> dumpBlock;
private class WebhookData
{
public ISchemaWebhookUrlEntity Webhook;
}
private sealed class InvocationRequest : WebhookData
{
public JObject Payload;
}
private sealed class InvocationResponse : WebhookData
{
public string Dump;
public TimeSpan Elapsed;
public WebhookResult Result;
}
public string Name
{
@ -42,16 +63,37 @@ namespace Squidex.Read.Schemas
get { return "^content-"; }
}
public WebhookInvoker(ISchemaWebhookRepository webhookRepository, JsonSerializer webhookSerializer, ISemanticLog log)
public WebhookInvoker(ISchemaWebhookRepository webhookRepository, JsonSerializer webhookSerializer, ISemanticLog log, TypeNameRegistry typeNameRegistry)
{
Guard.NotNull(webhookRepository, nameof(webhookRepository));
Guard.NotNull(webhookSerializer, nameof(webhookSerializer));
Guard.NotNull(typeNameRegistry, nameof(typeNameRegistry));
Guard.NotNull(log, nameof(log));
this.webhookRepository = webhookRepository;
this.webhookSerializer = webhookSerializer;
this.log = log;
this.typeNameRegistry = typeNameRegistry;
invokeBlock =
new TransformBlock<InvocationRequest, InvocationResponse>(x => DispatchEventAsync(x),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, BoundedCapacity = 8 });
dumpBlock =
new ActionBlock<InvocationResponse>(DumpAsync,
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 32, BoundedCapacity = 64 });
invokeBlock.LinkTo(dumpBlock,
new DataflowLinkOptions { PropagateCompletion = true }, x => x != null);
}
protected override void DisposeObject(bool disposing)
{
invokeBlock.Complete();
dumpBlock.Completion.Wait();
}
public Task ClearAsync()
@ -63,15 +105,15 @@ namespace Squidex.Read.Schemas
{
if (@event.Payload is ContentEvent contentEvent)
{
var hooks = await webhookRepository.QueryUrlsBySchemaAsync(contentEvent.AppId.Id, contentEvent.SchemaId.Id);
var webhooks = await webhookRepository.QueryUrlsBySchemaAsync(contentEvent.AppId.Id, contentEvent.SchemaId.Id);
if (hooks.Count > 0)
if (webhooks.Count > 0)
{
var payload = CreatePayload(@event);
foreach (var hook in hooks)
foreach (var webhook in webhooks)
{
DispatchEventAsync(payload, hook, @event.Headers.Timestamp()).Forget();
await invokeBlock.SendAsync(new InvocationRequest { Webhook = webhook, Payload = payload });
}
}
}
@ -80,21 +122,35 @@ namespace Squidex.Read.Schemas
private JObject CreatePayload(Envelope<IEvent> @event)
{
return new JObject(
new JProperty("type", @event.Payload.GetType().Name),
new JProperty("type", typeNameRegistry.GetName(@event.Payload.GetType())),
new JProperty("payload", JObject.FromObject(@event.Payload, webhookSerializer)),
new JProperty("timestamp", @event.Headers.Timestamp().ToString()));
}
private async Task DispatchEventAsync(JObject payload, ISchemaWebhookUrlEntity webhook, Instant instant)
private async Task DumpAsync(InvocationResponse input)
{
try
{
payload = SignPayload(payload, webhook, instant);
await webhookRepository.AddInvokationAsync(input.Webhook.Id, input.Dump, input.Result, input.Elapsed);
}
catch (Exception ex)
{
log.LogError(ex, w => w
.WriteProperty("action", "DumpHook")
.WriteProperty("status", "Failed"));
}
}
private async Task<InvocationResponse> DispatchEventAsync(InvocationRequest input)
{
try
{
var payload = SignPayload(input.Payload, input.Webhook);
var requestString = payload.ToString(Formatting.Indented);
var responseString = string.Empty;
var request = BuildRequest(requestString, webhook);
var request = BuildRequest(requestString, input.Webhook);
var response = (HttpResponseMessage)null;
var isTimeout = false;
@ -103,9 +159,9 @@ namespace Squidex.Read.Schemas
try
{
using (log.MeasureInformation(w => w
.WriteProperty("Action", "SendToHook")
.WriteProperty("Status", "Invoked")
.WriteProperty("RequestUrl", request.RequestUri.ToString())))
.WriteProperty("action", "SendToHook")
.WriteProperty("status", "Invoked")
.WriteProperty("requestUrl", request.RequestUri.ToString())))
{
using (var client = new HttpClient { Timeout = Timeout })
{
@ -144,19 +200,26 @@ namespace Squidex.Read.Schemas
result = WebhookResult.Success;
}
await webhookRepository.AddInvokationAsync(webhook.Id, dump, result, watch.Elapsed);
return new InvocationResponse { Dump = dump, Result = result, Elapsed = watch.Elapsed, Webhook = input.Webhook };
}
catch (Exception ex)
{
log.LogError(ex, w => w
.WriteProperty("Action", "SendToHook")
.WriteProperty("Status", "Failed"));
.WriteProperty("action", "SendToHook")
.WriteProperty("status", "Failed"));
return null;
}
}
private static JObject SignPayload(JObject payload, ISchemaWebhookUrlEntity webhook, Instant instant)
private static JObject SignPayload(JObject payload, ISchemaWebhookUrlEntity webhook)
{
payload["signature"] = $"{instant.ToUnixTimeSeconds()}{webhook.SharedSecret}".Sha256Base64();
payload = new JObject(payload);
var eventTimestamp = SystemClock.Instance.GetCurrentInstant().ToUnixTimeSeconds();
var eventSignature = $"{eventTimestamp}{webhook.SharedSecret}".Sha256Base64();
payload["signature"] = eventSignature;
return payload;
}

1
src/Squidex.Read/Squidex.Read.csproj

@ -17,5 +17,6 @@
<PackageReference Include="Microsoft.Extensions.Caching.Memory" Version="1.1.2" />
<PackageReference Include="NodaTime" Version="2.0.3" />
<PackageReference Include="System.Linq.Queryable" Version="4.3.0" />
<PackageReference Include="System.Threading.Tasks.Dataflow" Version="4.7.0" />
</ItemGroup>
</Project>

Loading…
Cancel
Save