Browse Source

Queue name fix

pull/1/head
Sebastian 9 years ago
parent
commit
08d03e08d7
  1. 24
      src/Squidex.Infrastructure.RabbitMq/RabbitMqEventBus.cs
  2. 2
      src/Squidex.Infrastructure/CQRS/Events/IEventStream.cs
  3. 2
      src/Squidex.Infrastructure/CQRS/Events/InMemoryEventBus.cs
  4. 4
      src/Squidex/Config/Domain/EventBusModule.cs

24
src/Squidex.Infrastructure.RabbitMq/RabbitMqEventBus.cs

@ -19,16 +19,19 @@ namespace Squidex.Infrastructure.RabbitMq
public sealed class RabbitMqEventBus : DisposableObject, IEventPublisher, IEventStream, IExternalSystem
{
private readonly bool isPersistent;
private readonly string queueName;
private const string Exchange = "Squidex";
private readonly ConnectionFactory connectionFactory;
private readonly Lazy<IConnection> connection;
private readonly Lazy<IModel> channel;
private EventingBasicConsumer consumer;
public RabbitMqEventBus(ConnectionFactory connectionFactory, bool isPersistent)
public RabbitMqEventBus(ConnectionFactory connectionFactory, bool isPersistent, string queueName)
{
Guard.NotNull(connectionFactory, nameof(connectionFactory));
this.queueName = queueName;
this.connectionFactory = connectionFactory;
connection = new Lazy<IConnection>(connectionFactory.CreateConnection);
@ -70,7 +73,7 @@ namespace Squidex.Infrastructure.RabbitMq
}
}
public void Connect(string queueName, Action<EventData> received)
public void Connect(string queuePrefix, Action<EventData> received)
{
ThrowIfDisposed();
ThrowIfConnected();
@ -81,10 +84,19 @@ namespace Squidex.Infrastructure.RabbitMq
ThrowIfConnected();
queueName = $"{queueName}_{Environment.MachineName}";
var fullQueueName = $"{queuePrefix}_";
if (!string.IsNullOrWhiteSpace(queueName))
{
fullQueueName += queueName;
}
else
{
fullQueueName += Environment.MachineName;
}
currentChannel.QueueDeclare(queueName, isPersistent, false, !isPersistent);
currentChannel.QueueBind(queueName, Exchange, string.Empty);
currentChannel.QueueDeclare(fullQueueName, isPersistent, false, !isPersistent);
currentChannel.QueueBind(fullQueueName, Exchange, string.Empty);
consumer = new EventingBasicConsumer(currentChannel);
@ -95,7 +107,7 @@ namespace Squidex.Infrastructure.RabbitMq
received(eventData);
};
currentChannel.BasicConsume(queueName, true, consumer);
currentChannel.BasicConsume(fullQueueName, true, consumer);
}
}

2
src/Squidex.Infrastructure/CQRS/Events/IEventStream.cs

@ -12,6 +12,6 @@ namespace Squidex.Infrastructure.CQRS.Events
{
public interface IEventStream : IDisposable
{
void Connect(string queueName, Action<EventData> received);
void Connect(string queuePrefix, Action<EventData> received);
}
}

2
src/Squidex.Infrastructure/CQRS/Events/InMemoryEventBus.cs

@ -24,7 +24,7 @@ namespace Squidex.Infrastructure.CQRS.Events
subject.OnNext(eventData);
}
public void Connect(string queueName, Action<EventData> received)
public void Connect(string queuePrefix, Action<EventData> received)
{
subject.Subscribe(received);
}

4
src/Squidex/Config/Domain/EventBusModule.cs

@ -57,13 +57,15 @@ namespace Squidex.Config.Domain
throw new ConfigurationException("You must specify the RabbitMq connection string in the 'squidex:eventBus:rabbitMq:connectionString' configuration section.");
}
var queueName = Configuration.GetValue<string>("squidex:eventBus:rabbitMq:queueName");
builder.Register(c =>
{
var connectionFactory = new ConnectionFactory();
connectionFactory.SetUri(new Uri(connectionString));
return new RabbitMqEventBus(connectionFactory, canCatch);
return new RabbitMqEventBus(connectionFactory, canCatch, queueName);
})
.As<IEventStream>()
.As<IEventPublisher>()

Loading…
Cancel
Save