You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
116 lines
4.2 KiB
116 lines
4.2 KiB
using LINGYUN.Abp.Webhooks;
|
|
using LINGYUN.Abp.Webhooks.EventBus;
|
|
using System;
|
|
using System.Collections.Generic;
|
|
using System.Linq;
|
|
using System.Threading.Tasks;
|
|
using Volo.Abp.BackgroundJobs;
|
|
using Volo.Abp.DependencyInjection;
|
|
using Volo.Abp.EventBus.Distributed;
|
|
using Volo.Abp.MultiTenancy;
|
|
|
|
namespace LY.MicroService.WebhooksManagement.EventBus.Handlers;
|
|
|
|
public class WebhooksEventHandler :
|
|
IDistributedEventHandler<WebhooksEventData>,
|
|
ITransientDependency
|
|
{
|
|
public IWebhookEventStore WebhookEventStore { get; set; }
|
|
|
|
private readonly ICurrentTenant _currentTenant;
|
|
private readonly IBackgroundJobManager _backgroundJobManager;
|
|
private readonly IWebhookSubscriptionManager _webhookSubscriptionManager;
|
|
|
|
public WebhooksEventHandler(
|
|
IWebhookSubscriptionManager webhookSubscriptionManager,
|
|
ICurrentTenant currentTenant,
|
|
IBackgroundJobManager backgroundJobManager)
|
|
{
|
|
_currentTenant = currentTenant;
|
|
_backgroundJobManager = backgroundJobManager;
|
|
_webhookSubscriptionManager = webhookSubscriptionManager;
|
|
|
|
WebhookEventStore = NullWebhookEventStore.Instance;
|
|
}
|
|
|
|
public async virtual Task HandleEventAsync(WebhooksEventData eventData)
|
|
{
|
|
var subscriptions = await _webhookSubscriptionManager
|
|
.GetAllSubscriptionsOfTenantsIfFeaturesGrantedAsync(
|
|
eventData.TenantIds,
|
|
eventData.WebhookName);
|
|
|
|
await PublishAsync(eventData.WebhookName, eventData.Data, subscriptions, eventData.SendExactSameData, eventData.Headers);
|
|
}
|
|
|
|
protected async virtual Task PublishAsync(
|
|
string webhookName,
|
|
string data,
|
|
List<WebhookSubscriptionInfo> webhookSubscriptions,
|
|
bool sendExactSameData = false,
|
|
WebhookHeader headers = null)
|
|
{
|
|
if (webhookSubscriptions.IsNullOrEmpty())
|
|
{
|
|
return;
|
|
}
|
|
|
|
var subscriptionsGroupedByTenant = webhookSubscriptions.GroupBy(x => x.TenantId);
|
|
|
|
foreach (var subscriptionGroupedByTenant in subscriptionsGroupedByTenant)
|
|
{
|
|
var webhookInfo = await SaveAndGetWebhookAsync(subscriptionGroupedByTenant.Key, webhookName, data);
|
|
|
|
foreach (var webhookSubscription in subscriptionGroupedByTenant)
|
|
{
|
|
var headersToSend = webhookSubscription.Headers;
|
|
if (headers != null)
|
|
{
|
|
if (headers.UseOnlyGivenHeaders)//do not use the headers defined in subscription
|
|
{
|
|
headersToSend = headers.Headers;
|
|
}
|
|
else
|
|
{
|
|
//use the headers defined in subscription. If additional headers has same header, use additional headers value.
|
|
foreach (var additionalHeader in headers.Headers)
|
|
{
|
|
headersToSend[additionalHeader.Key] = additionalHeader.Value;
|
|
}
|
|
}
|
|
}
|
|
|
|
await _backgroundJobManager.EnqueueAsync(new WebhookSenderArgs
|
|
{
|
|
TenantId = webhookSubscription.TenantId,
|
|
WebhookEventId = webhookInfo.Id,
|
|
Data = webhookInfo.Data,
|
|
WebhookName = webhookInfo.WebhookName,
|
|
WebhookSubscriptionId = webhookSubscription.Id,
|
|
Headers = headersToSend,
|
|
Secret = webhookSubscription.Secret,
|
|
WebhookUri = webhookSubscription.WebhookUri,
|
|
SendExactSameData = sendExactSameData
|
|
});
|
|
}
|
|
}
|
|
}
|
|
|
|
protected async virtual Task<WebhookEvent> SaveAndGetWebhookAsync(
|
|
Guid? tenantId,
|
|
string webhookName,
|
|
string data)
|
|
{
|
|
var webhookInfo = new WebhookEvent
|
|
{
|
|
WebhookName = webhookName,
|
|
Data = data,
|
|
TenantId = tenantId
|
|
};
|
|
|
|
var webhookId = await WebhookEventStore.InsertAndGetIdAsync(webhookInfo);
|
|
webhookInfo.Id = webhookId;
|
|
|
|
return webhookInfo;
|
|
}
|
|
}
|
|
|