Browse Source

implement the IStaticWebhookSaver

pull/765/head
cKey 3 years ago
parent
commit
02007e6aa2
  1. 8
      aspnet-core/modules/webhooks/LINGYUN.Abp.Webhooks.Core/LINGYUN/Abp/Webhooks/AbpWebhooksOptions.cs
  2. 1
      aspnet-core/modules/webhooks/LINGYUN.Abp.WebhooksManagement.Domain/LINGYUN.Abp.WebhooksManagement.Domain.csproj
  3. 8
      aspnet-core/modules/webhooks/LINGYUN.Abp.WebhooksManagement.Domain/LINGYUN/Abp/WebhooksManagement/IStaticWebhookSaver.cs
  4. 296
      aspnet-core/modules/webhooks/LINGYUN.Abp.WebhooksManagement.Domain/LINGYUN/Abp/WebhooksManagement/StaticWebhookSaver.cs
  5. 2
      aspnet-core/modules/webhooks/LINGYUN.Abp.WebhooksManagement.Domain/LINGYUN/Abp/WebhooksManagement/WebhookManagementOptions.cs
  6. 146
      aspnet-core/modules/webhooks/LINGYUN.Abp.WebhooksManagement.Domain/LINGYUN/Abp/WebhooksManagement/WebhooksManagementDomainModule.cs

8
aspnet-core/modules/webhooks/LINGYUN.Abp.Webhooks.Core/LINGYUN/Abp/Webhooks/AbpWebhooksOptions.cs

@ -1,4 +1,5 @@
using System; using System;
using System.Collections.Generic;
using Volo.Abp.Collections; using Volo.Abp.Collections;
namespace LINGYUN.Abp.Webhooks; namespace LINGYUN.Abp.Webhooks;
@ -24,6 +25,10 @@ public class AbpWebhooksOptions
public ITypeList<IWebhookDefinitionProvider> DefinitionProviders { get; } public ITypeList<IWebhookDefinitionProvider> DefinitionProviders { get; }
public HashSet<string> DeletedWebhooks { get; }
public HashSet<string> DeletedWebhookGroups { get; }
public AbpWebhooksOptions() public AbpWebhooksOptions()
{ {
TimeoutDuration = TimeSpan.FromSeconds(60); TimeoutDuration = TimeSpan.FromSeconds(60);
@ -31,5 +36,8 @@ public class AbpWebhooksOptions
MaxConsecutiveFailCountBeforeDeactivateSubscription = MaxSendAttemptCount * 3; MaxConsecutiveFailCountBeforeDeactivateSubscription = MaxSendAttemptCount * 3;
DefinitionProviders = new TypeList<IWebhookDefinitionProvider>(); DefinitionProviders = new TypeList<IWebhookDefinitionProvider>();
DeletedWebhooks = new HashSet<string>();
DeletedWebhookGroups = new HashSet<string>();
} }
} }

1
aspnet-core/modules/webhooks/LINGYUN.Abp.WebhooksManagement.Domain/LINGYUN.Abp.WebhooksManagement.Domain.csproj

@ -12,6 +12,7 @@
<PackageReference Include="Volo.Abp.AutoMapper" Version="$(VoloAbpPackageVersion)" /> <PackageReference Include="Volo.Abp.AutoMapper" Version="$(VoloAbpPackageVersion)" />
<PackageReference Include="Volo.Abp.Caching" Version="$(VoloAbpPackageVersion)" /> <PackageReference Include="Volo.Abp.Caching" Version="$(VoloAbpPackageVersion)" />
<PackageReference Include="Volo.Abp.Ddd.Domain" Version="$(VoloAbpPackageVersion)" /> <PackageReference Include="Volo.Abp.Ddd.Domain" Version="$(VoloAbpPackageVersion)" />
<PackageReference Include="Polly" Version="$(PollyPackageVersion)" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>

8
aspnet-core/modules/webhooks/LINGYUN.Abp.WebhooksManagement.Domain/LINGYUN/Abp/WebhooksManagement/IStaticWebhookSaver.cs

@ -0,0 +1,8 @@
using System.Threading.Tasks;
namespace LINGYUN.Abp.WebhooksManagement;
public interface IStaticWebhookSaver
{
Task SaveAsync();
}

