Browse Source

Rebus Integration for Distributed Event Bus

pull/5637/head
liangshiwei 5 years ago
parent
commit
6a7a833c68
  1. 7
      framework/Volo.Abp.sln
  2. 3
      framework/src/Volo.Abp.EventBus.Rebus/FodyWeavers.xml
  3. 30
      framework/src/Volo.Abp.EventBus.Rebus/FodyWeavers.xsd
  4. 26
      framework/src/Volo.Abp.EventBus.Rebus/Volo.Abp.EventBus.Rebus.csproj
  5. 43
      framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/AbpEventBusRebusModule.cs
  6. 50
      framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/AbpEventBusRebusOptions.cs
  7. 176
      framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs
  8. 20
      framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventHandlerAdapter.cs
  9. 1
      nupkg/common.ps1

7
framework/Volo.Abp.sln

@ -347,6 +347,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Volo.Abp.Autofac.WebAssembl
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Volo.Abp.AspNetCore.Authentication.OpenIdConnect", "src\Volo.Abp.AspNetCore.Authentication.OpenIdConnect\Volo.Abp.AspNetCore.Authentication.OpenIdConnect.csproj", "{DEFE3DB2-EA4F-4F90-87FC-B25D64427BC5}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Volo.Abp.EventBus.Rebus", "src\Volo.Abp.EventBus.Rebus\Volo.Abp.EventBus.Rebus.csproj", "{F689967F-1EF1-4D75-8BA4-2F2F3506B1F3}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@ -1033,6 +1035,10 @@ Global
{DEFE3DB2-EA4F-4F90-87FC-B25D64427BC5}.Debug|Any CPU.Build.0 = Debug|Any CPU
{DEFE3DB2-EA4F-4F90-87FC-B25D64427BC5}.Release|Any CPU.ActiveCfg = Release|Any CPU
{DEFE3DB2-EA4F-4F90-87FC-B25D64427BC5}.Release|Any CPU.Build.0 = Release|Any CPU
{F689967F-1EF1-4D75-8BA4-2F2F3506B1F3}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{F689967F-1EF1-4D75-8BA4-2F2F3506B1F3}.Debug|Any CPU.Build.0 = Debug|Any CPU
{F689967F-1EF1-4D75-8BA4-2F2F3506B1F3}.Release|Any CPU.ActiveCfg = Release|Any CPU
{F689967F-1EF1-4D75-8BA4-2F2F3506B1F3}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@ -1208,6 +1214,7 @@ Global
{29CA7471-4E3E-4E75-8B33-001DDF682F01} = {5DF0E140-0513-4D0D-BE2E-3D4D85CD70E6}
{37F89B0B-1C6B-426F-A5EE-676D1956D9E9} = {5DF0E140-0513-4D0D-BE2E-3D4D85CD70E6}
{DEFE3DB2-EA4F-4F90-87FC-B25D64427BC5} = {5DF0E140-0513-4D0D-BE2E-3D4D85CD70E6}
{F689967F-1EF1-4D75-8BA4-2F2F3506B1F3} = {5DF0E140-0513-4D0D-BE2E-3D4D85CD70E6}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {BB97ECF4-9A84-433F-A80B-2A3285BDD1D5}

3
framework/src/Volo.Abp.EventBus.Rebus/FodyWeavers.xml

@ -0,0 +1,3 @@
<Weavers xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="FodyWeavers.xsd">
<ConfigureAwait ContinueOnCapturedContext="false" />
</Weavers>

30
framework/src/Volo.Abp.EventBus.Rebus/FodyWeavers.xsd

@ -0,0 +1,30 @@
<?xml version="1.0" encoding="utf-8"?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
<!-- This file was generated by Fody. Manual changes to this file will be lost when your project is rebuilt. -->
<xs:element name="Weavers">
<xs:complexType>
<xs:all>
<xs:element name="ConfigureAwait" minOccurs="0" maxOccurs="1">
<xs:complexType>
<xs:attribute name="ContinueOnCapturedContext" type="xs:boolean" />
</xs:complexType>
</xs:element>
</xs:all>
<xs:attribute name="VerifyAssembly" type="xs:boolean">
<xs:annotation>
<xs:documentation>'true' to run assembly verification (PEVerify) on the target assembly after all weavers have been executed.</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="VerifyIgnoreCodes" type="xs:string">
<xs:annotation>
<xs:documentation>A comma-separated list of error codes that can be safely ignored in assembly verification.</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="GenerateXsd" type="xs:boolean">
<xs:annotation>
<xs:documentation>'false' to turn off automatic generation of the XML Schema file.</xs:documentation>
</xs:annotation>
</xs:attribute>
</xs:complexType>
</xs:element>
</xs:schema>

26
framework/src/Volo.Abp.EventBus.Rebus/Volo.Abp.EventBus.Rebus.csproj

