Browse Source

feat: Implement TickerQ background worker management.

pull/23802/head
maliming 4 months ago
parent
commit
3a9f035136
No known key found for this signature in database GPG Key ID: A646B9CB645ECEA4
  1. 18
      framework/src/Volo.Abp.BackgroundJobs.TickerQ/Volo/Abp/BackgroundJobs/TickerQ/AbpBackgroundJobsTickerQModule.cs
  2. 96
      framework/src/Volo.Abp.BackgroundWorkers.TickerQ/Volo/Abp/BackgroundWorkers/TickerQ/TickerQBackgroundWorkerManager.cs
  3. 40
      framework/src/Volo.Abp.BackgroundWorkers.TickerQ/Volo/Abp/BackgroundWorkers/TickerQ/TickerQPeriodicBackgroundWorkerInvoker.cs
  4. 2
      framework/src/Volo.Abp.Core/Volo/Abp/AbpApplicationBase.cs
  5. 14
      framework/src/Volo.Abp.TickerQ/Volo/Abp/TickerQ/AbpTickerQModule.cs
  6. 19
      framework/src/Volo.Abp.TickerQ/Volo/Abp/TickerQ/AbpTickerQOptions.cs

18
framework/src/Volo.Abp.BackgroundJobs.TickerQ/Volo/Abp/BackgroundJobs/TickerQ/AbpBackgroundJobsTickerQModule.cs

@ -19,18 +19,28 @@ public class AbpBackgroundJobsTickerQModule : AbpModule
public override void OnApplicationInitialization(ApplicationInitializationContext context)
{
var abpBackgroundJobOptions = context.ServiceProvider.GetRequiredService<IOptions<AbpBackgroundJobOptions>>();
var tickerFunctionDelegateDict = new Dictionary<string, (string, TickerTaskPriority, TickerFunctionDelegate)>();
var tickerFunctionDelegates = new Dictionary<string, (string, TickerTaskPriority, TickerFunctionDelegate)>();
var requestTypes = new Dictionary<string, (string, Type)>();
foreach (var jobConfiguration in abpBackgroundJobOptions.Value.GetJobs())
{
var genericMethod = GetTickerFunctionDelegateMethod.MakeGenericMethod(jobConfiguration.ArgsType);
var tickerFunctionDelegate = (TickerFunctionDelegate)genericMethod.Invoke(null, [jobConfiguration.ArgsType])!;
tickerFunctionDelegateDict.TryAdd(jobConfiguration.JobName, (string.Empty, TickerTaskPriority.Normal, tickerFunctionDelegate));
tickerFunctionDelegates.TryAdd(jobConfiguration.JobName, (string.Empty, TickerTaskPriority.Normal, tickerFunctionDelegate));
requestTypes.TryAdd(jobConfiguration.JobName, (jobConfiguration.ArgsType.FullName, jobConfiguration.ArgsType)!);
}
TickerFunctionProvider.RegisterFunctions(tickerFunctionDelegateDict);
TickerFunctionProvider.RegisterRequestType(requestTypes);
PreConfigure<AbpTickerQOptions>(options =>
{
foreach (var functionDelegate in tickerFunctionDelegates)
{
options.Functions.TryAdd(functionDelegate.Key, functionDelegate.Value);
}
foreach (var requestType in requestTypes)
{
options.RequestTypes.TryAdd(requestType.Key, requestType.Value);
}
});
}
private static TickerFunctionDelegate GetTickerFunctionDelegate<TArgs>(Type argsType)

96
framework/src/Volo.Abp.BackgroundWorkers.TickerQ/Volo/Abp/BackgroundWorkers/TickerQ/TickerQBackgroundWorkerManager.cs

@ -0,0 +1,96 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using TickerQ.Utilities.Enums;
using Volo.Abp.DependencyInjection;
using Volo.Abp.TickerQ;
namespace Volo.Abp.BackgroundWorkers.TickerQ;
[Dependency(ReplaceServices = true)]
[ExposeServices(typeof(IBackgroundWorkerManager), typeof(TickerQBackgroundWorkerManager))]
public class TickerQBackgroundWorkerManager : BackgroundWorkerManager, ISingletonDependency
{
protected IObjectAccessor<IServiceCollection> ObjectAccessor { get; }
public TickerQBackgroundWorkerManager(IObjectAccessor<IServiceCollection> objectAccessor)
{
ObjectAccessor = objectAccessor;
}
public override async Task AddAsync(IBackgroundWorker worker, CancellationToken cancellationToken = default)
{
if (worker is AsyncPeriodicBackgroundWorkerBase or PeriodicBackgroundWorkerBase)
{
int? period = null;
string? cronExpression = null;
if (worker is AsyncPeriodicBackgroundWorkerBase asyncPeriodicBackgroundWorkerBase)
{
period = asyncPeriodicBackgroundWorkerBase.Period;
cronExpression = asyncPeriodicBackgroundWorkerBase.CronExpression;
}
else if (worker is PeriodicBackgroundWorkerBase periodicBackgroundWorkerBase)
{
period = periodicBackgroundWorkerBase.Period;
cronExpression = periodicBackgroundWorkerBase.CronExpression;
}
if (period == null && cronExpression.IsNullOrWhiteSpace())
{
throw new AbpException($"Both 'Period' and 'CronExpression' are not set for {worker.GetType().FullName}. You must set at least one of them.");
}
if (period != null && cronExpression.IsNullOrWhiteSpace())
{
cronExpression = GetCron(period.Value);
}
ObjectAccessor.Value!.PreConfigure<AbpTickerQOptions>(options =>
{
var name = BackgroundWorkerNameAttribute.GetNameOrNull(worker.GetType()) ?? worker.GetType().FullName;
options.Functions.TryAdd(name!, (cronExpression!, TickerTaskPriority.Normal, async (tickerQCancellationToken, serviceProvider, tickerFunctionContext) =>
{
var workerInvoker = new TickerQPeriodicBackgroundWorkerInvoker(worker, serviceProvider);
await workerInvoker.DoWorkAsync(tickerFunctionContext, tickerQCancellationToken);
}));
});
}
await base.AddAsync(worker, cancellationToken);
}
protected virtual string GetCron(int period)
{
var time = TimeSpan.FromMilliseconds(period);
if (time.TotalMinutes < 1)
{
// Less than 1 minute — 5-field cron doesn't support seconds, so run every minute
return "* * * * *";
}
if (time.TotalMinutes < 60)
{
// Run every N minutes
var minutes = (int)Math.Round(time.TotalMinutes);
return $"*/{minutes} * * * *";
}
if (time.TotalHours < 24)
{
// Run every N hours
var hours = (int)Math.Round(time.TotalHours);
return $"0 */{hours} * * *";
}
if (time.TotalDays <= 31)
{
// Run every N days
var days = (int)Math.Round(time.TotalDays);
return $"0 0 */{days} * *";
}
throw new AbpException($"Cannot convert period: {period} to cron expression.");
}
}

