Browse Source

Disabled distributed event bus.

pull/603/head
Halil ibrahim Kalkan 7 years ago
parent
commit
23f4f1e65d
  1. 6
      framework/src/Volo.Abp.EventBus.Distributed.RabbitMQ/Volo/Abp/EventBus/Distributed/RabbitMq/RabbitMqDistributedEventBus.cs
  2. 9
      framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/AbpEventBusModule.cs
  3. 34
      framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus.cs
  4. 2
      framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventBusBase.cs
  5. 16
      framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/IocEventHandlerFactory.cs
  6. 4
      framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/LocalEventBus.cs

6
framework/src/Volo.Abp.EventBus.Distributed.RabbitMQ/Volo/Abp/EventBus/Distributed/RabbitMq/RabbitMqDistributedEventBus.cs

@ -37,12 +37,12 @@ namespace Volo.Abp.EventBus.Distributed.RabbitMq
IConnectionPool connectionPool,
IRabbitMqSerializer serializer,
IServiceProvider serviceProvider,
DistributedEventBusOptions distributedEventBusOptions)
IOptions<DistributedEventBusOptions> distributedEventBusOptions)
{
ConnectionPool = connectionPool;
Serializer = serializer;
ServiceProvider = serviceProvider;
DistributedEventBusOptions = distributedEventBusOptions;
DistributedEventBusOptions = distributedEventBusOptions.Value;
RabbitMqDistributedEventBusOptions = options.Value;
HandlerFactories = new ConcurrentDictionary<Type, List<IEventHandlerFactory>>();
@ -52,7 +52,7 @@ namespace Volo.Abp.EventBus.Distributed.RabbitMq
Subscribe(DistributedEventBusOptions.Handlers);
}
public virtual void Subscribe(ITypeList<IEventHandler> handlers)
protected virtual void Subscribe(ITypeList<IEventHandler> handlers)
{
foreach (var handler in handlers)
{

9
framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/AbpEventBusModule.cs

@ -27,10 +27,11 @@ namespace Volo.Abp.EventBus
{
localHandlers.Add(context.ImplementationType);
}
else if (ReflectionHelper.IsAssignableToGenericType(context.ImplementationType, typeof(IDistributedEventHandler<>)))
{
distributedHandlers.Add(context.ImplementationType);
}
//TODO: Distrbiuted event bus is disabled since it's not properly working yet for v0.8 release
//else if (ReflectionHelper.IsAssignableToGenericType(context.ImplementationType, typeof(IDistributedEventHandler<>)))
//{
// distributedHandlers.Add(context.ImplementationType);
//}
});
services.Configure<LocalEventBusOptions>(options =>

34
framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus.cs

@ -1,5 +1,8 @@
using System;
using System.Reflection;
using System.Threading.Tasks;
using Microsoft.Extensions.Options;
using Volo.Abp.Collections;
using Volo.Abp.DependencyInjection;
using Volo.Abp.EventBus.Local;
@ -10,10 +13,39 @@ namespace Volo.Abp.EventBus.Distributed
public class LocalDistributedEventBus : IDistributedEventBus, ITransientDependency
{
private readonly ILocalEventBus _localEventBus;
protected IServiceProvider ServiceProvider { get; }
protected DistributedEventBusOptions DistributedEventBusOptions { get; }
public LocalDistributedEventBus(ILocalEventBus localEventBus)
public LocalDistributedEventBus(
ILocalEventBus localEventBus,
IServiceProvider serviceProvider,
IOptions<DistributedEventBusOptions> distributedEventBusOptions)
{
_localEventBus = localEventBus;
ServiceProvider = serviceProvider;
DistributedEventBusOptions = distributedEventBusOptions.Value;
Subscribe(distributedEventBusOptions.Value.Handlers);
}
public virtual void Subscribe(ITypeList<IEventHandler> handlers)
{
foreach (var handler in handlers)
{
var interfaces = handler.GetInterfaces();
foreach (var @interface in interfaces)
{
if (!typeof(IEventHandler).GetTypeInfo().IsAssignableFrom(@interface))
{
continue;
}
var genericArgs = @interface.GetGenericArguments();
if (genericArgs.Length == 1)
{
Subscribe(genericArgs[0], new IocEventHandlerFactory(ServiceProvider, handler));
}
}
}
}
public IDisposable Subscribe<TEvent>(Func<TEvent, Task> action) where TEvent : class

2
framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventBusBase.cs

@ -105,7 +105,7 @@ namespace Volo.Abp.EventBus.Local
foreach (var handlerFactories in GetHandlerFactories(eventType))
{
foreach (var handlerFactory in handlerFactories.EventHandlerFactories)
foreach (var handlerFactory in handlerFactories.EventHandlerFactories.ToArray()) //TODO: ToArray should not be needed!
{
await TriggerHandlerAsync(handlerFactory, handlerFactories.EventType, eventData, exceptions);
}

16
framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/IocEventHandlerFactory.cs

@ -7,16 +7,19 @@ namespace Volo.Abp.EventBus
/// This <see cref="IEventHandlerFactory"/> implementation is used to get/release
/// handlers using Ioc.
/// </summary>
public class IocEventHandlerFactory : IEventHandlerFactory
public class IocEventHandlerFactory : IEventHandlerFactory, IDisposable
{
public Type HandlerType { get; }
private readonly IServiceProvider _serviceProvider;
protected IServiceScope ServiceScope { get; }
//TODO: Consider to inject IServiceScopeFactory instead
public IocEventHandlerFactory(IServiceProvider serviceProvider, Type handlerType)
{
_serviceProvider = serviceProvider;
HandlerType = handlerType;
ServiceScope = serviceProvider
.GetRequiredService<IServiceScopeFactory>()
.CreateScope();
}
/// <summary>
@ -25,11 +28,16 @@ namespace Volo.Abp.EventBus
/// <returns>Resolved handler object</returns>
public IEventHandlerDisposeWrapper GetHandler()
{
var scope = _serviceProvider.CreateScope();
var scope = ServiceScope.ServiceProvider.CreateScope();
return new EventHandlerDisposeWrapper(
(IEventHandler) scope.ServiceProvider.GetRequiredService(HandlerType),
() => scope.Dispose()
);
}
public void Dispose()
{
ServiceScope.Dispose();
}
}
}

4
framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/LocalEventBus.cs

@ -67,7 +67,9 @@ namespace Volo.Abp.EventBus.Local
public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory)
{
GetOrCreateHandlerFactories(eventType)
.Locking(factories => factories.Add(factory));
.Locking(factories =>
factories.Add(factory)
);
return new EventHandlerFactoryUnregistrar(this, eventType, factory);
}

Loading…
Cancel
Save