diff --git a/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/InboxProcessor.cs b/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/InboxProcessor.cs index ec56db7b0a..c9b45a3ea8 100644 --- a/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/InboxProcessor.cs +++ b/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/InboxProcessor.cs @@ -28,6 +28,8 @@ namespace Volo.Abp.EventBus.Boxes protected string DistributedLockName => "Inbox_" + InboxConfig.Name; public ILogger Logger { get; set; } + protected CancellationTokenSource StoppingTokenSource { get; } + protected CancellationToken StoppingToken { get; } public InboxProcessor( IServiceProvider serviceProvider, @@ -46,6 +48,8 @@ namespace Volo.Abp.EventBus.Boxes Timer.Period = 2000; //TODO: Config? Timer.Elapsed += TimerOnElapsed; Logger = NullLogger.Instance; + StoppingTokenSource = new CancellationTokenSource(); + StoppingToken = StoppingTokenSource.Token; } private async Task TimerOnElapsed(AbpAsyncTimer arg) @@ -63,13 +67,20 @@ namespace Volo.Abp.EventBus.Boxes public Task StopAsync(CancellationToken cancellationToken = default) { + StoppingTokenSource.Cancel(); Timer.Stop(cancellationToken); + StoppingTokenSource.Dispose(); return Task.CompletedTask; } protected virtual async Task RunAsync() { - await using (var handle = await DistributedLockProvider.TryAcquireLockAsync(DistributedLockName)) + if (StoppingToken.IsCancellationRequested) + { + return; + } + + await using (var handle = await DistributedLockProvider.TryAcquireLockAsync(DistributedLockName, cancellationToken: StoppingToken)) { if (handle != null) { @@ -79,7 +90,7 @@ namespace Volo.Abp.EventBus.Boxes while (true) { - var waitingEvents = await Inbox.GetWaitingEventsAsync(1000); //TODO: Config? + var waitingEvents = await Inbox.GetWaitingEventsAsync(1000); //TODO: Config? Pass StoppingToken! if (waitingEvents.Count <= 0) { break; @@ -107,7 +118,7 @@ namespace Volo.Abp.EventBus.Boxes else { Logger.LogDebug("Could not obtain the distributed lock: " + DistributedLockName); - await Task.Delay(7000); //TODO: Can we pass a cancellation token to cancel on shutdown? (Config?) + await TaskDelayHelper.DelayAsync(15000, StoppingToken); //TODO: Config? } } } diff --git a/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/OutboxSender.cs b/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/OutboxSender.cs index 261ff6512b..1a2e0b9d79 100644 --- a/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/OutboxSender.cs +++ b/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/OutboxSender.cs @@ -21,6 +21,9 @@ namespace Volo.Abp.EventBus.Boxes protected OutboxConfig OutboxConfig { get; private set; } protected string DistributedLockName => "Outbox_" + OutboxConfig.Name; public ILogger Logger { get; set; } + + protected CancellationTokenSource StoppingTokenSource { get; } + protected CancellationToken StoppingToken { get; } public OutboxSender( IServiceProvider serviceProvider, @@ -35,6 +38,8 @@ namespace Volo.Abp.EventBus.Boxes Timer.Period = 2000; //TODO: Config? Timer.Elapsed += TimerOnElapsed; Logger = NullLogger.Instance; + StoppingTokenSource = new CancellationTokenSource(); + StoppingToken = StoppingTokenSource.Token; } public virtual Task StartAsync(OutboxConfig outboxConfig, CancellationToken cancellationToken = default) @@ -47,7 +52,9 @@ namespace Volo.Abp.EventBus.Boxes public virtual Task StopAsync(CancellationToken cancellationToken = default) { + StoppingTokenSource.Cancel(); Timer.Stop(cancellationToken); + StoppingTokenSource.Dispose(); return Task.CompletedTask; } @@ -62,8 +69,6 @@ namespace Volo.Abp.EventBus.Boxes { if (handle != null) { - Logger.LogDebug("Obtained the distributed lock: " + DistributedLockName); - while (true) { var waitingEvents = await Outbox.GetWaitingEventsAsync(1000); //TODO: Config? @@ -89,7 +94,7 @@ namespace Volo.Abp.EventBus.Boxes else { Logger.LogDebug("Could not obtain the distributed lock: " + DistributedLockName); - await Task.Delay(7000); //TODO: Can we pass a cancellation token to cancel on shutdown? (Config?) + await TaskDelayHelper.DelayAsync(15000, StoppingToken); //TODO: Config? } } } diff --git a/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/TaskHelper.cs b/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/TaskHelper.cs new file mode 100644 index 0000000000..87abe18da8 --- /dev/null +++ b/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/TaskHelper.cs @@ -0,0 +1,20 @@ +using System.Threading; +using System.Threading.Tasks; + +namespace Volo.Abp.EventBus.Boxes +{ + internal static class TaskDelayHelper + { + public static async Task DelayAsync(int milliseconds, CancellationToken cancellationToken = default) + { + try + { + await Task.Delay(milliseconds, cancellationToken); + } + catch (TaskCanceledException) + { + return; + } + } + } +} \ No newline at end of file diff --git a/test/DistEvents/DistDemoApp.Shared/DistDemoAppSharedModule.cs b/test/DistEvents/DistDemoApp.Shared/DistDemoAppSharedModule.cs index 5755f63677..138b64f567 100644 --- a/test/DistEvents/DistDemoApp.Shared/DistDemoAppSharedModule.cs +++ b/test/DistEvents/DistDemoApp.Shared/DistDemoAppSharedModule.cs @@ -14,7 +14,7 @@ namespace DistDemoApp typeof(AbpAutofacModule), typeof(AbpDddDomainModule), typeof(AbpEventBusBoxesModule) - )] + )] public class DistDemoAppSharedModule : AbpModule { public override void ConfigureServices(ServiceConfigurationContext context)