From 4a724493b99ce515178a5fa192b35cf09301c22f Mon Sep 17 00:00:00 2001 From: maliming Date: Tue, 9 Aug 2022 15:28:43 +0800 Subject: [PATCH] Implement the Dapr event bus. --- .../Volo.Abp.AspNetCore.Dapr.csproj | 8 +- .../Dapr/AbpAspNetCoreDaprConsts.cs | 8 + .../Dapr/AbpAspNetCoreDaprModule.cs | 26 ++- .../Dapr/AbpAspNetCoreDaprOptions.cs | 11 + .../AspNetCore/Dapr/AbpDaprPubSubProvider.cs | 63 +++++ ...AbpDaprPubSubProviderContributorContext.cs | 16 ++ .../Dapr/Controllers/DaprController.cs | 44 ++++ .../Dapr/IAbpDaprPubSubProviderContributor.cs | 6 + .../Dapr/Models/DaprSubscriptionDefinition.cs | 10 + .../Dapr/Models/DaprSubscriptionRequest.cs | 8 + .../AbpAspNetCoreDaprJsonNamingPolicy.cs | 11 + .../DaprSubscriptionDefinitionConverter.cs | 25 ++ .../Volo/Abp/Dapr/AbpDaprClientFactory.cs | 6 +- .../Volo/Abp/Dapr/IDaprSerializer.cs | 16 ++ .../Volo/Abp/Dapr/Utf8JsonDaprSerializer.cs | 45 ++++ .../EventBus/Dapr/AbpDaprEventBusOptions.cs | 11 + .../EventBus/Dapr/AbpEventBusDaprModule.cs | 12 +- .../EventBus/Dapr/DaprDistributedEventBus.cs | 221 ++++++++++++++++++ 18 files changed, 539 insertions(+), 8 deletions(-) create mode 100644 framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/AbpAspNetCoreDaprConsts.cs create mode 100644 framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/AbpAspNetCoreDaprOptions.cs create mode 100644 framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/AbpDaprPubSubProvider.cs create mode 100644 framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/AbpDaprPubSubProviderContributorContext.cs create mode 100644 framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/Controllers/DaprController.cs create mode 100644 framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/IAbpDaprPubSubProviderContributor.cs create mode 100644 framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/Models/DaprSubscriptionDefinition.cs create mode 100644 framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/Models/DaprSubscriptionRequest.cs create mode 100644 framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/SystemTextJson/AbpAspNetCoreDaprJsonNamingPolicy.cs create mode 100644 framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/SystemTextJson/DaprSubscriptionDefinitionConverter.cs create mode 100644 framework/src/Volo.Abp.Dapr/Volo/Abp/Dapr/IDaprSerializer.cs create mode 100644 framework/src/Volo.Abp.Dapr/Volo/Abp/Dapr/Utf8JsonDaprSerializer.cs create mode 100644 framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/AbpDaprEventBusOptions.cs create mode 100644 framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/DaprDistributedEventBus.cs diff --git a/framework/src/Volo.Abp.AspNetCore.Dapr/Volo.Abp.AspNetCore.Dapr.csproj b/framework/src/Volo.Abp.AspNetCore.Dapr/Volo.Abp.AspNetCore.Dapr.csproj index 43bfc66df8..747593eb3f 100644 --- a/framework/src/Volo.Abp.AspNetCore.Dapr/Volo.Abp.AspNetCore.Dapr.csproj +++ b/framework/src/Volo.Abp.AspNetCore.Dapr/Volo.Abp.AspNetCore.Dapr.csproj @@ -7,11 +7,13 @@ net6.0 enable enable - + + + @@ -19,4 +21,8 @@ + + + + diff --git a/framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/AbpAspNetCoreDaprConsts.cs b/framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/AbpAspNetCoreDaprConsts.cs new file mode 100644 index 0000000000..7011d25476 --- /dev/null +++ b/framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/AbpAspNetCoreDaprConsts.cs @@ -0,0 +1,8 @@ +namespace Volo.Abp.AspNetCore.Dapr; + +public class AbpAspNetCoreDaprConsts +{ + public const string DaprSubscribeUrl = "dapr/subscribe"; + + public const string DaprEventCallbackUrl = "api/abp/dapr/event"; +} diff --git a/framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/AbpAspNetCoreDaprModule.cs b/framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/AbpAspNetCoreDaprModule.cs index 3f84054b5e..52e5820c4c 100644 --- a/framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/AbpAspNetCoreDaprModule.cs +++ b/framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/AbpAspNetCoreDaprModule.cs @@ -1,13 +1,31 @@ -using Volo.Abp.Dapr; +using Microsoft.AspNetCore.Http.Json; +using Volo.Abp.AspNetCore.Dapr.SystemTextJson; +using Volo.Abp.EventBus.Dapr; +using Volo.Abp.Json; +using Volo.Abp.Json.SystemTextJson; using Volo.Abp.Modularity; namespace Volo.Abp.AspNetCore.Dapr; [DependsOn( typeof(AbpAspNetCoreModule), - typeof(AbpDaprModule) + typeof(AbpEventBusDaprModule), + typeof(AbpJsonModule) )] public class AbpAspNetCoreDaprModule : AbpModule { - -} \ No newline at end of file + public override void ConfigureServices(ServiceConfigurationContext context) + { + // TODO: Add NewtonsoftJson json converter. + + Configure(options => + { + options.SerializerOptions.Converters.Add(new DaprSubscriptionDefinitionConverter()); + }); + + Configure(options => + { + options.JsonSerializerOptions.Converters.Add(new DaprSubscriptionDefinitionConverter()); + }); + } +} diff --git a/framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/AbpAspNetCoreDaprOptions.cs b/framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/AbpAspNetCoreDaprOptions.cs new file mode 100644 index 0000000000..17667d24b9 --- /dev/null +++ b/framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/AbpAspNetCoreDaprOptions.cs @@ -0,0 +1,11 @@ +namespace Volo.Abp.AspNetCore.Dapr; + +public class AbpAspNetCoreDaprOptions +{ + public List Contributors { get; } + + public AbpAspNetCoreDaprOptions() + { + Contributors = new List(); + } +} diff --git a/framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/AbpDaprPubSubProvider.cs b/framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/AbpDaprPubSubProvider.cs new file mode 100644 index 0000000000..27eaa294d5 --- /dev/null +++ b/framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/AbpDaprPubSubProvider.cs @@ -0,0 +1,63 @@ +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Options; +using Volo.Abp.AspNetCore.Dapr.Models; +using Volo.Abp.DependencyInjection; +using Volo.Abp.EventBus; +using Volo.Abp.EventBus.Dapr; +using Volo.Abp.EventBus.Distributed; + +namespace Volo.Abp.AspNetCore.Dapr; + +public class AbpDaprPubSubProvider : ITransientDependency +{ + protected IServiceProvider ServiceProvider { get; } + protected AbpAspNetCoreDaprOptions Options { get; } + protected AbpDaprEventBusOptions EventBusOptions { get; } + protected AbpDistributedEventBusOptions DistributedEventBusOptions { get; } + + public AbpDaprPubSubProvider( + IServiceProvider serviceProvider, + IOptions options, + IOptions eventBusOptions, + IOptions distributedEventBusOptions) + { + ServiceProvider = serviceProvider; + EventBusOptions = eventBusOptions.Value; + Options = options.Value; + DistributedEventBusOptions = distributedEventBusOptions.Value; + } + + public virtual async Task> GetSubscriptionsAsync() + { + var subscriptions = new List(); + foreach (var handler in DistributedEventBusOptions.Handlers) + { + foreach (var @interface in handler.GetInterfaces().Where(x => x.IsGenericType && x.GetGenericTypeDefinition() == typeof(IDistributedEventHandler<>))) + { + var eventType = @interface.GetGenericArguments()[0]; + var eventName = EventNameAttribute.GetNameOrDefault(eventType); + + subscriptions.Add(new DaprSubscriptionDefinition() + { + PubSubName = EventBusOptions.PubSubName, + Topic = eventName, + Route = AbpAspNetCoreDaprConsts.DaprEventCallbackUrl + }); + } + } + + if (Options.Contributors.Any()) + { + using (var scope = ServiceProvider.CreateScope()) + { + var context = new AbpDaprPubSubProviderContributorContext(scope.ServiceProvider, subscriptions); + foreach (var contributor in Options.Contributors) + { + await contributor.ContributeAsync(context); + } + } + } + + return subscriptions; + } +} diff --git a/framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/AbpDaprPubSubProviderContributorContext.cs b/framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/AbpDaprPubSubProviderContributorContext.cs new file mode 100644 index 0000000000..e2428e992a --- /dev/null +++ b/framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/AbpDaprPubSubProviderContributorContext.cs @@ -0,0 +1,16 @@ +using Volo.Abp.AspNetCore.Dapr.Models; + +namespace Volo.Abp.AspNetCore.Dapr; + +public class AbpDaprPubSubProviderContributorContext +{ + public IServiceProvider ServiceProvider { get; } + + public List Subscriptions { get; } + + public AbpDaprPubSubProviderContributorContext(IServiceProvider serviceProvider, List daprSubscriptionModels) + { + ServiceProvider = serviceProvider; + Subscriptions = daprSubscriptionModels; + } +} diff --git a/framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/Controllers/DaprController.cs b/framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/Controllers/DaprController.cs new file mode 100644 index 0000000000..b28fbf6e7a --- /dev/null +++ b/framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/Controllers/DaprController.cs @@ -0,0 +1,44 @@ +using System.Text.Json; +using Microsoft.AspNetCore.Mvc; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Options; +using Volo.Abp.AspNetCore.Dapr.Models; +using Volo.Abp.AspNetCore.Mvc; +using Volo.Abp.Dapr; +using Volo.Abp.EventBus.Dapr; + +namespace Volo.Abp.AspNetCore.Dapr.Controllers; + +[Area("abp")] +[RemoteService(Name = "abp")] +public class DaprController : AbpController +{ + protected AbpDaprPubSubProvider DaprPubSubProvider { get; } + + public DaprController(AbpDaprPubSubProvider daprPubSubProvider) + { + DaprPubSubProvider = daprPubSubProvider; + } + + [HttpGet(AbpAspNetCoreDaprConsts.DaprSubscribeUrl)] + public virtual async Task> SubscribeAsync() + { + return await DaprPubSubProvider.GetSubscriptionsAsync(); + } + + [HttpPost(AbpAspNetCoreDaprConsts.DaprEventCallbackUrl)] + public virtual async Task EventsAsync() + { + var bodyJsonDocument = await JsonDocument.ParseAsync(HttpContext.Request.Body); + var request = JsonSerializer.Deserialize(bodyJsonDocument.RootElement.GetRawText(), + HttpContext.RequestServices.GetRequiredService>().Value.JsonSerializerOptions); + + var distributedEventBus = HttpContext.RequestServices.GetRequiredService(); + var daprSerializer = HttpContext.RequestServices.GetRequiredService(); + + var eventData = daprSerializer.Deserialize(bodyJsonDocument.RootElement.GetProperty("data").GetRawText(), distributedEventBus.GetEventType(request.Topic)); + await distributedEventBus.TriggerHandlersAsync(distributedEventBus.GetEventType(request.Topic), eventData); + + return Ok(); + } +} diff --git a/framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/IAbpDaprPubSubProviderContributor.cs b/framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/IAbpDaprPubSubProviderContributor.cs new file mode 100644 index 0000000000..1a23483f9d --- /dev/null +++ b/framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/IAbpDaprPubSubProviderContributor.cs @@ -0,0 +1,6 @@ +namespace Volo.Abp.AspNetCore.Dapr; + +public interface IAbpDaprPubSubProviderContributor +{ + Task ContributeAsync(AbpDaprPubSubProviderContributorContext context); +} diff --git a/framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/Models/DaprSubscriptionDefinition.cs b/framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/Models/DaprSubscriptionDefinition.cs new file mode 100644 index 0000000000..6fd8077250 --- /dev/null +++ b/framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/Models/DaprSubscriptionDefinition.cs @@ -0,0 +1,10 @@ +namespace Volo.Abp.AspNetCore.Dapr.Models; + +public class DaprSubscriptionDefinition +{ + public string PubSubName { get; set; } + + public string Topic { get; set; } + + public string Route { get; set; } +} diff --git a/framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/Models/DaprSubscriptionRequest.cs b/framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/Models/DaprSubscriptionRequest.cs new file mode 100644 index 0000000000..9f7f4dfd4f --- /dev/null +++ b/framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/Models/DaprSubscriptionRequest.cs @@ -0,0 +1,8 @@ +namespace Volo.Abp.AspNetCore.Dapr.Models; + +public class DaprSubscriptionRequest +{ + public string PubSubName { get; set; } + + public string Topic { get; set; } +} diff --git a/framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/SystemTextJson/AbpAspNetCoreDaprJsonNamingPolicy.cs b/framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/SystemTextJson/AbpAspNetCoreDaprJsonNamingPolicy.cs new file mode 100644 index 0000000000..e682d283e9 --- /dev/null +++ b/framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/SystemTextJson/AbpAspNetCoreDaprJsonNamingPolicy.cs @@ -0,0 +1,11 @@ +using System.Text.Json; + +namespace Volo.Abp.AspNetCore.Dapr.SystemTextJson; + +public class AbpAspNetCoreDaprJsonNamingPolicy : JsonNamingPolicy +{ + public override string ConvertName(string name) + { + return name.ToLower(); + } +} diff --git a/framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/SystemTextJson/DaprSubscriptionDefinitionConverter.cs b/framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/SystemTextJson/DaprSubscriptionDefinitionConverter.cs new file mode 100644 index 0000000000..e2168da993 --- /dev/null +++ b/framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/SystemTextJson/DaprSubscriptionDefinitionConverter.cs @@ -0,0 +1,25 @@ +using System.Text.Json; +using System.Text.Json.Serialization; +using Volo.Abp.AspNetCore.Dapr.Models; + +namespace Volo.Abp.AspNetCore.Dapr.SystemTextJson; + +public class DaprSubscriptionDefinitionConverter : JsonConverter +{ + private JsonSerializerOptions _writeJsonSerializerOptions; + + public override DaprSubscriptionDefinition Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + { + throw new NotSupportedException(); + } + + public override void Write(Utf8JsonWriter writer, DaprSubscriptionDefinition value, JsonSerializerOptions options) + { + _writeJsonSerializerOptions ??= JsonSerializerOptionsHelper.Create(new JsonSerializerOptions(options) + { + PropertyNamingPolicy = new AbpAspNetCoreDaprJsonNamingPolicy() + }, x => x == this); + + JsonSerializer.Serialize(writer, value, _writeJsonSerializerOptions); + } +} diff --git a/framework/src/Volo.Abp.Dapr/Volo/Abp/Dapr/AbpDaprClientFactory.cs b/framework/src/Volo.Abp.Dapr/Volo/Abp/Dapr/AbpDaprClientFactory.cs index 1cd37ceb94..faaf450ddc 100644 --- a/framework/src/Volo.Abp.Dapr/Volo/Abp/Dapr/AbpDaprClientFactory.cs +++ b/framework/src/Volo.Abp.Dapr/Volo/Abp/Dapr/AbpDaprClientFactory.cs @@ -23,9 +23,13 @@ public class AbpDaprClientFactory : ITransientDependency public virtual async Task CreateAsync() { var builder = new DaprClientBuilder() - .UseHttpEndpoint(Options.HttpEndpoint) .UseJsonSerializationOptions(await CreateJsonSerializerOptions()); + if (!Options.HttpEndpoint.IsNullOrWhiteSpace()) + { + builder.UseHttpEndpoint(Options.HttpEndpoint); + } + if (!Options.GrpcEndpoint.IsNullOrWhiteSpace()) { builder.UseGrpcEndpoint(Options.GrpcEndpoint); diff --git a/framework/src/Volo.Abp.Dapr/Volo/Abp/Dapr/IDaprSerializer.cs b/framework/src/Volo.Abp.Dapr/Volo/Abp/Dapr/IDaprSerializer.cs new file mode 100644 index 0000000000..7eec2c5c1c --- /dev/null +++ b/framework/src/Volo.Abp.Dapr/Volo/Abp/Dapr/IDaprSerializer.cs @@ -0,0 +1,16 @@ +namespace Volo.Abp.Dapr; + +public interface IDaprSerializer +{ + byte[] Serialize(object obj); + + object Deserialize(byte[] value, Type type); + + T Deserialize(byte[] value); + + string SerializeToString(object obj); + + object Deserialize(string value, Type type); + + T Deserialize(string value); +} diff --git a/framework/src/Volo.Abp.Dapr/Volo/Abp/Dapr/Utf8JsonDaprSerializer.cs b/framework/src/Volo.Abp.Dapr/Volo/Abp/Dapr/Utf8JsonDaprSerializer.cs new file mode 100644 index 0000000000..ce9b4a8523 --- /dev/null +++ b/framework/src/Volo.Abp.Dapr/Volo/Abp/Dapr/Utf8JsonDaprSerializer.cs @@ -0,0 +1,45 @@ +using System.Text; +using Volo.Abp.DependencyInjection; +using Volo.Abp.Json; + +namespace Volo.Abp.Dapr; + +public class Utf8JsonDaprSerializer : IDaprSerializer, ITransientDependency +{ + private readonly IJsonSerializer _jsonSerializer; + + public Utf8JsonDaprSerializer(IJsonSerializer jsonSerializer) + { + _jsonSerializer = jsonSerializer; + } + + public byte[] Serialize(object obj) + { + return Encoding.UTF8.GetBytes(_jsonSerializer.Serialize(obj)); + } + + public object Deserialize(byte[] value, Type type) + { + return _jsonSerializer.Deserialize(type, Encoding.UTF8.GetString(value)); + } + + public T Deserialize(byte[] value) + { + return _jsonSerializer.Deserialize(Encoding.UTF8.GetString(value)); + } + + public string SerializeToString(object obj) + { + return _jsonSerializer.Serialize(obj); + } + + public object Deserialize(string value, Type type) + { + return _jsonSerializer.Deserialize(type, value); + } + + public T Deserialize(string value) + { + return _jsonSerializer.Deserialize(value); + } +} diff --git a/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/AbpDaprEventBusOptions.cs b/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/AbpDaprEventBusOptions.cs new file mode 100644 index 0000000000..d3ced52ba2 --- /dev/null +++ b/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/AbpDaprEventBusOptions.cs @@ -0,0 +1,11 @@ +namespace Volo.Abp.EventBus.Dapr; + +public class AbpDaprEventBusOptions +{ + public string PubSubName { get; set; } + + public AbpDaprEventBusOptions() + { + PubSubName = "pubsub"; + } +} diff --git a/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/AbpEventBusDaprModule.cs b/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/AbpEventBusDaprModule.cs index 8d054c6bd7..5f6329a4bc 100644 --- a/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/AbpEventBusDaprModule.cs +++ b/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/AbpEventBusDaprModule.cs @@ -1,4 +1,5 @@ -using Volo.Abp.Dapr; +using Microsoft.Extensions.DependencyInjection; +using Volo.Abp.Dapr; using Volo.Abp.Modularity; namespace Volo.Abp.EventBus.Dapr; @@ -9,4 +10,11 @@ namespace Volo.Abp.EventBus.Dapr; )] public class AbpEventBusDaprModule : AbpModule { -} \ No newline at end of file + public override void OnApplicationInitialization(ApplicationInitializationContext context) + { + context + .ServiceProvider + .GetRequiredService() + .Initialize(); + } +} diff --git a/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/DaprDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/DaprDistributedEventBus.cs new file mode 100644 index 0000000000..1f241b9c6d --- /dev/null +++ b/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/DaprDistributedEventBus.cs @@ -0,0 +1,221 @@ +using System.Collections.Concurrent; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Options; +using Volo.Abp.Dapr; +using Volo.Abp.DependencyInjection; +using Volo.Abp.EventBus.Distributed; +using Volo.Abp.Guids; +using Volo.Abp.MultiTenancy; +using Volo.Abp.Threading; +using Volo.Abp.Timing; +using Volo.Abp.Uow; + +namespace Volo.Abp.EventBus.Dapr; + +[Dependency(ReplaceServices = true)] +[ExposeServices(typeof(IDistributedEventBus), typeof(DaprDistributedEventBus))] +public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDependency +{ + protected IDaprSerializer Serializer { get; } + protected AbpDaprEventBusOptions DaprEventBusOptions { get; } + protected AbpDaprClientFactory DaprClientFactory { get; } + + protected ConcurrentDictionary> HandlerFactories { get; } + protected ConcurrentDictionary EventTypes { get; } + + public DaprDistributedEventBus( + IServiceScopeFactory serviceScopeFactory, + ICurrentTenant currentTenant, + IUnitOfWorkManager unitOfWorkManager, + IOptions abpDistributedEventBusOptions, + IGuidGenerator guidGenerator, + IClock clock, + IEventHandlerInvoker eventHandlerInvoker, + IDaprSerializer serializer, + IOptions daprEventBusOptions, + AbpDaprClientFactory daprClientFactory) + : base(serviceScopeFactory, currentTenant, unitOfWorkManager, abpDistributedEventBusOptions, guidGenerator, clock, eventHandlerInvoker) + { + Serializer = serializer; + DaprEventBusOptions = daprEventBusOptions.Value; + DaprClientFactory = daprClientFactory; + + HandlerFactories = new ConcurrentDictionary>(); + EventTypes = new ConcurrentDictionary(); + } + + public void Initialize() + { + SubscribeHandlers(AbpDistributedEventBusOptions.Handlers); + } + + public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory) + { + var handlerFactories = GetOrCreateHandlerFactories(eventType); + + if (factory.IsInFactories(handlerFactories)) + { + return NullDisposable.Instance; + } + + handlerFactories.Add(factory); + + return new EventHandlerFactoryUnregistrar(this, eventType, factory); + } + + public override void Unsubscribe(Func action) + { + Check.NotNull(action, nameof(action)); + + GetOrCreateHandlerFactories(typeof(TEvent)) + .Locking(factories => + { + factories.RemoveAll( + factory => + { + var singleInstanceFactory = factory as SingleInstanceHandlerFactory; + if (singleInstanceFactory == null) + { + return false; + } + + var actionHandler = singleInstanceFactory.HandlerInstance as ActionEventHandler; + if (actionHandler == null) + { + return false; + } + + return actionHandler.Action == action; + }); + }); + } + + public override void Unsubscribe(Type eventType, IEventHandler handler) + { + GetOrCreateHandlerFactories(eventType) + .Locking(factories => + { + factories.RemoveAll( + factory => + factory is SingleInstanceHandlerFactory && + (factory as SingleInstanceHandlerFactory).HandlerInstance == handler + ); + }); + } + + public override void Unsubscribe(Type eventType, IEventHandlerFactory factory) + { + GetOrCreateHandlerFactories(eventType).Locking(factories => factories.Remove(factory)); + } + + public override void UnsubscribeAll(Type eventType) + { + GetOrCreateHandlerFactories(eventType).Locking(factories => factories.Clear()); + } + + protected async override Task PublishToEventBusAsync(Type eventType, object eventData) + { + await PublishToDaprAsync(eventType, eventData); + } + + protected override void AddToUnitOfWork(IUnitOfWork unitOfWork, UnitOfWorkEventRecord eventRecord) + { + unitOfWork.AddOrReplaceDistributedEvent(eventRecord); + } + + protected override IEnumerable GetHandlerFactories(Type eventType) + { + var handlerFactoryList = new List(); + + foreach (var handlerFactory in HandlerFactories.Where(hf => ShouldTriggerEventForHandler(eventType, hf.Key))) + { + handlerFactoryList.Add(new EventTypeWithEventHandlerFactories(handlerFactory.Key, handlerFactory.Value)); + } + + return handlerFactoryList.ToArray(); + } + + public async override Task PublishFromOutboxAsync(OutgoingEventInfo outgoingEvent, OutboxConfig outboxConfig) + { + await PublishToDaprAsync(outgoingEvent.EventName, Serializer.Deserialize(outgoingEvent.EventData, GetEventType(outgoingEvent.EventName))); + } + + public async override Task PublishManyFromOutboxAsync(IEnumerable outgoingEvents, OutboxConfig outboxConfig) + { + var outgoingEventArray = outgoingEvents.ToArray(); + + foreach (var outgoingEvent in outgoingEventArray) + { + await PublishToDaprAsync(outgoingEvent.EventName, Serializer.Deserialize(outgoingEvent.EventData, GetEventType(outgoingEvent.EventName))); + } + } + + public async override Task ProcessFromInboxAsync(IncomingEventInfo incomingEvent, InboxConfig inboxConfig) + { + var eventType = EventTypes.GetOrDefault(incomingEvent.EventName); + if (eventType == null) + { + return; + } + + var eventData = Serializer.Deserialize(incomingEvent.EventData, eventType); + var exceptions = new List(); + await TriggerHandlersAsync(eventType, eventData, exceptions, inboxConfig); + if (exceptions.Any()) + { + ThrowOriginalExceptions(eventType, exceptions); + } + } + + protected override byte[] Serialize(object eventData) + { + return Serializer.Serialize(eventData); + } + + private List GetOrCreateHandlerFactories(Type eventType) + { + return HandlerFactories.GetOrAdd( + eventType, + type => + { + var eventName = EventNameAttribute.GetNameOrDefault(type); + EventTypes[eventName] = type; + return new List(); + } + ); + } + + public Type GetEventType(string eventName) + { + return EventTypes.GetOrDefault(eventName); + } + + protected virtual async Task PublishToDaprAsync(Type eventType, object eventData) + { + await PublishToDaprAsync(EventNameAttribute.GetNameOrDefault(eventType), eventData); + } + + protected virtual async Task PublishToDaprAsync(string eventName, object eventData) + { + var client = await DaprClientFactory.CreateAsync(); + await client.PublishEventAsync(pubsubName: DaprEventBusOptions.PubSubName, topicName: eventName, data: eventData); + } + + private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType) + { + //Should trigger same type + if (handlerEventType == targetEventType) + { + return true; + } + + //TODO: Support inheritance? But it does not support on subscription to RabbitMq! + //Should trigger for inherited types + if (handlerEventType.IsAssignableFrom(targetEventType)) + { + return true; + } + + return false; + } +}