Browse Source
Merge pull request #11432 from abpframework/liangshiwie/rebus
Add AbpRebusEventHandlerStep
pull/11440/head
maliming
4 years ago
committed by
GitHub
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with
45 additions and
15 deletions
-
framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/AbpEventBusRebusModule.cs
-
framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/AbpRebusEventBusOptions.cs
-
framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/AbpRebusEventHandlerStep.cs
-
framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs
|
|
|
@ -1,5 +1,7 @@ |
|
|
|
using Microsoft.Extensions.DependencyInjection; |
|
|
|
using Rebus.Handlers; |
|
|
|
using Rebus.Pipeline; |
|
|
|
using Rebus.Pipeline.Receive; |
|
|
|
using Rebus.ServiceProvider; |
|
|
|
using Volo.Abp.Modularity; |
|
|
|
|
|
|
|
@ -21,6 +23,17 @@ public class AbpEventBusRebusModule : AbpModule |
|
|
|
|
|
|
|
context.Services.AddRebus(configure => |
|
|
|
{ |
|
|
|
configure.Options(options => |
|
|
|
{ |
|
|
|
options.Decorate<IPipeline>(d => |
|
|
|
{ |
|
|
|
var step = new AbpRebusEventHandlerStep(); |
|
|
|
var pipeline = d.Get<IPipeline>(); |
|
|
|
|
|
|
|
return new PipelineStepInjector(pipeline).OnReceive(step, PipelineRelativePosition.After, typeof(ActivateHandlersStep)); |
|
|
|
}); |
|
|
|
}); |
|
|
|
|
|
|
|
preActions.Configure().Configurer?.Invoke(configure); |
|
|
|
return configure; |
|
|
|
}); |
|
|
|
@ -35,4 +48,4 @@ public class AbpEventBusRebusModule : AbpModule |
|
|
|
.GetRequiredService<RebusDistributedEventBus>() |
|
|
|
.Initialize(); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
@ -20,24 +20,13 @@ public class AbpRebusEventBusOptions |
|
|
|
} |
|
|
|
private Action<RebusConfigurer> _configurer; |
|
|
|
|
|
|
|
[NotNull] |
|
|
|
public Func<IBus, Type, object, Task> Publish { |
|
|
|
get => _publish; |
|
|
|
set => _publish = Check.NotNull(value, nameof(value)); |
|
|
|
} |
|
|
|
private Func<IBus, Type, object, Task> _publish; |
|
|
|
public Func<IBus, Type, object, Task> Publish { get; set; } |
|
|
|
|
|
|
|
public AbpRebusEventBusOptions() |
|
|
|
{ |
|
|
|
_publish = DefaultPublish; |
|
|
|
_configurer = DefaultConfigure; |
|
|
|
} |
|
|
|
|
|
|
|
private async Task DefaultPublish(IBus bus, Type eventType, object eventData) |
|
|
|
{ |
|
|
|
await bus.Advanced.Routing.Send(InputQueueName, eventData); |
|
|
|
} |
|
|
|
|
|
|
|
private void DefaultConfigure(RebusConfigurer configure) |
|
|
|
{ |
|
|
|
configure.Subscriptions(s => s.StoreInMemory()); |
|
|
|
|
|
|
|
@ -0,0 +1,22 @@ |
|
|
|
using System; |
|
|
|
using System.Linq; |
|
|
|
using System.Threading.Tasks; |
|
|
|
using Rebus.Messages; |
|
|
|
using Rebus.Pipeline; |
|
|
|
using Rebus.Pipeline.Receive; |
|
|
|
|
|
|
|
namespace Volo.Abp.EventBus.Rebus; |
|
|
|
|
|
|
|
public class AbpRebusEventHandlerStep : IIncomingStep |
|
|
|
{ |
|
|
|
public Task Process(IncomingStepContext context, Func<Task> next) |
|
|
|
{ |
|
|
|
var message = context.Load<Message>(); |
|
|
|
var handlerInvokers = context.Load<HandlerInvokers>().ToList(); |
|
|
|
|
|
|
|
handlerInvokers.RemoveAll(x => x.Handler.GetType() == typeof(RebusDistributedEventHandlerAdapter<object>)); |
|
|
|
context.Save(new HandlerInvokers(message, handlerInvokers)); |
|
|
|
|
|
|
|
return next(); |
|
|
|
} |
|
|
|
} |
|
|
|
@ -148,9 +148,15 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen |
|
|
|
await TriggerHandlersAsync(eventType, eventData); |
|
|
|
} |
|
|
|
|
|
|
|
protected override async Task PublishToEventBusAsync(Type eventType, object eventData) |
|
|
|
protected async override Task PublishToEventBusAsync(Type eventType, object eventData) |
|
|
|
{ |
|
|
|
await AbpRebusEventBusOptions.Publish(Rebus, eventType, eventData); |
|
|
|
if (AbpRebusEventBusOptions.Publish != null) |
|
|
|
{ |
|
|
|
await AbpRebusEventBusOptions.Publish(Rebus, eventType, eventData); |
|
|
|
return; |
|
|
|
} |
|
|
|
|
|
|
|
await Rebus.Publish(eventData); |
|
|
|
} |
|
|
|
|
|
|
|
protected override void AddToUnitOfWork(IUnitOfWork unitOfWork, UnitOfWorkEventRecord eventRecord) |
|
|
|
|