mirror of https://github.com/Squidex/squidex.git
14 changed files with 179 additions and 424 deletions
@ -1,96 +0,0 @@ |
|||
// ==========================================================================
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex UG (haftungsbeschraenkt)
|
|||
// All rights reserved. Licensed under the MIT license.
|
|||
// ==========================================================================
|
|||
|
|||
using System.Text; |
|||
using RabbitMQ.Client; |
|||
using Squidex.Hosting; |
|||
using Squidex.Hosting.Configuration; |
|||
using Squidex.Infrastructure.EventSourcing; |
|||
using Squidex.Infrastructure.Json; |
|||
|
|||
namespace Squidex.Infrastructure.CQRS.Events; |
|||
|
|||
public sealed class RabbitMqEventConsumer : DisposableObjectBase, IInitializable, IEventConsumer |
|||
{ |
|||
private readonly IJsonSerializer serializer; |
|||
private readonly string eventPublisherName; |
|||
private readonly string exchange; |
|||
private readonly string eventsFilter; |
|||
private readonly ConnectionFactory connectionFactory; |
|||
private readonly Lazy<IConnection> connection; |
|||
private readonly Lazy<IModel> channel; |
|||
|
|||
public string Name |
|||
{ |
|||
get => eventPublisherName; |
|||
} |
|||
|
|||
public string EventsFilter |
|||
{ |
|||
get => eventsFilter; |
|||
} |
|||
|
|||
public RabbitMqEventConsumer(IJsonSerializer serializer, string eventPublisherName, string uri, string exchange, string eventsFilter) |
|||
{ |
|||
connectionFactory = new ConnectionFactory { Uri = new Uri(uri, UriKind.Absolute) }; |
|||
connection = new Lazy<IConnection>(connectionFactory.CreateConnection); |
|||
channel = new Lazy<IModel>(connection.Value.CreateModel); |
|||
|
|||
this.exchange = exchange; |
|||
this.eventsFilter = eventsFilter; |
|||
this.eventPublisherName = eventPublisherName; |
|||
this.serializer = serializer; |
|||
} |
|||
|
|||
protected override void DisposeObject(bool disposing) |
|||
{ |
|||
if (connection.IsValueCreated) |
|||
{ |
|||
connection.Value.Close(); |
|||
connection.Value.Dispose(); |
|||
} |
|||
} |
|||
|
|||
public Task InitializeAsync( |
|||
CancellationToken ct) |
|||
{ |
|||
try |
|||
{ |
|||
var currentConnection = connection.Value; |
|||
|
|||
if (!currentConnection.IsOpen) |
|||
{ |
|||
var error = new ConfigurationError($"RabbitMq event bus failed to connect to {connectionFactory.Endpoint}."); |
|||
|
|||
throw new ConfigurationException(error); |
|||
} |
|||
|
|||
return Task.CompletedTask; |
|||
} |
|||
catch (Exception ex) |
|||
{ |
|||
var error = new ConfigurationError($"RabbitMq event bus failed to connect to {connectionFactory.Endpoint}."); |
|||
|
|||
throw new ConfigurationException(error, ex); |
|||
} |
|||
} |
|||
|
|||
public Task On(Envelope<IEvent> @event) |
|||
{ |
|||
if (@event.Headers.Restored()) |
|||
{ |
|||
return Task.CompletedTask; |
|||
} |
|||
|
|||
var jsonString = serializer.Serialize(@event); |
|||
var jsonBytes = Encoding.UTF8.GetBytes(jsonString); |
|||
|
|||
channel.Value.BasicPublish(exchange, string.Empty, null, jsonBytes); |
|||
|
|||
return Task.CompletedTask; |
|||
} |
|||
} |
|||
@ -1,29 +0,0 @@ |
|||
<Project Sdk="Microsoft.NET.Sdk"> |
|||
<PropertyGroup> |
|||
<TargetFramework>net7.0</TargetFramework> |
|||
<RootNamespace>Squidex.Infrastructure</RootNamespace> |
|||
<LangVersion>10.0</LangVersion> |
|||
<ImplicitUsings>enable</ImplicitUsings> |
|||
<Nullable>enable</Nullable> |
|||
</PropertyGroup> |
|||
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'"> |
|||
<DebugType>full</DebugType> |
|||
<DebugSymbols>True</DebugSymbols> |
|||
</PropertyGroup> |
|||
<ItemGroup> |
|||
<PackageReference Include="Meziantou.Analyzer" Version="1.0.756"> |
|||
<PrivateAssets>all</PrivateAssets> |
|||
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets> |
|||
</PackageReference> |
|||
<PackageReference Include="RabbitMQ.Client" Version="6.4.0" /> |
|||
<PackageReference Include="RefactoringEssentials" Version="5.6.0" PrivateAssets="all" /> |
|||
<PackageReference Include="StyleCop.Analyzers" Version="1.1.118" PrivateAssets="all" /> |
|||
<PackageReference Include="System.ValueTuple" Version="4.5.0" /> |
|||
</ItemGroup> |
|||
<ItemGroup> |
|||
<ProjectReference Include="..\Squidex.Infrastructure\Squidex.Infrastructure.csproj" /> |
|||
</ItemGroup> |
|||
<ItemGroup> |
|||
<AdditionalFiles Include="..\..\stylecop.json" Link="stylecop.json" /> |
|||
</ItemGroup> |
|||
</Project> |
|||
@ -1,68 +0,0 @@ |
|||
// ==========================================================================
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex UG (haftungsbeschränkt)
|
|||
// All rights reserved. Licensed under the MIT license.
|
|||
// ==========================================================================
|
|||
|
|||
using System; |
|||
using System.Collections.Concurrent; |
|||
using System.Threading; |
|||
using System.Threading.Tasks; |
|||
using Squidex.Infrastructure.Json; |
|||
using Squidex.Infrastructure.Log; |
|||
using Squidex.Infrastructure.Tasks; |
|||
using StackExchange.Redis; |
|||
|
|||
namespace Squidex.Infrastructure |
|||
{ |
|||
public sealed class RedisPubSub : IPubSub, IInitializable |
|||
{ |
|||
private readonly ConcurrentDictionary<string, object> subscriptions = new ConcurrentDictionary<string, object>(); |
|||
private readonly IConnectionMultiplexer redis; |
|||
private readonly IJsonSerializer serializer; |
|||
private readonly ISemanticLog log; |
|||
private ISubscriber redisSubscriber; |
|||
|
|||
public RedisPubSub(IConnectionMultiplexer redis, IJsonSerializer serializer, ISemanticLog log) |
|||
{ |
|||
this.log = log; |
|||
this.redis = redis; |
|||
this.serializer = serializer; |
|||
} |
|||
|
|||
public Task InitializeAsync( |
|||
CancellationToken ct = default) |
|||
{ |
|||
try |
|||
{ |
|||
redisSubscriber = redis.GetSubscriber(); |
|||
|
|||
redis.GetStatus(); |
|||
|
|||
return TaskHelper.Done; |
|||
} |
|||
catch (Exception ex) |
|||
{ |
|||
throw new ConfigurationException($"Redis connection failed to connect to database {redis.Configuration}", ex); |
|||
} |
|||
} |
|||
|
|||
public void Publish<T>(T value, bool notifySelf) |
|||
{ |
|||
GetSubscriber<T>().Publish(value, notifySelf); |
|||
} |
|||
|
|||
public IDisposable Subscribe<T>(Action<T> handler) |
|||
{ |
|||
return GetSubscriber<T>().Subscribe(handler); |
|||
} |
|||
|
|||
private RedisSubscription<T> GetSubscriber<T>() |
|||
{ |
|||
var typeName = typeof(T).FullName; |
|||
|
|||
return (RedisSubscription<T>)subscriptions.GetOrAdd(typeName, this, (k, c) => new RedisSubscription<T>(c.redisSubscriber, serializer, k, c.log)); |
|||
} |
|||
} |
|||
} |
|||
@ -1,99 +0,0 @@ |
|||
// ==========================================================================
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex UG (haftungsbeschränkt)
|
|||
// All rights reserved. Licensed under the MIT license.
|
|||
// ==========================================================================
|
|||
|
|||
using System; |
|||
using System.Reactive.Subjects; |
|||
using Squidex.Infrastructure.Json; |
|||
using Squidex.Infrastructure.Log; |
|||
using StackExchange.Redis; |
|||
|
|||
#pragma warning disable SA1401 // Fields must be private
|
|||
|
|||
namespace Squidex.Infrastructure |
|||
{ |
|||
internal sealed class RedisSubscription<T> |
|||
{ |
|||
private readonly Guid selfId = Guid.NewGuid(); |
|||
private readonly Subject<T> subject = new Subject<T>(); |
|||
private readonly ISubscriber subscriber; |
|||
private readonly IJsonSerializer serializer; |
|||
private readonly ISemanticLog log; |
|||
private readonly string channelName; |
|||
|
|||
private sealed class Envelope |
|||
{ |
|||
public T Payload; |
|||
|
|||
public Guid Sender; |
|||
} |
|||
|
|||
public RedisSubscription(ISubscriber subscriber, IJsonSerializer serializer, string channelName, ISemanticLog log) |
|||
{ |
|||
this.log = log; |
|||
|
|||
this.serializer = serializer; |
|||
this.subscriber = subscriber; |
|||
this.subscriber.Subscribe(channelName, (channel, value) => HandleMessage(value)); |
|||
|
|||
this.channelName = channelName; |
|||
} |
|||
|
|||
public void Publish(object value, bool notifySelf) |
|||
{ |
|||
try |
|||
{ |
|||
var senderId = notifySelf ? Guid.Empty : selfId; |
|||
|
|||
var envelope = serializer.Serialize(new Envelope { Sender = senderId, Payload = (T)value }); |
|||
|
|||
subscriber.Publish(channelName, envelope); |
|||
} |
|||
catch (Exception ex) |
|||
{ |
|||
log.LogError(ex, channelName, (logChannel, w) => w |
|||
.WriteProperty("action", "PublishRedisMessage") |
|||
.WriteProperty("status", "Failed") |
|||
.WriteProperty("channel", logChannel)); |
|||
} |
|||
} |
|||
|
|||
private void HandleMessage(string value) |
|||
{ |
|||
try |
|||
{ |
|||
if (string.IsNullOrWhiteSpace(value)) |
|||
{ |
|||
return; |
|||
} |
|||
|
|||
var envelope = serializer.Deserialize<Envelope>(value); |
|||
|
|||
if (envelope.Sender != selfId) |
|||
{ |
|||
subject.OnNext(envelope.Payload); |
|||
|
|||
log.LogDebug(channelName, (logChannel, w) => w |
|||
.WriteProperty("action", "ReceiveRedisMessage") |
|||
.WriteProperty("channel", logChannel) |
|||
.WriteProperty("status", "Received")); |
|||
} |
|||
} |
|||
catch (Exception ex) |
|||
{ |
|||
log.LogError(ex, channelName, (logChannel, w) => w |
|||
.WriteProperty("action", "ReceiveRedisMessage") |
|||
.WriteProperty("channel", logChannel) |
|||
.WriteProperty("status", "Failed")); |
|||
} |
|||
} |
|||
|
|||
public IDisposable Subscribe(Action<T> handler) |
|||
{ |
|||
return subject.Subscribe(handler); |
|||
} |
|||
} |
|||
} |
|||
@ -1,24 +0,0 @@ |
|||
<Project Sdk="Microsoft.NET.Sdk"> |
|||
<PropertyGroup> |
|||
<TargetFramework>net7.0</TargetFramework> |
|||
<RootNamespace>Squidex.Infrastructure</RootNamespace> |
|||
<LangVersion>10.0</LangVersion> |
|||
<ImplicitUsings>enable</ImplicitUsings> |
|||
</PropertyGroup> |
|||
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'"> |
|||
<DebugType>full</DebugType> |
|||
<DebugSymbols>True</DebugSymbols> |
|||
</PropertyGroup> |
|||
<ItemGroup> |
|||
<ProjectReference Include="..\Squidex.Infrastructure\Squidex.Infrastructure.csproj" /> |
|||
</ItemGroup> |
|||
<ItemGroup> |
|||
<PackageReference Include="RefactoringEssentials" Version="5.6.0" PrivateAssets="all" /> |
|||
<PackageReference Include="StackExchange.Redis.StrongName" Version="1.2.6" /> |
|||
<PackageReference Include="StyleCop.Analyzers" Version="1.1.118" PrivateAssets="all" /> |
|||
<PackageReference Include="System.ValueTuple" Version="4.5.0" /> |
|||
</ItemGroup> |
|||
<ItemGroup> |
|||
<AdditionalFiles Include="..\..\stylecop.json" Link="stylecop.json" /> |
|||
</ItemGroup> |
|||
</Project> |
|||
@ -0,0 +1,64 @@ |
|||
// ==========================================================================
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex UG (haftungsbeschraenkt)
|
|||
// All rights reserved. Licensed under the MIT license.
|
|||
// ==========================================================================
|
|||
|
|||
using Microsoft.Extensions.Diagnostics.HealthChecks; |
|||
|
|||
namespace Squidex.Infrastructure.EventSourcing.Consume; |
|||
|
|||
public sealed class EventConsumersHealthCheck : IHealthCheck |
|||
{ |
|||
private readonly IEventConsumerManager eventConsumerManager; |
|||
|
|||
public EventConsumersHealthCheck(IEventConsumerManager eventConsumerManager) |
|||
{ |
|||
this.eventConsumerManager = eventConsumerManager; |
|||
} |
|||
|
|||
public async Task<HealthCheckResult> CheckHealthAsync(HealthCheckContext context, |
|||
CancellationToken cancellationToken = default) |
|||
{ |
|||
var eventConsumers = await eventConsumerManager.GetConsumersAsync(cancellationToken); |
|||
|
|||
var data = new Dictionary<string, object>(); |
|||
|
|||
var numTotal = 0; |
|||
var numFailed = 0; |
|||
|
|||
foreach (var eventConsumer in eventConsumers) |
|||
{ |
|||
var status = "Running"; |
|||
|
|||
if (eventConsumer.Error != null) |
|||
{ |
|||
status = "Failed"; |
|||
|
|||
numFailed++; |
|||
} |
|||
else if (eventConsumer.IsStopped) |
|||
{ |
|||
status = "Stopped"; |
|||
} |
|||
|
|||
data[eventConsumer.Name] = status; |
|||
|
|||
numTotal++; |
|||
} |
|||
|
|||
if (numTotal > 0 && numFailed == numTotal) |
|||
{ |
|||
return HealthCheckResult.Unhealthy("All event consumers failed", null, data); |
|||
} |
|||
else if (numFailed > 0) |
|||
{ |
|||
return HealthCheckResult.Degraded("One or more event consumers failed", null, data); |
|||
} |
|||
else |
|||
{ |
|||
return HealthCheckResult.Healthy(data: data); |
|||
} |
|||
} |
|||
} |
|||
@ -1,72 +0,0 @@ |
|||
// ==========================================================================
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex UG (haftungsbeschraenkt)
|
|||
// All rights reserved. Licensed under the MIT license.
|
|||
// ==========================================================================
|
|||
|
|||
using Squidex.Hosting.Configuration; |
|||
using Squidex.Infrastructure.CQRS.Events; |
|||
using Squidex.Infrastructure.EventSourcing; |
|||
using Squidex.Infrastructure.Json; |
|||
|
|||
namespace Squidex.Config.Domain; |
|||
|
|||
public static class EventPublishersServices |
|||
{ |
|||
public static void AddSquidexEventPublisher(this IServiceCollection services, IConfiguration config) |
|||
{ |
|||
var eventPublishers = config.GetSection("eventPublishers"); |
|||
|
|||
foreach (var child in eventPublishers.GetChildren()) |
|||
{ |
|||
var eventPublisherType = child.GetValue<string>("type"); |
|||
|
|||
if (string.IsNullOrWhiteSpace(eventPublisherType)) |
|||
{ |
|||
var error = new ConfigurationError("Value is required.", "eventPublishers:{child.Key}:type"); |
|||
|
|||
throw new ConfigurationException(error); |
|||
} |
|||
|
|||
var eventsFilter = child.GetValue<string>("eventsFilter") ?? string.Empty; |
|||
|
|||
var enabled = child.GetValue<bool>("enabled"); |
|||
|
|||
if (string.Equals(eventPublisherType, "RabbitMq", StringComparison.OrdinalIgnoreCase)) |
|||
{ |
|||
var publisherConfig = child.GetValue<string>("configuration"); |
|||
|
|||
if (string.IsNullOrWhiteSpace(publisherConfig)) |
|||
{ |
|||
var error = new ConfigurationError("Value is required.", "eventPublishers:{child.Key}:configuration"); |
|||
|
|||
throw new ConfigurationException(error); |
|||
} |
|||
|
|||
var exchange = child.GetValue<string>("exchange"); |
|||
|
|||
if (string.IsNullOrWhiteSpace(exchange)) |
|||
{ |
|||
var error = new ConfigurationError("Value is required.", "eventPublishers:{child.Key}:exchange"); |
|||
|
|||
throw new ConfigurationException(error); |
|||
} |
|||
|
|||
var name = $"EventPublishers_{child.Key}"; |
|||
|
|||
if (enabled) |
|||
{ |
|||
services.AddSingletonAs(c => new RabbitMqEventConsumer(c.GetRequiredService<IJsonSerializer>(), name, publisherConfig, exchange, eventsFilter)) |
|||
.As<IEventConsumer>(); |
|||
} |
|||
} |
|||
else |
|||
{ |
|||
var error = new ConfigurationError($"Unsupported value '{child.Key}", "eventPublishers:{child.Key}:type."); |
|||
|
|||
throw new ConfigurationException(error); |
|||
} |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,112 @@ |
|||
// ==========================================================================
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex UG (haftungsbeschraenkt)
|
|||
// All rights reserved. Licensed under the MIT license.
|
|||
// ==========================================================================
|
|||
|
|||
using Microsoft.Extensions.Diagnostics.HealthChecks; |
|||
|
|||
namespace Squidex.Infrastructure.EventSourcing.Consume; |
|||
|
|||
public class EventConsumersHealthCheckTests |
|||
{ |
|||
private readonly IEventConsumerManager eventConsumerManager = A.Fake<IEventConsumerManager>(); |
|||
private readonly List<EventConsumerInfo> consumers = new List<EventConsumerInfo>(); |
|||
private readonly CancellationTokenSource cts = new CancellationTokenSource(); |
|||
private readonly CancellationToken ct; |
|||
private readonly EventConsumersHealthCheck sut; |
|||
|
|||
public EventConsumersHealthCheckTests() |
|||
{ |
|||
ct = cts.Token; |
|||
|
|||
A.CallTo(() => eventConsumerManager.GetConsumersAsync(ct)) |
|||
.Returns(consumers); |
|||
|
|||
sut = new EventConsumersHealthCheck(eventConsumerManager); |
|||
} |
|||
|
|||
[Fact] |
|||
public async Task Should_return_healthy_if_no_consumer_found() |
|||
{ |
|||
var status = await sut.CheckHealthAsync(null!, ct); |
|||
|
|||
Assert.Equal(HealthStatus.Healthy, status.Status); |
|||
} |
|||
|
|||
[Fact] |
|||
public async Task Should_return_healthy_if_no_consumer_failed() |
|||
{ |
|||
consumers.Add(new EventConsumerInfo |
|||
{ |
|||
Name = "Consumer1" |
|||
}); |
|||
|
|||
consumers.Add(new EventConsumerInfo |
|||
{ |
|||
Name = "Consumer2" |
|||
}); |
|||
|
|||
consumers.Add(new EventConsumerInfo |
|||
{ |
|||
Name = "Consumer2" |
|||
}); |
|||
|
|||
var status = await sut.CheckHealthAsync(null!, ct); |
|||
|
|||
Assert.Equal(HealthStatus.Healthy, status.Status); |
|||
} |
|||
|
|||
[Fact] |
|||
public async Task Should_return_unhealthy_if_all_consumers_failed() |
|||
{ |
|||
consumers.Add(new EventConsumerInfo |
|||
{ |
|||
Name = "Consumer1", |
|||
Error = "Failed1" |
|||
}); |
|||
|
|||
consumers.Add(new EventConsumerInfo |
|||
{ |
|||
Name = "Consumer2", |
|||
Error = "Failed2" |
|||
}); |
|||
|
|||
consumers.Add(new EventConsumerInfo |
|||
{ |
|||
Name = "Consumer3", |
|||
Error = "Failed3" |
|||
}); |
|||
|
|||
var status = await sut.CheckHealthAsync(null!, ct); |
|||
|
|||
Assert.Equal(HealthStatus.Unhealthy, status.Status); |
|||
} |
|||
|
|||
[Fact] |
|||
public async Task Should_return_degrated_if_at_least_one_consumers_failed() |
|||
{ |
|||
consumers.Add(new EventConsumerInfo |
|||
{ |
|||
Name = "Consumer1", |
|||
Error = "Failed1" |
|||
}); |
|||
|
|||
consumers.Add(new EventConsumerInfo |
|||
{ |
|||
Name = "Consumer2", |
|||
IsStopped = true |
|||
}); |
|||
|
|||
consumers.Add(new EventConsumerInfo |
|||
{ |
|||
Name = "Consumer3", |
|||
IsStopped = false |
|||
}); |
|||
|
|||
var status = await sut.CheckHealthAsync(null!, ct); |
|||
|
|||
Assert.Equal(HealthStatus.Degraded, status.Status); |
|||
} |
|||
} |
|||
Loading…
Reference in new issue