40
framework/src/Volo.Abp.BackgroundWorkers.TickerQ/Volo/Abp/BackgroundWorkers/TickerQ/TickerQPeriodicBackgroundWorkerInvoker.cs

@ -0,0 +1,40 @@
using System;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using TickerQ.Utilities.Models;
namespace Volo.Abp.BackgroundWorkers.TickerQ;
//TODO: Use lambda expression to improve performance.
public class TickerQPeriodicBackgroundWorkerInvoker
{
private readonly MethodInfo _doWorkAsyncMethod;
private readonly MethodInfo _doWorkMethod;
protected IBackgroundWorker Worker { get; }
protected IServiceProvider ServiceProvider { get; }
public TickerQPeriodicBackgroundWorkerInvoker(IBackgroundWorker worker, IServiceProvider serviceProvider)
{
_doWorkAsyncMethod = worker.GetType().GetMethod("DoWorkAsync", BindingFlags.Instance | BindingFlags.NonPublic)!;
_doWorkMethod = worker.GetType().GetMethod("DoWork", BindingFlags.Instance | BindingFlags.NonPublic)!;
Worker = worker;
ServiceProvider = serviceProvider;
}
public virtual async Task DoWorkAsync(TickerFunctionContext context, CancellationToken cancellationToken = default)
{
var workerContext = new PeriodicBackgroundWorkerContext(ServiceProvider);
switch (Worker)
{
case AsyncPeriodicBackgroundWorkerBase asyncPeriodicBackgroundWorker:
await (Task)(_doWorkAsyncMethod.Invoke(asyncPeriodicBackgroundWorker, new object[] { workerContext })!);
break;
case PeriodicBackgroundWorkerBase periodicBackgroundWorker:
_doWorkMethod.Invoke(periodicBackgroundWorker, new object[] { workerContext });
break;
}
}
}

2
framework/src/Volo.Abp.Core/Volo/Abp/AbpApplicationBase.cs

@ -45,7 +45,7 @@ public abstract class AbpApplicationBase : IAbpApplication
StartupModuleType = startupModuleType;
Services = services;
services.AddObjectAccessor(services);
services.TryAddObjectAccessor<IServiceProvider>();
var options = new AbpApplicationCreationOptions(services);

14
framework/src/Volo.Abp.TickerQ/Volo/Abp/TickerQ/AbpTickerQModule.cs

@ -1,5 +1,7 @@
using Microsoft.Extensions.DependencyInjection;
using TickerQ.DependencyInjection;
using TickerQ.Utilities;
using Volo.Abp.DependencyInjection;
using Volo.Abp.Modularity;
namespace Volo.Abp.TickerQ;
@ -12,6 +14,18 @@ public class AbpTickerQModule : AbpModule
{
options.SetInstanceIdentifier(context.Services.GetApplicationName());
});
}
public override void OnPostApplicationInitialization(ApplicationInitializationContext context)
{
var serviceCollection = context.ServiceProvider.GetRequiredService<IObjectAccessor<IServiceCollection>>();
if (serviceCollection.Value == null)
{
return;
}
var tickerQ = serviceCollection.Value.ExecutePreConfiguredActions<AbpTickerQOptions>();
TickerFunctionProvider.RegisterFunctions(tickerQ.Functions);
TickerFunctionProvider.RegisterRequestType(tickerQ.RequestTypes);
}
}

19
framework/src/Volo.Abp.TickerQ/Volo/Abp/TickerQ/AbpTickerQOptions.cs

@ -0,0 +1,19 @@
using System;
using System.Collections.Generic;
using TickerQ.Utilities;
using TickerQ.Utilities.Enums;
namespace Volo.Abp.TickerQ;
public class AbpTickerQOptions
{
public Dictionary<string, (string, TickerTaskPriority, TickerFunctionDelegate)> Functions { get;}
public Dictionary<string, (string, Type)> RequestTypes { get; }
public AbpTickerQOptions()
{
Functions = new Dictionary<string, (string, TickerTaskPriority, TickerFunctionDelegate)>();
RequestTypes = new Dictionary<string, (string, Type)>();
}
}
Loading…
Cancel
Save