@ -0,0 +1,26 @@
<Project Sdk="Microsoft.NET.Sdk">
<Import Project="..\..\..\configureawait.props" />
<Import Project="..\..\..\common.props" />
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<AssemblyName>Volo.Abp.EventBus.Rebus</AssemblyName>
<PackageId>Volo.Abp.EventBus.Rebus</PackageId>
<AssetTargetFallback>$(AssetTargetFallback);portable-net45+win8+wp8+wpa81;</AssetTargetFallback>
<GenerateAssemblyConfigurationAttribute>false</GenerateAssemblyConfigurationAttribute>
<GenerateAssemblyCompanyAttribute>false</GenerateAssemblyCompanyAttribute>
<GenerateAssemblyProductAttribute>false</GenerateAssemblyProductAttribute>
<RootNamespace />
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\Volo.Abp.EventBus\Volo.Abp.EventBus.csproj" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Rebus" Version="6.4.1" />
<PackageReference Include="Rebus.ServiceProvider" Version="5.0.6" />
</ItemGroup>
</Project>

43
framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/AbpEventBusRebusModule.cs

@ -0,0 +1,43 @@
using Microsoft.Extensions.DependencyInjection;
using Rebus.Handlers;
using Rebus.ServiceProvider;
using Volo.Abp.Modularity;
namespace Volo.Abp.EventBus.Rebus
{
[DependsOn(
typeof(AbpEventBusModule))]
public class AbpEventBusRebusModule : AbpModule
{
public override void ConfigureServices(ServiceConfigurationContext context)
{
var options = context.Services.ExecutePreConfiguredActions<AbpEventBusRebusOptions>();
context.Services.AddTransient(typeof(IHandleMessages<>), typeof(RebusDistributedEventHandlerAdapter<>));
Configure<AbpEventBusRebusOptions>(rebusOptions =>
{
rebusOptions.Configurer = options.Configurer;
rebusOptions.Publish = options.Publish;
rebusOptions.InputQueueName = options.InputQueueName;
});
context.Services.AddRebus(configurer =>
{
options.Configurer?.Invoke(configurer);
return configurer;
});
}
public override void OnApplicationInitialization(ApplicationInitializationContext context)
{
context.ServiceProvider.UseRebus();
context
.ServiceProvider
.GetRequiredService<RebusDistributedEventBus>()
.Initialize();
}
}
}

50
framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/AbpEventBusRebusOptions.cs

@ -0,0 +1,50 @@
using System;
using System.Threading.Tasks;
using JetBrains.Annotations;
using Rebus.Bus;
using Rebus.Config;
using Rebus.Persistence.InMem;
using Rebus.Transport.InMem;
namespace Volo.Abp.EventBus.Rebus
{
public class AbpEventBusRebusOptions
{
[NotNull]
public string InputQueueName { get; set; }
[NotNull]
public Action<RebusConfigurer> Configurer
{
get => _configurer;
set => _configurer = Check.NotNull(value, nameof(value));
}
private Action<RebusConfigurer> _configurer;
[NotNull]
public Func<IBus, Type, object, Task> Publish
{
get => _publish;
set => _publish = Check.NotNull(value, nameof(value));
}
private Func<IBus, Type, object, Task> _publish;
public AbpEventBusRebusOptions()
{
_publish = DefaultPublish;
_configurer = DefaultConfigurer;
}
private async Task DefaultPublish(IBus bus, Type eventType, object eventData)
{
await bus.Advanced.Routing.Send(InputQueueName, eventData);
}
private void DefaultConfigurer(RebusConfigurer configurer)
{
configurer.Subscriptions(s => s.StoreInMemory());
configurer.Transport(t => t.UseInMemoryTransport(new InMemNetwork(), InputQueueName));
}
}
}

176
framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs

