diff --git a/aspnet-core/modules/webhooks/LINGYUN.Abp.Webhooks.Core/LINGYUN/Abp/Webhooks/AbpWebhooksOptions.cs b/aspnet-core/modules/webhooks/LINGYUN.Abp.Webhooks.Core/LINGYUN/Abp/Webhooks/AbpWebhooksOptions.cs index 971ecf571..1411be14f 100644 --- a/aspnet-core/modules/webhooks/LINGYUN.Abp.Webhooks.Core/LINGYUN/Abp/Webhooks/AbpWebhooksOptions.cs +++ b/aspnet-core/modules/webhooks/LINGYUN.Abp.Webhooks.Core/LINGYUN/Abp/Webhooks/AbpWebhooksOptions.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using Volo.Abp.Collections; namespace LINGYUN.Abp.Webhooks; @@ -24,6 +25,10 @@ public class AbpWebhooksOptions public ITypeList DefinitionProviders { get; } + public HashSet DeletedWebhooks { get; } + + public HashSet DeletedWebhookGroups { get; } + public AbpWebhooksOptions() { TimeoutDuration = TimeSpan.FromSeconds(60); @@ -31,5 +36,8 @@ public class AbpWebhooksOptions MaxConsecutiveFailCountBeforeDeactivateSubscription = MaxSendAttemptCount * 3; DefinitionProviders = new TypeList(); + + DeletedWebhooks = new HashSet(); + DeletedWebhookGroups = new HashSet(); } } diff --git a/aspnet-core/modules/webhooks/LINGYUN.Abp.WebhooksManagement.Domain/LINGYUN.Abp.WebhooksManagement.Domain.csproj b/aspnet-core/modules/webhooks/LINGYUN.Abp.WebhooksManagement.Domain/LINGYUN.Abp.WebhooksManagement.Domain.csproj index 36f7a7d04..0f23dd8fb 100644 --- a/aspnet-core/modules/webhooks/LINGYUN.Abp.WebhooksManagement.Domain/LINGYUN.Abp.WebhooksManagement.Domain.csproj +++ b/aspnet-core/modules/webhooks/LINGYUN.Abp.WebhooksManagement.Domain/LINGYUN.Abp.WebhooksManagement.Domain.csproj @@ -12,6 +12,7 @@ + diff --git a/aspnet-core/modules/webhooks/LINGYUN.Abp.WebhooksManagement.Domain/LINGYUN/Abp/WebhooksManagement/IStaticWebhookSaver.cs b/aspnet-core/modules/webhooks/LINGYUN.Abp.WebhooksManagement.Domain/LINGYUN/Abp/WebhooksManagement/IStaticWebhookSaver.cs new file mode 100644 index 000000000..930945ec8 --- /dev/null +++ b/aspnet-core/modules/webhooks/LINGYUN.Abp.WebhooksManagement.Domain/LINGYUN/Abp/WebhooksManagement/IStaticWebhookSaver.cs @@ -0,0 +1,8 @@ +using System.Threading.Tasks; + +namespace LINGYUN.Abp.WebhooksManagement; + +public interface IStaticWebhookSaver +{ + Task SaveAsync(); +} diff --git a/aspnet-core/modules/webhooks/LINGYUN.Abp.WebhooksManagement.Domain/LINGYUN/Abp/WebhooksManagement/StaticWebhookSaver.cs b/aspnet-core/modules/webhooks/LINGYUN.Abp.WebhooksManagement.Domain/LINGYUN/Abp/WebhooksManagement/StaticWebhookSaver.cs new file mode 100644 index 000000000..dc3bc7df8 --- /dev/null +++ b/aspnet-core/modules/webhooks/LINGYUN.Abp.WebhooksManagement.Domain/LINGYUN/Abp/WebhooksManagement/StaticWebhookSaver.cs @@ -0,0 +1,296 @@ +using LINGYUN.Abp.Webhooks; +using Microsoft.Extensions.Caching.Distributed; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Text.Json; +using System.Threading.Tasks; +using Volo.Abp; +using Volo.Abp.Caching; +using Volo.Abp.DependencyInjection; +using Volo.Abp.DistributedLocking; +using Volo.Abp.Threading; +using Volo.Abp.Uow; + +namespace LINGYUN.Abp.WebhooksManagement; +public class StaticWebhookSaver : IStaticWebhookSaver, ITransientDependency +{ + protected IStaticWebhookDefinitionStore StaticStore { get; } + protected IWebhookGroupDefinitionRecordRepository WebhookGroupRepository { get; } + protected IWebhookDefinitionRecordRepository WebhookRepository { get; } + protected IWebhookDefinitionSerializer WebhookSerializer { get; } + protected IDistributedCache Cache { get; } + protected IApplicationInfoAccessor ApplicationInfoAccessor { get; } + protected IAbpDistributedLock DistributedLock { get; } + protected AbpWebhooksOptions WebhookOptions { get; } + protected ICancellationTokenProvider CancellationTokenProvider { get; } + protected AbpDistributedCacheOptions CacheOptions { get; } + + public StaticWebhookSaver( + IStaticWebhookDefinitionStore staticStore, + IWebhookGroupDefinitionRecordRepository webhookGroupRepository, + IWebhookDefinitionRecordRepository webhookRepository, + IWebhookDefinitionSerializer webhookSerializer, + IDistributedCache cache, + IOptions cacheOptions, + IApplicationInfoAccessor applicationInfoAccessor, + IAbpDistributedLock distributedLock, + IOptions webhookOptions, + ICancellationTokenProvider cancellationTokenProvider) + { + StaticStore = staticStore; + WebhookGroupRepository = webhookGroupRepository; + WebhookRepository = webhookRepository; + WebhookSerializer = webhookSerializer; + Cache = cache; + ApplicationInfoAccessor = applicationInfoAccessor; + DistributedLock = distributedLock; + CancellationTokenProvider = cancellationTokenProvider; + WebhookOptions = webhookOptions.Value; + CacheOptions = cacheOptions.Value; + } + + [UnitOfWork] + public virtual async Task SaveAsync() + { + await using var applicationLockHandle = await DistributedLock.TryAcquireAsync( + GetApplicationDistributedLockKey() + ); + + if (applicationLockHandle == null) + { + /* Another application instance is already doing it */ + return; + } + + /* NOTE: This can be further optimized by using 4 cache values for: + * Groups, webhooks, deleted groups and deleted webhooks. + * But the code would be more complex. This is enough for now. + */ + + var cacheKey = GetApplicationHashCacheKey(); + var cachedHash = await Cache.GetStringAsync(cacheKey, CancellationTokenProvider.Token); + + var (webhookGroupRecords, webhookRecords) = await WebhookSerializer.SerializeAsync( + await StaticStore.GetGroupsAsync() + ); + + var currentHash = CalculateHash( + webhookGroupRecords, + webhookRecords, + WebhookOptions.DeletedWebhookGroups, + WebhookOptions.DeletedWebhooks + ); + + if (cachedHash == currentHash) + { + return; + } + + await using (var commonLockHandle = await DistributedLock.TryAcquireAsync( + GetCommonDistributedLockKey(), + TimeSpan.FromMinutes(5))) + { + if (commonLockHandle == null) + { + /* It will re-try */ + throw new AbpException("Could not acquire distributed lock for saving static webhooks!"); + } + + var hasChangesInGroups = await UpdateChangedWebhookGroupsAsync(webhookGroupRecords); + var hasChangesInWebhooks = await UpdateChangedWebhooksAsync(webhookRecords); + + if (hasChangesInGroups || hasChangesInWebhooks) + { + await Cache.SetStringAsync( + GetCommonStampCacheKey(), + Guid.NewGuid().ToString(), + new DistributedCacheEntryOptions + { + SlidingExpiration = TimeSpan.FromDays(30) //TODO: Make it configurable? + }, + CancellationTokenProvider.Token + ); + } + } + + await Cache.SetStringAsync( + cacheKey, + currentHash, + new DistributedCacheEntryOptions + { + SlidingExpiration = TimeSpan.FromDays(30) //TODO: Make it configurable? + }, + CancellationTokenProvider.Token + ); + } + + private async Task UpdateChangedWebhookGroupsAsync( + IEnumerable webhookGroupRecords) + { + var newRecords = new List(); + var changedRecords = new List(); + + var webhookGroupRecordsInDatabase = (await WebhookGroupRepository.GetListAsync()) + .ToDictionary(x => x.Name); + + foreach (var webhookGroupRecord in webhookGroupRecords) + { + var webhookGroupRecordInDatabase = webhookGroupRecordsInDatabase.GetOrDefault(webhookGroupRecord.Name); + if (webhookGroupRecordInDatabase == null) + { + /* New group */ + newRecords.Add(webhookGroupRecord); + continue; + } + + if (webhookGroupRecord.HasSameData(webhookGroupRecordInDatabase)) + { + /* Not changed */ + continue; + } + + /* Changed */ + webhookGroupRecordInDatabase.Patch(webhookGroupRecord); + changedRecords.Add(webhookGroupRecordInDatabase); + } + + /* Deleted */ + var deletedRecords = WebhookOptions.DeletedWebhookGroups.Any() + ? webhookGroupRecordsInDatabase.Values + .Where(x => WebhookOptions.DeletedWebhookGroups.Contains(x.Name)) + .ToArray() + : Array.Empty(); + + if (newRecords.Any()) + { + await WebhookGroupRepository.InsertManyAsync(newRecords); + } + + if (changedRecords.Any()) + { + await WebhookGroupRepository.UpdateManyAsync(changedRecords); + } + + if (deletedRecords.Any()) + { + await WebhookGroupRepository.DeleteManyAsync(deletedRecords); + } + + return newRecords.Any() || changedRecords.Any() || deletedRecords.Any(); + } + + private async Task UpdateChangedWebhooksAsync( + IEnumerable webhookRecords) + { + var newRecords = new List(); + var changedRecords = new List(); + + var webhookRecordsInDatabase = (await WebhookRepository.GetListAsync()) + .ToDictionary(x => x.Name); + + foreach (var webhookRecord in webhookRecords) + { + var webhookRecordInDatabase = webhookRecordsInDatabase.GetOrDefault(webhookRecord.Name); + if (webhookRecordInDatabase == null) + { + /* New group */ + newRecords.Add(webhookRecord); + continue; + } + + if (webhookRecord.HasSameData(webhookRecordInDatabase)) + { + /* Not changed */ + continue; + } + + /* Changed */ + webhookRecordInDatabase.Patch(webhookRecord); + changedRecords.Add(webhookRecordInDatabase); + } + + /* Deleted */ + var deletedRecords = new List(); + + if (WebhookOptions.DeletedWebhooks.Any()) + { + deletedRecords.AddRange( + webhookRecordsInDatabase.Values + .Where(x => WebhookOptions.DeletedWebhooks.Contains(x.Name)) + ); + } + + if (WebhookOptions.DeletedWebhookGroups.Any()) + { + deletedRecords.AddIfNotContains( + webhookRecordsInDatabase.Values + .Where(x => WebhookOptions.DeletedWebhookGroups.Contains(x.GroupName)) + ); + } + + if (newRecords.Any()) + { + await WebhookRepository.InsertManyAsync(newRecords); + } + + if (changedRecords.Any()) + { + await WebhookRepository.UpdateManyAsync(changedRecords); + } + + if (deletedRecords.Any()) + { + await WebhookRepository.DeleteManyAsync(deletedRecords); + } + + return newRecords.Any() || changedRecords.Any() || deletedRecords.Any(); + } + + private string GetApplicationDistributedLockKey() + { + return $"{CacheOptions.KeyPrefix}_{ApplicationInfoAccessor.ApplicationName}_AbpWebhookUpdateLock"; + } + + private string GetCommonDistributedLockKey() + { + return $"{CacheOptions.KeyPrefix}_Common_AbpWebhookUpdateLock"; + } + + private string GetApplicationHashCacheKey() + { + return $"{CacheOptions.KeyPrefix}_{ApplicationInfoAccessor.ApplicationName}_AbpWebhooksHash"; + } + + private string GetCommonStampCacheKey() + { + return $"{CacheOptions.KeyPrefix}_AbpInMemoryWebhookCacheStamp"; + } + + private static string CalculateHash( + WebhookGroupDefinitionRecord[] webhookGroupRecords, + WebhookDefinitionRecord[] webhookRecords, + IEnumerable deletedWebhookGroups, + IEnumerable deletedWebhooks) + { + var stringBuilder = new StringBuilder(); + + stringBuilder.Append("WebhookGroupRecords:"); + stringBuilder.AppendLine(JsonSerializer.Serialize(webhookGroupRecords)); + + stringBuilder.Append("WebhookRecords:"); + stringBuilder.AppendLine(JsonSerializer.Serialize(webhookRecords)); + + stringBuilder.Append("DeletedWebhookGroups:"); + stringBuilder.AppendLine(deletedWebhookGroups.JoinAsString(",")); + + stringBuilder.Append("DeletedWebhook:"); + stringBuilder.Append(deletedWebhooks.JoinAsString(",")); + + return stringBuilder + .ToString() + .ToMd5(); + } +} diff --git a/aspnet-core/modules/webhooks/LINGYUN.Abp.WebhooksManagement.Domain/LINGYUN/Abp/WebhooksManagement/WebhookManagementOptions.cs b/aspnet-core/modules/webhooks/LINGYUN.Abp.WebhooksManagement.Domain/LINGYUN/Abp/WebhooksManagement/WebhookManagementOptions.cs index ca900b88f..f9c4069d6 100644 --- a/aspnet-core/modules/webhooks/LINGYUN.Abp.WebhooksManagement.Domain/LINGYUN/Abp/WebhooksManagement/WebhookManagementOptions.cs +++ b/aspnet-core/modules/webhooks/LINGYUN.Abp.WebhooksManagement.Domain/LINGYUN/Abp/WebhooksManagement/WebhookManagementOptions.cs @@ -1,10 +1,12 @@ namespace LINGYUN.Abp.WebhooksManagement; public class WebhookManagementOptions { + public bool SaveStaticWebhooksToDatabase { get; set; } public bool IsDynamicWebhookStoreEnabled { get; set; } public WebhookManagementOptions() { IsDynamicWebhookStoreEnabled = true; + SaveStaticWebhooksToDatabase = true; } } diff --git a/aspnet-core/modules/webhooks/LINGYUN.Abp.WebhooksManagement.Domain/LINGYUN/Abp/WebhooksManagement/WebhooksManagementDomainModule.cs b/aspnet-core/modules/webhooks/LINGYUN.Abp.WebhooksManagement.Domain/LINGYUN/Abp/WebhooksManagement/WebhooksManagementDomainModule.cs index 18903b4b3..0b530ea72 100644 --- a/aspnet-core/modules/webhooks/LINGYUN.Abp.WebhooksManagement.Domain/LINGYUN/Abp/WebhooksManagement/WebhooksManagementDomainModule.cs +++ b/aspnet-core/modules/webhooks/LINGYUN.Abp.WebhooksManagement.Domain/LINGYUN/Abp/WebhooksManagement/WebhooksManagementDomainModule.cs @@ -1,7 +1,17 @@ using LINGYUN.Abp.Webhooks; using LINGYUN.Abp.WebhooksManagement.ObjectExtending; using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Polly; +using System; +using System.Threading; +using System.Threading.Tasks; +using Volo.Abp; using Volo.Abp.AutoMapper; +using Volo.Abp.Data; +using Volo.Abp.DependencyInjection; using Volo.Abp.Domain.Entities.Events.Distributed; using Volo.Abp.Modularity; using Volo.Abp.ObjectExtending.Modularity; @@ -15,7 +25,8 @@ namespace LINGYUN.Abp.WebhooksManagement; typeof(WebhooksManagementDomainSharedModule))] public class WebhooksManagementDomainModule : AbpModule { - private static readonly OneTimeRunner OneTimeRunner = new(); + private readonly CancellationTokenSource _cancellationTokenSource = new(); + private readonly static OneTimeRunner OneTimeRunner = new(); public override void ConfigureServices(ServiceConfigurationContext context) { context.Services.AddAutoMapperObjectMapper(); @@ -35,6 +46,15 @@ public class WebhooksManagementDomainModule : AbpModule options.AutoEventSelectors.Add(); options.AutoEventSelectors.Add(); }); + + if (context.Services.IsDataMigrationEnvironment()) + { + Configure(options => + { + options.SaveStaticWebhooksToDatabase = false; + options.IsDynamicWebhookStoreEnabled = false; + }); + } } public override void PostConfigureServices(ServiceConfigurationContext context) @@ -59,4 +79,128 @@ public class WebhooksManagementDomainModule : AbpModule ); }); } + + public override void OnApplicationInitialization(ApplicationInitializationContext context) + { + AsyncHelper.RunSync(() => OnApplicationInitializationAsync(context)); + } + + public override Task OnApplicationInitializationAsync(ApplicationInitializationContext context) + { + InitializeDynamicWebhooks(context); + return Task.CompletedTask; + } + + public override Task OnApplicationShutdownAsync(ApplicationShutdownContext context) + { + _cancellationTokenSource.Cancel(); + return Task.CompletedTask; + } + + private void InitializeDynamicWebhooks(ApplicationInitializationContext context) + { + var options = context + .ServiceProvider + .GetRequiredService>() + .Value; + + if (!options.SaveStaticWebhooksToDatabase && !options.IsDynamicWebhookStoreEnabled) + { + return; + } + + var rootServiceProvider = context.ServiceProvider.GetRequiredService(); + + Task.Run(async () => + { + using var scope = rootServiceProvider.CreateScope(); + var applicationLifetime = scope.ServiceProvider.GetService(); + var cancellationTokenProvider = scope.ServiceProvider.GetRequiredService(); + var cancellationToken = applicationLifetime?.ApplicationStopping ?? _cancellationTokenSource.Token; + + try + { + using (cancellationTokenProvider.Use(cancellationToken)) + { + if (cancellationTokenProvider.Token.IsCancellationRequested) + { + return; + } + + await SaveStaticWebhooksToDatabaseAsync(options, scope, cancellationTokenProvider); + + if (cancellationTokenProvider.Token.IsCancellationRequested) + { + return; + } + + await PreCacheDynamicWebhooksAsync(options, scope); + } + } + // ReSharper disable once EmptyGeneralCatchClause (No need to log since it is logged above) + catch { } + }); + } + + private async static Task SaveStaticWebhooksToDatabaseAsync( + WebhookManagementOptions options, + IServiceScope scope, + ICancellationTokenProvider cancellationTokenProvider) + { + if (!options.SaveStaticWebhooksToDatabase) + { + return; + } + + await Policy + .Handle() + .WaitAndRetryAsync(8, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt) * 10)) + .ExecuteAsync(async _ => + { + try + { + // ReSharper disable once AccessToDisposedClosure + await scope + .ServiceProvider + .GetRequiredService() + .SaveAsync(); + } + catch (Exception ex) + { + // ReSharper disable once AccessToDisposedClosure + scope.ServiceProvider + .GetService>()? + .LogException(ex); + + throw; // Polly will catch it + } + }, cancellationTokenProvider.Token); + } + + private async static Task PreCacheDynamicWebhooksAsync(WebhookManagementOptions options, IServiceScope scope) + { + if (!options.IsDynamicWebhookStoreEnabled) + { + return; + } + + try + { + // Pre-cache permissions, so first request doesn't wait + await scope + .ServiceProvider + .GetRequiredService() + .GetGroupsAsync(); + } + catch (Exception ex) + { + // ReSharper disable once AccessToDisposedClosure + scope + .ServiceProvider + .GetService>()? + .LogException(ex); + + throw; // It will be cached in InitializeDynamicWebhooks + } + } }