296
aspnet-core/modules/webhooks/LINGYUN.Abp.WebhooksManagement.Domain/LINGYUN/Abp/WebhooksManagement/StaticWebhookSaver.cs

@ -0,0 +1,296 @@
using LINGYUN.Abp.Webhooks;
using Microsoft.Extensions.Caching.Distributed;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Text.Json;
using System.Threading.Tasks;
using Volo.Abp;
using Volo.Abp.Caching;
using Volo.Abp.DependencyInjection;
using Volo.Abp.DistributedLocking;
using Volo.Abp.Threading;
using Volo.Abp.Uow;
namespace LINGYUN.Abp.WebhooksManagement;
public class StaticWebhookSaver : IStaticWebhookSaver, ITransientDependency
{
protected IStaticWebhookDefinitionStore StaticStore { get; }
protected IWebhookGroupDefinitionRecordRepository WebhookGroupRepository { get; }
protected IWebhookDefinitionRecordRepository WebhookRepository { get; }
protected IWebhookDefinitionSerializer WebhookSerializer { get; }
protected IDistributedCache Cache { get; }
protected IApplicationInfoAccessor ApplicationInfoAccessor { get; }
protected IAbpDistributedLock DistributedLock { get; }
protected AbpWebhooksOptions WebhookOptions { get; }
protected ICancellationTokenProvider CancellationTokenProvider { get; }
protected AbpDistributedCacheOptions CacheOptions { get; }
public StaticWebhookSaver(
IStaticWebhookDefinitionStore staticStore,
IWebhookGroupDefinitionRecordRepository webhookGroupRepository,
IWebhookDefinitionRecordRepository webhookRepository,
IWebhookDefinitionSerializer webhookSerializer,
IDistributedCache cache,
IOptions<AbpDistributedCacheOptions> cacheOptions,
IApplicationInfoAccessor applicationInfoAccessor,
IAbpDistributedLock distributedLock,
IOptions<AbpWebhooksOptions> webhookOptions,
ICancellationTokenProvider cancellationTokenProvider)
{
StaticStore = staticStore;
WebhookGroupRepository = webhookGroupRepository;
WebhookRepository = webhookRepository;
WebhookSerializer = webhookSerializer;
Cache = cache;
ApplicationInfoAccessor = applicationInfoAccessor;
DistributedLock = distributedLock;
CancellationTokenProvider = cancellationTokenProvider;
WebhookOptions = webhookOptions.Value;
CacheOptions = cacheOptions.Value;
}
[UnitOfWork]
public virtual async Task SaveAsync()
{
await using var applicationLockHandle = await DistributedLock.TryAcquireAsync(
GetApplicationDistributedLockKey()
);
if (applicationLockHandle == null)
{
/* Another application instance is already doing it */
return;
}
/* NOTE: This can be further optimized by using 4 cache values for:
* Groups, webhooks, deleted groups and deleted webhooks.
* But the code would be more complex. This is enough for now.
*/
var cacheKey = GetApplicationHashCacheKey();
var cachedHash = await Cache.GetStringAsync(cacheKey, CancellationTokenProvider.Token);
var (webhookGroupRecords, webhookRecords) = await WebhookSerializer.SerializeAsync(
await StaticStore.GetGroupsAsync()
);
var currentHash = CalculateHash(
webhookGroupRecords,
webhookRecords,
WebhookOptions.DeletedWebhookGroups,
WebhookOptions.DeletedWebhooks
);
if (cachedHash == currentHash)
{
return;
}
await using (var commonLockHandle = await DistributedLock.TryAcquireAsync(
GetCommonDistributedLockKey(),
TimeSpan.FromMinutes(5)))
{
if (commonLockHandle == null)
{
/* It will re-try */
throw new AbpException("Could not acquire distributed lock for saving static webhooks!");
}
var hasChangesInGroups = await UpdateChangedWebhookGroupsAsync(webhookGroupRecords);
var hasChangesInWebhooks = await UpdateChangedWebhooksAsync(webhookRecords);
if (hasChangesInGroups || hasChangesInWebhooks)
{
await Cache.SetStringAsync(
GetCommonStampCacheKey(),
Guid.NewGuid().ToString(),
new DistributedCacheEntryOptions
{
SlidingExpiration = TimeSpan.FromDays(30) //TODO: Make it configurable?
},
CancellationTokenProvider.Token
);
}
}
await Cache.SetStringAsync(
cacheKey,
currentHash,
new DistributedCacheEntryOptions
{
SlidingExpiration = TimeSpan.FromDays(30) //TODO: Make it configurable?
},
CancellationTokenProvider.Token
);
}
private async Task<bool> UpdateChangedWebhookGroupsAsync(
IEnumerable<WebhookGroupDefinitionRecord> webhookGroupRecords)
{
var newRecords = new List<WebhookGroupDefinitionRecord>();
var changedRecords = new List<WebhookGroupDefinitionRecord>();
var webhookGroupRecordsInDatabase = (await WebhookGroupRepository.GetListAsync())
.ToDictionary(x => x.Name);
foreach (var webhookGroupRecord in webhookGroupRecords)
{
var webhookGroupRecordInDatabase = webhookGroupRecordsInDatabase.GetOrDefault(webhookGroupRecord.Name);
if (webhookGroupRecordInDatabase == null)
{
/* New group */
newRecords.Add(webhookGroupRecord);
continue;
}
if (webhookGroupRecord.HasSameData(webhookGroupRecordInDatabase))
{
/* Not changed */
continue;
}
/* Changed */
webhookGroupRecordInDatabase.Patch(webhookGroupRecord);
changedRecords.Add(webhookGroupRecordInDatabase);
}
/* Deleted */
var deletedRecords = WebhookOptions.DeletedWebhookGroups.Any()
? webhookGroupRecordsInDatabase.Values
.Where(x => WebhookOptions.DeletedWebhookGroups.Contains(x.Name))
.ToArray()
: Array.Empty<WebhookGroupDefinitionRecord>();
if (newRecords.Any())
{
await WebhookGroupRepository.InsertManyAsync(newRecords);
}
if (changedRecords.Any())
{
await WebhookGroupRepository.UpdateManyAsync(changedRecords);
}
if (deletedRecords.Any())
{
await WebhookGroupRepository.DeleteManyAsync(deletedRecords);
}
return newRecords.Any() || changedRecords.Any() || deletedRecords.Any();
}
private async Task<bool> UpdateChangedWebhooksAsync(
IEnumerable<WebhookDefinitionRecord> webhookRecords)
{
var newRecords = new List<WebhookDefinitionRecord>();
var changedRecords = new List<WebhookDefinitionRecord>();
var webhookRecordsInDatabase = (await WebhookRepository.GetListAsync())
.ToDictionary(x => x.Name);
foreach (var webhookRecord in webhookRecords)
{
var webhookRecordInDatabase = webhookRecordsInDatabase.GetOrDefault(webhookRecord.Name);
if (webhookRecordInDatabase == null)
{
/* New group */
newRecords.Add(webhookRecord);
continue;
}
if (webhookRecord.HasSameData(webhookRecordInDatabase))
{
/* Not changed */
continue;
}
/* Changed */
webhookRecordInDatabase.Patch(webhookRecord);
changedRecords.Add(webhookRecordInDatabase);
}
/* Deleted */
var deletedRecords = new List<WebhookDefinitionRecord>();
if (WebhookOptions.DeletedWebhooks.Any())
{
deletedRecords.AddRange(
webhookRecordsInDatabase.Values
.Where(x => WebhookOptions.DeletedWebhooks.Contains(x.Name))
);
}
if (WebhookOptions.DeletedWebhookGroups.Any())
{
deletedRecords.AddIfNotContains(
webhookRecordsInDatabase.Values
.Where(x => WebhookOptions.DeletedWebhookGroups.Contains(x.GroupName))
);
}
if (newRecords.Any())
{
await WebhookRepository.InsertManyAsync(newRecords);
}
if (changedRecords.Any())
{
await WebhookRepository.UpdateManyAsync(changedRecords);
}
if (deletedRecords.Any())
{
await WebhookRepository.DeleteManyAsync(deletedRecords);
}
return newRecords.Any() || changedRecords.Any() || deletedRecords.Any();
}
private string GetApplicationDistributedLockKey()
{
return $"{CacheOptions.KeyPrefix}_{ApplicationInfoAccessor.ApplicationName}_AbpWebhookUpdateLock";
}
private string GetCommonDistributedLockKey()
{
return $"{CacheOptions.KeyPrefix}_Common_AbpWebhookUpdateLock";
}
private string GetApplicationHashCacheKey()
{
return $"{CacheOptions.KeyPrefix}_{ApplicationInfoAccessor.ApplicationName}_AbpWebhooksHash";
}
private string GetCommonStampCacheKey()
{
return $"{CacheOptions.KeyPrefix}_AbpInMemoryWebhookCacheStamp";
}
private static string CalculateHash(
WebhookGroupDefinitionRecord[] webhookGroupRecords,
WebhookDefinitionRecord[] webhookRecords,
IEnumerable<string> deletedWebhookGroups,
IEnumerable<string> deletedWebhooks)
{
var stringBuilder = new StringBuilder();
stringBuilder.Append("WebhookGroupRecords:");
stringBuilder.AppendLine(JsonSerializer.Serialize(webhookGroupRecords));
stringBuilder.Append("WebhookRecords:");
stringBuilder.AppendLine(JsonSerializer.Serialize(webhookRecords));
stringBuilder.Append("DeletedWebhookGroups:");
stringBuilder.AppendLine(deletedWebhookGroups.JoinAsString(","));
stringBuilder.Append("DeletedWebhook:");
stringBuilder.Append(deletedWebhooks.JoinAsString(","));
return stringBuilder
.ToString()
.ToMd5();
}
}

