diff --git a/backend/Squidex.sln b/backend/Squidex.sln index 4b7126ab6..7f3458932 100644 --- a/backend/Squidex.sln +++ b/backend/Squidex.sln @@ -18,8 +18,6 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Squidex.Domain.Apps.Core.Te EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Squidex.Infrastructure.MongoDb", "src\Squidex.Infrastructure.MongoDb\Squidex.Infrastructure.MongoDb.csproj", "{6A811927-3C37-430A-90F4-503E37123956}" EndProject -Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Squidex.Infrastructure.RabbitMq", "src\Squidex.Infrastructure.RabbitMq\Squidex.Infrastructure.RabbitMq.csproj", "{C1E5BBB6-6B6A-4DE5-B19D-0538304DE343}" -EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "tools", "tools", "{94207AA6-4923-4183-A558-E0F8196B8CA3}" EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Squidex.Shared", "src\Squidex.Shared\Squidex.Shared.csproj", "{5E75AB7D-6F01-4313-AFF1-7F7128FFD71F}" @@ -118,18 +116,6 @@ Global {6A811927-3C37-430A-90F4-503E37123956}.Release|Any CPU.Build.0 = Release|Any CPU {6A811927-3C37-430A-90F4-503E37123956}.Release|x64.ActiveCfg = Release|Any CPU {6A811927-3C37-430A-90F4-503E37123956}.Release|x86.ActiveCfg = Release|Any CPU - {C1E5BBB6-6B6A-4DE5-B19D-0538304DE343}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {C1E5BBB6-6B6A-4DE5-B19D-0538304DE343}.Debug|Any CPU.Build.0 = Debug|Any CPU - {C1E5BBB6-6B6A-4DE5-B19D-0538304DE343}.Debug|x64.ActiveCfg = Debug|Any CPU - {C1E5BBB6-6B6A-4DE5-B19D-0538304DE343}.Debug|x64.Build.0 = Debug|Any CPU - {C1E5BBB6-6B6A-4DE5-B19D-0538304DE343}.Debug|x86.ActiveCfg = Debug|Any CPU - {C1E5BBB6-6B6A-4DE5-B19D-0538304DE343}.Debug|x86.Build.0 = Debug|Any CPU - {C1E5BBB6-6B6A-4DE5-B19D-0538304DE343}.Release|Any CPU.ActiveCfg = Release|Any CPU - {C1E5BBB6-6B6A-4DE5-B19D-0538304DE343}.Release|Any CPU.Build.0 = Release|Any CPU - {C1E5BBB6-6B6A-4DE5-B19D-0538304DE343}.Release|x64.ActiveCfg = Release|Any CPU - {C1E5BBB6-6B6A-4DE5-B19D-0538304DE343}.Release|x64.Build.0 = Release|Any CPU - {C1E5BBB6-6B6A-4DE5-B19D-0538304DE343}.Release|x86.ActiveCfg = Release|Any CPU - {C1E5BBB6-6B6A-4DE5-B19D-0538304DE343}.Release|x86.Build.0 = Release|Any CPU {5E75AB7D-6F01-4313-AFF1-7F7128FFD71F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {5E75AB7D-6F01-4313-AFF1-7F7128FFD71F}.Debug|Any CPU.Build.0 = Debug|Any CPU {5E75AB7D-6F01-4313-AFF1-7F7128FFD71F}.Debug|x64.ActiveCfg = Debug|Any CPU @@ -308,7 +294,6 @@ Global {7FD0A92B-7862-4BB1-932B-B52A9CACB56B} = {8CF53B92-5EB1-461D-98F8-70DA9B603FBF} {FD0AFD44-7A93-4F9E-B5ED-72582392E435} = {4C6B06C2-6D77-4E0E-AE32-D7050236433A} {6A811927-3C37-430A-90F4-503E37123956} = {8CF53B92-5EB1-461D-98F8-70DA9B603FBF} - {C1E5BBB6-6B6A-4DE5-B19D-0538304DE343} = {8CF53B92-5EB1-461D-98F8-70DA9B603FBF} {5E75AB7D-6F01-4313-AFF1-7F7128FFD71F} = {7EDE8CF1-B1E4-4005-B154-834B944E0D7A} {F7771E22-47BD-45C4-A133-FD7F1DE27CA0} = {7EDE8CF1-B1E4-4005-B154-834B944E0D7A} {27CF800D-890F-4882-BF05-44EC3233537D} = {7EDE8CF1-B1E4-4005-B154-834B944E0D7A} diff --git a/backend/src/Squidex.Infrastructure.RabbitMq/CQRS/Events/RabbitMqEventConsumer.cs b/backend/src/Squidex.Infrastructure.RabbitMq/CQRS/Events/RabbitMqEventConsumer.cs deleted file mode 100644 index f5e0234b1..000000000 --- a/backend/src/Squidex.Infrastructure.RabbitMq/CQRS/Events/RabbitMqEventConsumer.cs +++ /dev/null @@ -1,96 +0,0 @@ -// ========================================================================== -// Squidex Headless CMS -// ========================================================================== -// Copyright (c) Squidex UG (haftungsbeschraenkt) -// All rights reserved. Licensed under the MIT license. -// ========================================================================== - -using System.Text; -using RabbitMQ.Client; -using Squidex.Hosting; -using Squidex.Hosting.Configuration; -using Squidex.Infrastructure.EventSourcing; -using Squidex.Infrastructure.Json; - -namespace Squidex.Infrastructure.CQRS.Events; - -public sealed class RabbitMqEventConsumer : DisposableObjectBase, IInitializable, IEventConsumer -{ - private readonly IJsonSerializer serializer; - private readonly string eventPublisherName; - private readonly string exchange; - private readonly string eventsFilter; - private readonly ConnectionFactory connectionFactory; - private readonly Lazy connection; - private readonly Lazy channel; - - public string Name - { - get => eventPublisherName; - } - - public string EventsFilter - { - get => eventsFilter; - } - - public RabbitMqEventConsumer(IJsonSerializer serializer, string eventPublisherName, string uri, string exchange, string eventsFilter) - { - connectionFactory = new ConnectionFactory { Uri = new Uri(uri, UriKind.Absolute) }; - connection = new Lazy(connectionFactory.CreateConnection); - channel = new Lazy(connection.Value.CreateModel); - - this.exchange = exchange; - this.eventsFilter = eventsFilter; - this.eventPublisherName = eventPublisherName; - this.serializer = serializer; - } - - protected override void DisposeObject(bool disposing) - { - if (connection.IsValueCreated) - { - connection.Value.Close(); - connection.Value.Dispose(); - } - } - - public Task InitializeAsync( - CancellationToken ct) - { - try - { - var currentConnection = connection.Value; - - if (!currentConnection.IsOpen) - { - var error = new ConfigurationError($"RabbitMq event bus failed to connect to {connectionFactory.Endpoint}."); - - throw new ConfigurationException(error); - } - - return Task.CompletedTask; - } - catch (Exception ex) - { - var error = new ConfigurationError($"RabbitMq event bus failed to connect to {connectionFactory.Endpoint}."); - - throw new ConfigurationException(error, ex); - } - } - - public Task On(Envelope @event) - { - if (@event.Headers.Restored()) - { - return Task.CompletedTask; - } - - var jsonString = serializer.Serialize(@event); - var jsonBytes = Encoding.UTF8.GetBytes(jsonString); - - channel.Value.BasicPublish(exchange, string.Empty, null, jsonBytes); - - return Task.CompletedTask; - } -} diff --git a/backend/src/Squidex.Infrastructure.RabbitMq/Squidex.Infrastructure.RabbitMq.csproj b/backend/src/Squidex.Infrastructure.RabbitMq/Squidex.Infrastructure.RabbitMq.csproj deleted file mode 100644 index e227c1cba..000000000 --- a/backend/src/Squidex.Infrastructure.RabbitMq/Squidex.Infrastructure.RabbitMq.csproj +++ /dev/null @@ -1,29 +0,0 @@ - - - net7.0 - Squidex.Infrastructure - 10.0 - enable - enable - - - full - True - - - - all - runtime; build; native; contentfiles; analyzers; buildtransitive - - - - - - - - - - - - - \ No newline at end of file diff --git a/backend/src/Squidex.Infrastructure.Redis/RedisPubSub.cs b/backend/src/Squidex.Infrastructure.Redis/RedisPubSub.cs deleted file mode 100644 index 33286c134..000000000 --- a/backend/src/Squidex.Infrastructure.Redis/RedisPubSub.cs +++ /dev/null @@ -1,68 +0,0 @@ -// ========================================================================== -// Squidex Headless CMS -// ========================================================================== -// Copyright (c) Squidex UG (haftungsbeschränkt) -// All rights reserved. Licensed under the MIT license. -// ========================================================================== - -using System; -using System.Collections.Concurrent; -using System.Threading; -using System.Threading.Tasks; -using Squidex.Infrastructure.Json; -using Squidex.Infrastructure.Log; -using Squidex.Infrastructure.Tasks; -using StackExchange.Redis; - -namespace Squidex.Infrastructure -{ - public sealed class RedisPubSub : IPubSub, IInitializable - { - private readonly ConcurrentDictionary subscriptions = new ConcurrentDictionary(); - private readonly IConnectionMultiplexer redis; - private readonly IJsonSerializer serializer; - private readonly ISemanticLog log; - private ISubscriber redisSubscriber; - - public RedisPubSub(IConnectionMultiplexer redis, IJsonSerializer serializer, ISemanticLog log) - { - this.log = log; - this.redis = redis; - this.serializer = serializer; - } - - public Task InitializeAsync( - CancellationToken ct = default) - { - try - { - redisSubscriber = redis.GetSubscriber(); - - redis.GetStatus(); - - return TaskHelper.Done; - } - catch (Exception ex) - { - throw new ConfigurationException($"Redis connection failed to connect to database {redis.Configuration}", ex); - } - } - - public void Publish(T value, bool notifySelf) - { - GetSubscriber().Publish(value, notifySelf); - } - - public IDisposable Subscribe(Action handler) - { - return GetSubscriber().Subscribe(handler); - } - - private RedisSubscription GetSubscriber() - { - var typeName = typeof(T).FullName; - - return (RedisSubscription)subscriptions.GetOrAdd(typeName, this, (k, c) => new RedisSubscription(c.redisSubscriber, serializer, k, c.log)); - } - } -} diff --git a/backend/src/Squidex.Infrastructure.Redis/RedisSubscription.cs b/backend/src/Squidex.Infrastructure.Redis/RedisSubscription.cs deleted file mode 100644 index 98a15f343..000000000 --- a/backend/src/Squidex.Infrastructure.Redis/RedisSubscription.cs +++ /dev/null @@ -1,99 +0,0 @@ -// ========================================================================== -// Squidex Headless CMS -// ========================================================================== -// Copyright (c) Squidex UG (haftungsbeschränkt) -// All rights reserved. Licensed under the MIT license. -// ========================================================================== - -using System; -using System.Reactive.Subjects; -using Squidex.Infrastructure.Json; -using Squidex.Infrastructure.Log; -using StackExchange.Redis; - -#pragma warning disable SA1401 // Fields must be private - -namespace Squidex.Infrastructure -{ - internal sealed class RedisSubscription - { - private readonly Guid selfId = Guid.NewGuid(); - private readonly Subject subject = new Subject(); - private readonly ISubscriber subscriber; - private readonly IJsonSerializer serializer; - private readonly ISemanticLog log; - private readonly string channelName; - - private sealed class Envelope - { - public T Payload; - - public Guid Sender; - } - - public RedisSubscription(ISubscriber subscriber, IJsonSerializer serializer, string channelName, ISemanticLog log) - { - this.log = log; - - this.serializer = serializer; - this.subscriber = subscriber; - this.subscriber.Subscribe(channelName, (channel, value) => HandleMessage(value)); - - this.channelName = channelName; - } - - public void Publish(object value, bool notifySelf) - { - try - { - var senderId = notifySelf ? Guid.Empty : selfId; - - var envelope = serializer.Serialize(new Envelope { Sender = senderId, Payload = (T)value }); - - subscriber.Publish(channelName, envelope); - } - catch (Exception ex) - { - log.LogError(ex, channelName, (logChannel, w) => w - .WriteProperty("action", "PublishRedisMessage") - .WriteProperty("status", "Failed") - .WriteProperty("channel", logChannel)); - } - } - - private void HandleMessage(string value) - { - try - { - if (string.IsNullOrWhiteSpace(value)) - { - return; - } - - var envelope = serializer.Deserialize(value); - - if (envelope.Sender != selfId) - { - subject.OnNext(envelope.Payload); - - log.LogDebug(channelName, (logChannel, w) => w - .WriteProperty("action", "ReceiveRedisMessage") - .WriteProperty("channel", logChannel) - .WriteProperty("status", "Received")); - } - } - catch (Exception ex) - { - log.LogError(ex, channelName, (logChannel, w) => w - .WriteProperty("action", "ReceiveRedisMessage") - .WriteProperty("channel", logChannel) - .WriteProperty("status", "Failed")); - } - } - - public IDisposable Subscribe(Action handler) - { - return subject.Subscribe(handler); - } - } -} diff --git a/backend/src/Squidex.Infrastructure.Redis/Squidex.Infrastructure.Redis.csproj b/backend/src/Squidex.Infrastructure.Redis/Squidex.Infrastructure.Redis.csproj deleted file mode 100644 index ec30c5e2b..000000000 --- a/backend/src/Squidex.Infrastructure.Redis/Squidex.Infrastructure.Redis.csproj +++ /dev/null @@ -1,24 +0,0 @@ - - - net7.0 - Squidex.Infrastructure - 10.0 - enable - - - full - True - - - - - - - - - - - - - - diff --git a/backend/src/Squidex.Infrastructure/EventSourcing/Consume/EventConsumersHealthCheck.cs b/backend/src/Squidex.Infrastructure/EventSourcing/Consume/EventConsumersHealthCheck.cs new file mode 100644 index 000000000..08018f7c5 --- /dev/null +++ b/backend/src/Squidex.Infrastructure/EventSourcing/Consume/EventConsumersHealthCheck.cs @@ -0,0 +1,64 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschraenkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +using Microsoft.Extensions.Diagnostics.HealthChecks; + +namespace Squidex.Infrastructure.EventSourcing.Consume; + +public sealed class EventConsumersHealthCheck : IHealthCheck +{ + private readonly IEventConsumerManager eventConsumerManager; + + public EventConsumersHealthCheck(IEventConsumerManager eventConsumerManager) + { + this.eventConsumerManager = eventConsumerManager; + } + + public async Task CheckHealthAsync(HealthCheckContext context, + CancellationToken cancellationToken = default) + { + var eventConsumers = await eventConsumerManager.GetConsumersAsync(cancellationToken); + + var data = new Dictionary(); + + var numTotal = 0; + var numFailed = 0; + + foreach (var eventConsumer in eventConsumers) + { + var status = "Running"; + + if (eventConsumer.Error != null) + { + status = "Failed"; + + numFailed++; + } + else if (eventConsumer.IsStopped) + { + status = "Stopped"; + } + + data[eventConsumer.Name] = status; + + numTotal++; + } + + if (numTotal > 0 && numFailed == numTotal) + { + return HealthCheckResult.Unhealthy("All event consumers failed", null, data); + } + else if (numFailed > 0) + { + return HealthCheckResult.Degraded("One or more event consumers failed", null, data); + } + else + { + return HealthCheckResult.Healthy(data: data); + } + } +} diff --git a/backend/src/Squidex/Config/Domain/EventPublishersServices.cs b/backend/src/Squidex/Config/Domain/EventPublishersServices.cs deleted file mode 100644 index 95a85aa1b..000000000 --- a/backend/src/Squidex/Config/Domain/EventPublishersServices.cs +++ /dev/null @@ -1,72 +0,0 @@ -// ========================================================================== -// Squidex Headless CMS -// ========================================================================== -// Copyright (c) Squidex UG (haftungsbeschraenkt) -// All rights reserved. Licensed under the MIT license. -// ========================================================================== - -using Squidex.Hosting.Configuration; -using Squidex.Infrastructure.CQRS.Events; -using Squidex.Infrastructure.EventSourcing; -using Squidex.Infrastructure.Json; - -namespace Squidex.Config.Domain; - -public static class EventPublishersServices -{ - public static void AddSquidexEventPublisher(this IServiceCollection services, IConfiguration config) - { - var eventPublishers = config.GetSection("eventPublishers"); - - foreach (var child in eventPublishers.GetChildren()) - { - var eventPublisherType = child.GetValue("type"); - - if (string.IsNullOrWhiteSpace(eventPublisherType)) - { - var error = new ConfigurationError("Value is required.", "eventPublishers:{child.Key}:type"); - - throw new ConfigurationException(error); - } - - var eventsFilter = child.GetValue("eventsFilter") ?? string.Empty; - - var enabled = child.GetValue("enabled"); - - if (string.Equals(eventPublisherType, "RabbitMq", StringComparison.OrdinalIgnoreCase)) - { - var publisherConfig = child.GetValue("configuration"); - - if (string.IsNullOrWhiteSpace(publisherConfig)) - { - var error = new ConfigurationError("Value is required.", "eventPublishers:{child.Key}:configuration"); - - throw new ConfigurationException(error); - } - - var exchange = child.GetValue("exchange"); - - if (string.IsNullOrWhiteSpace(exchange)) - { - var error = new ConfigurationError("Value is required.", "eventPublishers:{child.Key}:exchange"); - - throw new ConfigurationException(error); - } - - var name = $"EventPublishers_{child.Key}"; - - if (enabled) - { - services.AddSingletonAs(c => new RabbitMqEventConsumer(c.GetRequiredService(), name, publisherConfig, exchange, eventsFilter)) - .As(); - } - } - else - { - var error = new ConfigurationError($"Unsupported value '{child.Key}", "eventPublishers:{child.Key}:type."); - - throw new ConfigurationException(error); - } - } - } -} diff --git a/backend/src/Squidex/Config/Domain/HealthCheckServices.cs b/backend/src/Squidex/Config/Domain/HealthCheckServices.cs index df7b9765b..9e498d5ab 100644 --- a/backend/src/Squidex/Config/Domain/HealthCheckServices.cs +++ b/backend/src/Squidex/Config/Domain/HealthCheckServices.cs @@ -6,6 +6,7 @@ // ========================================================================== using Squidex.Infrastructure.Diagnostics; +using Squidex.Infrastructure.EventSourcing.Consume; namespace Squidex.Config.Domain; @@ -17,6 +18,7 @@ public static class HealthCheckServices "diagnostics:gc"); services.AddHealthChecks() - .AddCheck("GC", tags: new[] { "node" }); + .AddCheck("GC", tags: new[] { "node" }) + .AddCheck("EventConsumers", tags: new[] { "background" }); } } diff --git a/backend/src/Squidex/Config/Web/WebExtensions.cs b/backend/src/Squidex/Config/Web/WebExtensions.cs index fa768dd02..a2e85f28b 100644 --- a/backend/src/Squidex/Config/Web/WebExtensions.cs +++ b/backend/src/Squidex/Config/Web/WebExtensions.cs @@ -108,12 +108,6 @@ public static class WebExtensions ResponseWriter = writer }); - app.UseHealthChecks("/cluster-healthz", new HealthCheckOptions - { - Predicate = check => check.Tags.Contains("cluster"), - ResponseWriter = writer - }); - app.UseHealthChecks("/background-healthz", new HealthCheckOptions { Predicate = check => check.Tags.Contains("background"), diff --git a/backend/src/Squidex/Squidex.csproj b/backend/src/Squidex/Squidex.csproj index 5790ab8e4..f7c6b6104 100644 --- a/backend/src/Squidex/Squidex.csproj +++ b/backend/src/Squidex/Squidex.csproj @@ -27,7 +27,6 @@ - diff --git a/backend/src/Squidex/Startup.cs b/backend/src/Squidex/Startup.cs index 0caaaa53d..49aec6494 100644 --- a/backend/src/Squidex/Startup.cs +++ b/backend/src/Squidex/Startup.cs @@ -49,7 +49,6 @@ public sealed class Startup services.AddSquidexComments(); services.AddSquidexContents(config); services.AddSquidexControllerServices(config); - services.AddSquidexEventPublisher(config); services.AddSquidexEventSourcing(config); services.AddSquidexGraphQL(); services.AddSquidexHealthChecks(config); diff --git a/backend/src/Squidex/appsettings.json b/backend/src/Squidex/appsettings.json index ee879b644..63056b7c8 100644 --- a/backend/src/Squidex/appsettings.json +++ b/backend/src/Squidex/appsettings.json @@ -499,18 +499,6 @@ } }, - "eventPublishers": { - // Additional event publishers (advanced usage only): (Name => Config) - "allToRabbitMq": { - // Example:: Push all events to RabbitMq. - "type": "RabbitMq", - "configuration": "amqp://guest:guest@localhost/", - "exchange": "squidex", - "enabled": false, - "eventsFilter": ".*" - } - }, - "store": { // Define the type of the read store. // diff --git a/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/Consume/EventConsumersHealthCheckTests.cs b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/Consume/EventConsumersHealthCheckTests.cs new file mode 100644 index 000000000..f534702d1 --- /dev/null +++ b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/Consume/EventConsumersHealthCheckTests.cs @@ -0,0 +1,112 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschraenkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +using Microsoft.Extensions.Diagnostics.HealthChecks; + +namespace Squidex.Infrastructure.EventSourcing.Consume; + +public class EventConsumersHealthCheckTests +{ + private readonly IEventConsumerManager eventConsumerManager = A.Fake(); + private readonly List consumers = new List(); + private readonly CancellationTokenSource cts = new CancellationTokenSource(); + private readonly CancellationToken ct; + private readonly EventConsumersHealthCheck sut; + + public EventConsumersHealthCheckTests() + { + ct = cts.Token; + + A.CallTo(() => eventConsumerManager.GetConsumersAsync(ct)) + .Returns(consumers); + + sut = new EventConsumersHealthCheck(eventConsumerManager); + } + + [Fact] + public async Task Should_return_healthy_if_no_consumer_found() + { + var status = await sut.CheckHealthAsync(null!, ct); + + Assert.Equal(HealthStatus.Healthy, status.Status); + } + + [Fact] + public async Task Should_return_healthy_if_no_consumer_failed() + { + consumers.Add(new EventConsumerInfo + { + Name = "Consumer1" + }); + + consumers.Add(new EventConsumerInfo + { + Name = "Consumer2" + }); + + consumers.Add(new EventConsumerInfo + { + Name = "Consumer2" + }); + + var status = await sut.CheckHealthAsync(null!, ct); + + Assert.Equal(HealthStatus.Healthy, status.Status); + } + + [Fact] + public async Task Should_return_unhealthy_if_all_consumers_failed() + { + consumers.Add(new EventConsumerInfo + { + Name = "Consumer1", + Error = "Failed1" + }); + + consumers.Add(new EventConsumerInfo + { + Name = "Consumer2", + Error = "Failed2" + }); + + consumers.Add(new EventConsumerInfo + { + Name = "Consumer3", + Error = "Failed3" + }); + + var status = await sut.CheckHealthAsync(null!, ct); + + Assert.Equal(HealthStatus.Unhealthy, status.Status); + } + + [Fact] + public async Task Should_return_degrated_if_at_least_one_consumers_failed() + { + consumers.Add(new EventConsumerInfo + { + Name = "Consumer1", + Error = "Failed1" + }); + + consumers.Add(new EventConsumerInfo + { + Name = "Consumer2", + IsStopped = true + }); + + consumers.Add(new EventConsumerInfo + { + Name = "Consumer3", + IsStopped = false + }); + + var status = await sut.CheckHealthAsync(null!, ct); + + Assert.Equal(HealthStatus.Degraded, status.Status); + } +}