From 6a7a833c68adc8c3684cfff9493cb8ed05478df5 Mon Sep 17 00:00:00 2001 From: liangshiwei Date: Tue, 29 Sep 2020 18:50:47 +0800 Subject: [PATCH] Rebus Integration for Distributed Event Bus --- framework/Volo.Abp.sln | 7 + .../Volo.Abp.EventBus.Rebus/FodyWeavers.xml | 3 + .../Volo.Abp.EventBus.Rebus/FodyWeavers.xsd | 30 +++ .../Volo.Abp.EventBus.Rebus.csproj | 26 +++ .../EventBus/Rebus/AbpEventBusRebusModule.cs | 43 +++++ .../EventBus/Rebus/AbpEventBusRebusOptions.cs | 50 +++++ .../Rebus/RebusDistributedEventBus.cs | 176 ++++++++++++++++++ .../RebusDistributedEventHandlerAdapter.cs | 20 ++ nupkg/common.ps1 | 1 + 9 files changed, 356 insertions(+) create mode 100644 framework/src/Volo.Abp.EventBus.Rebus/FodyWeavers.xml create mode 100644 framework/src/Volo.Abp.EventBus.Rebus/FodyWeavers.xsd create mode 100644 framework/src/Volo.Abp.EventBus.Rebus/Volo.Abp.EventBus.Rebus.csproj create mode 100644 framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/AbpEventBusRebusModule.cs create mode 100644 framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/AbpEventBusRebusOptions.cs create mode 100644 framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs create mode 100644 framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventHandlerAdapter.cs diff --git a/framework/Volo.Abp.sln b/framework/Volo.Abp.sln index 503b42c7e1..ef7a66ee00 100644 --- a/framework/Volo.Abp.sln +++ b/framework/Volo.Abp.sln @@ -347,6 +347,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Volo.Abp.Autofac.WebAssembl EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Volo.Abp.AspNetCore.Authentication.OpenIdConnect", "src\Volo.Abp.AspNetCore.Authentication.OpenIdConnect\Volo.Abp.AspNetCore.Authentication.OpenIdConnect.csproj", "{DEFE3DB2-EA4F-4F90-87FC-B25D64427BC5}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Volo.Abp.EventBus.Rebus", "src\Volo.Abp.EventBus.Rebus\Volo.Abp.EventBus.Rebus.csproj", "{F689967F-1EF1-4D75-8BA4-2F2F3506B1F3}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -1033,6 +1035,10 @@ Global {DEFE3DB2-EA4F-4F90-87FC-B25D64427BC5}.Debug|Any CPU.Build.0 = Debug|Any CPU {DEFE3DB2-EA4F-4F90-87FC-B25D64427BC5}.Release|Any CPU.ActiveCfg = Release|Any CPU {DEFE3DB2-EA4F-4F90-87FC-B25D64427BC5}.Release|Any CPU.Build.0 = Release|Any CPU + {F689967F-1EF1-4D75-8BA4-2F2F3506B1F3}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {F689967F-1EF1-4D75-8BA4-2F2F3506B1F3}.Debug|Any CPU.Build.0 = Debug|Any CPU + {F689967F-1EF1-4D75-8BA4-2F2F3506B1F3}.Release|Any CPU.ActiveCfg = Release|Any CPU + {F689967F-1EF1-4D75-8BA4-2F2F3506B1F3}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -1208,6 +1214,7 @@ Global {29CA7471-4E3E-4E75-8B33-001DDF682F01} = {5DF0E140-0513-4D0D-BE2E-3D4D85CD70E6} {37F89B0B-1C6B-426F-A5EE-676D1956D9E9} = {5DF0E140-0513-4D0D-BE2E-3D4D85CD70E6} {DEFE3DB2-EA4F-4F90-87FC-B25D64427BC5} = {5DF0E140-0513-4D0D-BE2E-3D4D85CD70E6} + {F689967F-1EF1-4D75-8BA4-2F2F3506B1F3} = {5DF0E140-0513-4D0D-BE2E-3D4D85CD70E6} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {BB97ECF4-9A84-433F-A80B-2A3285BDD1D5} diff --git a/framework/src/Volo.Abp.EventBus.Rebus/FodyWeavers.xml b/framework/src/Volo.Abp.EventBus.Rebus/FodyWeavers.xml new file mode 100644 index 0000000000..be0de3a908 --- /dev/null +++ b/framework/src/Volo.Abp.EventBus.Rebus/FodyWeavers.xml @@ -0,0 +1,3 @@ + + + \ No newline at end of file diff --git a/framework/src/Volo.Abp.EventBus.Rebus/FodyWeavers.xsd b/framework/src/Volo.Abp.EventBus.Rebus/FodyWeavers.xsd new file mode 100644 index 0000000000..3f3946e282 --- /dev/null +++ b/framework/src/Volo.Abp.EventBus.Rebus/FodyWeavers.xsd @@ -0,0 +1,30 @@ + + + + + + + + + + + + + + + 'true' to run assembly verification (PEVerify) on the target assembly after all weavers have been executed. + + + + + A comma-separated list of error codes that can be safely ignored in assembly verification. + + + + + 'false' to turn off automatic generation of the XML Schema file. + + + + + \ No newline at end of file diff --git a/framework/src/Volo.Abp.EventBus.Rebus/Volo.Abp.EventBus.Rebus.csproj b/framework/src/Volo.Abp.EventBus.Rebus/Volo.Abp.EventBus.Rebus.csproj new file mode 100644 index 0000000000..0a01911990 --- /dev/null +++ b/framework/src/Volo.Abp.EventBus.Rebus/Volo.Abp.EventBus.Rebus.csproj @@ -0,0 +1,26 @@ + + + + + + + netstandard2.0 + Volo.Abp.EventBus.Rebus + Volo.Abp.EventBus.Rebus + $(AssetTargetFallback);portable-net45+win8+wp8+wpa81; + false + false + false + + + + + + + + + + + + + diff --git a/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/AbpEventBusRebusModule.cs b/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/AbpEventBusRebusModule.cs new file mode 100644 index 0000000000..ec9dfe6b07 --- /dev/null +++ b/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/AbpEventBusRebusModule.cs @@ -0,0 +1,43 @@ +using Microsoft.Extensions.DependencyInjection; +using Rebus.Handlers; +using Rebus.ServiceProvider; +using Volo.Abp.Modularity; + +namespace Volo.Abp.EventBus.Rebus +{ + [DependsOn( + typeof(AbpEventBusModule))] + public class AbpEventBusRebusModule : AbpModule + { + public override void ConfigureServices(ServiceConfigurationContext context) + { + var options = context.Services.ExecutePreConfiguredActions(); + + context.Services.AddTransient(typeof(IHandleMessages<>), typeof(RebusDistributedEventHandlerAdapter<>)); + + Configure(rebusOptions => + { + rebusOptions.Configurer = options.Configurer; + rebusOptions.Publish = options.Publish; + rebusOptions.InputQueueName = options.InputQueueName; + }); + + context.Services.AddRebus(configurer => + { + options.Configurer?.Invoke(configurer); + return configurer; + }); + + } + + public override void OnApplicationInitialization(ApplicationInitializationContext context) + { + context.ServiceProvider.UseRebus(); + + context + .ServiceProvider + .GetRequiredService() + .Initialize(); + } + } +} diff --git a/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/AbpEventBusRebusOptions.cs b/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/AbpEventBusRebusOptions.cs new file mode 100644 index 0000000000..d6e8a2c68a --- /dev/null +++ b/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/AbpEventBusRebusOptions.cs @@ -0,0 +1,50 @@ +using System; +using System.Threading.Tasks; +using JetBrains.Annotations; +using Rebus.Bus; +using Rebus.Config; +using Rebus.Persistence.InMem; +using Rebus.Transport.InMem; + +namespace Volo.Abp.EventBus.Rebus +{ + public class AbpEventBusRebusOptions + { + [NotNull] + public string InputQueueName { get; set; } + + [NotNull] + public Action Configurer + { + get => _configurer; + set => _configurer = Check.NotNull(value, nameof(value)); + } + private Action _configurer; + + + [NotNull] + public Func Publish + { + get => _publish; + set => _publish = Check.NotNull(value, nameof(value)); + } + private Func _publish; + + public AbpEventBusRebusOptions() + { + _publish = DefaultPublish; + _configurer = DefaultConfigurer; + } + + private async Task DefaultPublish(IBus bus, Type eventType, object eventData) + { + await bus.Advanced.Routing.Send(InputQueueName, eventData); + } + + private void DefaultConfigurer(RebusConfigurer configurer) + { + configurer.Subscriptions(s => s.StoreInMemory()); + configurer.Transport(t => t.UseInMemoryTransport(new InMemNetwork(), InputQueueName)); + } + } +} diff --git a/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs new file mode 100644 index 0000000000..9a30281617 --- /dev/null +++ b/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs @@ -0,0 +1,176 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Options; +using Rebus.Bus; +using Volo.Abp.DependencyInjection; +using Volo.Abp.EventBus.Distributed; +using Volo.Abp.MultiTenancy; +using Volo.Abp.Threading; + +namespace Volo.Abp.EventBus.Rebus +{ + [Dependency(ReplaceServices = true)] + [ExposeServices(typeof(IDistributedEventBus), typeof(RebusDistributedEventBus))] + public class RebusDistributedEventBus : EventBusBase, IDistributedEventBus, ISingletonDependency + { + protected IBus Rebus { get; } + + //TODO: Accessing to the List may not be thread-safe! + protected ConcurrentDictionary> HandlerFactories { get; } + protected ConcurrentDictionary EventTypes { get; } + protected AbpDistributedEventBusOptions AbpDistributedEventBusOptions { get; } + protected AbpEventBusRebusOptions AbpEventBusRebusOptions { get; } + + public RebusDistributedEventBus( + IServiceScopeFactory serviceScopeFactory, + ICurrentTenant currentTenant, + IBus rebus, + IOptions abpDistributedEventBusOptions, + IOptions abpEventBusRebusOptions) : + base(serviceScopeFactory, currentTenant) + { + Rebus = rebus; + AbpEventBusRebusOptions = abpEventBusRebusOptions.Value; + AbpDistributedEventBusOptions = abpDistributedEventBusOptions.Value; + + HandlerFactories = new ConcurrentDictionary>(); + EventTypes = new ConcurrentDictionary(); + } + + public void Initialize() + { + SubscribeHandlers(AbpDistributedEventBusOptions.Handlers); + } + + public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory) + { + var handlerFactories = GetOrCreateHandlerFactories(eventType); + + if (factory.IsInFactories(handlerFactories)) + { + return NullDisposable.Instance; + } + + handlerFactories.Add(factory); + + if (handlerFactories.Count == 1) //TODO: Multi-threading! + { + Rebus.Subscribe(eventType); + } + + return new EventHandlerFactoryUnregistrar(this, eventType, factory); + } + + public override void Unsubscribe(Func action) + { + Check.NotNull(action, nameof(action)); + + GetOrCreateHandlerFactories(typeof(TEvent)) + .Locking(factories => + { + factories.RemoveAll( + factory => + { + if (!(factory is SingleInstanceHandlerFactory singleInstanceFactory)) + { + return false; + } + + if (!(singleInstanceFactory.HandlerInstance is ActionEventHandler actionHandler)) + { + return false; + } + + return actionHandler.Action == action; + }); + }); + + Rebus.Unsubscribe(typeof(TEvent)); + } + + public override void Unsubscribe(Type eventType, IEventHandler handler) + { + GetOrCreateHandlerFactories(eventType) + .Locking(factories => + { + factories.RemoveAll( + factory => + factory is SingleInstanceHandlerFactory handlerFactory && + handlerFactory.HandlerInstance == handler + ); + }); + + Rebus.Unsubscribe(eventType); + } + + public override void Unsubscribe(Type eventType, IEventHandlerFactory factory) + { + GetOrCreateHandlerFactories(eventType).Locking(factories => factories.Remove(factory)); + Rebus.Unsubscribe(eventType); + } + + public override void UnsubscribeAll(Type eventType) + { + GetOrCreateHandlerFactories(eventType).Locking(factories => factories.Clear()); + Rebus.Unsubscribe(eventType); + } + + public IDisposable Subscribe(IDistributedEventHandler handler) where TEvent : class + { + return Subscribe(typeof(TEvent), handler); + } + + public override async Task PublishAsync(Type eventType, object eventData) + { + await AbpEventBusRebusOptions.Publish(Rebus, eventType, eventData); + } + + private List GetOrCreateHandlerFactories(Type eventType) + { + return HandlerFactories.GetOrAdd( + eventType, + type => + { + var eventName = EventNameAttribute.GetNameOrDefault(type); + EventTypes[eventName] = type; + return new List(); + } + ); + } + + protected override IEnumerable GetHandlerFactories(Type eventType) + { + var handlerFactoryList = new List(); + + foreach (var handlerFactory in HandlerFactories.Where(hf => ShouldTriggerEventForHandler(eventType, hf.Key)) + ) + { + handlerFactoryList.Add( + new EventTypeWithEventHandlerFactories(handlerFactory.Key, handlerFactory.Value)); + } + + return handlerFactoryList.ToArray(); + } + + private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType) + { + //Should trigger same type + if (handlerEventType == targetEventType) + { + return true; + } + + //Should trigger for inherited types + if (handlerEventType.IsAssignableFrom(targetEventType)) + { + return true; + } + + return false; + } + } +} diff --git a/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventHandlerAdapter.cs b/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventHandlerAdapter.cs new file mode 100644 index 0000000000..005226b885 --- /dev/null +++ b/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventHandlerAdapter.cs @@ -0,0 +1,20 @@ +using System.Threading.Tasks; +using Rebus.Handlers; + +namespace Volo.Abp.EventBus.Rebus +{ + public class RebusDistributedEventHandlerAdapter : IHandleMessages + { + protected RebusDistributedEventBus RebusDistributedEventBus { get; } + + public RebusDistributedEventHandlerAdapter(RebusDistributedEventBus rebusDistributedEventBus) + { + RebusDistributedEventBus = rebusDistributedEventBus; + } + + public async Task Handle(TEventData message) + { + await RebusDistributedEventBus.TriggerHandlersAsync(typeof(TEventData), message); + } + } +} diff --git a/nupkg/common.ps1 b/nupkg/common.ps1 index e5847e8a7c..dcaead0715 100644 --- a/nupkg/common.ps1 +++ b/nupkg/common.ps1 @@ -92,6 +92,7 @@ $projects = ( "framework/src/Volo.Abp.EventBus", "framework/src/Volo.Abp.EventBus.RabbitMQ", "framework/src/Volo.Abp.EventBus.Kafka", + "framework/src/Volo.Abp.EventBus.Rebus", "framework/src/Volo.Abp.Features", "framework/src/Volo.Abp.FluentValidation", "framework/src/Volo.Abp.GlobalFeatures",