2
aspnet-core/modules/webhooks/LINGYUN.Abp.WebhooksManagement.Domain/LINGYUN/Abp/WebhooksManagement/WebhookManagementOptions.cs

@ -1,10 +1,12 @@
namespace LINGYUN.Abp.WebhooksManagement; namespace LINGYUN.Abp.WebhooksManagement;
public class WebhookManagementOptions public class WebhookManagementOptions
{ {
public bool SaveStaticWebhooksToDatabase { get; set; }
public bool IsDynamicWebhookStoreEnabled { get; set; } public bool IsDynamicWebhookStoreEnabled { get; set; }
public WebhookManagementOptions() public WebhookManagementOptions()
{ {
IsDynamicWebhookStoreEnabled = true; IsDynamicWebhookStoreEnabled = true;
SaveStaticWebhooksToDatabase = true;
} }
} }

146
aspnet-core/modules/webhooks/LINGYUN.Abp.WebhooksManagement.Domain/LINGYUN/Abp/WebhooksManagement/WebhooksManagementDomainModule.cs

@ -1,7 +1,17 @@
using LINGYUN.Abp.Webhooks; using LINGYUN.Abp.Webhooks;
using LINGYUN.Abp.WebhooksManagement.ObjectExtending; using LINGYUN.Abp.WebhooksManagement.ObjectExtending;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Polly;
using System;
using System.Threading;
using System.Threading.Tasks;
using Volo.Abp;
using Volo.Abp.AutoMapper; using Volo.Abp.AutoMapper;
using Volo.Abp.Data;
using Volo.Abp.DependencyInjection;
using Volo.Abp.Domain.Entities.Events.Distributed; using Volo.Abp.Domain.Entities.Events.Distributed;
using Volo.Abp.Modularity; using Volo.Abp.Modularity;
using Volo.Abp.ObjectExtending.Modularity; using Volo.Abp.ObjectExtending.Modularity;
@ -15,7 +25,8 @@ namespace LINGYUN.Abp.WebhooksManagement;
typeof(WebhooksManagementDomainSharedModule))] typeof(WebhooksManagementDomainSharedModule))]
public class WebhooksManagementDomainModule : AbpModule public class WebhooksManagementDomainModule : AbpModule
{ {
private static readonly OneTimeRunner OneTimeRunner = new(); private readonly CancellationTokenSource _cancellationTokenSource = new();
private readonly static OneTimeRunner OneTimeRunner = new();
public override void ConfigureServices(ServiceConfigurationContext context) public override void ConfigureServices(ServiceConfigurationContext context)
{ {
context.Services.AddAutoMapperObjectMapper<WebhooksManagementDomainModule>(); context.Services.AddAutoMapperObjectMapper<WebhooksManagementDomainModule>();
@ -35,6 +46,15 @@ public class WebhooksManagementDomainModule : AbpModule
options.AutoEventSelectors.Add<WebhookSendRecord>(); options.AutoEventSelectors.Add<WebhookSendRecord>();
options.AutoEventSelectors.Add<WebhookSubscription>(); options.AutoEventSelectors.Add<WebhookSubscription>();
}); });
if (context.Services.IsDataMigrationEnvironment())
{
Configure<WebhookManagementOptions>(options =>
{
options.SaveStaticWebhooksToDatabase = false;
options.IsDynamicWebhookStoreEnabled = false;
});
}
} }
public override void PostConfigureServices(ServiceConfigurationContext context) public override void PostConfigureServices(ServiceConfigurationContext context)
@ -59,4 +79,128 @@ public class WebhooksManagementDomainModule : AbpModule
); );
}); });
} }
public override void OnApplicationInitialization(ApplicationInitializationContext context)
{
AsyncHelper.RunSync(() => OnApplicationInitializationAsync(context));
}
public override Task OnApplicationInitializationAsync(ApplicationInitializationContext context)
{
InitializeDynamicWebhooks(context);
return Task.CompletedTask;
}
public override Task OnApplicationShutdownAsync(ApplicationShutdownContext context)
{
_cancellationTokenSource.Cancel();
return Task.CompletedTask;
}
private void InitializeDynamicWebhooks(ApplicationInitializationContext context)
{
var options = context
.ServiceProvider
.GetRequiredService<IOptions<WebhookManagementOptions>>()
.Value;
if (!options.SaveStaticWebhooksToDatabase && !options.IsDynamicWebhookStoreEnabled)
{
return;
}
var rootServiceProvider = context.ServiceProvider.GetRequiredService<IRootServiceProvider>();
Task.Run(async () =>
{
using var scope = rootServiceProvider.CreateScope();
var applicationLifetime = scope.ServiceProvider.GetService<IHostApplicationLifetime>();
var cancellationTokenProvider = scope.ServiceProvider.GetRequiredService<ICancellationTokenProvider>();
var cancellationToken = applicationLifetime?.ApplicationStopping ?? _cancellationTokenSource.Token;
try
{
using (cancellationTokenProvider.Use(cancellationToken))
{
if (cancellationTokenProvider.Token.IsCancellationRequested)
{
return;
}
await SaveStaticWebhooksToDatabaseAsync(options, scope, cancellationTokenProvider);
if (cancellationTokenProvider.Token.IsCancellationRequested)
{
return;
}
await PreCacheDynamicWebhooksAsync(options, scope);
}
}
// ReSharper disable once EmptyGeneralCatchClause (No need to log since it is logged above)
catch { }
});
}
private async static Task SaveStaticWebhooksToDatabaseAsync(
WebhookManagementOptions options,
IServiceScope scope,
ICancellationTokenProvider cancellationTokenProvider)
{
if (!options.SaveStaticWebhooksToDatabase)
{
return;
}
await Policy
.Handle<Exception>()
.WaitAndRetryAsync(8, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt) * 10))
.ExecuteAsync(async _ =>
{
try
{
// ReSharper disable once AccessToDisposedClosure
await scope
.ServiceProvider
.GetRequiredService<IStaticWebhookSaver>()
.SaveAsync();
}
catch (Exception ex)
{
// ReSharper disable once AccessToDisposedClosure
scope.ServiceProvider
.GetService<ILogger<WebhooksManagementDomainModule>>()?
.LogException(ex);
throw; // Polly will catch it
}
}, cancellationTokenProvider.Token);
}
private async static Task PreCacheDynamicWebhooksAsync(WebhookManagementOptions options, IServiceScope scope)
{
if (!options.IsDynamicWebhookStoreEnabled)
{
return;
}
try
{
// Pre-cache permissions, so first request doesn't wait
await scope
.ServiceProvider
.GetRequiredService<IDynamicWebhookDefinitionStore>()
.GetGroupsAsync();
}
catch (Exception ex)
{
// ReSharper disable once AccessToDisposedClosure
scope
.ServiceProvider
.GetService<ILogger<WebhooksManagementDomainModule>>()?
.LogException(ex);
throw; // It will be cached in InitializeDynamicWebhooks
}
}
} }

Loading…
Cancel
Save