mirror of https://github.com/Squidex/squidex.git
44 changed files with 2926 additions and 1384 deletions
@ -0,0 +1,79 @@ |
|||
// ==========================================================================
|
|||
// MongoWebhookEventEntity.cs
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex Group
|
|||
// All rights reserved.
|
|||
// ==========================================================================
|
|||
|
|||
using System; |
|||
using MongoDB.Bson.Serialization.Attributes; |
|||
using NodaTime; |
|||
using Squidex.Domain.Apps.Read.Schemas; |
|||
using Squidex.Infrastructure.MongoDb; |
|||
using Squidex.Infrastructure.Reflection; |
|||
|
|||
namespace Squidex.Domain.Apps.Read.MongoDb.Schemas |
|||
{ |
|||
public sealed class MongoWebhookEventEntity : MongoEntity, IWebhookEventEntity |
|||
{ |
|||
private WebhookJob job; |
|||
|
|||
[BsonRequired] |
|||
[BsonElement] |
|||
public Guid AppId { get; set; } |
|||
|
|||
[BsonRequired] |
|||
[BsonElement] |
|||
public Guid WebhookId { get; set; } |
|||
|
|||
[BsonRequired] |
|||
[BsonElement] |
|||
public Uri RequestUrl { get; set; } |
|||
|
|||
[BsonRequired] |
|||
[BsonElement] |
|||
public string RequestBody { get; set; } |
|||
|
|||
[BsonRequired] |
|||
[BsonElement] |
|||
public string RequestSignature { get; set; } |
|||
|
|||
[BsonRequired] |
|||
[BsonElement] |
|||
public string EventName { get; set; } |
|||
|
|||
[BsonRequired] |
|||
[BsonElement] |
|||
public string LastDump { get; set; } |
|||
|
|||
[BsonRequired] |
|||
[BsonElement] |
|||
public Instant Expires { get; set; } |
|||
|
|||
[BsonRequired] |
|||
[BsonElement] |
|||
public Instant? NextAttempt { get; set; } |
|||
|
|||
[BsonRequired] |
|||
[BsonElement] |
|||
public int NumCalls { get; set; } |
|||
|
|||
[BsonRequired] |
|||
[BsonElement] |
|||
public bool IsSending { get; set; } |
|||
|
|||
[BsonRequired] |
|||
[BsonElement] |
|||
public WebhookResult Result { get; set; } |
|||
|
|||
[BsonRequired] |
|||
[BsonElement] |
|||
public WebhookJobResult JobResult { get; set; } |
|||
|
|||
public WebhookJob Job |
|||
{ |
|||
get { return job ?? (job = SimpleMapper.Map(this, new WebhookJob())); } |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,109 @@ |
|||
// ==========================================================================
|
|||
// MongoWebhookEventRepository.cs
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex Group
|
|||
// All rights reserved.
|
|||
// ==========================================================================
|
|||
|
|||
using System; |
|||
using System.Collections.Generic; |
|||
using System.Threading; |
|||
using System.Threading.Tasks; |
|||
using MongoDB.Driver; |
|||
using NodaTime; |
|||
using Squidex.Domain.Apps.Read.Schemas; |
|||
using Squidex.Domain.Apps.Read.Schemas.Repositories; |
|||
using Squidex.Infrastructure; |
|||
using Squidex.Infrastructure.MongoDb; |
|||
using Squidex.Infrastructure.Reflection; |
|||
|
|||
namespace Squidex.Domain.Apps.Read.MongoDb.Schemas |
|||
{ |
|||
public sealed class MongoWebhookEventRepository : MongoRepositoryBase<MongoWebhookEventEntity>, IWebhookEventRepository |
|||
{ |
|||
private readonly IClock clock; |
|||
|
|||
public MongoWebhookEventRepository(IMongoDatabase database, IClock clock) |
|||
: base(database) |
|||
{ |
|||
Guard.NotNull(clock, nameof(clock)); |
|||
|
|||
this.clock = clock; |
|||
} |
|||
|
|||
protected override string CollectionName() |
|||
{ |
|||
return "WebhookEvents"; |
|||
} |
|||
|
|||
protected override async Task SetupCollectionAsync(IMongoCollection<MongoWebhookEventEntity> collection) |
|||
{ |
|||
await collection.Indexes.CreateOneAsync(Index.Ascending(x => x.NextAttempt).Descending(x => x.IsSending)); |
|||
await collection.Indexes.CreateOneAsync(Index.Ascending(x => x.AppId).Descending(x => x.Created)); |
|||
await collection.Indexes.CreateOneAsync(Index.Ascending(x => x.Expires), new CreateIndexOptions { ExpireAfter = TimeSpan.Zero }); |
|||
} |
|||
|
|||
public Task QueryPendingAsync(Func<IWebhookEventEntity, Task> callback, CancellationToken cancellationToken = new CancellationToken()) |
|||
{ |
|||
var now = clock.GetCurrentInstant(); |
|||
|
|||
return Collection.Find(x => x.NextAttempt < now && !x.IsSending).ForEachAsync(callback, cancellationToken); |
|||
} |
|||
|
|||
public async Task<IReadOnlyList<IWebhookEventEntity>> QueryByAppAsync(Guid appId, int skip = 0, int take = 20) |
|||
{ |
|||
var entities = await Collection.Find(x => x.AppId == appId).Skip(skip).Limit(take).SortByDescending(x => x.Created).ToListAsync(); |
|||
|
|||
return entities; |
|||
} |
|||
|
|||
public async Task<IWebhookEventEntity> FindAsync(Guid id) |
|||
{ |
|||
var entity = await Collection.Find(x => x.Id == id).FirstOrDefaultAsync(); |
|||
|
|||
return entity; |
|||
} |
|||
|
|||
public async Task<int> CountByAppAsync(Guid appId) |
|||
{ |
|||
return (int)await Collection.CountAsync(x => x.AppId == appId); |
|||
} |
|||
|
|||
public Task TraceSendingAsync(Guid jobId) |
|||
{ |
|||
return Collection.UpdateOneAsync(x => x.Id == jobId, Update.Set(x => x.IsSending, true)); |
|||
} |
|||
|
|||
public Task EnqueueAsync(WebhookJob job, Instant nextAttempt) |
|||
{ |
|||
var entity = SimpleMapper.Map(job, new MongoWebhookEventEntity { NextAttempt = nextAttempt }); |
|||
|
|||
return Collection.InsertOneIfNotExistsAsync(entity); |
|||
} |
|||
|
|||
public Task TraceSentAsync(Guid jobId, string dump, WebhookResult result, TimeSpan elapsed, Instant? nextAttempt) |
|||
{ |
|||
WebhookJobResult jobResult; |
|||
|
|||
if (result != WebhookResult.Success && nextAttempt == null) |
|||
{ |
|||
jobResult = WebhookJobResult.Failed; |
|||
} |
|||
else if (result != WebhookResult.Success && nextAttempt.HasValue) |
|||
{ |
|||
jobResult = WebhookJobResult.Retry; |
|||
} |
|||
else |
|||
{ |
|||
jobResult = WebhookJobResult.Success; |
|||
} |
|||
|
|||
return Collection.UpdateOneAsync(x => x.Id == jobId, |
|||
Update.Set(x => x.Result, result) |
|||
.Set(x => x.LastDump, dump) |
|||
.Set(x => x.JobResult, jobResult) |
|||
.Inc(x => x.NumCalls, 1)); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,27 @@ |
|||
// ==========================================================================
|
|||
// IWebhookEventEntity.cs
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex Group
|
|||
// All rights reserved.
|
|||
// ==========================================================================
|
|||
|
|||
using NodaTime; |
|||
|
|||
namespace Squidex.Domain.Apps.Read.Schemas |
|||
{ |
|||
public interface IWebhookEventEntity |
|||
{ |
|||
WebhookJob Job { get; } |
|||
|
|||
Instant? NextAttempt { get; } |
|||
|
|||
WebhookResult Result { get; } |
|||
|
|||
WebhookJobResult JobResult { get; } |
|||
|
|||
int NumCalls { get; } |
|||
|
|||
string LastDump { get; } |
|||
} |
|||
} |
|||
@ -0,0 +1,33 @@ |
|||
// ==========================================================================
|
|||
// IWebhookEventRepository.cs
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex Group
|
|||
// All rights reserved.
|
|||
// ==========================================================================
|
|||
|
|||
using System; |
|||
using System.Collections.Generic; |
|||
using System.Threading; |
|||
using System.Threading.Tasks; |
|||
using NodaTime; |
|||
|
|||
namespace Squidex.Domain.Apps.Read.Schemas.Repositories |
|||
{ |
|||
public interface IWebhookEventRepository |
|||
{ |
|||
Task EnqueueAsync(WebhookJob job, Instant nextAttempt); |
|||
|
|||
Task TraceSendingAsync(Guid jobId); |
|||
|
|||
Task TraceSentAsync(Guid jobId, string dump, WebhookResult result, TimeSpan elapsed, Instant? nextCall); |
|||
|
|||
Task QueryPendingAsync(Func<IWebhookEventEntity, Task> callback, CancellationToken cancellationToken = default(CancellationToken)); |
|||
|
|||
Task<int> CountByAppAsync(Guid appId); |
|||
|
|||
Task<IReadOnlyList<IWebhookEventEntity>> QueryByAppAsync(Guid appId, int skip = 0, int take = 20); |
|||
|
|||
Task<IWebhookEventEntity> FindAsync(Guid id); |
|||
} |
|||
} |
|||
@ -0,0 +1,159 @@ |
|||
// ==========================================================================
|
|||
// WebhookDequeuer.cs
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex Group
|
|||
// All rights reserved.
|
|||
// ==========================================================================
|
|||
|
|||
using System; |
|||
using System.Threading; |
|||
using System.Threading.Tasks; |
|||
using System.Threading.Tasks.Dataflow; |
|||
using NodaTime; |
|||
using Squidex.Domain.Apps.Read.Schemas.Repositories; |
|||
using Squidex.Infrastructure; |
|||
using Squidex.Infrastructure.Log; |
|||
using Squidex.Infrastructure.Timers; |
|||
|
|||
// ReSharper disable SwitchStatementMissingSomeCases
|
|||
// ReSharper disable MethodSupportsCancellation
|
|||
// ReSharper disable InvertIf
|
|||
|
|||
namespace Squidex.Domain.Apps.Read.Schemas |
|||
{ |
|||
public sealed class WebhookDequeuer : DisposableObjectBase, IExternalSystem |
|||
{ |
|||
private readonly ActionBlock<IWebhookEventEntity> requestBlock; |
|||
private readonly TransformBlock<IWebhookEventEntity, IWebhookEventEntity> blockBlock; |
|||
private readonly IWebhookEventRepository webhookEventRepository; |
|||
private readonly ISchemaWebhookRepository webhookRepository; |
|||
private readonly WebhookSender webhookSender; |
|||
private readonly CompletionTimer timer; |
|||
private readonly ISemanticLog log; |
|||
private readonly IClock clock; |
|||
|
|||
public WebhookDequeuer(WebhookSender webhookSender, |
|||
IWebhookEventRepository webhookEventRepository, |
|||
ISchemaWebhookRepository webhookRepository, |
|||
IClock clock, |
|||
ISemanticLog log) |
|||
{ |
|||
Guard.NotNull(webhookEventRepository, nameof(webhookEventRepository)); |
|||
Guard.NotNull(webhookRepository, nameof(webhookRepository)); |
|||
Guard.NotNull(webhookSender, nameof(webhookSender)); |
|||
Guard.NotNull(clock, nameof(clock)); |
|||
Guard.NotNull(log, nameof(log)); |
|||
|
|||
this.webhookEventRepository = webhookEventRepository; |
|||
this.webhookRepository = webhookRepository; |
|||
this.webhookSender = webhookSender; |
|||
|
|||
this.clock = clock; |
|||
|
|||
this.log = log; |
|||
|
|||
requestBlock = |
|||
new ActionBlock<IWebhookEventEntity>(MakeRequestAsync, |
|||
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 32, BoundedCapacity = 32 }); |
|||
|
|||
blockBlock = |
|||
new TransformBlock<IWebhookEventEntity, IWebhookEventEntity>(x => BlockAsync(x), |
|||
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, BoundedCapacity = 1 }); |
|||
|
|||
blockBlock.LinkTo(requestBlock, new DataflowLinkOptions { PropagateCompletion = true }); |
|||
|
|||
timer = new CompletionTimer(5000, QueryAsync); |
|||
} |
|||
|
|||
protected override void DisposeObject(bool disposing) |
|||
{ |
|||
if (disposing) |
|||
{ |
|||
timer.StopAsync().Wait(); |
|||
|
|||
blockBlock.Complete(); |
|||
requestBlock.Completion.Wait(); |
|||
} |
|||
} |
|||
|
|||
public void Connect() |
|||
{ |
|||
} |
|||
|
|||
private async Task QueryAsync(CancellationToken cancellationToken) |
|||
{ |
|||
try |
|||
{ |
|||
await webhookEventRepository.QueryPendingAsync(blockBlock.SendAsync, cancellationToken); |
|||
} |
|||
catch (Exception ex) |
|||
{ |
|||
log.LogError(ex, w => w |
|||
.WriteProperty("action", "QueueWebhookEvents") |
|||
.WriteProperty("status", "Failed")); |
|||
} |
|||
} |
|||
|
|||
private async Task<IWebhookEventEntity> BlockAsync(IWebhookEventEntity @event) |
|||
{ |
|||
try |
|||
{ |
|||
await webhookEventRepository.TraceSendingAsync(@event.Job.Id); |
|||
|
|||
return @event; |
|||
} |
|||
catch (Exception ex) |
|||
{ |
|||
log.LogError(ex, w => w |
|||
.WriteProperty("action", "BlockWebhookEvent") |
|||
.WriteProperty("status", "Failed")); |
|||
|
|||
throw; |
|||
} |
|||
} |
|||
|
|||
private async Task MakeRequestAsync(IWebhookEventEntity @event) |
|||
{ |
|||
try |
|||
{ |
|||
var now = clock.GetCurrentInstant(); |
|||
|
|||
var response = await webhookSender.SendAsync(@event.Job); |
|||
|
|||
Instant? nextCall = null; |
|||
|
|||
if (response.Result != WebhookResult.Success) |
|||
{ |
|||
switch (@event.NumCalls) |
|||
{ |
|||
case 0: |
|||
nextCall = now.Plus(Duration.FromMinutes(5)); |
|||
break; |
|||
case 1: |
|||
nextCall = now.Plus(Duration.FromHours(1)); |
|||
break; |
|||
case 2: |
|||
nextCall = now.Plus(Duration.FromHours(5)); |
|||
break; |
|||
case 3: |
|||
nextCall = now.Plus(Duration.FromHours(6)); |
|||
break; |
|||
} |
|||
} |
|||
|
|||
await Task.WhenAll( |
|||
webhookRepository.TraceSentAsync(@event.Job.WebhookId, response.Result, response.Elapsed), |
|||
webhookEventRepository.TraceSentAsync(@event.Job.Id, response.Dump, response.Result, response.Elapsed, nextCall)); |
|||
} |
|||
catch (Exception ex) |
|||
{ |
|||
log.LogError(ex, w => w |
|||
.WriteProperty("action", "SendWebhookEvent") |
|||
.WriteProperty("status", "Failed")); |
|||
|
|||
throw; |
|||
} |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,126 @@ |
|||
// ==========================================================================
|
|||
// WebhookEnqueuer.cs
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex Group
|
|||
// All rights reserved.
|
|||
// ==========================================================================
|
|||
|
|||
using System; |
|||
using System.Threading.Tasks; |
|||
using Newtonsoft.Json; |
|||
using Newtonsoft.Json.Linq; |
|||
using NodaTime; |
|||
using Squidex.Domain.Apps.Events; |
|||
using Squidex.Domain.Apps.Events.Contents; |
|||
using Squidex.Domain.Apps.Read.Schemas.Repositories; |
|||
using Squidex.Infrastructure; |
|||
using Squidex.Infrastructure.CQRS.Events; |
|||
using Squidex.Infrastructure.Tasks; |
|||
|
|||
namespace Squidex.Domain.Apps.Read.Schemas |
|||
{ |
|||
public sealed class WebhookEnqueuer : IEventConsumer |
|||
{ |
|||
private const string ContentPrefix = "Content"; |
|||
private static readonly Duration ExpirationTime = Duration.FromDays(2); |
|||
private readonly IWebhookEventRepository webhookEventRepository; |
|||
private readonly ISchemaWebhookRepository webhookRepository; |
|||
private readonly IClock clock; |
|||
private readonly TypeNameRegistry typeNameRegistry; |
|||
private readonly JsonSerializer webhookSerializer; |
|||
|
|||
public string Name |
|||
{ |
|||
get { return GetType().Name; } |
|||
} |
|||
|
|||
public string EventsFilter |
|||
{ |
|||
get { return "^content-"; } |
|||
} |
|||
|
|||
public WebhookEnqueuer(TypeNameRegistry typeNameRegistry, |
|||
IWebhookEventRepository webhookEventRepository, |
|||
ISchemaWebhookRepository webhookRepository, |
|||
IClock clock, |
|||
JsonSerializer webhookSerializer) |
|||
{ |
|||
Guard.NotNull(webhookEventRepository, nameof(webhookEventRepository)); |
|||
Guard.NotNull(webhookSerializer, nameof(webhookSerializer)); |
|||
Guard.NotNull(webhookRepository, nameof(webhookRepository)); |
|||
Guard.NotNull(typeNameRegistry, nameof(typeNameRegistry)); |
|||
Guard.NotNull(clock, nameof(clock)); |
|||
|
|||
this.webhookEventRepository = webhookEventRepository; |
|||
this.webhookSerializer = webhookSerializer; |
|||
this.webhookRepository = webhookRepository; |
|||
|
|||
this.clock = clock; |
|||
|
|||
this.typeNameRegistry = typeNameRegistry; |
|||
} |
|||
|
|||
public Task ClearAsync() |
|||
{ |
|||
return TaskHelper.Done; |
|||
} |
|||
|
|||
public async Task On(Envelope<IEvent> @event) |
|||
{ |
|||
if (@event.Payload is ContentEvent contentEvent) |
|||
{ |
|||
var eventType = typeNameRegistry.GetName(@event.Payload.GetType()); |
|||
|
|||
var webhooks = await webhookRepository.QueryUrlsBySchemaAsync(contentEvent.AppId.Id, contentEvent.SchemaId.Id); |
|||
|
|||
if (webhooks.Count > 0) |
|||
{ |
|||
var now = clock.GetCurrentInstant(); |
|||
|
|||
var payload = CreatePayload(@event, eventType); |
|||
|
|||
var eventName = $"{contentEvent.SchemaId.Name.ToPascalCase()}{CreateContentEventName(eventType)}"; |
|||
|
|||
foreach (var webhook in webhooks) |
|||
{ |
|||
await EnqueueJobAsync(payload, webhook, contentEvent, eventName, now); |
|||
} |
|||
} |
|||
} |
|||
} |
|||
|
|||
private async Task EnqueueJobAsync(string payload, ISchemaWebhookUrlEntity webhook, AppEvent contentEvent, string eventName, Instant now) |
|||
{ |
|||
var signature = $"{payload}{webhook.SharedSecret}".Sha256Base64(); |
|||
|
|||
var job = new WebhookJob |
|||
{ |
|||
Id = Guid.NewGuid(), |
|||
AppId = contentEvent.AppId.Id, |
|||
RequestUrl = webhook.Url, |
|||
RequestBody = payload, |
|||
RequestSignature = signature, |
|||
EventName = eventName, |
|||
Expires = now.Plus(ExpirationTime), |
|||
WebhookId = webhook.Id |
|||
}; |
|||
|
|||
await webhookEventRepository.EnqueueAsync(job, now); |
|||
} |
|||
|
|||
private string CreatePayload(Envelope<IEvent> @event, string eventType) |
|||
{ |
|||
return new JObject( |
|||
new JProperty("type", eventType), |
|||
new JProperty("payload", JObject.FromObject(@event.Payload, webhookSerializer)), |
|||
new JProperty("timestamp", @event.Headers.Timestamp().ToString())) |
|||
.ToString(Formatting.Indented); |
|||
} |
|||
|
|||
private static string CreateContentEventName(string eventType) |
|||
{ |
|||
return eventType.StartsWith(ContentPrefix) ? eventType.Substring(ContentPrefix.Length) : eventType; |
|||
} |
|||
} |
|||
} |
|||
@ -1,237 +0,0 @@ |
|||
// ==========================================================================
|
|||
// WebhookInvoker.cs
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex Group
|
|||
// All rights reserved.
|
|||
// ==========================================================================
|
|||
|
|||
using System; |
|||
using System.Diagnostics; |
|||
using System.Net.Http; |
|||
using System.Text; |
|||
using System.Threading.Tasks; |
|||
using System.Threading.Tasks.Dataflow; |
|||
using Newtonsoft.Json; |
|||
using Newtonsoft.Json.Linq; |
|||
using NodaTime; |
|||
using Squidex.Domain.Apps.Events.Contents; |
|||
using Squidex.Domain.Apps.Read.Schemas.Repositories; |
|||
using Squidex.Infrastructure; |
|||
using Squidex.Infrastructure.CQRS.Events; |
|||
using Squidex.Infrastructure.Http; |
|||
using Squidex.Infrastructure.Log; |
|||
using Squidex.Infrastructure.Tasks; |
|||
|
|||
namespace Squidex.Domain.Apps.Read.Schemas |
|||
{ |
|||
public sealed class WebhookInvoker : DisposableObjectBase, IEventConsumer |
|||
{ |
|||
private static readonly TimeSpan Timeout = TimeSpan.FromSeconds(2); |
|||
|
|||
private readonly ISchemaWebhookRepository webhookRepository; |
|||
private readonly ISemanticLog log; |
|||
private readonly TypeNameRegistry typeNameRegistry; |
|||
private readonly JsonSerializer webhookSerializer; |
|||
private readonly TransformBlock<InvocationRequest, InvocationResponse> invokeBlock; |
|||
private readonly ActionBlock<InvocationResponse> dumpBlock; |
|||
|
|||
private class WebhookData |
|||
{ |
|||
public ISchemaWebhookUrlEntity Webhook; |
|||
} |
|||
|
|||
private sealed class InvocationRequest : WebhookData |
|||
{ |
|||
public JObject Payload; |
|||
} |
|||
|
|||
private sealed class InvocationResponse : WebhookData |
|||
{ |
|||
public string Dump; |
|||
public TimeSpan Elapsed; |
|||
public WebhookResult Result; |
|||
} |
|||
|
|||
public string Name |
|||
{ |
|||
get { return GetType().Name; } |
|||
} |
|||
|
|||
public string EventsFilter |
|||
{ |
|||
get { return "^content-"; } |
|||
} |
|||
|
|||
public WebhookInvoker(ISchemaWebhookRepository webhookRepository, JsonSerializer webhookSerializer, ISemanticLog log, TypeNameRegistry typeNameRegistry) |
|||
{ |
|||
Guard.NotNull(webhookRepository, nameof(webhookRepository)); |
|||
Guard.NotNull(webhookSerializer, nameof(webhookSerializer)); |
|||
Guard.NotNull(typeNameRegistry, nameof(typeNameRegistry)); |
|||
Guard.NotNull(log, nameof(log)); |
|||
|
|||
this.webhookRepository = webhookRepository; |
|||
this.webhookSerializer = webhookSerializer; |
|||
|
|||
this.log = log; |
|||
|
|||
this.typeNameRegistry = typeNameRegistry; |
|||
|
|||
invokeBlock = |
|||
new TransformBlock<InvocationRequest, InvocationResponse>(x => DispatchEventAsync(x), |
|||
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, BoundedCapacity = 8 }); |
|||
|
|||
dumpBlock = |
|||
new ActionBlock<InvocationResponse>(DumpAsync, |
|||
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 32, BoundedCapacity = 64 }); |
|||
|
|||
invokeBlock.LinkTo(dumpBlock, |
|||
new DataflowLinkOptions { PropagateCompletion = true }, x => x != null); |
|||
} |
|||
|
|||
protected override void DisposeObject(bool disposing) |
|||
{ |
|||
invokeBlock.Complete(); |
|||
|
|||
dumpBlock.Completion.Wait(); |
|||
} |
|||
|
|||
public Task ClearAsync() |
|||
{ |
|||
return TaskHelper.Done; |
|||
} |
|||
|
|||
public async Task On(Envelope<IEvent> @event) |
|||
{ |
|||
if (@event.Payload is ContentEvent contentEvent) |
|||
{ |
|||
var webhooks = await webhookRepository.QueryUrlsBySchemaAsync(contentEvent.AppId.Id, contentEvent.SchemaId.Id); |
|||
|
|||
if (webhooks.Count > 0) |
|||
{ |
|||
var payload = CreatePayload(@event); |
|||
|
|||
foreach (var webhook in webhooks) |
|||
{ |
|||
await invokeBlock.SendAsync(new InvocationRequest { Webhook = webhook, Payload = payload }); |
|||
} |
|||
} |
|||
} |
|||
} |
|||
|
|||
private JObject CreatePayload(Envelope<IEvent> @event) |
|||
{ |
|||
return new JObject( |
|||
new JProperty("type", typeNameRegistry.GetName(@event.Payload.GetType())), |
|||
new JProperty("payload", JObject.FromObject(@event.Payload, webhookSerializer)), |
|||
new JProperty("timestamp", @event.Headers.Timestamp().ToString())); |
|||
} |
|||
|
|||
private async Task DumpAsync(InvocationResponse input) |
|||
{ |
|||
try |
|||
{ |
|||
await webhookRepository.AddInvokationAsync(input.Webhook.Id, input.Dump, input.Result, input.Elapsed); |
|||
} |
|||
catch (Exception ex) |
|||
{ |
|||
log.LogError(ex, w => w |
|||
.WriteProperty("action", "DumpHook") |
|||
.WriteProperty("status", "Failed")); |
|||
} |
|||
} |
|||
|
|||
private async Task<InvocationResponse> DispatchEventAsync(InvocationRequest input) |
|||
{ |
|||
try |
|||
{ |
|||
var payload = SignPayload(input.Payload, input.Webhook); |
|||
|
|||
var requestString = payload.ToString(Formatting.Indented); |
|||
var responseString = string.Empty; |
|||
|
|||
var request = BuildRequest(requestString, input.Webhook); |
|||
var response = (HttpResponseMessage)null; |
|||
|
|||
var isTimeout = false; |
|||
|
|||
var watch = Stopwatch.StartNew(); |
|||
try |
|||
{ |
|||
using (log.MeasureInformation(w => w |
|||
.WriteProperty("action", "SendToHook") |
|||
.WriteProperty("status", "Invoked") |
|||
.WriteProperty("requestUrl", request.RequestUri.ToString()))) |
|||
{ |
|||
using (var client = new HttpClient { Timeout = Timeout }) |
|||
{ |
|||
response = await client.SendAsync(request); |
|||
} |
|||
} |
|||
} |
|||
catch (TimeoutException) |
|||
{ |
|||
isTimeout = true; |
|||
} |
|||
catch (OperationCanceledException) |
|||
{ |
|||
isTimeout = true; |
|||
} |
|||
finally |
|||
{ |
|||
watch.Stop(); |
|||
} |
|||
|
|||
if (response != null) |
|||
{ |
|||
responseString = await response.Content.ReadAsStringAsync(); |
|||
} |
|||
|
|||
var dump = DumpFormatter.BuildDump(request, response, requestString, responseString, watch.Elapsed); |
|||
|
|||
var result = WebhookResult.Fail; |
|||
|
|||
if (isTimeout) |
|||
{ |
|||
result = WebhookResult.Timeout; |
|||
} |
|||
else if (response?.IsSuccessStatusCode == true) |
|||
{ |
|||
result = WebhookResult.Success; |
|||
} |
|||
|
|||
return new InvocationResponse { Dump = dump, Result = result, Elapsed = watch.Elapsed, Webhook = input.Webhook }; |
|||
} |
|||
catch (Exception ex) |
|||
{ |
|||
log.LogError(ex, w => w |
|||
.WriteProperty("action", "SendToHook") |
|||
.WriteProperty("status", "Failed")); |
|||
|
|||
return null; |
|||
} |
|||
} |
|||
|
|||
private static JObject SignPayload(JObject payload, ISchemaWebhookUrlEntity webhook) |
|||
{ |
|||
payload = new JObject(payload); |
|||
|
|||
var eventTimestamp = SystemClock.Instance.GetCurrentInstant().ToUnixTimeSeconds(); |
|||
var eventSignature = $"{eventTimestamp}{webhook.SharedSecret}".Sha256Base64(); |
|||
|
|||
payload["signature"] = eventSignature; |
|||
|
|||
return payload; |
|||
} |
|||
|
|||
private static HttpRequestMessage BuildRequest(string requestBody, ISchemaWebhookUrlEntity webhook) |
|||
{ |
|||
var request = new HttpRequestMessage(HttpMethod.Post, webhook.Url) |
|||
{ |
|||
Content = new StringContent(requestBody, Encoding.UTF8, "application/json") |
|||
}; |
|||
|
|||
return request; |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,32 @@ |
|||
// ==========================================================================
|
|||
// WebhookJob.cs
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex Group
|
|||
// All rights reserved.
|
|||
// ==========================================================================
|
|||
|
|||
using System; |
|||
using NodaTime; |
|||
|
|||
namespace Squidex.Domain.Apps.Read.Schemas |
|||
{ |
|||
public sealed class WebhookJob |
|||
{ |
|||
public Guid Id { get; set; } |
|||
|
|||
public Guid AppId { get; set; } |
|||
|
|||
public Guid WebhookId { get; set; } |
|||
|
|||
public Uri RequestUrl { get; set; } |
|||
|
|||
public string RequestBody { get; set; } |
|||
|
|||
public string RequestSignature { get; set; } |
|||
|
|||
public string EventName { get; set; } |
|||
|
|||
public Instant Expires { get; set; } |
|||
} |
|||
} |
|||
@ -0,0 +1,18 @@ |
|||
// ==========================================================================
|
|||
// WebhookJobResult.cs
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex Group
|
|||
// All rights reserved.
|
|||
// ==========================================================================
|
|||
|
|||
namespace Squidex.Domain.Apps.Read.Schemas |
|||
{ |
|||
public enum WebhookJobResult |
|||
{ |
|||
Pending, |
|||
Success, |
|||
Retry, |
|||
Failed |
|||
} |
|||
} |
|||
@ -0,0 +1,88 @@ |
|||
// ==========================================================================
|
|||
// WebhookSender.cs
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex Group
|
|||
// All rights reserved.
|
|||
// ==========================================================================
|
|||
|
|||
using System; |
|||
using System.Diagnostics; |
|||
using System.Net.Http; |
|||
using System.Text; |
|||
using System.Threading.Tasks; |
|||
using Squidex.Infrastructure.Http; |
|||
|
|||
// ReSharper disable SuggestVarOrType_SimpleTypes
|
|||
|
|||
namespace Squidex.Domain.Apps.Read.Schemas |
|||
{ |
|||
public class WebhookSender |
|||
{ |
|||
private static readonly TimeSpan Timeout = TimeSpan.FromSeconds(2); |
|||
|
|||
public virtual async Task<(string Dump, WebhookResult Result, TimeSpan Elapsed)> SendAsync(WebhookJob job) |
|||
{ |
|||
HttpRequestMessage request = BuildRequest(job); |
|||
HttpResponseMessage response = null; |
|||
|
|||
var isTimeout = false; |
|||
|
|||
var watch = Stopwatch.StartNew(); |
|||
|
|||
try |
|||
{ |
|||
using (var client = new HttpClient { Timeout = Timeout }) |
|||
{ |
|||
response = await client.SendAsync(request); |
|||
} |
|||
} |
|||
catch (TimeoutException) |
|||
{ |
|||
isTimeout = true; |
|||
} |
|||
catch (OperationCanceledException) |
|||
{ |
|||
isTimeout = true; |
|||
} |
|||
finally |
|||
{ |
|||
watch.Stop(); |
|||
} |
|||
|
|||
var responseString = string.Empty; |
|||
|
|||
if (response != null) |
|||
{ |
|||
responseString = await response.Content.ReadAsStringAsync(); |
|||
} |
|||
|
|||
var dump = DumpFormatter.BuildDump(request, response, job.RequestBody, responseString, watch.Elapsed); |
|||
|
|||
var result = WebhookResult.Failed; |
|||
|
|||
if (isTimeout) |
|||
{ |
|||
result = WebhookResult.Timeout; |
|||
} |
|||
else if (response?.IsSuccessStatusCode == true) |
|||
{ |
|||
result = WebhookResult.Success; |
|||
} |
|||
|
|||
return (dump, result, watch.Elapsed); |
|||
} |
|||
|
|||
private static HttpRequestMessage BuildRequest(WebhookJob job) |
|||
{ |
|||
var request = new HttpRequestMessage(HttpMethod.Post, job.RequestUrl) |
|||
{ |
|||
Content = new StringContent(job.RequestBody, Encoding.UTF8, "application/json") |
|||
}; |
|||
|
|||
request.Headers.Add("X-Signature", job.RequestSignature); |
|||
|
|||
return request; |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,60 @@ |
|||
// ==========================================================================
|
|||
// WebhookEventDto.cs
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex Group
|
|||
// All rights reserved.
|
|||
// ==========================================================================
|
|||
|
|||
using System; |
|||
using System.ComponentModel.DataAnnotations; |
|||
using NodaTime; |
|||
using Squidex.Domain.Apps.Read.Schemas; |
|||
|
|||
namespace Squidex.Controllers.Api.Webhooks.Models |
|||
{ |
|||
public class WebhookEventDto |
|||
{ |
|||
/// <summary>
|
|||
/// The request url.
|
|||
/// </summary>
|
|||
[Required] |
|||
public Uri RequestUrl { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// The name of the event.
|
|||
/// </summary>
|
|||
[Required] |
|||
public string EventName { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// The webhook event id.
|
|||
/// </summary>
|
|||
public Guid Id { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// The last dump.
|
|||
/// </summary>
|
|||
public string LastDump { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// The number of calls.
|
|||
/// </summary>
|
|||
public int NumCalls { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// The next attempt.
|
|||
/// </summary>
|
|||
public Instant? NextAttempt { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// The result of the event.
|
|||
/// </summary>
|
|||
public WebhookResult Result { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// The result of the job.
|
|||
/// </summary>
|
|||
public WebhookJobResult JobResult { get; set; } |
|||
} |
|||
} |
|||
@ -0,0 +1,23 @@ |
|||
// ==========================================================================
|
|||
// WebhookEventsDto.cs
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex Group
|
|||
// All rights reserved.
|
|||
// ==========================================================================
|
|||
|
|||
namespace Squidex.Controllers.Api.Webhooks.Models |
|||
{ |
|||
public class WebhookEventsDto |
|||
{ |
|||
/// <summary>
|
|||
/// The total number of webhook events.
|
|||
/// </summary>
|
|||
public long Total { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// The webhook events.
|
|||
/// </summary>
|
|||
public WebhookEventDto[] Items { get; set; } |
|||
} |
|||
} |
|||
File diff suppressed because it is too large
Binary file not shown.
|
Before Width: | Height: | Size: 59 KiB After Width: | Height: | Size: 62 KiB |
Binary file not shown.
Binary file not shown.
File diff suppressed because it is too large
Loading…
Reference in new issue