From 08d03e08d7609c9b4a5ebc3ab42be641889f653a Mon Sep 17 00:00:00 2001 From: Sebastian Date: Sat, 4 Feb 2017 21:03:44 +0100 Subject: [PATCH] Queue name fix --- .../RabbitMqEventBus.cs | 24 ++++++++++++++----- .../CQRS/Events/IEventStream.cs | 2 +- .../CQRS/Events/InMemoryEventBus.cs | 2 +- src/Squidex/Config/Domain/EventBusModule.cs | 4 +++- 4 files changed, 23 insertions(+), 9 deletions(-) diff --git a/src/Squidex.Infrastructure.RabbitMq/RabbitMqEventBus.cs b/src/Squidex.Infrastructure.RabbitMq/RabbitMqEventBus.cs index c7fce966c..faaec0ec1 100644 --- a/src/Squidex.Infrastructure.RabbitMq/RabbitMqEventBus.cs +++ b/src/Squidex.Infrastructure.RabbitMq/RabbitMqEventBus.cs @@ -19,16 +19,19 @@ namespace Squidex.Infrastructure.RabbitMq public sealed class RabbitMqEventBus : DisposableObject, IEventPublisher, IEventStream, IExternalSystem { private readonly bool isPersistent; + private readonly string queueName; private const string Exchange = "Squidex"; private readonly ConnectionFactory connectionFactory; private readonly Lazy connection; private readonly Lazy channel; private EventingBasicConsumer consumer; - public RabbitMqEventBus(ConnectionFactory connectionFactory, bool isPersistent) + public RabbitMqEventBus(ConnectionFactory connectionFactory, bool isPersistent, string queueName) { Guard.NotNull(connectionFactory, nameof(connectionFactory)); + this.queueName = queueName; + this.connectionFactory = connectionFactory; connection = new Lazy(connectionFactory.CreateConnection); @@ -70,7 +73,7 @@ namespace Squidex.Infrastructure.RabbitMq } } - public void Connect(string queueName, Action received) + public void Connect(string queuePrefix, Action received) { ThrowIfDisposed(); ThrowIfConnected(); @@ -81,10 +84,19 @@ namespace Squidex.Infrastructure.RabbitMq ThrowIfConnected(); - queueName = $"{queueName}_{Environment.MachineName}"; + var fullQueueName = $"{queuePrefix}_"; + + if (!string.IsNullOrWhiteSpace(queueName)) + { + fullQueueName += queueName; + } + else + { + fullQueueName += Environment.MachineName; + } - currentChannel.QueueDeclare(queueName, isPersistent, false, !isPersistent); - currentChannel.QueueBind(queueName, Exchange, string.Empty); + currentChannel.QueueDeclare(fullQueueName, isPersistent, false, !isPersistent); + currentChannel.QueueBind(fullQueueName, Exchange, string.Empty); consumer = new EventingBasicConsumer(currentChannel); @@ -95,7 +107,7 @@ namespace Squidex.Infrastructure.RabbitMq received(eventData); }; - currentChannel.BasicConsume(queueName, true, consumer); + currentChannel.BasicConsume(fullQueueName, true, consumer); } } diff --git a/src/Squidex.Infrastructure/CQRS/Events/IEventStream.cs b/src/Squidex.Infrastructure/CQRS/Events/IEventStream.cs index 3323e0bbe..5b311bfea 100644 --- a/src/Squidex.Infrastructure/CQRS/Events/IEventStream.cs +++ b/src/Squidex.Infrastructure/CQRS/Events/IEventStream.cs @@ -12,6 +12,6 @@ namespace Squidex.Infrastructure.CQRS.Events { public interface IEventStream : IDisposable { - void Connect(string queueName, Action received); + void Connect(string queuePrefix, Action received); } } diff --git a/src/Squidex.Infrastructure/CQRS/Events/InMemoryEventBus.cs b/src/Squidex.Infrastructure/CQRS/Events/InMemoryEventBus.cs index 3c047389e..0eb7063f1 100644 --- a/src/Squidex.Infrastructure/CQRS/Events/InMemoryEventBus.cs +++ b/src/Squidex.Infrastructure/CQRS/Events/InMemoryEventBus.cs @@ -24,7 +24,7 @@ namespace Squidex.Infrastructure.CQRS.Events subject.OnNext(eventData); } - public void Connect(string queueName, Action received) + public void Connect(string queuePrefix, Action received) { subject.Subscribe(received); } diff --git a/src/Squidex/Config/Domain/EventBusModule.cs b/src/Squidex/Config/Domain/EventBusModule.cs index 7afbfaf3d..9993c1e0c 100644 --- a/src/Squidex/Config/Domain/EventBusModule.cs +++ b/src/Squidex/Config/Domain/EventBusModule.cs @@ -57,13 +57,15 @@ namespace Squidex.Config.Domain throw new ConfigurationException("You must specify the RabbitMq connection string in the 'squidex:eventBus:rabbitMq:connectionString' configuration section."); } + var queueName = Configuration.GetValue("squidex:eventBus:rabbitMq:queueName"); + builder.Register(c => { var connectionFactory = new ConnectionFactory(); connectionFactory.SetUri(new Uri(connectionString)); - return new RabbitMqEventBus(connectionFactory, canCatch); + return new RabbitMqEventBus(connectionFactory, canCatch, queueName); }) .As() .As()