diff --git a/framework/src/Volo.Abp.AspNetCore.Dapr/Volo.Abp.AspNetCore.Dapr.csproj b/framework/src/Volo.Abp.AspNetCore.Dapr/Volo.Abp.AspNetCore.Dapr.csproj
index 43bfc66df8..747593eb3f 100644
--- a/framework/src/Volo.Abp.AspNetCore.Dapr/Volo.Abp.AspNetCore.Dapr.csproj
+++ b/framework/src/Volo.Abp.AspNetCore.Dapr/Volo.Abp.AspNetCore.Dapr.csproj
@@ -7,11 +7,13 @@
net6.0
enable
enable
-
+
+
+
@@ -19,4 +21,8 @@
+
+
+
+
diff --git a/framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/AbpAspNetCoreDaprConsts.cs b/framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/AbpAspNetCoreDaprConsts.cs
new file mode 100644
index 0000000000..7011d25476
--- /dev/null
+++ b/framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/AbpAspNetCoreDaprConsts.cs
@@ -0,0 +1,8 @@
+namespace Volo.Abp.AspNetCore.Dapr;
+
+public class AbpAspNetCoreDaprConsts
+{
+ public const string DaprSubscribeUrl = "dapr/subscribe";
+
+ public const string DaprEventCallbackUrl = "api/abp/dapr/event";
+}
diff --git a/framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/AbpAspNetCoreDaprModule.cs b/framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/AbpAspNetCoreDaprModule.cs
index 3f84054b5e..52e5820c4c 100644
--- a/framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/AbpAspNetCoreDaprModule.cs
+++ b/framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/AbpAspNetCoreDaprModule.cs
@@ -1,13 +1,31 @@
-using Volo.Abp.Dapr;
+using Microsoft.AspNetCore.Http.Json;
+using Volo.Abp.AspNetCore.Dapr.SystemTextJson;
+using Volo.Abp.EventBus.Dapr;
+using Volo.Abp.Json;
+using Volo.Abp.Json.SystemTextJson;
using Volo.Abp.Modularity;
namespace Volo.Abp.AspNetCore.Dapr;
[DependsOn(
typeof(AbpAspNetCoreModule),
- typeof(AbpDaprModule)
+ typeof(AbpEventBusDaprModule),
+ typeof(AbpJsonModule)
)]
public class AbpAspNetCoreDaprModule : AbpModule
{
-
-}
\ No newline at end of file
+ public override void ConfigureServices(ServiceConfigurationContext context)
+ {
+ // TODO: Add NewtonsoftJson json converter.
+
+ Configure(options =>
+ {
+ options.SerializerOptions.Converters.Add(new DaprSubscriptionDefinitionConverter());
+ });
+
+ Configure(options =>
+ {
+ options.JsonSerializerOptions.Converters.Add(new DaprSubscriptionDefinitionConverter());
+ });
+ }
+}
diff --git a/framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/AbpAspNetCoreDaprOptions.cs b/framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/AbpAspNetCoreDaprOptions.cs
new file mode 100644
index 0000000000..17667d24b9
--- /dev/null
+++ b/framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/AbpAspNetCoreDaprOptions.cs
@@ -0,0 +1,11 @@
+namespace Volo.Abp.AspNetCore.Dapr;
+
+public class AbpAspNetCoreDaprOptions
+{
+ public List Contributors { get; }
+
+ public AbpAspNetCoreDaprOptions()
+ {
+ Contributors = new List();
+ }
+}
diff --git a/framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/AbpDaprPubSubProvider.cs b/framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/AbpDaprPubSubProvider.cs
new file mode 100644
index 0000000000..27eaa294d5
--- /dev/null
+++ b/framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/AbpDaprPubSubProvider.cs
@@ -0,0 +1,63 @@
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Options;
+using Volo.Abp.AspNetCore.Dapr.Models;
+using Volo.Abp.DependencyInjection;
+using Volo.Abp.EventBus;
+using Volo.Abp.EventBus.Dapr;
+using Volo.Abp.EventBus.Distributed;
+
+namespace Volo.Abp.AspNetCore.Dapr;
+
+public class AbpDaprPubSubProvider : ITransientDependency
+{
+ protected IServiceProvider ServiceProvider { get; }
+ protected AbpAspNetCoreDaprOptions Options { get; }
+ protected AbpDaprEventBusOptions EventBusOptions { get; }
+ protected AbpDistributedEventBusOptions DistributedEventBusOptions { get; }
+
+ public AbpDaprPubSubProvider(
+ IServiceProvider serviceProvider,
+ IOptions options,
+ IOptions eventBusOptions,
+ IOptions distributedEventBusOptions)
+ {
+ ServiceProvider = serviceProvider;
+ EventBusOptions = eventBusOptions.Value;
+ Options = options.Value;
+ DistributedEventBusOptions = distributedEventBusOptions.Value;
+ }
+
+ public virtual async Task> GetSubscriptionsAsync()
+ {
+ var subscriptions = new List();
+ foreach (var handler in DistributedEventBusOptions.Handlers)
+ {
+ foreach (var @interface in handler.GetInterfaces().Where(x => x.IsGenericType && x.GetGenericTypeDefinition() == typeof(IDistributedEventHandler<>)))
+ {
+ var eventType = @interface.GetGenericArguments()[0];
+ var eventName = EventNameAttribute.GetNameOrDefault(eventType);
+
+ subscriptions.Add(new DaprSubscriptionDefinition()
+ {
+ PubSubName = EventBusOptions.PubSubName,
+ Topic = eventName,
+ Route = AbpAspNetCoreDaprConsts.DaprEventCallbackUrl
+ });
+ }
+ }
+
+ if (Options.Contributors.Any())
+ {
+ using (var scope = ServiceProvider.CreateScope())
+ {
+ var context = new AbpDaprPubSubProviderContributorContext(scope.ServiceProvider, subscriptions);
+ foreach (var contributor in Options.Contributors)
+ {
+ await contributor.ContributeAsync(context);
+ }
+ }
+ }
+
+ return subscriptions;
+ }
+}
diff --git a/framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/AbpDaprPubSubProviderContributorContext.cs b/framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/AbpDaprPubSubProviderContributorContext.cs
new file mode 100644
index 0000000000..e2428e992a
--- /dev/null
+++ b/framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/AbpDaprPubSubProviderContributorContext.cs
@@ -0,0 +1,16 @@
+using Volo.Abp.AspNetCore.Dapr.Models;
+
+namespace Volo.Abp.AspNetCore.Dapr;
+
+public class AbpDaprPubSubProviderContributorContext
+{
+ public IServiceProvider ServiceProvider { get; }
+
+ public List Subscriptions { get; }
+
+ public AbpDaprPubSubProviderContributorContext(IServiceProvider serviceProvider, List daprSubscriptionModels)
+ {
+ ServiceProvider = serviceProvider;
+ Subscriptions = daprSubscriptionModels;
+ }
+}
diff --git a/framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/Controllers/DaprController.cs b/framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/Controllers/DaprController.cs
new file mode 100644
index 0000000000..b28fbf6e7a
--- /dev/null
+++ b/framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/Controllers/DaprController.cs
@@ -0,0 +1,44 @@
+using System.Text.Json;
+using Microsoft.AspNetCore.Mvc;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Options;
+using Volo.Abp.AspNetCore.Dapr.Models;
+using Volo.Abp.AspNetCore.Mvc;
+using Volo.Abp.Dapr;
+using Volo.Abp.EventBus.Dapr;
+
+namespace Volo.Abp.AspNetCore.Dapr.Controllers;
+
+[Area("abp")]
+[RemoteService(Name = "abp")]
+public class DaprController : AbpController
+{
+ protected AbpDaprPubSubProvider DaprPubSubProvider { get; }
+
+ public DaprController(AbpDaprPubSubProvider daprPubSubProvider)
+ {
+ DaprPubSubProvider = daprPubSubProvider;
+ }
+
+ [HttpGet(AbpAspNetCoreDaprConsts.DaprSubscribeUrl)]
+ public virtual async Task> SubscribeAsync()
+ {
+ return await DaprPubSubProvider.GetSubscriptionsAsync();
+ }
+
+ [HttpPost(AbpAspNetCoreDaprConsts.DaprEventCallbackUrl)]
+ public virtual async Task EventsAsync()
+ {
+ var bodyJsonDocument = await JsonDocument.ParseAsync(HttpContext.Request.Body);
+ var request = JsonSerializer.Deserialize(bodyJsonDocument.RootElement.GetRawText(),
+ HttpContext.RequestServices.GetRequiredService>().Value.JsonSerializerOptions);
+
+ var distributedEventBus = HttpContext.RequestServices.GetRequiredService();
+ var daprSerializer = HttpContext.RequestServices.GetRequiredService();
+
+ var eventData = daprSerializer.Deserialize(bodyJsonDocument.RootElement.GetProperty("data").GetRawText(), distributedEventBus.GetEventType(request.Topic));
+ await distributedEventBus.TriggerHandlersAsync(distributedEventBus.GetEventType(request.Topic), eventData);
+
+ return Ok();
+ }
+}
diff --git a/framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/IAbpDaprPubSubProviderContributor.cs b/framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/IAbpDaprPubSubProviderContributor.cs
new file mode 100644
index 0000000000..1a23483f9d
--- /dev/null
+++ b/framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/IAbpDaprPubSubProviderContributor.cs
@@ -0,0 +1,6 @@
+namespace Volo.Abp.AspNetCore.Dapr;
+
+public interface IAbpDaprPubSubProviderContributor
+{
+ Task ContributeAsync(AbpDaprPubSubProviderContributorContext context);
+}
diff --git a/framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/Models/DaprSubscriptionDefinition.cs b/framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/Models/DaprSubscriptionDefinition.cs
new file mode 100644
index 0000000000..6fd8077250
--- /dev/null
+++ b/framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/Models/DaprSubscriptionDefinition.cs
@@ -0,0 +1,10 @@
+namespace Volo.Abp.AspNetCore.Dapr.Models;
+
+public class DaprSubscriptionDefinition
+{
+ public string PubSubName { get; set; }
+
+ public string Topic { get; set; }
+
+ public string Route { get; set; }
+}
diff --git a/framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/Models/DaprSubscriptionRequest.cs b/framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/Models/DaprSubscriptionRequest.cs
new file mode 100644
index 0000000000..9f7f4dfd4f
--- /dev/null
+++ b/framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/Models/DaprSubscriptionRequest.cs
@@ -0,0 +1,8 @@
+namespace Volo.Abp.AspNetCore.Dapr.Models;
+
+public class DaprSubscriptionRequest
+{
+ public string PubSubName { get; set; }
+
+ public string Topic { get; set; }
+}
diff --git a/framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/SystemTextJson/AbpAspNetCoreDaprJsonNamingPolicy.cs b/framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/SystemTextJson/AbpAspNetCoreDaprJsonNamingPolicy.cs
new file mode 100644
index 0000000000..e682d283e9
--- /dev/null
+++ b/framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/SystemTextJson/AbpAspNetCoreDaprJsonNamingPolicy.cs
@@ -0,0 +1,11 @@
+using System.Text.Json;
+
+namespace Volo.Abp.AspNetCore.Dapr.SystemTextJson;
+
+public class AbpAspNetCoreDaprJsonNamingPolicy : JsonNamingPolicy
+{
+ public override string ConvertName(string name)
+ {
+ return name.ToLower();
+ }
+}
diff --git a/framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/SystemTextJson/DaprSubscriptionDefinitionConverter.cs b/framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/SystemTextJson/DaprSubscriptionDefinitionConverter.cs
new file mode 100644
index 0000000000..e2168da993
--- /dev/null
+++ b/framework/src/Volo.Abp.AspNetCore.Dapr/Volo/Abp/AspNetCore/Dapr/SystemTextJson/DaprSubscriptionDefinitionConverter.cs
@@ -0,0 +1,25 @@
+using System.Text.Json;
+using System.Text.Json.Serialization;
+using Volo.Abp.AspNetCore.Dapr.Models;
+
+namespace Volo.Abp.AspNetCore.Dapr.SystemTextJson;
+
+public class DaprSubscriptionDefinitionConverter : JsonConverter
+{
+ private JsonSerializerOptions _writeJsonSerializerOptions;
+
+ public override DaprSubscriptionDefinition Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
+ {
+ throw new NotSupportedException();
+ }
+
+ public override void Write(Utf8JsonWriter writer, DaprSubscriptionDefinition value, JsonSerializerOptions options)
+ {
+ _writeJsonSerializerOptions ??= JsonSerializerOptionsHelper.Create(new JsonSerializerOptions(options)
+ {
+ PropertyNamingPolicy = new AbpAspNetCoreDaprJsonNamingPolicy()
+ }, x => x == this);
+
+ JsonSerializer.Serialize(writer, value, _writeJsonSerializerOptions);
+ }
+}
diff --git a/framework/src/Volo.Abp.Dapr/Volo/Abp/Dapr/AbpDaprClientFactory.cs b/framework/src/Volo.Abp.Dapr/Volo/Abp/Dapr/AbpDaprClientFactory.cs
index 1cd37ceb94..faaf450ddc 100644
--- a/framework/src/Volo.Abp.Dapr/Volo/Abp/Dapr/AbpDaprClientFactory.cs
+++ b/framework/src/Volo.Abp.Dapr/Volo/Abp/Dapr/AbpDaprClientFactory.cs
@@ -23,9 +23,13 @@ public class AbpDaprClientFactory : ITransientDependency
public virtual async Task CreateAsync()
{
var builder = new DaprClientBuilder()
- .UseHttpEndpoint(Options.HttpEndpoint)
.UseJsonSerializationOptions(await CreateJsonSerializerOptions());
+ if (!Options.HttpEndpoint.IsNullOrWhiteSpace())
+ {
+ builder.UseHttpEndpoint(Options.HttpEndpoint);
+ }
+
if (!Options.GrpcEndpoint.IsNullOrWhiteSpace())
{
builder.UseGrpcEndpoint(Options.GrpcEndpoint);
diff --git a/framework/src/Volo.Abp.Dapr/Volo/Abp/Dapr/IDaprSerializer.cs b/framework/src/Volo.Abp.Dapr/Volo/Abp/Dapr/IDaprSerializer.cs
new file mode 100644
index 0000000000..7eec2c5c1c
--- /dev/null
+++ b/framework/src/Volo.Abp.Dapr/Volo/Abp/Dapr/IDaprSerializer.cs
@@ -0,0 +1,16 @@
+namespace Volo.Abp.Dapr;
+
+public interface IDaprSerializer
+{
+ byte[] Serialize(object obj);
+
+ object Deserialize(byte[] value, Type type);
+
+ T Deserialize(byte[] value);
+
+ string SerializeToString(object obj);
+
+ object Deserialize(string value, Type type);
+
+ T Deserialize(string value);
+}
diff --git a/framework/src/Volo.Abp.Dapr/Volo/Abp/Dapr/Utf8JsonDaprSerializer.cs b/framework/src/Volo.Abp.Dapr/Volo/Abp/Dapr/Utf8JsonDaprSerializer.cs
new file mode 100644
index 0000000000..ce9b4a8523
--- /dev/null
+++ b/framework/src/Volo.Abp.Dapr/Volo/Abp/Dapr/Utf8JsonDaprSerializer.cs
@@ -0,0 +1,45 @@
+using System.Text;
+using Volo.Abp.DependencyInjection;
+using Volo.Abp.Json;
+
+namespace Volo.Abp.Dapr;
+
+public class Utf8JsonDaprSerializer : IDaprSerializer, ITransientDependency
+{
+ private readonly IJsonSerializer _jsonSerializer;
+
+ public Utf8JsonDaprSerializer(IJsonSerializer jsonSerializer)
+ {
+ _jsonSerializer = jsonSerializer;
+ }
+
+ public byte[] Serialize(object obj)
+ {
+ return Encoding.UTF8.GetBytes(_jsonSerializer.Serialize(obj));
+ }
+
+ public object Deserialize(byte[] value, Type type)
+ {
+ return _jsonSerializer.Deserialize(type, Encoding.UTF8.GetString(value));
+ }
+
+ public T Deserialize(byte[] value)
+ {
+ return _jsonSerializer.Deserialize(Encoding.UTF8.GetString(value));
+ }
+
+ public string SerializeToString(object obj)
+ {
+ return _jsonSerializer.Serialize(obj);
+ }
+
+ public object Deserialize(string value, Type type)
+ {
+ return _jsonSerializer.Deserialize(type, value);
+ }
+
+ public T Deserialize(string value)
+ {
+ return _jsonSerializer.Deserialize(value);
+ }
+}
diff --git a/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/AbpDaprEventBusOptions.cs b/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/AbpDaprEventBusOptions.cs
new file mode 100644
index 0000000000..d3ced52ba2
--- /dev/null
+++ b/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/AbpDaprEventBusOptions.cs
@@ -0,0 +1,11 @@
+namespace Volo.Abp.EventBus.Dapr;
+
+public class AbpDaprEventBusOptions
+{
+ public string PubSubName { get; set; }
+
+ public AbpDaprEventBusOptions()
+ {
+ PubSubName = "pubsub";
+ }
+}
diff --git a/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/AbpEventBusDaprModule.cs b/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/AbpEventBusDaprModule.cs
index 8d054c6bd7..5f6329a4bc 100644
--- a/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/AbpEventBusDaprModule.cs
+++ b/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/AbpEventBusDaprModule.cs
@@ -1,4 +1,5 @@
-using Volo.Abp.Dapr;
+using Microsoft.Extensions.DependencyInjection;
+using Volo.Abp.Dapr;
using Volo.Abp.Modularity;
namespace Volo.Abp.EventBus.Dapr;
@@ -9,4 +10,11 @@ namespace Volo.Abp.EventBus.Dapr;
)]
public class AbpEventBusDaprModule : AbpModule
{
-}
\ No newline at end of file
+ public override void OnApplicationInitialization(ApplicationInitializationContext context)
+ {
+ context
+ .ServiceProvider
+ .GetRequiredService()
+ .Initialize();
+ }
+}
diff --git a/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/DaprDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/DaprDistributedEventBus.cs
new file mode 100644
index 0000000000..1f241b9c6d
--- /dev/null
+++ b/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/DaprDistributedEventBus.cs
@@ -0,0 +1,221 @@
+using System.Collections.Concurrent;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Options;
+using Volo.Abp.Dapr;
+using Volo.Abp.DependencyInjection;
+using Volo.Abp.EventBus.Distributed;
+using Volo.Abp.Guids;
+using Volo.Abp.MultiTenancy;
+using Volo.Abp.Threading;
+using Volo.Abp.Timing;
+using Volo.Abp.Uow;
+
+namespace Volo.Abp.EventBus.Dapr;
+
+[Dependency(ReplaceServices = true)]
+[ExposeServices(typeof(IDistributedEventBus), typeof(DaprDistributedEventBus))]
+public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDependency
+{
+ protected IDaprSerializer Serializer { get; }
+ protected AbpDaprEventBusOptions DaprEventBusOptions { get; }
+ protected AbpDaprClientFactory DaprClientFactory { get; }
+
+ protected ConcurrentDictionary> HandlerFactories { get; }
+ protected ConcurrentDictionary EventTypes { get; }
+
+ public DaprDistributedEventBus(
+ IServiceScopeFactory serviceScopeFactory,
+ ICurrentTenant currentTenant,
+ IUnitOfWorkManager unitOfWorkManager,
+ IOptions abpDistributedEventBusOptions,
+ IGuidGenerator guidGenerator,
+ IClock clock,
+ IEventHandlerInvoker eventHandlerInvoker,
+ IDaprSerializer serializer,
+ IOptions daprEventBusOptions,
+ AbpDaprClientFactory daprClientFactory)
+ : base(serviceScopeFactory, currentTenant, unitOfWorkManager, abpDistributedEventBusOptions, guidGenerator, clock, eventHandlerInvoker)
+ {
+ Serializer = serializer;
+ DaprEventBusOptions = daprEventBusOptions.Value;
+ DaprClientFactory = daprClientFactory;
+
+ HandlerFactories = new ConcurrentDictionary>();
+ EventTypes = new ConcurrentDictionary();
+ }
+
+ public void Initialize()
+ {
+ SubscribeHandlers(AbpDistributedEventBusOptions.Handlers);
+ }
+
+ public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory)
+ {
+ var handlerFactories = GetOrCreateHandlerFactories(eventType);
+
+ if (factory.IsInFactories(handlerFactories))
+ {
+ return NullDisposable.Instance;
+ }
+
+ handlerFactories.Add(factory);
+
+ return new EventHandlerFactoryUnregistrar(this, eventType, factory);
+ }
+
+ public override void Unsubscribe(Func action)
+ {
+ Check.NotNull(action, nameof(action));
+
+ GetOrCreateHandlerFactories(typeof(TEvent))
+ .Locking(factories =>
+ {
+ factories.RemoveAll(
+ factory =>
+ {
+ var singleInstanceFactory = factory as SingleInstanceHandlerFactory;
+ if (singleInstanceFactory == null)
+ {
+ return false;
+ }
+
+ var actionHandler = singleInstanceFactory.HandlerInstance as ActionEventHandler;
+ if (actionHandler == null)
+ {
+ return false;
+ }
+
+ return actionHandler.Action == action;
+ });
+ });
+ }
+
+ public override void Unsubscribe(Type eventType, IEventHandler handler)
+ {
+ GetOrCreateHandlerFactories(eventType)
+ .Locking(factories =>
+ {
+ factories.RemoveAll(
+ factory =>
+ factory is SingleInstanceHandlerFactory &&
+ (factory as SingleInstanceHandlerFactory).HandlerInstance == handler
+ );
+ });
+ }
+
+ public override void Unsubscribe(Type eventType, IEventHandlerFactory factory)
+ {
+ GetOrCreateHandlerFactories(eventType).Locking(factories => factories.Remove(factory));
+ }
+
+ public override void UnsubscribeAll(Type eventType)
+ {
+ GetOrCreateHandlerFactories(eventType).Locking(factories => factories.Clear());
+ }
+
+ protected async override Task PublishToEventBusAsync(Type eventType, object eventData)
+ {
+ await PublishToDaprAsync(eventType, eventData);
+ }
+
+ protected override void AddToUnitOfWork(IUnitOfWork unitOfWork, UnitOfWorkEventRecord eventRecord)
+ {
+ unitOfWork.AddOrReplaceDistributedEvent(eventRecord);
+ }
+
+ protected override IEnumerable GetHandlerFactories(Type eventType)
+ {
+ var handlerFactoryList = new List();
+
+ foreach (var handlerFactory in HandlerFactories.Where(hf => ShouldTriggerEventForHandler(eventType, hf.Key)))
+ {
+ handlerFactoryList.Add(new EventTypeWithEventHandlerFactories(handlerFactory.Key, handlerFactory.Value));
+ }
+
+ return handlerFactoryList.ToArray();
+ }
+
+ public async override Task PublishFromOutboxAsync(OutgoingEventInfo outgoingEvent, OutboxConfig outboxConfig)
+ {
+ await PublishToDaprAsync(outgoingEvent.EventName, Serializer.Deserialize(outgoingEvent.EventData, GetEventType(outgoingEvent.EventName)));
+ }
+
+ public async override Task PublishManyFromOutboxAsync(IEnumerable outgoingEvents, OutboxConfig outboxConfig)
+ {
+ var outgoingEventArray = outgoingEvents.ToArray();
+
+ foreach (var outgoingEvent in outgoingEventArray)
+ {
+ await PublishToDaprAsync(outgoingEvent.EventName, Serializer.Deserialize(outgoingEvent.EventData, GetEventType(outgoingEvent.EventName)));
+ }
+ }
+
+ public async override Task ProcessFromInboxAsync(IncomingEventInfo incomingEvent, InboxConfig inboxConfig)
+ {
+ var eventType = EventTypes.GetOrDefault(incomingEvent.EventName);
+ if (eventType == null)
+ {
+ return;
+ }
+
+ var eventData = Serializer.Deserialize(incomingEvent.EventData, eventType);
+ var exceptions = new List();
+ await TriggerHandlersAsync(eventType, eventData, exceptions, inboxConfig);
+ if (exceptions.Any())
+ {
+ ThrowOriginalExceptions(eventType, exceptions);
+ }
+ }
+
+ protected override byte[] Serialize(object eventData)
+ {
+ return Serializer.Serialize(eventData);
+ }
+
+ private List GetOrCreateHandlerFactories(Type eventType)
+ {
+ return HandlerFactories.GetOrAdd(
+ eventType,
+ type =>
+ {
+ var eventName = EventNameAttribute.GetNameOrDefault(type);
+ EventTypes[eventName] = type;
+ return new List();
+ }
+ );
+ }
+
+ public Type GetEventType(string eventName)
+ {
+ return EventTypes.GetOrDefault(eventName);
+ }
+
+ protected virtual async Task PublishToDaprAsync(Type eventType, object eventData)
+ {
+ await PublishToDaprAsync(EventNameAttribute.GetNameOrDefault(eventType), eventData);
+ }
+
+ protected virtual async Task PublishToDaprAsync(string eventName, object eventData)
+ {
+ var client = await DaprClientFactory.CreateAsync();
+ await client.PublishEventAsync(pubsubName: DaprEventBusOptions.PubSubName, topicName: eventName, data: eventData);
+ }
+
+ private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType)
+ {
+ //Should trigger same type
+ if (handlerEventType == targetEventType)
+ {
+ return true;
+ }
+
+ //TODO: Support inheritance? But it does not support on subscription to RabbitMq!
+ //Should trigger for inherited types
+ if (handlerEventType.IsAssignableFrom(targetEventType))
+ {
+ return true;
+ }
+
+ return false;
+ }
+}