Browse Source

Fixed #1056: Consume twice when published a message by rabbitMq distributed event bus.

pull/1296/head
Halil İbrahim Kalkan 7 years ago
parent
commit
70a371ca7b
  1. 9
      framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs
  2. 1
      framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventBusBase.cs
  3. 4
      framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/IEventHandlerFactory.cs
  4. 9
      framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/IocEventHandlerFactory.cs
  5. 7
      framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/LocalEventBus.cs
  6. 10
      framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/SingleInstanceHandlerFactory.cs
  7. 50
      framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/TransientEventHandlerFactory.cs

9
framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs

@ -49,8 +49,6 @@ namespace Volo.Abp.EventBus.RabbitMq
HandlerFactories = new ConcurrentDictionary<Type, List<IEventHandlerFactory>>();
EventTypes = new ConcurrentDictionary<string, Type>();
Initialize();
}
public void Initialize()
@ -97,7 +95,12 @@ namespace Volo.Abp.EventBus.RabbitMq
public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory)
{
var handlerFactories = GetOrCreateHandlerFactories(eventType);
if (factory.IsInFactories(handlerFactories))
{
return NullDisposable.Instance;
}
handlerFactories.Add(factory);
if (handlerFactories.Count == 1) //TODO: Multi-threading!

1
framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventBusBase.cs

@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.ComponentModel.Design;
using System.Linq;
using System.Reflection;
using System.Runtime.CompilerServices;

4
framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/IEventHandlerFactory.cs

@ -1,3 +1,5 @@
using System.Collections.Generic;
namespace Volo.Abp.EventBus
{
/// <summary>
@ -10,5 +12,7 @@ namespace Volo.Abp.EventBus
/// </summary>
/// <returns>The event handler</returns>
IEventHandlerDisposeWrapper GetHandler();
bool IsInFactories(List<IEventHandlerFactory> handlerFactories);
}
}

9
framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/IocEventHandlerFactory.cs

@ -1,4 +1,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
using Microsoft.Extensions.DependencyInjection;
using Volo.Abp.DependencyInjection;
@ -33,6 +35,13 @@ namespace Volo.Abp.EventBus
);
}
public bool IsInFactories(List<IEventHandlerFactory> handlerFactories)
{
return handlerFactories
.OfType<IocEventHandlerFactory>()
.Any(f => f.HandlerType == HandlerType);
}
public void Dispose()
{

7
framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/LocalEventBus.cs

@ -49,7 +49,12 @@ namespace Volo.Abp.EventBus.Local
{
GetOrCreateHandlerFactories(eventType)
.Locking(factories =>
factories.Add(factory)
{
if (!factory.IsInFactories(factories))
{
factories.Add(factory);
}
}
);
return new EventHandlerFactoryUnregistrar(this, eventType, factory);

10
framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/SingleInstanceHandlerFactory.cs

@ -1,3 +1,6 @@
using System.Collections.Generic;
using System.Linq;
namespace Volo.Abp.EventBus
{
/// <summary>
@ -27,5 +30,12 @@ namespace Volo.Abp.EventBus
{
return new EventHandlerDisposeWrapper(HandlerInstance);
}
public bool IsInFactories(List<IEventHandlerFactory> handlerFactories)
{
return handlerFactories
.OfType<SingleInstanceHandlerFactory>()
.Any(f => f.HandlerInstance == HandlerInstance);
}
}
}

50
framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/TransientEventHandlerFactory.cs

@ -1,4 +1,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
namespace Volo.Abp.EventBus
{
@ -7,22 +9,62 @@ namespace Volo.Abp.EventBus
/// by a transient instance object.
/// </summary>
/// <remarks>
/// This class always creates a new transient instance of handler.
/// This class always creates a new transient instance of the handler type.
/// </remarks>
public class TransientEventHandlerFactory<THandler> : IEventHandlerFactory
public class TransientEventHandlerFactory<THandler> : TransientEventHandlerFactory, IEventHandlerFactory
where THandler : IEventHandler, new()
{
public TransientEventHandlerFactory()
: base(typeof(THandler))
{
}
protected override IEventHandler CreateHandler()
{
return new THandler();
}
}
/// <summary>
/// This <see cref="IEventHandlerFactory"/> implementation is used to handle events
/// by a transient instance object.
/// </summary>
/// <remarks>
/// This class always creates a new transient instance of the handler type.
/// </remarks>
public class TransientEventHandlerFactory : IEventHandlerFactory
{
public Type HandlerType { get; }
public TransientEventHandlerFactory(Type handlerType)
{
HandlerType = handlerType;
}
/// <summary>
/// Creates a new instance of the handler object.
/// </summary>
/// <returns>The handler object</returns>
public IEventHandlerDisposeWrapper GetHandler()
public virtual IEventHandlerDisposeWrapper GetHandler()
{
var handler = new THandler();
var handler = CreateHandler();
return new EventHandlerDisposeWrapper(
handler,
() => (handler as IDisposable)?.Dispose()
);
}
public bool IsInFactories(List<IEventHandlerFactory> handlerFactories)
{
return handlerFactories
.OfType<TransientEventHandlerFactory>()
.Any(f => f.HandlerType == HandlerType);
}
protected virtual IEventHandler CreateHandler()
{
return (IEventHandler) Activator.CreateInstance(HandlerType);
}
}
}
Loading…
Cancel
Save