16 changed files with 99 additions and 427 deletions
@ -1,74 +0,0 @@ |
|||
using DotNetCore.CAP.Persistence; |
|||
using LINGYUN.Abp.EventBus.CAP; |
|||
using Microsoft.Extensions.DependencyInjection; |
|||
using Microsoft.Extensions.Logging; |
|||
using Microsoft.Extensions.Options; |
|||
using System; |
|||
using System.Threading.Tasks; |
|||
using Volo.Abp.BackgroundWorkers; |
|||
using Volo.Abp.Threading; |
|||
|
|||
namespace DotNetCore.CAP.Processor |
|||
{ |
|||
/// <summary>
|
|||
/// 过期消息清理任务
|
|||
/// </summary>
|
|||
public class AbpCapExpiresMessageCleanupBackgroundWorker : AsyncPeriodicBackgroundWorkerBase |
|||
{ |
|||
/// <summary>
|
|||
/// 过期消息清理配置
|
|||
/// </summary>
|
|||
protected AbpCAPEventBusOptions Options { get; } |
|||
/// <summary>
|
|||
/// Initializer
|
|||
/// </summary>
|
|||
protected IStorageInitializer Initializer { get; } |
|||
/// <summary>
|
|||
/// Storage
|
|||
/// </summary>
|
|||
protected IDataStorage Storage{ get; } |
|||
/// <summary>
|
|||
/// 创建过期消息清理任务
|
|||
/// </summary>
|
|||
/// <param name="timer"></param>
|
|||
/// <param name="storage"></param>
|
|||
/// <param name="initializer"></param>
|
|||
/// <param name="options"></param>
|
|||
/// <param name="serviceScopeFactory"></param>
|
|||
public AbpCapExpiresMessageCleanupBackgroundWorker( |
|||
AbpAsyncTimer timer, |
|||
IDataStorage storage, |
|||
IStorageInitializer initializer, |
|||
IOptions<AbpCAPEventBusOptions> options, |
|||
IServiceScopeFactory serviceScopeFactory) |
|||
: base(timer, serviceScopeFactory) |
|||
{ |
|||
Storage = storage; |
|||
Options = options.Value; |
|||
Initializer = initializer; |
|||
|
|||
timer.Period = Options.CleanUpExpiresMessageInterval; |
|||
} |
|||
|
|||
/// <summary>
|
|||
/// 异步执行任务
|
|||
/// </summary>
|
|||
/// <param name="workerContext"></param>
|
|||
/// <returns></returns>
|
|||
protected override async Task DoWorkAsync(PeriodicBackgroundWorkerContext workerContext) |
|||
{ |
|||
var tables = new[] |
|||
{ |
|||
Initializer.GetPublishedTableName(), |
|||
Initializer.GetReceivedTableName() |
|||
}; |
|||
|
|||
foreach (var table in tables) |
|||
{ |
|||
Logger.LogDebug($"Collecting expired data from table: {table}"); |
|||
var time = DateTime.Now; |
|||
await Storage.DeleteExpiresAsync(table, time, Options.CleanUpExpiresMessageBatch); |
|||
} |
|||
} |
|||
} |
|||
} |
|||
@ -1,119 +0,0 @@ |
|||
using DotNetCore.CAP.Internal; |
|||
using Microsoft.Extensions.DependencyInjection; |
|||
using Microsoft.Extensions.Logging; |
|||
using System; |
|||
using System.Collections.Generic; |
|||
using System.Linq; |
|||
using System.Threading; |
|||
using System.Threading.Tasks; |
|||
|
|||
namespace DotNetCore.CAP.Processor |
|||
{ |
|||
/// <summary>
|
|||
/// CapProcessingServer
|
|||
/// </summary>
|
|||
public class AbpCapProcessingServer : IProcessingServer |
|||
{ |
|||
private readonly CancellationTokenSource _cts; |
|||
private readonly ILogger _logger; |
|||
private readonly ILoggerFactory _loggerFactory; |
|||
private readonly IServiceProvider _provider; |
|||
|
|||
private Task _compositeTask; |
|||
private ProcessingContext _context; |
|||
private bool _disposed; |
|||
/// <summary>
|
|||
/// CapProcessingServer
|
|||
/// </summary>
|
|||
/// <param name="logger"></param>
|
|||
/// <param name="loggerFactory"></param>
|
|||
/// <param name="provider"></param>
|
|||
public AbpCapProcessingServer( |
|||
ILogger<AbpCapProcessingServer> logger, |
|||
ILoggerFactory loggerFactory, |
|||
IServiceProvider provider) |
|||
{ |
|||
_logger = logger; |
|||
_loggerFactory = loggerFactory; |
|||
_provider = provider; |
|||
_cts = new CancellationTokenSource(); |
|||
} |
|||
/// <summary>
|
|||
/// Start
|
|||
/// </summary>
|
|||
public void Start(CancellationToken stoppingToken) |
|||
{ |
|||
_logger.LogInformation("Starting the processing server."); |
|||
stoppingToken.Register(() => |
|||
{ |
|||
_cts.Cancel(); |
|||
}); |
|||
|
|||
_context = new ProcessingContext(_provider, _cts.Token); |
|||
|
|||
var processorTasks = GetProcessors() |
|||
.Select(InfiniteRetry) |
|||
.Select(p => p.ProcessAsync(_context)); |
|||
_compositeTask = Task.WhenAll(processorTasks); |
|||
} |
|||
/// <summary>
|
|||
/// Pulse
|
|||
/// </summary>
|
|||
public void Pulse() |
|||
{ |
|||
_logger.LogTrace("Pulsing the processor."); |
|||
} |
|||
/// <summary>
|
|||
/// Dispose
|
|||
/// </summary>
|
|||
public void Dispose() |
|||
{ |
|||
if (_disposed) |
|||
{ |
|||
return; |
|||
} |
|||
|
|||
try |
|||
{ |
|||
_disposed = true; |
|||
|
|||
_logger.LogInformation("Shutting down the processing server..."); |
|||
_cts.Cancel(); |
|||
|
|||
_compositeTask?.Wait((int)TimeSpan.FromSeconds(10).TotalMilliseconds); |
|||
} |
|||
catch (AggregateException ex) |
|||
{ |
|||
var innerEx = ex.InnerExceptions[0]; |
|||
if (!(innerEx is OperationCanceledException)) |
|||
{ |
|||
_logger.LogWarning(innerEx, $"Expected an OperationCanceledException, but found '{innerEx.Message}'."); |
|||
} |
|||
} |
|||
catch (Exception ex) |
|||
{ |
|||
_logger.LogWarning(ex, "An exception was occured when disposing."); |
|||
} |
|||
finally |
|||
{ |
|||
_logger.LogInformation("### CAP shutdown!"); |
|||
} |
|||
} |
|||
|
|||
private IProcessor InfiniteRetry(IProcessor inner) |
|||
{ |
|||
return new InfiniteRetryProcessor(inner, _loggerFactory); |
|||
} |
|||
|
|||
private IProcessor[] GetProcessors() |
|||
{ |
|||
var returnedProcessors = new List<IProcessor> |
|||
{ |
|||
_provider.GetRequiredService<TransportCheckProcessor>(), |
|||
_provider.GetRequiredService<MessageNeedToRetryProcessor>(), |
|||
}; |
|||
|
|||
return returnedProcessors.ToArray(); |
|||
} |
|||
} |
|||
} |
|||
@ -1,3 +0,0 @@ |
|||
<Weavers xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="FodyWeavers.xsd"> |
|||
<ConfigureAwait ContinueOnCapturedContext="false" /> |
|||
</Weavers> |
|||
Loading…
Reference in new issue