@ -0,0 +1,176 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using Rebus.Bus;
using Volo.Abp.DependencyInjection;
using Volo.Abp.EventBus.Distributed;
using Volo.Abp.MultiTenancy;
using Volo.Abp.Threading;
namespace Volo.Abp.EventBus.Rebus
{
[Dependency(ReplaceServices = true)]
[ExposeServices(typeof(IDistributedEventBus), typeof(RebusDistributedEventBus))]
public class RebusDistributedEventBus : EventBusBase, IDistributedEventBus, ISingletonDependency
{
protected IBus Rebus { get; }
//TODO: Accessing to the List<IEventHandlerFactory> may not be thread-safe!
protected ConcurrentDictionary<Type, List<IEventHandlerFactory>> HandlerFactories { get; }
protected ConcurrentDictionary<string, Type> EventTypes { get; }
protected AbpDistributedEventBusOptions AbpDistributedEventBusOptions { get; }
protected AbpEventBusRebusOptions AbpEventBusRebusOptions { get; }
public RebusDistributedEventBus(
IServiceScopeFactory serviceScopeFactory,
ICurrentTenant currentTenant,
IBus rebus,
IOptions<AbpDistributedEventBusOptions> abpDistributedEventBusOptions,
IOptions<AbpEventBusRebusOptions> abpEventBusRebusOptions) :
base(serviceScopeFactory, currentTenant)
{
Rebus = rebus;
AbpEventBusRebusOptions = abpEventBusRebusOptions.Value;
AbpDistributedEventBusOptions = abpDistributedEventBusOptions.Value;
HandlerFactories = new ConcurrentDictionary<Type, List<IEventHandlerFactory>>();
EventTypes = new ConcurrentDictionary<string, Type>();
}
public void Initialize()
{
SubscribeHandlers(AbpDistributedEventBusOptions.Handlers);
}
public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory)
{
var handlerFactories = GetOrCreateHandlerFactories(eventType);
if (factory.IsInFactories(handlerFactories))
{
return NullDisposable.Instance;
}
handlerFactories.Add(factory);
if (handlerFactories.Count == 1) //TODO: Multi-threading!
{
Rebus.Subscribe(eventType);
}
return new EventHandlerFactoryUnregistrar(this, eventType, factory);
}
public override void Unsubscribe<TEvent>(Func<TEvent, Task> action)
{
Check.NotNull(action, nameof(action));
GetOrCreateHandlerFactories(typeof(TEvent))
.Locking(factories =>
{
factories.RemoveAll(
factory =>
{
if (!(factory is SingleInstanceHandlerFactory singleInstanceFactory))
{
return false;
}
if (!(singleInstanceFactory.HandlerInstance is ActionEventHandler<TEvent> actionHandler))
{
return false;
}
return actionHandler.Action == action;
});
});
Rebus.Unsubscribe(typeof(TEvent));
}
public override void Unsubscribe(Type eventType, IEventHandler handler)
{
GetOrCreateHandlerFactories(eventType)
.Locking(factories =>
{
factories.RemoveAll(
factory =>
factory is SingleInstanceHandlerFactory handlerFactory &&
handlerFactory.HandlerInstance == handler
);
});
Rebus.Unsubscribe(eventType);
}
public override void Unsubscribe(Type eventType, IEventHandlerFactory factory)
{
GetOrCreateHandlerFactories(eventType).Locking(factories => factories.Remove(factory));
Rebus.Unsubscribe(eventType);
}
public override void UnsubscribeAll(Type eventType)
{
GetOrCreateHandlerFactories(eventType).Locking(factories => factories.Clear());
Rebus.Unsubscribe(eventType);
}
public IDisposable Subscribe<TEvent>(IDistributedEventHandler<TEvent> handler) where TEvent : class
{
return Subscribe(typeof(TEvent), handler);
}
public override async Task PublishAsync(Type eventType, object eventData)
{
await AbpEventBusRebusOptions.Publish(Rebus, eventType, eventData);
}
private List<IEventHandlerFactory> GetOrCreateHandlerFactories(Type eventType)
{
return HandlerFactories.GetOrAdd(
eventType,
type =>
{
var eventName = EventNameAttribute.GetNameOrDefault(type);
EventTypes[eventName] = type;
return new List<IEventHandlerFactory>();
}
);
}
protected override IEnumerable<EventTypeWithEventHandlerFactories> GetHandlerFactories(Type eventType)
{
var handlerFactoryList = new List<EventTypeWithEventHandlerFactories>();
foreach (var handlerFactory in HandlerFactories.Where(hf => ShouldTriggerEventForHandler(eventType, hf.Key))
)
{
handlerFactoryList.Add(
new EventTypeWithEventHandlerFactories(handlerFactory.Key, handlerFactory.Value));
}
return handlerFactoryList.ToArray();
}
private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType)
{
//Should trigger same type
if (handlerEventType == targetEventType)
{
return true;
}
//Should trigger for inherited types
if (handlerEventType.IsAssignableFrom(targetEventType))
{
return true;
}
return false;
}
}
}

20
framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventHandlerAdapter.cs

@ -0,0 +1,20 @@
using System.Threading.Tasks;
using Rebus.Handlers;
namespace Volo.Abp.EventBus.Rebus
{
public class RebusDistributedEventHandlerAdapter<TEventData> : IHandleMessages<TEventData>
{
protected RebusDistributedEventBus RebusDistributedEventBus { get; }
public RebusDistributedEventHandlerAdapter(RebusDistributedEventBus rebusDistributedEventBus)
{
RebusDistributedEventBus = rebusDistributedEventBus;
}
public async Task Handle(TEventData message)
{
await RebusDistributedEventBus.TriggerHandlersAsync(typeof(TEventData), message);
}
}
}

1
nupkg/common.ps1

@ -92,6 +92,7 @@ $projects = (
"framework/src/Volo.Abp.EventBus",
"framework/src/Volo.Abp.EventBus.RabbitMQ",
"framework/src/Volo.Abp.EventBus.Kafka",
"framework/src/Volo.Abp.EventBus.Rebus",
"framework/src/Volo.Abp.Features",
"framework/src/Volo.Abp.FluentValidation",
"framework/src/Volo.Abp.GlobalFeatures",

Loading…
Cancel
Save