mirror of https://github.com/abpframework/abp.git
18 changed files with 539 additions and 8 deletions
@ -0,0 +1,8 @@ |
|||
namespace Volo.Abp.AspNetCore.Dapr; |
|||
|
|||
public class AbpAspNetCoreDaprConsts |
|||
{ |
|||
public const string DaprSubscribeUrl = "dapr/subscribe"; |
|||
|
|||
public const string DaprEventCallbackUrl = "api/abp/dapr/event"; |
|||
} |
|||
@ -1,13 +1,31 @@ |
|||
using Volo.Abp.Dapr; |
|||
using Microsoft.AspNetCore.Http.Json; |
|||
using Volo.Abp.AspNetCore.Dapr.SystemTextJson; |
|||
using Volo.Abp.EventBus.Dapr; |
|||
using Volo.Abp.Json; |
|||
using Volo.Abp.Json.SystemTextJson; |
|||
using Volo.Abp.Modularity; |
|||
|
|||
namespace Volo.Abp.AspNetCore.Dapr; |
|||
|
|||
[DependsOn( |
|||
typeof(AbpAspNetCoreModule), |
|||
typeof(AbpDaprModule) |
|||
typeof(AbpEventBusDaprModule), |
|||
typeof(AbpJsonModule) |
|||
)] |
|||
public class AbpAspNetCoreDaprModule : AbpModule |
|||
{ |
|||
|
|||
} |
|||
public override void ConfigureServices(ServiceConfigurationContext context) |
|||
{ |
|||
// TODO: Add NewtonsoftJson json converter.
|
|||
|
|||
Configure<JsonOptions>(options => |
|||
{ |
|||
options.SerializerOptions.Converters.Add(new DaprSubscriptionDefinitionConverter()); |
|||
}); |
|||
|
|||
Configure<AbpSystemTextJsonSerializerOptions>(options => |
|||
{ |
|||
options.JsonSerializerOptions.Converters.Add(new DaprSubscriptionDefinitionConverter()); |
|||
}); |
|||
} |
|||
} |
|||
|
|||
@ -0,0 +1,11 @@ |
|||
namespace Volo.Abp.AspNetCore.Dapr; |
|||
|
|||
public class AbpAspNetCoreDaprOptions |
|||
{ |
|||
public List<IAbpDaprPubSubProviderContributor> Contributors { get; } |
|||
|
|||
public AbpAspNetCoreDaprOptions() |
|||
{ |
|||
Contributors = new List<IAbpDaprPubSubProviderContributor>(); |
|||
} |
|||
} |
|||
@ -0,0 +1,63 @@ |
|||
using Microsoft.Extensions.DependencyInjection; |
|||
using Microsoft.Extensions.Options; |
|||
using Volo.Abp.AspNetCore.Dapr.Models; |
|||
using Volo.Abp.DependencyInjection; |
|||
using Volo.Abp.EventBus; |
|||
using Volo.Abp.EventBus.Dapr; |
|||
using Volo.Abp.EventBus.Distributed; |
|||
|
|||
namespace Volo.Abp.AspNetCore.Dapr; |
|||
|
|||
public class AbpDaprPubSubProvider : ITransientDependency |
|||
{ |
|||
protected IServiceProvider ServiceProvider { get; } |
|||
protected AbpAspNetCoreDaprOptions Options { get; } |
|||
protected AbpDaprEventBusOptions EventBusOptions { get; } |
|||
protected AbpDistributedEventBusOptions DistributedEventBusOptions { get; } |
|||
|
|||
public AbpDaprPubSubProvider( |
|||
IServiceProvider serviceProvider, |
|||
IOptions<AbpAspNetCoreDaprOptions> options, |
|||
IOptions<AbpDaprEventBusOptions> eventBusOptions, |
|||
IOptions<AbpDistributedEventBusOptions> distributedEventBusOptions) |
|||
{ |
|||
ServiceProvider = serviceProvider; |
|||
EventBusOptions = eventBusOptions.Value; |
|||
Options = options.Value; |
|||
DistributedEventBusOptions = distributedEventBusOptions.Value; |
|||
} |
|||
|
|||
public virtual async Task<List<DaprSubscriptionDefinition>> GetSubscriptionsAsync() |
|||
{ |
|||
var subscriptions = new List<DaprSubscriptionDefinition>(); |
|||
foreach (var handler in DistributedEventBusOptions.Handlers) |
|||
{ |
|||
foreach (var @interface in handler.GetInterfaces().Where(x => x.IsGenericType && x.GetGenericTypeDefinition() == typeof(IDistributedEventHandler<>))) |
|||
{ |
|||
var eventType = @interface.GetGenericArguments()[0]; |
|||
var eventName = EventNameAttribute.GetNameOrDefault(eventType); |
|||
|
|||
subscriptions.Add(new DaprSubscriptionDefinition() |
|||
{ |
|||
PubSubName = EventBusOptions.PubSubName, |
|||
Topic = eventName, |
|||
Route = AbpAspNetCoreDaprConsts.DaprEventCallbackUrl |
|||
}); |
|||
} |
|||
} |
|||
|
|||
if (Options.Contributors.Any()) |
|||
{ |
|||
using (var scope = ServiceProvider.CreateScope()) |
|||
{ |
|||
var context = new AbpDaprPubSubProviderContributorContext(scope.ServiceProvider, subscriptions); |
|||
foreach (var contributor in Options.Contributors) |
|||
{ |
|||
await contributor.ContributeAsync(context); |
|||
} |
|||
} |
|||
} |
|||
|
|||
return subscriptions; |
|||
} |
|||
} |
|||
@ -0,0 +1,16 @@ |
|||
using Volo.Abp.AspNetCore.Dapr.Models; |
|||
|
|||
namespace Volo.Abp.AspNetCore.Dapr; |
|||
|
|||
public class AbpDaprPubSubProviderContributorContext |
|||
{ |
|||
public IServiceProvider ServiceProvider { get; } |
|||
|
|||
public List<DaprSubscriptionDefinition> Subscriptions { get; } |
|||
|
|||
public AbpDaprPubSubProviderContributorContext(IServiceProvider serviceProvider, List<DaprSubscriptionDefinition> daprSubscriptionModels) |
|||
{ |
|||
ServiceProvider = serviceProvider; |
|||
Subscriptions = daprSubscriptionModels; |
|||
} |
|||
} |
|||
@ -0,0 +1,44 @@ |
|||
using System.Text.Json; |
|||
using Microsoft.AspNetCore.Mvc; |
|||
using Microsoft.Extensions.DependencyInjection; |
|||
using Microsoft.Extensions.Options; |
|||
using Volo.Abp.AspNetCore.Dapr.Models; |
|||
using Volo.Abp.AspNetCore.Mvc; |
|||
using Volo.Abp.Dapr; |
|||
using Volo.Abp.EventBus.Dapr; |
|||
|
|||
namespace Volo.Abp.AspNetCore.Dapr.Controllers; |
|||
|
|||
[Area("abp")] |
|||
[RemoteService(Name = "abp")] |
|||
public class DaprController : AbpController |
|||
{ |
|||
protected AbpDaprPubSubProvider DaprPubSubProvider { get; } |
|||
|
|||
public DaprController(AbpDaprPubSubProvider daprPubSubProvider) |
|||
{ |
|||
DaprPubSubProvider = daprPubSubProvider; |
|||
} |
|||
|
|||
[HttpGet(AbpAspNetCoreDaprConsts.DaprSubscribeUrl)] |
|||
public virtual async Task<List<DaprSubscriptionDefinition>> SubscribeAsync() |
|||
{ |
|||
return await DaprPubSubProvider.GetSubscriptionsAsync(); |
|||
} |
|||
|
|||
[HttpPost(AbpAspNetCoreDaprConsts.DaprEventCallbackUrl)] |
|||
public virtual async Task<IActionResult> EventsAsync() |
|||
{ |
|||
var bodyJsonDocument = await JsonDocument.ParseAsync(HttpContext.Request.Body); |
|||
var request = JsonSerializer.Deserialize<DaprSubscriptionRequest>(bodyJsonDocument.RootElement.GetRawText(), |
|||
HttpContext.RequestServices.GetRequiredService<IOptions<JsonOptions>>().Value.JsonSerializerOptions); |
|||
|
|||
var distributedEventBus = HttpContext.RequestServices.GetRequiredService<DaprDistributedEventBus>(); |
|||
var daprSerializer = HttpContext.RequestServices.GetRequiredService<IDaprSerializer>(); |
|||
|
|||
var eventData = daprSerializer.Deserialize(bodyJsonDocument.RootElement.GetProperty("data").GetRawText(), distributedEventBus.GetEventType(request.Topic)); |
|||
await distributedEventBus.TriggerHandlersAsync(distributedEventBus.GetEventType(request.Topic), eventData); |
|||
|
|||
return Ok(); |
|||
} |
|||
} |
|||
@ -0,0 +1,6 @@ |
|||
namespace Volo.Abp.AspNetCore.Dapr; |
|||
|
|||
public interface IAbpDaprPubSubProviderContributor |
|||
{ |
|||
Task ContributeAsync(AbpDaprPubSubProviderContributorContext context); |
|||
} |
|||
@ -0,0 +1,10 @@ |
|||
namespace Volo.Abp.AspNetCore.Dapr.Models; |
|||
|
|||
public class DaprSubscriptionDefinition |
|||
{ |
|||
public string PubSubName { get; set; } |
|||
|
|||
public string Topic { get; set; } |
|||
|
|||
public string Route { get; set; } |
|||
} |
|||
@ -0,0 +1,8 @@ |
|||
namespace Volo.Abp.AspNetCore.Dapr.Models; |
|||
|
|||
public class DaprSubscriptionRequest |
|||
{ |
|||
public string PubSubName { get; set; } |
|||
|
|||
public string Topic { get; set; } |
|||
} |
|||
@ -0,0 +1,11 @@ |
|||
using System.Text.Json; |
|||
|
|||
namespace Volo.Abp.AspNetCore.Dapr.SystemTextJson; |
|||
|
|||
public class AbpAspNetCoreDaprJsonNamingPolicy : JsonNamingPolicy |
|||
{ |
|||
public override string ConvertName(string name) |
|||
{ |
|||
return name.ToLower(); |
|||
} |
|||
} |
|||
@ -0,0 +1,25 @@ |
|||
using System.Text.Json; |
|||
using System.Text.Json.Serialization; |
|||
using Volo.Abp.AspNetCore.Dapr.Models; |
|||
|
|||
namespace Volo.Abp.AspNetCore.Dapr.SystemTextJson; |
|||
|
|||
public class DaprSubscriptionDefinitionConverter : JsonConverter<DaprSubscriptionDefinition> |
|||
{ |
|||
private JsonSerializerOptions _writeJsonSerializerOptions; |
|||
|
|||
public override DaprSubscriptionDefinition Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) |
|||
{ |
|||
throw new NotSupportedException(); |
|||
} |
|||
|
|||
public override void Write(Utf8JsonWriter writer, DaprSubscriptionDefinition value, JsonSerializerOptions options) |
|||
{ |
|||
_writeJsonSerializerOptions ??= JsonSerializerOptionsHelper.Create(new JsonSerializerOptions(options) |
|||
{ |
|||
PropertyNamingPolicy = new AbpAspNetCoreDaprJsonNamingPolicy() |
|||
}, x => x == this); |
|||
|
|||
JsonSerializer.Serialize(writer, value, _writeJsonSerializerOptions); |
|||
} |
|||
} |
|||
@ -0,0 +1,16 @@ |
|||
namespace Volo.Abp.Dapr; |
|||
|
|||
public interface IDaprSerializer |
|||
{ |
|||
byte[] Serialize(object obj); |
|||
|
|||
object Deserialize(byte[] value, Type type); |
|||
|
|||
T Deserialize<T>(byte[] value); |
|||
|
|||
string SerializeToString(object obj); |
|||
|
|||
object Deserialize(string value, Type type); |
|||
|
|||
T Deserialize<T>(string value); |
|||
} |
|||
@ -0,0 +1,45 @@ |
|||
using System.Text; |
|||
using Volo.Abp.DependencyInjection; |
|||
using Volo.Abp.Json; |
|||
|
|||
namespace Volo.Abp.Dapr; |
|||
|
|||
public class Utf8JsonDaprSerializer : IDaprSerializer, ITransientDependency |
|||
{ |
|||
private readonly IJsonSerializer _jsonSerializer; |
|||
|
|||
public Utf8JsonDaprSerializer(IJsonSerializer jsonSerializer) |
|||
{ |
|||
_jsonSerializer = jsonSerializer; |
|||
} |
|||
|
|||
public byte[] Serialize(object obj) |
|||
{ |
|||
return Encoding.UTF8.GetBytes(_jsonSerializer.Serialize(obj)); |
|||
} |
|||
|
|||
public object Deserialize(byte[] value, Type type) |
|||
{ |
|||
return _jsonSerializer.Deserialize(type, Encoding.UTF8.GetString(value)); |
|||
} |
|||
|
|||
public T Deserialize<T>(byte[] value) |
|||
{ |
|||
return _jsonSerializer.Deserialize<T>(Encoding.UTF8.GetString(value)); |
|||
} |
|||
|
|||
public string SerializeToString(object obj) |
|||
{ |
|||
return _jsonSerializer.Serialize(obj); |
|||
} |
|||
|
|||
public object Deserialize(string value, Type type) |
|||
{ |
|||
return _jsonSerializer.Deserialize(type, value); |
|||
} |
|||
|
|||
public T Deserialize<T>(string value) |
|||
{ |
|||
return _jsonSerializer.Deserialize<T>(value); |
|||
} |
|||
} |
|||
@ -0,0 +1,11 @@ |
|||
namespace Volo.Abp.EventBus.Dapr; |
|||
|
|||
public class AbpDaprEventBusOptions |
|||
{ |
|||
public string PubSubName { get; set; } |
|||
|
|||
public AbpDaprEventBusOptions() |
|||
{ |
|||
PubSubName = "pubsub"; |
|||
} |
|||
} |
|||
@ -0,0 +1,221 @@ |
|||
using System.Collections.Concurrent; |
|||
using Microsoft.Extensions.DependencyInjection; |
|||
using Microsoft.Extensions.Options; |
|||
using Volo.Abp.Dapr; |
|||
using Volo.Abp.DependencyInjection; |
|||
using Volo.Abp.EventBus.Distributed; |
|||
using Volo.Abp.Guids; |
|||
using Volo.Abp.MultiTenancy; |
|||
using Volo.Abp.Threading; |
|||
using Volo.Abp.Timing; |
|||
using Volo.Abp.Uow; |
|||
|
|||
namespace Volo.Abp.EventBus.Dapr; |
|||
|
|||
[Dependency(ReplaceServices = true)] |
|||
[ExposeServices(typeof(IDistributedEventBus), typeof(DaprDistributedEventBus))] |
|||
public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDependency |
|||
{ |
|||
protected IDaprSerializer Serializer { get; } |
|||
protected AbpDaprEventBusOptions DaprEventBusOptions { get; } |
|||
protected AbpDaprClientFactory DaprClientFactory { get; } |
|||
|
|||
protected ConcurrentDictionary<Type, List<IEventHandlerFactory>> HandlerFactories { get; } |
|||
protected ConcurrentDictionary<string, Type> EventTypes { get; } |
|||
|
|||
public DaprDistributedEventBus( |
|||
IServiceScopeFactory serviceScopeFactory, |
|||
ICurrentTenant currentTenant, |
|||
IUnitOfWorkManager unitOfWorkManager, |
|||
IOptions<AbpDistributedEventBusOptions> abpDistributedEventBusOptions, |
|||
IGuidGenerator guidGenerator, |
|||
IClock clock, |
|||
IEventHandlerInvoker eventHandlerInvoker, |
|||
IDaprSerializer serializer, |
|||
IOptions<AbpDaprEventBusOptions> daprEventBusOptions, |
|||
AbpDaprClientFactory daprClientFactory) |
|||
: base(serviceScopeFactory, currentTenant, unitOfWorkManager, abpDistributedEventBusOptions, guidGenerator, clock, eventHandlerInvoker) |
|||
{ |
|||
Serializer = serializer; |
|||
DaprEventBusOptions = daprEventBusOptions.Value; |
|||
DaprClientFactory = daprClientFactory; |
|||
|
|||
HandlerFactories = new ConcurrentDictionary<Type, List<IEventHandlerFactory>>(); |
|||
EventTypes = new ConcurrentDictionary<string, Type>(); |
|||
} |
|||
|
|||
public void Initialize() |
|||
{ |
|||
SubscribeHandlers(AbpDistributedEventBusOptions.Handlers); |
|||
} |
|||
|
|||
public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory) |
|||
{ |
|||
var handlerFactories = GetOrCreateHandlerFactories(eventType); |
|||
|
|||
if (factory.IsInFactories(handlerFactories)) |
|||
{ |
|||
return NullDisposable.Instance; |
|||
} |
|||
|
|||
handlerFactories.Add(factory); |
|||
|
|||
return new EventHandlerFactoryUnregistrar(this, eventType, factory); |
|||
} |
|||
|
|||
public override void Unsubscribe<TEvent>(Func<TEvent, Task> action) |
|||
{ |
|||
Check.NotNull(action, nameof(action)); |
|||
|
|||
GetOrCreateHandlerFactories(typeof(TEvent)) |
|||
.Locking(factories => |
|||
{ |
|||
factories.RemoveAll( |
|||
factory => |
|||
{ |
|||
var singleInstanceFactory = factory as SingleInstanceHandlerFactory; |
|||
if (singleInstanceFactory == null) |
|||
{ |
|||
return false; |
|||
} |
|||
|
|||
var actionHandler = singleInstanceFactory.HandlerInstance as ActionEventHandler<TEvent>; |
|||
if (actionHandler == null) |
|||
{ |
|||
return false; |
|||
} |
|||
|
|||
return actionHandler.Action == action; |
|||
}); |
|||
}); |
|||
} |
|||
|
|||
public override void Unsubscribe(Type eventType, IEventHandler handler) |
|||
{ |
|||
GetOrCreateHandlerFactories(eventType) |
|||
.Locking(factories => |
|||
{ |
|||
factories.RemoveAll( |
|||
factory => |
|||
factory is SingleInstanceHandlerFactory && |
|||
(factory as SingleInstanceHandlerFactory).HandlerInstance == handler |
|||
); |
|||
}); |
|||
} |
|||
|
|||
public override void Unsubscribe(Type eventType, IEventHandlerFactory factory) |
|||
{ |
|||
GetOrCreateHandlerFactories(eventType).Locking(factories => factories.Remove(factory)); |
|||
} |
|||
|
|||
public override void UnsubscribeAll(Type eventType) |
|||
{ |
|||
GetOrCreateHandlerFactories(eventType).Locking(factories => factories.Clear()); |
|||
} |
|||
|
|||
protected async override Task PublishToEventBusAsync(Type eventType, object eventData) |
|||
{ |
|||
await PublishToDaprAsync(eventType, eventData); |
|||
} |
|||
|
|||
protected override void AddToUnitOfWork(IUnitOfWork unitOfWork, UnitOfWorkEventRecord eventRecord) |
|||
{ |
|||
unitOfWork.AddOrReplaceDistributedEvent(eventRecord); |
|||
} |
|||
|
|||
protected override IEnumerable<EventTypeWithEventHandlerFactories> GetHandlerFactories(Type eventType) |
|||
{ |
|||
var handlerFactoryList = new List<EventTypeWithEventHandlerFactories>(); |
|||
|
|||
foreach (var handlerFactory in HandlerFactories.Where(hf => ShouldTriggerEventForHandler(eventType, hf.Key))) |
|||
{ |
|||
handlerFactoryList.Add(new EventTypeWithEventHandlerFactories(handlerFactory.Key, handlerFactory.Value)); |
|||
} |
|||
|
|||
return handlerFactoryList.ToArray(); |
|||
} |
|||
|
|||
public async override Task PublishFromOutboxAsync(OutgoingEventInfo outgoingEvent, OutboxConfig outboxConfig) |
|||
{ |
|||
await PublishToDaprAsync(outgoingEvent.EventName, Serializer.Deserialize(outgoingEvent.EventData, GetEventType(outgoingEvent.EventName))); |
|||
} |
|||
|
|||
public async override Task PublishManyFromOutboxAsync(IEnumerable<OutgoingEventInfo> outgoingEvents, OutboxConfig outboxConfig) |
|||
{ |
|||
var outgoingEventArray = outgoingEvents.ToArray(); |
|||
|
|||
foreach (var outgoingEvent in outgoingEventArray) |
|||
{ |
|||
await PublishToDaprAsync(outgoingEvent.EventName, Serializer.Deserialize(outgoingEvent.EventData, GetEventType(outgoingEvent.EventName))); |
|||
} |
|||
} |
|||
|
|||
public async override Task ProcessFromInboxAsync(IncomingEventInfo incomingEvent, InboxConfig inboxConfig) |
|||
{ |
|||
var eventType = EventTypes.GetOrDefault(incomingEvent.EventName); |
|||
if (eventType == null) |
|||
{ |
|||
return; |
|||
} |
|||
|
|||
var eventData = Serializer.Deserialize(incomingEvent.EventData, eventType); |
|||
var exceptions = new List<Exception>(); |
|||
await TriggerHandlersAsync(eventType, eventData, exceptions, inboxConfig); |
|||
if (exceptions.Any()) |
|||
{ |
|||
ThrowOriginalExceptions(eventType, exceptions); |
|||
} |
|||
} |
|||
|
|||
protected override byte[] Serialize(object eventData) |
|||
{ |
|||
return Serializer.Serialize(eventData); |
|||
} |
|||
|
|||
private List<IEventHandlerFactory> GetOrCreateHandlerFactories(Type eventType) |
|||
{ |
|||
return HandlerFactories.GetOrAdd( |
|||
eventType, |
|||
type => |
|||
{ |
|||
var eventName = EventNameAttribute.GetNameOrDefault(type); |
|||
EventTypes[eventName] = type; |
|||
return new List<IEventHandlerFactory>(); |
|||
} |
|||
); |
|||
} |
|||
|
|||
public Type GetEventType(string eventName) |
|||
{ |
|||
return EventTypes.GetOrDefault(eventName); |
|||
} |
|||
|
|||
protected virtual async Task PublishToDaprAsync(Type eventType, object eventData) |
|||
{ |
|||
await PublishToDaprAsync(EventNameAttribute.GetNameOrDefault(eventType), eventData); |
|||
} |
|||
|
|||
protected virtual async Task PublishToDaprAsync(string eventName, object eventData) |
|||
{ |
|||
var client = await DaprClientFactory.CreateAsync(); |
|||
await client.PublishEventAsync(pubsubName: DaprEventBusOptions.PubSubName, topicName: eventName, data: eventData); |
|||
} |
|||
|
|||
private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType) |
|||
{ |
|||
//Should trigger same type
|
|||
if (handlerEventType == targetEventType) |
|||
{ |
|||
return true; |
|||
} |
|||
|
|||
//TODO: Support inheritance? But it does not support on subscription to RabbitMq!
|
|||
//Should trigger for inherited types
|
|||
if (handlerEventType.IsAssignableFrom(targetEventType)) |
|||
{ |
|||
return true; |
|||
} |
|||
|
|||
return false; |
|||
} |
|||
} |
|||
Loading…
Reference in new issue