mirror of https://github.com/Squidex/squidex.git
9 changed files with 198 additions and 2 deletions
@ -0,0 +1,95 @@ |
|||
// ==========================================================================
|
|||
// RabbitMqEventConsumer.cs
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex Group
|
|||
// All rights reserved.
|
|||
// ==========================================================================
|
|||
|
|||
using System; |
|||
using System.Text; |
|||
using System.Threading.Tasks; |
|||
using Newtonsoft.Json; |
|||
using RabbitMQ.Client; |
|||
using Squidex.Infrastructure.CQRS.Events; |
|||
using Squidex.Infrastructure.Tasks; |
|||
|
|||
// ReSharper disable InvertIf
|
|||
|
|||
namespace Squidex.Infrastructure.RabbitMq |
|||
{ |
|||
public sealed class RabbitMqEventConsumer : DisposableObjectBase, IExternalSystem, IEventConsumer |
|||
{ |
|||
private readonly string exchange; |
|||
private readonly string streamFilter; |
|||
private readonly ConnectionFactory connectionFactory; |
|||
private readonly Lazy<IConnection> connection; |
|||
private readonly Lazy<IModel> channel; |
|||
|
|||
public string Name |
|||
{ |
|||
get { return GetType().Name; } |
|||
} |
|||
|
|||
public string StreamFilter |
|||
{ |
|||
get { return streamFilter; } |
|||
} |
|||
|
|||
public RabbitMqEventConsumer(string uri, string exchange, string streamFilter) |
|||
{ |
|||
Guard.NotNullOrEmpty(uri, nameof(uri)); |
|||
Guard.NotNullOrEmpty(exchange, nameof(exchange)); |
|||
|
|||
connectionFactory = new ConnectionFactory { Uri = uri }; |
|||
|
|||
connection = new Lazy<IConnection>(connectionFactory.CreateConnection); |
|||
channel = new Lazy<IModel>(() => connection.Value.CreateModel()); |
|||
|
|||
this.exchange = exchange; |
|||
|
|||
this.streamFilter = streamFilter; |
|||
} |
|||
|
|||
protected override void DisposeObject(bool disposing) |
|||
{ |
|||
if (connection.IsValueCreated) |
|||
{ |
|||
connection.Value.Close(); |
|||
connection.Value.Dispose(); |
|||
} |
|||
} |
|||
|
|||
public void Connect() |
|||
{ |
|||
try |
|||
{ |
|||
var currentConnection = connection.Value; |
|||
|
|||
if (!currentConnection.IsOpen) |
|||
{ |
|||
throw new ConfigurationException($"RabbitMq event bus failed to connect to {connectionFactory.Endpoint}"); |
|||
} |
|||
} |
|||
catch (Exception e) |
|||
{ |
|||
throw new ConfigurationException($"RabbitMq event bus failed to connect to {connectionFactory.Endpoint}", e); |
|||
} |
|||
} |
|||
|
|||
public Task ClearAsync() |
|||
{ |
|||
return TaskHelper.Done; |
|||
} |
|||
|
|||
public Task On(Envelope<IEvent> @event) |
|||
{ |
|||
var jsonString = JsonConvert.SerializeObject(@event); |
|||
var jsonBytes = Encoding.UTF8.GetBytes(jsonString); |
|||
|
|||
channel.Value.BasicPublish(exchange, string.Empty, null, jsonBytes); |
|||
|
|||
return TaskHelper.Done; |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,10 @@ |
|||
using System; |
|||
using System.Collections.Generic; |
|||
using System.Text; |
|||
|
|||
namespace Squidex.Infrastructure.RabbitMq |
|||
{ |
|||
class RabbitMqOptions |
|||
{ |
|||
} |
|||
} |
|||
@ -0,0 +1,15 @@ |
|||
<Project Sdk="Microsoft.NET.Sdk"> |
|||
<PropertyGroup> |
|||
<TargetFramework>netstandard1.6</TargetFramework> |
|||
</PropertyGroup> |
|||
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'"> |
|||
<DebugType>full</DebugType> |
|||
<DebugSymbols>True</DebugSymbols> |
|||
</PropertyGroup> |
|||
<ItemGroup> |
|||
<PackageReference Include="RabbitMQ.Client" Version="4.1.3" /> |
|||
</ItemGroup> |
|||
<ItemGroup> |
|||
<ProjectReference Include="..\src\Squidex.Infrastructure\Squidex.Infrastructure.csproj" /> |
|||
</ItemGroup> |
|||
</Project> |
|||
@ -0,0 +1,47 @@ |
|||
// ==========================================================================
|
|||
// RabbitMqModule.cs
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex Group
|
|||
// All rights reserved.
|
|||
// ==========================================================================
|
|||
|
|||
using Autofac; |
|||
using Microsoft.Extensions.Configuration; |
|||
using Squidex.Infrastructure; |
|||
using Squidex.Infrastructure.CQRS.Events; |
|||
using Squidex.Infrastructure.RabbitMq; |
|||
|
|||
// ReSharper disable InvertIf
|
|||
|
|||
namespace Squidex.Config.Domain |
|||
{ |
|||
public sealed class RabbitMqModule : Module |
|||
{ |
|||
private IConfiguration Configuration { get; } |
|||
|
|||
public RabbitMqModule(IConfiguration configuration) |
|||
{ |
|||
Configuration = configuration; |
|||
} |
|||
|
|||
protected override void Load(ContainerBuilder builder) |
|||
{ |
|||
var connectionString = Configuration.GetValue<string>("squidex:eventPublishers:rabbitMq:connectionString"); |
|||
var exchange = Configuration.GetValue<string>("squidex:eventPublishers:rabbitMq:exchange"); |
|||
var enabled = Configuration.GetValue<bool>("squidex:eventPublishers:rabbitMq:enabled"); |
|||
|
|||
if (!string.IsNullOrWhiteSpace(connectionString) && |
|||
!string.IsNullOrWhiteSpace(exchange) && |
|||
enabled) |
|||
{ |
|||
var streamFilter = Configuration.GetValue<string>("squidex:eventPublishers:rabbitMq:streamFilter"); |
|||
|
|||
builder.Register(c => new RabbitMqEventConsumer(connectionString, exchange, streamFilter)) |
|||
.As<IEventConsumer>() |
|||
.As<IExternalSystem>() |
|||
.SingleInstance(); |
|||
} |
|||
} |
|||
} |
|||
} |
|||
Loading…
Reference in new issue