diff --git a/.claude/settings.local.json b/.claude/settings.local.json index 1eff521897..4b5b456a93 100644 --- a/.claude/settings.local.json +++ b/.claude/settings.local.json @@ -2,7 +2,8 @@ "permissions": { "allow": [ "Bash(yarn nx g:*)", - "Bash(npx vitest:*)" + "Bash(npx vitest:*)", + "Bash(git show:*)" ] } } diff --git a/docs/en/Community-Articles/2026-03-23-Dynamic-Events-in-ABP/POST.md b/docs/en/Community-Articles/2026-03-23-Dynamic-Events-in-ABP/POST.md new file mode 100644 index 0000000000..1195a6953c --- /dev/null +++ b/docs/en/Community-Articles/2026-03-23-Dynamic-Events-in-ABP/POST.md @@ -0,0 +1,203 @@ +# Dynamic Events in ABP + +ABP's Event Bus is a core infrastructure piece. The **Local Event Bus** handles in-process communication between services. The **Distributed Event Bus** handles cross-service communication over message brokers like RabbitMQ, Kafka, Azure Service Bus, and Rebus. + +Both are fully type-safe — you define event types at compile time, register handlers via DI, and everything is wired up automatically. This works great, but it has one assumption: **you know all your event types at compile time**. + +In practice, that assumption breaks down in several scenarios: + +- You're building a **plugin system** where third-party modules register their own event types at runtime — you can't pre-define an `IDistributedEventHandler` for every possible plugin event +- Your system receives events from **external systems** (webhooks, IoT devices, partner APIs) where the event schema is defined by the external party, not by your codebase +- You're building a **low-code platform** where end users define event-driven workflows through a visual designer — the event names and payloads are entirely determined at runtime + +ABP's **Dynamic Events** extend the existing `IEventBus` and `IDistributedEventBus` interfaces with string-based publishing and subscription. You can publish events by name, subscribe to events by name, and handle payloads without any compile-time type binding — all while coexisting seamlessly with the existing typed event system. + +## Publishing Events by Name + +The most straightforward use case: publish an event using a string name and an arbitrary payload. + +```csharp +public class OrderAppService : ApplicationService +{ + private readonly IDistributedEventBus _eventBus; + + public OrderAppService(IDistributedEventBus eventBus) + { + _eventBus = eventBus; + } + + public async Task PlaceOrderAsync(PlaceOrderInput input) + { + // Business logic... + + // Publish a dynamic event — no event class needed + await _eventBus.PublishAsync( + "OrderPlaced", + new { OrderId = input.Id, CustomerEmail = input.Email } + ); + } +} +``` + +The payload can be any serializable object — an anonymous type, a `Dictionary`, or even an existing typed class. The event bus serializes the payload and sends it to the broker with the string name as the routing key. + +### What If a Typed Event Already Exists? + +If the string name matches an existing typed event (via `EventNameAttribute`), the framework automatically converts the payload to the typed class and routes it through the **typed pipeline**. Both typed handlers and dynamic handlers are triggered. + +```csharp +[EventName("OrderPlaced")] +public class OrderPlacedEto +{ + public Guid OrderId { get; set; } + public string CustomerEmail { get; set; } +} + +// This handler will still receive the event, with auto-converted data +public class OrderEmailHandler : IDistributedEventHandler +{ + public Task HandleEventAsync(OrderPlacedEto eventData) + { + // eventData.OrderId and eventData.CustomerEmail are populated + return Task.CompletedTask; + } +} +``` + +Publishing by name with `new { OrderId = ..., CustomerEmail = ... }` triggers this typed handler — the framework handles the serialization round-trip. This is especially useful for scenarios where a service needs to emit events without taking a dependency on the project that defines the event type. + +## Subscribing to Dynamic Events + +Dynamic subscription lets you register event handlers at runtime, using a string event name. + +```csharp +public override async Task OnApplicationInitializationAsync( + ApplicationInitializationContext context) +{ + var eventBus = context.ServiceProvider + .GetRequiredService(); + + // Subscribe to a dynamic event — no event class needed + eventBus.Subscribe("PartnerOrderReceived", + new PartnerOrderHandler(context.ServiceProvider)); +} +``` + +The handler implements `IDistributedEventHandler`: + +```csharp +public class PartnerOrderHandler : IDistributedEventHandler +{ + private readonly IServiceProvider _serviceProvider; + + public PartnerOrderHandler(IServiceProvider serviceProvider) + { + _serviceProvider = serviceProvider; + } + + public async Task HandleEventAsync(DynamicEventData eventData) + { + // eventData.EventName = "PartnerOrderReceived" + // eventData.Data = the raw payload from the broker + + var orderProcessor = _serviceProvider + .GetRequiredService(); + + await orderProcessor.ProcessAsync(eventData.EventName, eventData.Data); + } +} +``` + +`DynamicEventData` is a simple POCO with two properties: + +- **`EventName`** — the string name that identifies the event +- **`Data`** — the raw event data payload (the deserialized `object` from the broker) + +> `Subscribe` returns an `IDisposable`. Call `Dispose()` to unsubscribe the handler at runtime. + +## Mixed Typed and Dynamic Handlers + +Typed and dynamic handlers coexist naturally. When both are registered for the same event name, **both are triggered** — the framework automatically converts the data to the appropriate format for each handler. + +```csharp +// Typed handler — receives OrderPlacedEto +eventBus.Subscribe(); + +// Dynamic handler — receives DynamicEventData for the same event +eventBus.Subscribe("OrderPlaced", new AuditLogHandler()); +``` + +When `OrderPlacedEto` is published (by type or by name), both handlers fire. The typed handler receives a fully deserialized `OrderPlacedEto` object. The dynamic handler receives a `DynamicEventData` wrapping the raw payload. + +This enables a powerful pattern: the core business logic uses typed handlers for safety, while infrastructure concerns (auditing, logging, plugin hooks) use dynamic handlers for flexibility. + +## Outbox Support + +Dynamic events go through the same **outbox/inbox pipeline** as typed events. If you have outbox configured, dynamic events benefit from the same reliability guarantees — they are stored in the outbox table within the same database transaction as your business data, then reliably delivered to the broker by the background worker. + +No additional configuration is needed. The outbox works transparently for both typed and dynamic events: + +```csharp +// This dynamic event goes through the outbox if configured +using var uow = _unitOfWorkManager.Begin(); +await _eventBus.PublishAsync( + "OrderPlaced", + new { OrderId = orderId }, + onUnitOfWorkComplete: true, + useOutbox: true +); +await uow.CompleteAsync(); +``` + +## Local Event Bus + +Dynamic events work on the local event bus too, not just the distributed bus. The API is the same: + +```csharp +var localEventBus = context.ServiceProvider + .GetRequiredService(); + +// Subscribe dynamically +localEventBus.Subscribe("UserActivityTracked", + new SingleInstanceHandlerFactory( + new ActionEventHandler(eventData => + { + // Handle the event + return Task.CompletedTask; + }))); + +// Publish dynamically +await localEventBus.PublishAsync("UserActivityTracked", new +{ + UserId = currentUser.Id, + Action = "PageView", + Url = "/products/42" +}); +``` + +## Provider Support + +Dynamic events work with all distributed event bus providers: + +| Provider | Dynamic Subscribe | Dynamic Publish | +|---|---|---| +| LocalDistributedEventBus (default) | ✅ | ✅ | +| RabbitMQ | ✅ | ✅ | +| Kafka | ✅ | ✅ | +| Rebus | ✅ | ✅ | +| Azure Service Bus | ✅ | ✅ | +| Dapr | ❌ | ❌ | + +Dapr requires topic subscriptions to be declared at application startup and cannot add subscriptions at runtime. Calling `Subscribe(string, ...)` on the Dapr provider throws an `AbpException`. + +## Summary + +`IEventBus.PublishAsync(string, object)` and `IEventBus.Subscribe(string, handler)` let you publish and subscribe to events by name at runtime — no compile-time types required. If the event name matches a typed event, the framework auto-converts the payload and triggers both typed and dynamic handlers. Dynamic events go through the same outbox/inbox pipeline as typed events, so reliability guarantees are preserved. This works across all providers except Dapr, and coexists seamlessly with the existing typed event system. + +## References + +- [Local Event Bus](https://abp.io/docs/latest/framework/infrastructure/event-bus/local) +- [Distributed Event Bus](https://abp.io/docs/latest/framework/infrastructure/event-bus/distributed) +- [RabbitMQ Integration](https://abp.io/docs/latest/framework/infrastructure/event-bus/distributed/rabbitmq) +- [Kafka Integration](https://abp.io/docs/latest/framework/infrastructure/event-bus/distributed/kafka) +- [Dynamic Distributed Events Sample](https://github.com/abpframework/abp-samples/tree/master/DynamicDistributedEvents) diff --git a/docs/en/Community-Articles/2026-03-23-Dynamic-Events-in-ABP/cover.png b/docs/en/Community-Articles/2026-03-23-Dynamic-Events-in-ABP/cover.png new file mode 100644 index 0000000000..4776c8485b Binary files /dev/null and b/docs/en/Community-Articles/2026-03-23-Dynamic-Events-in-ABP/cover.png differ diff --git a/docs/en/framework/infrastructure/event-bus/distributed/index.md b/docs/en/framework/infrastructure/event-bus/distributed/index.md index e5f96bba10..5f2c7b11ec 100644 --- a/docs/en/framework/infrastructure/event-bus/distributed/index.md +++ b/docs/en/framework/infrastructure/event-bus/distributed/index.md @@ -721,6 +721,82 @@ Configure(options => }); ```` +## Dynamic (String-Based) Events + +In addition to the type-safe event system described above, ABP also supports **dynamic events** that are identified by a string name rather than a CLR type. This is useful for scenarios where event types are not known at compile time, such as integrating with external systems or building plugin architectures. + +> **Note:** Dynamic event subscriptions are supported by RabbitMQ, Kafka, Azure Service Bus, and Rebus providers. The **Dapr provider does not support dynamic events** because Dapr requires topic subscriptions to be declared at application startup and cannot add subscriptions at runtime. Attempting to call `Subscribe(string, ...)` on the Dapr provider will throw an `AbpException`. + +### Publishing Dynamic Events + +Use the `PublishAsync` overload that accepts a string event name: + +````csharp +await distributedEventBus.PublishAsync( + "MyDynamicEvent", + new Dictionary + { + ["UserId"] = 42, + ["Name"] = "John" + } +); +```` + +If a typed event exists with the given name (via `EventNameAttribute` or convention), the data is automatically deserialized and routed to the typed handler. Otherwise, it is delivered as a `DynamicEventData` to dynamic handlers. + +You can also control `onUnitOfWorkComplete` and `useOutbox` parameters: + +````csharp +await distributedEventBus.PublishAsync( + "MyDynamicEvent", + new { UserId = 42, Name = "John" }, + onUnitOfWorkComplete: true, + useOutbox: true +); +```` + +### Subscribing to Dynamic Events + +Use the `Subscribe` overload that accepts a string event name: + +````csharp +var subscription = distributedEventBus.Subscribe( + "MyDynamicEvent", + new SingleInstanceHandlerFactory( + new ActionEventHandler(eventData => + { + // Access the event name and raw data + var name = eventData.EventName; + var data = eventData.Data; + + return Task.CompletedTask; + }))); + +// Unsubscribe when done +subscription.Dispose(); +```` + +You can also subscribe using a typed distributed event handler: + +````csharp +distributedEventBus.Subscribe("MyDynamicEvent", myDistributedEventHandler); +```` + +Where `myDistributedEventHandler` implements `IDistributedEventHandler`. + +### Mixed Typed and Dynamic Handlers + +When both a typed handler and a dynamic handler are registered for the same event name, **both** handlers are triggered. The typed handler receives the converted typed data, while the dynamic handler receives a `DynamicEventData` wrapper. + +### DynamicEventData Class + +The `DynamicEventData` class is a simple data object that wraps the event payload: + +- **`EventName`**: The string name that identifies the event. +- **`Data`**: The raw event data payload. + +> If a typed handler exists for the same event name, the framework automatically converts the data to the expected type using the event bus serialization pipeline. Dynamic handlers receive the raw `Data` as-is. + ## See Also * [Local Event Bus](../local) diff --git a/docs/en/framework/infrastructure/event-bus/local/index.md b/docs/en/framework/infrastructure/event-bus/local/index.md index b20718c51f..bc8af41a06 100644 --- a/docs/en/framework/infrastructure/event-bus/local/index.md +++ b/docs/en/framework/infrastructure/event-bus/local/index.md @@ -249,6 +249,59 @@ If you set it to `false`, the `EntityUpdatedEventData` will not be published > This option is only used for the EF Core. +## Dynamic (String-Based) Events + +In addition to the type-safe event system described above, ABP also supports **dynamic events** that are identified by a string name rather than a CLR type. This is useful for scenarios where event types are not known at compile time. + +### Publishing Dynamic Events + +Use the `PublishAsync` overload that accepts a string event name: + +````csharp +await localEventBus.PublishAsync( + "MyDynamicEvent", + new Dictionary + { + ["UserId"] = 42, + ["Name"] = "John" + } +); +```` + +If a typed event exists with the given name (via `EventNameAttribute` or convention), the data is automatically converted and routed to the typed handler. Otherwise, it is delivered as a `DynamicEventData` to dynamic handlers. + +### Subscribing to Dynamic Events + +Use the `Subscribe` overload that accepts a string event name: + +````csharp +var subscription = localEventBus.Subscribe( + "MyDynamicEvent", + new SingleInstanceHandlerFactory( + new ActionEventHandler(eventData => + { + // Access the event name and raw data + var name = eventData.EventName; + var data = eventData.Data; + + return Task.CompletedTask; + }))); + +// Unsubscribe when done +subscription.Dispose(); +```` + +The `DynamicEventData` class is a simple data object with two properties: + +- **`EventName`**: The string name that identifies the event. +- **`Data`**: The raw event data payload. + +> If a typed handler exists for the same event name, the framework automatically converts the data to the expected type. Dynamic handlers receive the raw `Data` as-is. + +### Mixed Typed and Dynamic Handlers + +When both a typed handler and a dynamic handler are registered for the same event name, **both** handlers are triggered. The typed handler receives the converted typed data, while the dynamic handler receives a `DynamicEventData` wrapper. + ## See Also * [Distributed Event Bus](../distributed) diff --git a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IDistributedEventBus.cs index c84855e0ea..9fdab66aee 100644 --- a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Distributed/IDistributedEventBus.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Threading.Tasks; namespace Volo.Abp.EventBus.Distributed; @@ -14,15 +14,55 @@ public interface IDistributedEventBus : IEventBus IDisposable Subscribe(IDistributedEventHandler handler) where TEvent : class; + /// + /// Triggers an event. + /// + /// Event type + /// Related data for the event + /// True, to publish the event at the end of the current unit of work, if available + /// True, to use the outbox pattern for reliable event publishing + /// The task to handle async operation Task PublishAsync( TEvent eventData, bool onUnitOfWorkComplete = true, bool useOutbox = true) where TEvent : class; + /// + /// Triggers an event. + /// + /// Event type + /// Related data for the event + /// True, to publish the event at the end of the current unit of work, if available + /// True, to use the outbox pattern for reliable event publishing + /// The task to handle async operation Task PublishAsync( Type eventType, object eventData, bool onUnitOfWorkComplete = true, bool useOutbox = true); + + /// + /// Registers to an event by its string-based event name. + /// Same (given) instance of the handler is used for all event occurrences. + /// Wraps the handler as . + /// + /// Name of the event + /// Object to handle the event + IDisposable Subscribe(string eventName, IDistributedEventHandler handler); + + /// + /// Triggers an event by its string-based event name. + /// Used for dynamic (type-less) event publishing over distributed event bus. + /// + /// Name of the event + /// Related data for the event + /// True, to publish the event at the end of the current unit of work, if available + /// True, to use the outbox pattern for reliable event publishing + /// The task to handle async operation + Task PublishAsync( + string eventName, + object eventData, + bool onUnitOfWorkComplete = true, + bool useOutbox = true); } diff --git a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/DynamicEventData.cs b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/DynamicEventData.cs new file mode 100644 index 0000000000..1af2f600a3 --- /dev/null +++ b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/DynamicEventData.cs @@ -0,0 +1,17 @@ +namespace Volo.Abp.EventBus; + +/// +/// Wraps arbitrary event data with a string-based event name for dynamic (type-less) event handling. +/// +public class DynamicEventData +{ + public string EventName { get; } + + public object Data { get; } + + public DynamicEventData(string eventName, object data) + { + EventName = Check.NotNullOrWhiteSpace(eventName, nameof(eventName)); + Data = Check.NotNull(data, nameof(data)); + } +} diff --git a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/IEventBus.cs b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/IEventBus.cs index f1ceae8617..5430f20155 100644 --- a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/IEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/IEventBus.cs @@ -23,6 +23,16 @@ public interface IEventBus /// True, to publish the event at the end of the current unit of work, if available /// The task to handle async operation Task PublishAsync(Type eventType, object eventData, bool onUnitOfWorkComplete = true); + + /// + /// Triggers an event by its string-based event name. + /// Used for dynamic (type-less) event publishing. + /// + /// Name of the event + /// Related data for the event + /// True, to publish the event at the end of the current unit of work, if available + /// The task to handle async operation + Task PublishAsync(string eventName, object eventData, bool onUnitOfWorkComplete = true); /// /// Registers to an event. @@ -50,6 +60,22 @@ public interface IEventBus /// Event type /// Object to handle the event IDisposable Subscribe(Type eventType, IEventHandler handler); + + /// + /// Registers to an event by its string-based event name. + /// Same (given) instance of the handler is used for all event occurrences. + /// + /// Name of the event + /// Object to handle the event + IDisposable Subscribe(string eventName, IEventHandler handler); + + /// + /// Registers to an event by its string-based event name. + /// Given factory is used to create/release handlers. + /// + /// Name of the event + /// A factory to create/release handlers + IDisposable Subscribe(string eventName, IEventHandlerFactory handler); /// /// Registers to an event. @@ -104,6 +130,20 @@ public interface IEventBus /// Event type /// Factory object that is registered before void Unsubscribe(Type eventType, IEventHandlerFactory factory); + + /// + /// Unregisters from an event by its string-based event name. + /// + /// Name of the event + /// Factory object that is registered before + void Unsubscribe(string eventName, IEventHandlerFactory factory); + + /// + /// Unregisters from an event by its string-based event name. + /// + /// Name of the event + /// Handler object that is registered before + void Unsubscribe(string eventName, IEventHandler handler); /// /// Unregisters all event handlers of given event type. @@ -117,4 +157,10 @@ public interface IEventBus /// /// Event type void UnsubscribeAll(Type eventType); + + /// + /// Unregisters all event handlers of given string-based event name. + /// + /// Name of the event + void UnsubscribeAll(string eventName); } diff --git a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Local/ILocalEventBus.cs b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Local/ILocalEventBus.cs index e691b6c58c..733b2dafca 100644 --- a/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Local/ILocalEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/Local/ILocalEventBus.cs @@ -23,4 +23,11 @@ public interface ILocalEventBus : IEventBus /// Event type /// List GetEventHandlerFactories(Type eventType); + + /// + /// Gets the list of event handler factories for the given string-based event name. + /// + /// Name of the event + /// List of event handler factories registered for the given event name + List GetDynamicEventHandlerFactories(string eventName); } diff --git a/framework/src/Volo.Abp.EventBus.Azure/Volo/Abp/EventBus/Azure/AzureDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Azure/Volo/Abp/EventBus/Azure/AzureDistributedEventBus.cs index 32b07f1b82..296f353681 100644 --- a/framework/src/Volo.Abp.EventBus.Azure/Volo/Abp/EventBus/Azure/AzureDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.Azure/Volo/Abp/EventBus/Azure/AzureDistributedEventBus.cs @@ -29,6 +29,7 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen protected IAzureServiceBusSerializer Serializer { get; } protected ConcurrentDictionary> HandlerFactories { get; } protected ConcurrentDictionary EventTypes { get; } + protected ConcurrentDictionary> DynamicHandlerFactories { get; } protected IAzureServiceBusMessageConsumer Consumer { get; private set; } = default!; public AzureDistributedEventBus( @@ -61,6 +62,7 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen PublisherPool = publisherPool; HandlerFactories = new ConcurrentDictionary>(); EventTypes = new ConcurrentDictionary(); + DynamicHandlerFactories = new ConcurrentDictionary>(); } public void Initialize() @@ -81,14 +83,25 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen { return; } + var eventType = EventTypes.GetOrDefault(eventName); - if (eventType == null) + object eventData; + + if (eventType != null) + { + eventData = Serializer.Deserialize(message.Body.ToArray(), eventType); + } + else if (DynamicHandlerFactories.ContainsKey(eventName)) + { + var rawBytes = message.Body.ToArray(); + eventData = new DynamicEventData(eventName, Serializer.Deserialize(rawBytes)); + eventType = typeof(DynamicEventData); + } + else { return; } - var eventData = Serializer.Deserialize(message.Body.ToArray(), eventType); - if (await AddToInboxAsync(message.MessageId, eventName, eventType, eventData, message.CorrelationId)) { return; @@ -100,6 +113,113 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen } } + public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory) + { + var handlerFactories = GetOrCreateHandlerFactories(eventType); + + if (factory.IsInFactories(handlerFactories)) + { + return NullDisposable.Instance; + } + + handlerFactories.Add(factory); + + return new EventHandlerFactoryUnregistrar(this, eventType, factory); + } + + /// + public override IDisposable Subscribe(string eventName, IEventHandlerFactory handler) + { + var handlerFactories = GetOrCreateDynamicHandlerFactories(eventName); + + if (handler.IsInFactories(handlerFactories)) + { + return NullDisposable.Instance; + } + + handlerFactories.Add(handler); + + return new DynamicEventHandlerFactoryUnregistrar(this, eventName, handler); + } + + public override void Unsubscribe(Func action) + { + Check.NotNull(action, nameof(action)); + + GetOrCreateHandlerFactories(typeof(TEvent)) + .Locking(factories => + { + factories.RemoveAll( + factory => + { + var singleInstanceFactory = factory as SingleInstanceHandlerFactory; + if (singleInstanceFactory == null) + { + return false; + } + + var actionHandler = singleInstanceFactory.HandlerInstance as ActionEventHandler; + if (actionHandler == null) + { + return false; + } + + return actionHandler.Action == action; + }); + }); + } + + public override void Unsubscribe(Type eventType, IEventHandler handler) + { + GetOrCreateHandlerFactories(eventType) + .Locking(factories => + { + factories.RemoveAll( + factory => + factory is SingleInstanceHandlerFactory handlerFactory && + handlerFactory.HandlerInstance == handler + ); + }); + } + + public override void Unsubscribe(Type eventType, IEventHandlerFactory factory) + { + GetOrCreateHandlerFactories(eventType) + .Locking(factories => factories.Remove(factory)); + } + + /// + public override void UnsubscribeAll(Type eventType) + { + GetOrCreateHandlerFactories(eventType) + .Locking(factories => factories.Clear()); + } + + /// + public override Task PublishAsync(string eventName, object eventData, bool onUnitOfWorkComplete = true) + { + var eventType = EventTypes.GetOrDefault(eventName); + var dynamicEventData = eventData as DynamicEventData ?? new DynamicEventData(eventName, eventData); + + if (eventType != null) + { + return PublishAsync(eventType, ConvertDynamicEventData(dynamicEventData.Data, eventType), onUnitOfWorkComplete); + } + + return PublishAsync(typeof(DynamicEventData), dynamicEventData, onUnitOfWorkComplete); + } + + protected async override Task PublishToEventBusAsync(Type eventType, object eventData) + { + var (eventName, resolvedData) = ResolveEventForPublishing(eventType, eventData); + await PublishAsync(eventName, resolvedData); + } + + protected override void AddToUnitOfWork(IUnitOfWork unitOfWork, UnitOfWorkEventRecord eventRecord) + { + unitOfWork.AddOrReplaceDistributedEvent(eventRecord); + } + public async override Task PublishFromOutboxAsync(OutgoingEventInfo outgoingEvent, OutboxConfig outboxConfig) { await PublishAsync(outgoingEvent.EventName, outgoingEvent.EventData, outgoingEvent.GetCorrelationId(), outgoingEvent.Id); @@ -162,12 +282,21 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen public async override Task ProcessFromInboxAsync(IncomingEventInfo incomingEvent, InboxConfig inboxConfig) { var eventType = EventTypes.GetOrDefault(incomingEvent.EventName); - if (eventType == null) + object eventData; + + if (eventType != null) + { + eventData = Serializer.Deserialize(incomingEvent.EventData, eventType); + } + else if (DynamicHandlerFactories.ContainsKey(incomingEvent.EventName)) + { + eventData = new DynamicEventData(incomingEvent.EventName, Serializer.Deserialize(incomingEvent.EventData)); + eventType = typeof(DynamicEventData); + } + else { return; } - - var eventData = Serializer.Deserialize(incomingEvent.EventData, eventType); var exceptions = new List(); using (CorrelationIdProvider.Change(incomingEvent.GetCorrelationId())) { @@ -184,82 +313,6 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen return Serializer.Serialize(eventData); } - public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory) - { - var handlerFactories = GetOrCreateHandlerFactories(eventType); - - if (factory.IsInFactories(handlerFactories)) - { - return NullDisposable.Instance; - } - - handlerFactories.Add(factory); - - return new EventHandlerFactoryUnregistrar(this, eventType, factory); - } - - public override void Unsubscribe(Func action) - { - Check.NotNull(action, nameof(action)); - - GetOrCreateHandlerFactories(typeof(TEvent)) - .Locking(factories => - { - factories.RemoveAll( - factory => - { - var singleInstanceFactory = factory as SingleInstanceHandlerFactory; - if (singleInstanceFactory == null) - { - return false; - } - - var actionHandler = singleInstanceFactory.HandlerInstance as ActionEventHandler; - if (actionHandler == null) - { - return false; - } - - return actionHandler.Action == action; - }); - }); - } - - public override void Unsubscribe(Type eventType, IEventHandler handler) - { - GetOrCreateHandlerFactories(eventType) - .Locking(factories => - { - factories.RemoveAll( - factory => - factory is SingleInstanceHandlerFactory handlerFactory && - handlerFactory.HandlerInstance == handler - ); - }); - } - - public override void Unsubscribe(Type eventType, IEventHandlerFactory factory) - { - GetOrCreateHandlerFactories(eventType) - .Locking(factories => factories.Remove(factory)); - } - - public override void UnsubscribeAll(Type eventType) - { - GetOrCreateHandlerFactories(eventType) - .Locking(factories => factories.Clear()); - } - - protected async override Task PublishToEventBusAsync(Type eventType, object eventData) - { - await PublishAsync(EventNameAttribute.GetNameOrDefault(eventType), eventData); - } - - protected override void AddToUnitOfWork(IUnitOfWork unitOfWork, UnitOfWorkEventRecord eventRecord) - { - unitOfWork.AddOrReplaceDistributedEvent(eventRecord); - } - protected virtual Task PublishAsync(string eventName, object eventData) { var body = Serializer.Serialize(eventData); @@ -292,23 +345,12 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen await publisher.SendMessageAsync(message); } - protected override IEnumerable GetHandlerFactories(Type eventType) - { - return HandlerFactories - .Where(hf => ShouldTriggerEventForHandler(eventType, hf.Key)) - .Select(handlerFactory => - new EventTypeWithEventHandlerFactories(handlerFactory.Key, handlerFactory.Value)) - .ToArray(); - } - - private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType) - { - return handlerEventType == targetEventType || handlerEventType.IsAssignableFrom(targetEventType); - } - protected override Task OnAddToOutboxAsync(string eventName, Type eventType, object eventData) { - EventTypes.GetOrAdd(eventName, eventType); + if (typeof(DynamicEventData) != eventType) + { + EventTypes.GetOrAdd(eventName, eventType); + } return base.OnAddToOutboxAsync(eventName, eventType, eventData); } @@ -324,4 +366,83 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen } ); } + + protected override IEnumerable GetHandlerFactories(Type eventType) + { + var handlerFactoryList = new List(); + var eventNames = EventTypes.Where(x => ShouldTriggerEventForHandler(eventType, x.Value)).Select(x => x.Key).ToList(); + + foreach (var handlerFactory in HandlerFactories.Where(hf => ShouldTriggerEventForHandler(eventType, hf.Key))) + { + handlerFactoryList.Add(new EventTypeWithEventHandlerFactories(handlerFactory.Key, handlerFactory.Value)); + } + + foreach (var handlerFactory in DynamicHandlerFactories.Where(aehf => eventNames.Contains(aehf.Key))) + { + handlerFactoryList.Add(new EventTypeWithEventHandlerFactories(typeof(DynamicEventData), handlerFactory.Value)); + } + + return handlerFactoryList.ToArray(); + } + + protected override Type? GetEventTypeByEventName(string eventName) + { + return EventTypes.GetOrDefault(eventName); + } + + /// + public override void Unsubscribe(string eventName, IEventHandlerFactory factory) + { + GetOrCreateDynamicHandlerFactories(eventName) + .Locking(factories => factories.Remove(factory)); + } + + /// + public override void Unsubscribe(string eventName, IEventHandler handler) + { + GetOrCreateDynamicHandlerFactories(eventName) + .Locking(factories => + { + factories.RemoveAll( + factory => + factory is SingleInstanceHandlerFactory singleFactory && + singleFactory.HandlerInstance == handler + ); + }); + } + + /// + public override void UnsubscribeAll(string eventName) + { + GetOrCreateDynamicHandlerFactories(eventName) + .Locking(factories => factories.Clear()); + } + + protected override IEnumerable GetDynamicHandlerFactories(string eventName) + { + var eventType = GetEventTypeByEventName(eventName); + if (eventType != null) + { + return GetHandlerFactories(eventType); + } + + var result = new List(); + + foreach (var handlerFactory in DynamicHandlerFactories.Where(hf => hf.Key == eventName)) + { + result.Add(new EventTypeWithEventHandlerFactories(typeof(DynamicEventData), handlerFactory.Value)); + } + + return result; + } + + private List GetOrCreateDynamicHandlerFactories(string eventName) + { + return DynamicHandlerFactories.GetOrAdd(eventName, _ => new List()); + } + + private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType) + { + return handlerEventType == targetEventType || handlerEventType.IsAssignableFrom(targetEventType); + } } diff --git a/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/DaprDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/DaprDistributedEventBus.cs index 7c77340dda..5e89571554 100644 --- a/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/DaprDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.Dapr/Volo/Abp/EventBus/Dapr/DaprDistributedEventBus.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; @@ -79,6 +79,15 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend return new EventHandlerFactoryUnregistrar(this, eventType, factory); } + /// + public override IDisposable Subscribe(string eventName, IEventHandlerFactory handler) + { + throw new AbpException( + "Dapr distributed event bus does not support dynamic event subscriptions. " + + "Dapr requires topic subscriptions to be declared at startup and cannot add subscriptions at runtime. " + + "Use a typed event handler (IDistributedEventHandler) instead."); + } + public override void Unsubscribe(Func action) { Check.NotNull(action, nameof(action)); @@ -129,37 +138,43 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend GetOrCreateHandlerFactories(eventType).Locking(factories => factories.Clear()); } - protected async override Task PublishToEventBusAsync(Type eventType, object eventData) + /// + public override Task PublishAsync(string eventName, object eventData, bool onUnitOfWorkComplete = true) { - await PublishToDaprAsync(eventType, eventData, null, CorrelationIdProvider.Get()); + var eventType = EventTypes.GetOrDefault(eventName); + if (eventType != null) + { + var dynamicEventData = eventData as DynamicEventData ?? new DynamicEventData(eventName, eventData); + return PublishAsync(eventType, ConvertDynamicEventData(dynamicEventData.Data, eventType), onUnitOfWorkComplete); + } + + throw new AbpException( + "Dapr distributed event bus does not support dynamic event publishing. " + + "Dapr requires topic subscriptions to be declared at startup. " + + "Use a typed event (PublishAsync) or ensure the event name matches a registered typed event."); } - protected override void AddToUnitOfWork(IUnitOfWork unitOfWork, UnitOfWorkEventRecord eventRecord) + protected async override Task PublishToEventBusAsync(Type eventType, object eventData) { - unitOfWork.AddOrReplaceDistributedEvent(eventRecord); + var (eventName, resolvedData) = ResolveEventForPublishing(eventType, eventData); + await PublishToDaprAsync(eventName, resolvedData, null, CorrelationIdProvider.Get()); } - protected override IEnumerable GetHandlerFactories(Type eventType) + protected override void AddToUnitOfWork(IUnitOfWork unitOfWork, UnitOfWorkEventRecord eventRecord) { - var handlerFactoryList = new List(); - - foreach (var handlerFactory in HandlerFactories.Where(hf => ShouldTriggerEventForHandler(eventType, hf.Key))) - { - handlerFactoryList.Add(new EventTypeWithEventHandlerFactories(handlerFactory.Key, handlerFactory.Value)); - } - - return handlerFactoryList.ToArray(); + unitOfWork.AddOrReplaceDistributedEvent(eventRecord); } public async override Task PublishFromOutboxAsync(OutgoingEventInfo outgoingEvent, OutboxConfig outboxConfig) { - var eventType = GetEventType(outgoingEvent.EventName); + var eventType = EventTypes.GetOrDefault(outgoingEvent.EventName); if (eventType == null) { return; } - await PublishToDaprAsync(outgoingEvent.EventName, Serializer.Deserialize(outgoingEvent.EventData, eventType), outgoingEvent.Id, outgoingEvent.GetCorrelationId()); + var eventData = Serializer.Deserialize(outgoingEvent.EventData, eventType); + await PublishToDaprAsync(outgoingEvent.EventName, eventData, outgoingEvent.Id, outgoingEvent.GetCorrelationId()); using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId())) { @@ -182,7 +197,7 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend public virtual async Task TriggerHandlersAsync(Type eventType, object eventData, string? messageId = null, string? correlationId = null) { - if (await AddToInboxAsync(messageId, EventNameAttribute.GetNameOrDefault(eventType), eventType, eventData, correlationId)) + if (await AddToInboxAsync(messageId, GetEventName(eventType, eventData), eventType, eventData, correlationId)) { return; } @@ -195,13 +210,16 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend public async override Task ProcessFromInboxAsync(IncomingEventInfo incomingEvent, InboxConfig inboxConfig) { - var eventType = GetEventType(incomingEvent.EventName); + var eventType = EventTypes.GetOrDefault(incomingEvent.EventName); + object eventData; + if (eventType == null) { return; } - var eventData = Serializer.Deserialize(incomingEvent.EventData, eventType); + eventData = Serializer.Deserialize(incomingEvent.EventData, eventType); + var exceptions = new List(); using (CorrelationIdProvider.Change(incomingEvent.GetCorrelationId())) { @@ -218,6 +236,18 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend return Serializer.Serialize(eventData); } + protected virtual async Task PublishToDaprAsync(Type eventType, object eventData, Guid? messageId = null, string? correlationId = null) + { + await PublishToDaprAsync(EventNameAttribute.GetNameOrDefault(eventType), eventData, messageId, correlationId); + } + + protected virtual async Task PublishToDaprAsync(string eventName, object eventData, Guid? messageId = null, string? correlationId = null) + { + var client = await DaprClientFactory.CreateAsync(); + var data = new AbpDaprEventData(DaprEventBusOptions.PubSubName, eventName, (messageId ?? GuidGenerator.Create()).ToString("N"), Serializer.SerializeToString(eventData), correlationId); + await client.PublishEventAsync(pubsubName: DaprEventBusOptions.PubSubName, topicName: eventName, data: data); + } + protected override Task OnAddToOutboxAsync(string eventName, Type eventType, object eventData) { EventTypes.GetOrAdd(eventName, eventType); @@ -237,21 +267,55 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend ); } + protected override IEnumerable GetHandlerFactories(Type eventType) + { + var handlerFactoryList = new List(); + + foreach (var handlerFactory in HandlerFactories.Where(hf => ShouldTriggerEventForHandler(eventType, hf.Key))) + { + handlerFactoryList.Add(new EventTypeWithEventHandlerFactories(handlerFactory.Key, handlerFactory.Value)); + } + + return handlerFactoryList.ToArray(); + } + + protected override Type? GetEventTypeByEventName(string eventName) + { + return EventTypes.GetOrDefault(eventName); + } + public Type? GetEventType(string eventName) { return EventTypes.GetOrDefault(eventName); } - protected virtual async Task PublishToDaprAsync(Type eventType, object eventData, Guid? messageId = null, string? correlationId = null) + /// + public override void Unsubscribe(string eventName, IEventHandlerFactory factory) { - await PublishToDaprAsync(EventNameAttribute.GetNameOrDefault(eventType), eventData, messageId, correlationId); + throw new AbpException("Dapr distributed event bus does not support dynamic event subscriptions."); } - protected virtual async Task PublishToDaprAsync(string eventName, object eventData, Guid? messageId = null, string? correlationId = null) + /// + public override void Unsubscribe(string eventName, IEventHandler handler) { - var client = await DaprClientFactory.CreateAsync(); - var data = new AbpDaprEventData(DaprEventBusOptions.PubSubName, eventName, (messageId ?? GuidGenerator.Create()).ToString("N"), Serializer.SerializeToString(eventData), correlationId); - await client.PublishEventAsync(pubsubName: DaprEventBusOptions.PubSubName, topicName: eventName, data: data); + throw new AbpException("Dapr distributed event bus does not support dynamic event subscriptions."); + } + + /// + public override void UnsubscribeAll(string eventName) + { + throw new AbpException("Dapr distributed event bus does not support dynamic event subscriptions."); + } + + protected override IEnumerable GetDynamicHandlerFactories(string eventName) + { + var eventType = GetEventTypeByEventName(eventName); + if (eventType != null) + { + return GetHandlerFactories(eventType); + } + + return Array.Empty(); } private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType) diff --git a/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs index a3376d9429..a5b2b2869a 100644 --- a/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; @@ -29,6 +29,7 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen protected IProducerPool ProducerPool { get; } protected ConcurrentDictionary> HandlerFactories { get; } protected ConcurrentDictionary EventTypes { get; } + protected ConcurrentDictionary> DynamicHandlerFactories { get; } protected IKafkaMessageConsumer Consumer { get; private set; } = default!; public KafkaDistributedEventBus( @@ -63,6 +64,7 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen HandlerFactories = new ConcurrentDictionary>(); EventTypes = new ConcurrentDictionary(); + DynamicHandlerFactories = new ConcurrentDictionary>(); } public void Initialize() @@ -80,14 +82,24 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen { var eventName = message.Key; var eventType = EventTypes.GetOrDefault(eventName); - if (eventType == null) - { - return; - } var messageId = message.GetMessageId(); - var eventData = Serializer.Deserialize(message.Value, eventType); var correlationId = message.GetCorrelationId(); + object eventData; + + if (eventType != null) + { + eventData = Serializer.Deserialize(message.Value, eventType); + } + else if (DynamicHandlerFactories.ContainsKey(eventName)) + { + eventData = new DynamicEventData(eventName, Serializer.Deserialize(message.Value)); + eventType = typeof(DynamicEventData); + } + else + { + return; + } if (await AddToInboxAsync(messageId, eventName, eventType, eventData, correlationId)) { @@ -114,6 +126,21 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen return new EventHandlerFactoryUnregistrar(this, eventType, factory); } + /// + public override IDisposable Subscribe(string eventName, IEventHandlerFactory handler) + { + var handlerFactories = GetOrCreateDynamicHandlerFactories(eventName); + + if (handler.IsInFactories(handlerFactories)) + { + return NullDisposable.Instance; + } + + handlerFactories.Add(handler); + + return new DynamicEventHandlerFactoryUnregistrar(this, eventName, handler); + } + /// public override void Unsubscribe(Func action) { @@ -168,6 +195,20 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen GetOrCreateHandlerFactories(eventType).Locking(factories => factories.Clear()); } + /// + public override Task PublishAsync(string eventName, object eventData, bool onUnitOfWorkComplete = true) + { + var eventType = EventTypes.GetOrDefault(eventName); + var dynamicEventData = eventData as DynamicEventData ?? new DynamicEventData(eventName, eventData); + + if (eventType != null) + { + return PublishAsync(eventType, ConvertDynamicEventData(dynamicEventData.Data, eventType), onUnitOfWorkComplete); + } + + return PublishAsync(typeof(DynamicEventData), dynamicEventData, onUnitOfWorkComplete); + } + protected override async Task PublishToEventBusAsync(Type eventType, object eventData) { var headers = new Headers @@ -278,12 +319,21 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen InboxConfig inboxConfig) { var eventType = EventTypes.GetOrDefault(incomingEvent.EventName); - if (eventType == null) + object eventData; + + if (eventType != null) + { + eventData = Serializer.Deserialize(incomingEvent.EventData, eventType); + } + else if (DynamicHandlerFactories.ContainsKey(incomingEvent.EventName)) + { + eventData = new DynamicEventData(incomingEvent.EventName, Serializer.Deserialize(incomingEvent.EventData)); + eventType = typeof(DynamicEventData); + } + else { return; } - - var eventData = Serializer.Deserialize(incomingEvent.EventData, eventType); var exceptions = new List(); using (CorrelationIdProvider.Change(incomingEvent.GetCorrelationId())) { @@ -302,8 +352,8 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen private async Task PublishAsync(string topicName, Type eventType, object eventData, Headers headers) { - var eventName = EventNameAttribute.GetNameOrDefault(eventType); - var body = Serializer.Serialize(eventData); + var (eventName, resolvedData) = ResolveEventForPublishing(eventType, eventData); + var body = Serializer.Serialize(resolvedData); var result = await PublishAsync(topicName, eventName, body, headers); if (result.Status != PersistenceStatus.Persisted) @@ -332,7 +382,10 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen protected override Task OnAddToOutboxAsync(string eventName, Type eventType, object eventData) { - EventTypes.GetOrAdd(eventName, eventType); + if (typeof(DynamicEventData) != eventType) + { + EventTypes.GetOrAdd(eventName, eventType); + } return base.OnAddToOutboxAsync(eventName, eventType, eventData); } @@ -352,17 +405,75 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen protected override IEnumerable GetHandlerFactories(Type eventType) { var handlerFactoryList = new List(); + var eventNames = EventTypes.Where(x => ShouldTriggerEventForHandler(eventType, x.Value)).Select(x => x.Key).ToList(); - foreach (var handlerFactory in HandlerFactories.Where(hf => ShouldTriggerEventForHandler(eventType, hf.Key)) - ) + foreach (var handlerFactory in HandlerFactories.Where(hf => ShouldTriggerEventForHandler(eventType, hf.Key))) { - handlerFactoryList.Add( - new EventTypeWithEventHandlerFactories(handlerFactory.Key, handlerFactory.Value)); + handlerFactoryList.Add(new EventTypeWithEventHandlerFactories(handlerFactory.Key, handlerFactory.Value)); + } + + foreach (var handlerFactory in DynamicHandlerFactories.Where(aehf => eventNames.Contains(aehf.Key))) + { + handlerFactoryList.Add(new EventTypeWithEventHandlerFactories(typeof(DynamicEventData), handlerFactory.Value)); } return handlerFactoryList.ToArray(); } + protected override Type? GetEventTypeByEventName(string eventName) + { + return EventTypes.GetOrDefault(eventName); + } + + /// + public override void Unsubscribe(string eventName, IEventHandlerFactory factory) + { + GetOrCreateDynamicHandlerFactories(eventName).Locking(factories => factories.Remove(factory)); + } + + /// + public override void Unsubscribe(string eventName, IEventHandler handler) + { + GetOrCreateDynamicHandlerFactories(eventName) + .Locking(factories => + { + factories.RemoveAll( + factory => + factory is SingleInstanceHandlerFactory singleFactory && + singleFactory.HandlerInstance == handler + ); + }); + } + + /// + public override void UnsubscribeAll(string eventName) + { + GetOrCreateDynamicHandlerFactories(eventName).Locking(factories => factories.Clear()); + } + + protected override IEnumerable GetDynamicHandlerFactories(string eventName) + { + var eventType = GetEventTypeByEventName(eventName); + if (eventType != null) + { + return GetHandlerFactories(eventType); + } + + var result = new List(); + + foreach (var handlerFactory in DynamicHandlerFactories.Where(hf => hf.Key == eventName)) + { + result.Add(new EventTypeWithEventHandlerFactories(typeof(DynamicEventData), handlerFactory.Value)); + } + + return result; + } + + private List GetOrCreateDynamicHandlerFactories(string eventName) + { + return DynamicHandlerFactories.GetOrAdd(eventName, _ => new List()); + } + private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType) { //Should trigger same type diff --git a/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs index 3a647388b5..b18afd2ea8 100644 --- a/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; @@ -33,6 +33,7 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, IRabbitMqDis //TODO: Accessing to the List may not be thread-safe! protected ConcurrentDictionary> HandlerFactories { get; } protected ConcurrentDictionary EventTypes { get; } + protected ConcurrentDictionary> DynamicHandlerFactories { get; } protected IRabbitMqMessageConsumerFactory MessageConsumerFactory { get; } protected IRabbitMqMessageConsumer Consumer { get; private set; } = default!; @@ -70,6 +71,7 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, IRabbitMqDis HandlerFactories = new ConcurrentDictionary>(); EventTypes = new ConcurrentDictionary(); + DynamicHandlerFactories = new ConcurrentDictionary>(); } public virtual void Initialize() @@ -101,13 +103,23 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, IRabbitMqDis { var eventName = ea.RoutingKey; var eventType = EventTypes.GetOrDefault(eventName); - if (eventType == null) + object eventData; + + if (eventType != null) + { + eventData = Serializer.Deserialize(ea.Body.ToArray(), eventType); + } + else if (DynamicHandlerFactories.ContainsKey(eventName)) + { + var rawBytes = ea.Body.ToArray(); + eventType = typeof(DynamicEventData); + eventData = new DynamicEventData(eventName, Serializer.Deserialize(rawBytes)); + } + else { return; } - var eventData = Serializer.Deserialize(ea.Body.ToArray(), eventType); - var correlationId = ea.BasicProperties.CorrelationId; if (await AddToInboxAsync(ea.BasicProperties.MessageId, eventName, eventType, eventData, correlationId)) { @@ -139,6 +151,26 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, IRabbitMqDis return new EventHandlerFactoryUnregistrar(this, eventType, factory); } + /// + public override IDisposable Subscribe(string eventName, IEventHandlerFactory handler) + { + var handlerFactories = GetOrCreateDynamicHandlerFactories(eventName); + + if (handler.IsInFactories(handlerFactories)) + { + return NullDisposable.Instance; + } + + handlerFactories.Add(handler); + + if (handlerFactories.Count == 1) //TODO: Multi-threading! + { + Consumer.BindAsync(eventName); + } + + return new DynamicEventHandlerFactoryUnregistrar(this, eventName, handler); + } + /// public override void Unsubscribe(Func action) { @@ -193,6 +225,20 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, IRabbitMqDis GetOrCreateHandlerFactories(eventType).Locking(factories => factories.Clear()); } + /// + public override Task PublishAsync(string eventName, object eventData, bool onUnitOfWorkComplete = true) + { + var eventType = EventTypes.GetOrDefault(eventName); + var dynamicEventData = eventData as DynamicEventData ?? new DynamicEventData(eventName, eventData); + + if (eventType != null) + { + return PublishAsync(eventType, ConvertDynamicEventData(dynamicEventData.Data, eventType), onUnitOfWorkComplete); + } + + return PublishAsync(typeof(DynamicEventData), dynamicEventData, onUnitOfWorkComplete); + } + protected async override Task PublishToEventBusAsync(Type eventType, object eventData) { await PublishAsync(eventType, eventData, correlationId: CorrelationIdProvider.Get()); @@ -256,12 +302,21 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, IRabbitMqDis InboxConfig inboxConfig) { var eventType = EventTypes.GetOrDefault(incomingEvent.EventName); - if (eventType == null) + object eventData; + + if (eventType != null) + { + eventData = Serializer.Deserialize(incomingEvent.EventData, eventType); + } + else if (DynamicHandlerFactories.ContainsKey(incomingEvent.EventName)) + { + eventData = new DynamicEventData(incomingEvent.EventName, Serializer.Deserialize(incomingEvent.EventData)); + eventType = typeof(DynamicEventData); + } + else { return; } - - var eventData = Serializer.Deserialize(incomingEvent.EventData, eventType); var exceptions = new List(); using (CorrelationIdProvider.Change(incomingEvent.GetCorrelationId())) { @@ -285,8 +340,8 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, IRabbitMqDis Guid? eventId = null, string? correlationId = null) { - var eventName = EventNameAttribute.GetNameOrDefault(eventType); - var body = Serializer.Serialize(eventData); + var (eventName, resolvedData) = ResolveEventForPublishing(eventType, eventData); + var body = Serializer.Serialize(resolvedData); return PublishAsync(eventName, body, headersArguments, eventId, correlationId); } @@ -382,7 +437,10 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, IRabbitMqDis protected override Task OnAddToOutboxAsync(string eventName, Type eventType, object eventData) { - EventTypes.GetOrAdd(eventName, eventType); + if (typeof(DynamicEventData) != eventType) + { + EventTypes.GetOrAdd(eventName, eventType); + } return base.OnAddToOutboxAsync(eventName, eventType, eventData); } @@ -398,21 +456,79 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, IRabbitMqDis } ); } - + protected override IEnumerable GetHandlerFactories(Type eventType) { var handlerFactoryList = new List(); + var eventNames = EventTypes.Where(x => ShouldTriggerEventForHandler(eventType, x.Value)).Select(x => x.Key).ToList(); + + foreach (var handlerFactory in HandlerFactories.Where(hf => ShouldTriggerEventForHandler(eventType, hf.Key))) + { + handlerFactoryList.Add(new EventTypeWithEventHandlerFactories(handlerFactory.Key, handlerFactory.Value)); + } - foreach (var handlerFactory in - HandlerFactories.Where(hf => ShouldTriggerEventForHandler(eventType, hf.Key))) + foreach (var handlerFactory in DynamicHandlerFactories.Where(aehf => eventNames.Contains(aehf.Key))) { - handlerFactoryList.Add( - new EventTypeWithEventHandlerFactories(handlerFactory.Key, handlerFactory.Value)); + handlerFactoryList.Add(new EventTypeWithEventHandlerFactories(typeof(DynamicEventData), handlerFactory.Value)); } return handlerFactoryList.ToArray(); } + protected override Type? GetEventTypeByEventName(string eventName) + { + return EventTypes.GetOrDefault(eventName); + } + + /// + public override void Unsubscribe(string eventName, IEventHandlerFactory factory) + { + GetOrCreateDynamicHandlerFactories(eventName).Locking(factories => factories.Remove(factory)); + } + + /// + public override void Unsubscribe(string eventName, IEventHandler handler) + { + GetOrCreateDynamicHandlerFactories(eventName) + .Locking(factories => + { + factories.RemoveAll( + factory => + factory is SingleInstanceHandlerFactory singleFactory && + singleFactory.HandlerInstance == handler + ); + }); + } + + /// + public override void UnsubscribeAll(string eventName) + { + GetOrCreateDynamicHandlerFactories(eventName).Locking(factories => factories.Clear()); + } + + protected override IEnumerable GetDynamicHandlerFactories(string eventName) + { + var result = new List(); + + var eventType = GetEventTypeByEventName(eventName); + if (eventType != null) + { + return GetHandlerFactories(eventType); + } + + foreach (var handlerFactory in DynamicHandlerFactories.Where(hf => hf.Key == eventName)) + { + result.Add(new EventTypeWithEventHandlerFactories(typeof(DynamicEventData), handlerFactory.Value)); + } + + return result; + } + + private List GetOrCreateDynamicHandlerFactories(string eventName) + { + return DynamicHandlerFactories.GetOrAdd(eventName, _ => new List()); + } + private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType) { //Should trigger same type diff --git a/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs index 7a3d79e7d8..42ba15a10d 100644 --- a/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; @@ -31,6 +31,7 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen //TODO: Accessing to the List may not be thread-safe! protected ConcurrentDictionary> HandlerFactories { get; } protected ConcurrentDictionary EventTypes { get; } + protected ConcurrentDictionary> DynamicHandlerFactories { get; } protected AbpRebusEventBusOptions AbpRebusEventBusOptions { get; } public RebusDistributedEventBus( @@ -63,6 +64,7 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen HandlerFactories = new ConcurrentDictionary>(); EventTypes = new ConcurrentDictionary(); + DynamicHandlerFactories = new ConcurrentDictionary>(); } public void Initialize() @@ -70,6 +72,31 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen SubscribeHandlers(AbpDistributedEventBusOptions.Handlers); } + public async Task ProcessEventAsync(Type eventType, object eventData) + { + var messageId = MessageContext.Current.TransportMessage.GetMessageId(); + string eventName; + if (eventType == typeof(DynamicEventData) && eventData is DynamicEventData dynamicEventData) + { + eventName = dynamicEventData.EventName; + } + else + { + eventName = EventNameAttribute.GetNameOrDefault(eventType); + } + var correlationId = MessageContext.Current.Headers.GetOrDefault(EventBusConsts.CorrelationIdHeaderName); + + if (await AddToInboxAsync(messageId, eventName, eventType, eventData, correlationId)) + { + return; + } + + using (CorrelationIdProvider.Change(correlationId)) + { + await TriggerHandlersDirectAsync(eventType, eventData); + } + } + public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory) { var handlerFactories = GetOrCreateHandlerFactories(eventType); @@ -89,6 +116,26 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen return new EventHandlerFactoryUnregistrar(this, eventType, factory); } + /// + public override IDisposable Subscribe(string eventName, IEventHandlerFactory handler) + { + var handlerFactories = GetOrCreateDynamicHandlerFactories(eventName); + + if (handler.IsInFactories(handlerFactories)) + { + return NullDisposable.Instance; + } + + handlerFactories.Add(handler); + + if (DynamicHandlerFactories.Count == 1) //TODO: Multi-threading! + { + Rebus.Subscribe(typeof(DynamicEventData)); + } + + return new DynamicEventHandlerFactoryUnregistrar(this, eventName, handler); + } + public override void Unsubscribe(Func action) { Check.NotNull(action, nameof(action)); @@ -143,21 +190,18 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen Rebus.Unsubscribe(eventType); } - public async Task ProcessEventAsync(Type eventType, object eventData) + /// + public override Task PublishAsync(string eventName, object eventData, bool onUnitOfWorkComplete = true) { - var messageId = MessageContext.Current.TransportMessage.GetMessageId(); - var eventName = EventNameAttribute.GetNameOrDefault(eventType); - var correlationId = MessageContext.Current.Headers.GetOrDefault(EventBusConsts.CorrelationIdHeaderName); + var eventType = EventTypes.GetOrDefault(eventName); + var dynamicEventData = eventData as DynamicEventData ?? new DynamicEventData(eventName, eventData); - if (await AddToInboxAsync(messageId, eventName, eventType, eventData, correlationId)) + if (eventType != null) { - return; + return PublishAsync(eventType, ConvertDynamicEventData(dynamicEventData.Data, eventType), onUnitOfWorkComplete); } - using (CorrelationIdProvider.Change(correlationId)) - { - await TriggerHandlersDirectAsync(eventType, eventData); - } + return PublishAsync(typeof(DynamicEventData), dynamicEventData, onUnitOfWorkComplete); } protected async override Task PublishToEventBusAsync(Type eventType, object eventData) @@ -170,94 +214,32 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen await PublishAsync(eventType, eventData, headersArguments: headers); } - protected virtual async Task PublishAsync( - Type eventType, - object eventData, - Guid? eventId = null, - Dictionary? headersArguments = null) - { - if (AbpRebusEventBusOptions.Publish != null) - { - await AbpRebusEventBusOptions.Publish(Rebus, eventType, eventData); - return; - } - - headersArguments ??= new Dictionary(); - if (!headersArguments.ContainsKey(Headers.MessageId)) - { - headersArguments[Headers.MessageId] = (eventId ?? GuidGenerator.Create()).ToString("N"); - } - - await Rebus.Publish(eventData, headersArguments); - } - protected override void AddToUnitOfWork(IUnitOfWork unitOfWork, UnitOfWorkEventRecord eventRecord) { unitOfWork.AddOrReplaceDistributedEvent(eventRecord); } - protected override Task OnAddToOutboxAsync(string eventName, Type eventType, object eventData) - { - EventTypes.GetOrAdd(eventName, eventType); - return base.OnAddToOutboxAsync(eventName, eventType, eventData); - } - - private List GetOrCreateHandlerFactories(Type eventType) - { - return HandlerFactories.GetOrAdd( - eventType, - type => - { - var eventName = EventNameAttribute.GetNameOrDefault(type); - EventTypes.GetOrAdd(eventName, eventType); - return new List(); - } - ); - } - - protected override IEnumerable GetHandlerFactories(Type eventType) + public async override Task PublishFromOutboxAsync( + OutgoingEventInfo outgoingEvent, + OutboxConfig outboxConfig) { - var handlerFactoryList = new List(); + var eventType = EventTypes.GetOrDefault(outgoingEvent.EventName); + object eventData; - foreach (var handlerFactory in HandlerFactories.Where(hf => ShouldTriggerEventForHandler(eventType, hf.Key)) - ) + if (eventType != null) { - handlerFactoryList.Add( - new EventTypeWithEventHandlerFactories(handlerFactory.Key, handlerFactory.Value)); + eventData = Serializer.Deserialize(outgoingEvent.EventData, eventType); } - - return handlerFactoryList.ToArray(); - } - - private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType) - { - //Should trigger same type - if (handlerEventType == targetEventType) + else if (DynamicHandlerFactories.ContainsKey(outgoingEvent.EventName)) { - return true; + eventData = new DynamicEventData(outgoingEvent.EventName, Serializer.Deserialize(outgoingEvent.EventData, typeof(object))); + eventType = typeof(DynamicEventData); } - - //Should trigger for inherited types - if (handlerEventType.IsAssignableFrom(targetEventType)) - { - return true; - } - - return false; - } - - public async override Task PublishFromOutboxAsync( - OutgoingEventInfo outgoingEvent, - OutboxConfig outboxConfig) - { - var eventType = EventTypes.GetOrDefault(outgoingEvent.EventName); - if (eventType == null) + else { return; } - var eventData = Serializer.Deserialize(outgoingEvent.EventData, eventType); - var headers = new Dictionary(); if (outgoingEvent.GetCorrelationId() != null) { @@ -306,12 +288,21 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen InboxConfig inboxConfig) { var eventType = EventTypes.GetOrDefault(incomingEvent.EventName); - if (eventType == null) + object eventData; + + if (eventType != null) + { + eventData = Serializer.Deserialize(incomingEvent.EventData, eventType); + } + else if (DynamicHandlerFactories.ContainsKey(incomingEvent.EventName)) + { + eventData = new DynamicEventData(incomingEvent.EventName, Serializer.Deserialize(incomingEvent.EventData, typeof(object))); + eventType = typeof(DynamicEventData); + } + else { return; } - - var eventData = Serializer.Deserialize(incomingEvent.EventData, eventType); var exceptions = new List(); using (CorrelationIdProvider.Change(incomingEvent.GetCorrelationId())) { @@ -327,4 +318,136 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen { return Serializer.Serialize(eventData); } + + protected virtual async Task PublishAsync( + Type eventType, + object eventData, + Guid? eventId = null, + Dictionary? headersArguments = null) + { + if (AbpRebusEventBusOptions.Publish != null) + { + await AbpRebusEventBusOptions.Publish(Rebus, eventType, eventData); + return; + } + + headersArguments ??= new Dictionary(); + if (!headersArguments.ContainsKey(Headers.MessageId)) + { + headersArguments[Headers.MessageId] = (eventId ?? GuidGenerator.Create()).ToString("N"); + } + + await Rebus.Publish(eventData, headersArguments); + } + + protected override Task OnAddToOutboxAsync(string eventName, Type eventType, object eventData) + { + if (typeof(DynamicEventData) != eventType) + { + EventTypes.GetOrAdd(eventName, eventType); + } + return base.OnAddToOutboxAsync(eventName, eventType, eventData); + } + + private List GetOrCreateHandlerFactories(Type eventType) + { + return HandlerFactories.GetOrAdd( + eventType, + type => + { + var eventName = EventNameAttribute.GetNameOrDefault(type); + EventTypes.GetOrAdd(eventName, eventType); + return new List(); + } + ); + } + + protected override IEnumerable GetHandlerFactories(Type eventType) + { + var handlerFactoryList = new List(); + var eventNames = EventTypes.Where(x => ShouldTriggerEventForHandler(eventType, x.Value)).Select(x => x.Key).ToList(); + + foreach (var handlerFactory in HandlerFactories.Where(hf => ShouldTriggerEventForHandler(eventType, hf.Key))) + { + handlerFactoryList.Add(new EventTypeWithEventHandlerFactories(handlerFactory.Key, handlerFactory.Value)); + } + + foreach (var handlerFactory in DynamicHandlerFactories.Where(aehf => eventNames.Contains(aehf.Key))) + { + handlerFactoryList.Add(new EventTypeWithEventHandlerFactories(typeof(DynamicEventData), handlerFactory.Value)); + } + + return handlerFactoryList.ToArray(); + } + + protected override Type? GetEventTypeByEventName(string eventName) + { + return EventTypes.GetOrDefault(eventName); + } + + /// + public override void Unsubscribe(string eventName, IEventHandlerFactory factory) + { + GetOrCreateDynamicHandlerFactories(eventName).Locking(factories => factories.Remove(factory)); + } + + /// + public override void Unsubscribe(string eventName, IEventHandler handler) + { + GetOrCreateDynamicHandlerFactories(eventName) + .Locking(factories => + { + factories.RemoveAll( + factory => + factory is SingleInstanceHandlerFactory singleFactory && + singleFactory.HandlerInstance == handler + ); + }); + } + + /// + public override void UnsubscribeAll(string eventName) + { + GetOrCreateDynamicHandlerFactories(eventName).Locking(factories => factories.Clear()); + } + + protected override IEnumerable GetDynamicHandlerFactories(string eventName) + { + var eventType = GetEventTypeByEventName(eventName); + if (eventType != null) + { + return GetHandlerFactories(eventType); + } + + var result = new List(); + + foreach (var handlerFactory in DynamicHandlerFactories.Where(hf => hf.Key == eventName)) + { + result.Add(new EventTypeWithEventHandlerFactories(typeof(DynamicEventData), handlerFactory.Value)); + } + + return result; + } + + private List GetOrCreateDynamicHandlerFactories(string eventName) + { + return DynamicHandlerFactories.GetOrAdd(eventName, _ => new List()); + } + + private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType) + { + //Should trigger same type + if (handlerEventType == targetEventType) + { + return true; + } + + //Should trigger for inherited types + if (handlerEventType.IsAssignableFrom(targetEventType)) + { + return true; + } + + return false; + } } diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs index 3668193c9b..c4d186e2b3 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs @@ -43,16 +43,25 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB CorrelationIdProvider = correlationIdProvider; } + /// public virtual IDisposable Subscribe(IDistributedEventHandler handler) where TEvent : class { return Subscribe(typeof(TEvent), handler); } + /// + public virtual IDisposable Subscribe(string eventName, IDistributedEventHandler handler) + { + return Subscribe(eventName, (IEventHandler)handler); + } + + /// public override Task PublishAsync(Type eventType, object eventData, bool onUnitOfWorkComplete = true) { return PublishAsync(eventType, eventData, onUnitOfWorkComplete, useOutbox: true); } + /// public virtual Task PublishAsync( TEvent eventData, bool onUnitOfWorkComplete = true, @@ -62,6 +71,7 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB return PublishAsync(typeof(TEvent), eventData, onUnitOfWorkComplete, useOutbox); } + /// public virtual async Task PublishAsync( Type eventType, object eventData, @@ -90,11 +100,29 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB await TriggerDistributedEventSentAsync(new DistributedEventSent() { Source = DistributedEventSource.Direct, - EventName = EventNameAttribute.GetNameOrDefault(eventType), - EventData = eventData + EventName = GetEventName(eventType, eventData), + EventData = GetEventData(eventData) }); } + /// + public virtual Task PublishAsync( + string eventName, + object eventData, + bool onUnitOfWorkComplete = true, + bool useOutbox = true) + { + var eventType = GetEventTypeByEventName(eventName); + var dynamicEventData = eventData as DynamicEventData ?? new DynamicEventData(eventName, eventData); + + if (eventType != null) + { + return PublishAsync(eventType, ConvertDynamicEventData(dynamicEventData.Data, eventType), onUnitOfWorkComplete, useOutbox); + } + + return PublishAsync(typeof(DynamicEventData), dynamicEventData, onUnitOfWorkComplete, useOutbox); + } + public abstract Task PublishFromOutboxAsync( OutgoingEventInfo outgoingEvent, OutboxConfig outboxConfig @@ -124,7 +152,7 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB if (outboxConfig.Selector == null || outboxConfig.Selector(eventType)) { var eventOutbox = (IEventOutbox)unitOfWork.ServiceProvider.GetRequiredService(outboxConfig.ImplementationType); - var eventName = EventNameAttribute.GetNameOrDefault(eventType); + (var eventName, eventData) = ResolveEventForPublishing(eventType, eventData); await OnAddToOutboxAsync(eventName, eventType, eventData); @@ -181,12 +209,12 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB { if (await eventInbox.ExistsByMessageIdAsync(messageId!)) { - // Message already exists in the inbox, no need to add again. - // This can happen in case of retries from the sender side. addToInbox = true; continue; } } + + eventData = GetEventData(eventData); var incomingEventInfo = new IncomingEventInfo( GuidGenerator.Create(), @@ -212,8 +240,8 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB await TriggerDistributedEventReceivedAsync(new DistributedEventReceived { Source = DistributedEventSource.Direct, - EventName = EventNameAttribute.GetNameOrDefault(eventType), - EventData = eventData + EventName = GetEventName(eventType, eventData), + EventData = GetEventData(eventData) }); await TriggerHandlersAsync(eventType, eventData); @@ -224,8 +252,8 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB await TriggerDistributedEventReceivedAsync(new DistributedEventReceived { Source = DistributedEventSource.Inbox, - EventName = EventNameAttribute.GetNameOrDefault(eventType), - EventData = eventData + EventName = GetEventName(eventType, eventData), + EventData = GetEventData(eventData) }); await TriggerHandlersAsync(eventType, eventData, exceptions, inboxConfig); @@ -254,4 +282,29 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB // ignored } } + + protected virtual string GetEventName(Type eventType, object eventData) + { + if (eventData is DynamicEventData dynamicEventData) + { + return dynamicEventData.EventName; + } + + return EventNameAttribute.GetNameOrDefault(eventType); + } + + protected virtual object GetEventData(object eventData) + { + if (eventData is DynamicEventData dynamicEventData) + { + return dynamicEventData.Data; + } + + return eventData; + } + + protected virtual (string EventName, object EventData) ResolveEventForPublishing(Type eventType, object eventData) + { + return (GetEventName(eventType, eventData), GetEventData(eventData)); + } } diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus.cs index 843fb4f8ea..1d8e1ed680 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus.cs @@ -3,9 +3,7 @@ using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Reflection; -using System.Text; using System.Text.Json; -using System.Text.Unicode; using System.Threading.Tasks; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Options; @@ -26,6 +24,8 @@ public class LocalDistributedEventBus : DistributedEventBusBase, ISingletonDepen { protected ConcurrentDictionary EventTypes { get; } + protected ConcurrentDictionary DynamicEventNames { get; } + public LocalDistributedEventBus( IServiceScopeFactory serviceScopeFactory, ICurrentTenant currentTenant, @@ -47,6 +47,7 @@ public class LocalDistributedEventBus : DistributedEventBusBase, ISingletonDepen correlationIdProvider) { EventTypes = new ConcurrentDictionary(); + DynamicEventNames = new ConcurrentDictionary(); Subscribe(abpDistributedEventBusOptions.Value.Handlers); } @@ -71,6 +72,14 @@ public class LocalDistributedEventBus : DistributedEventBusBase, ISingletonDepen } } + /// + public override IDisposable Subscribe(string eventName, IEventHandlerFactory handler) + { + DynamicEventNames.GetOrAdd(eventName, true); + return LocalEventBus.Subscribe(eventName, handler); + } + + /// public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory) { var eventName = EventNameAttribute.GetNameOrDefault(eventType); @@ -93,11 +102,31 @@ public class LocalDistributedEventBus : DistributedEventBusBase, ISingletonDepen LocalEventBus.Unsubscribe(eventType, factory); } + /// + public override void Unsubscribe(string eventName, IEventHandlerFactory factory) + { + LocalEventBus.Unsubscribe(eventName, factory); + } + + /// + public override void Unsubscribe(string eventName, IEventHandler handler) + { + LocalEventBus.Unsubscribe(eventName, handler); + } + + /// public override void UnsubscribeAll(Type eventType) { LocalEventBus.UnsubscribeAll(eventType); } + /// + public override void UnsubscribeAll(string eventName) + { + LocalEventBus.UnsubscribeAll(eventName); + } + + /// public async override Task PublishAsync(Type eventType, object eventData, bool onUnitOfWorkComplete = true, bool useOutbox = true) { if (onUnitOfWorkComplete && UnitOfWorkManager.Current != null) @@ -120,23 +149,43 @@ public class LocalDistributedEventBus : DistributedEventBusBase, ISingletonDepen await TriggerDistributedEventSentAsync(new DistributedEventSent() { Source = DistributedEventSource.Direct, - EventName = EventNameAttribute.GetNameOrDefault(eventType), - EventData = eventData + EventName = GetEventName(eventType, eventData), + EventData = GetEventData(eventData) }); await TriggerDistributedEventReceivedAsync(new DistributedEventReceived { Source = DistributedEventSource.Direct, - EventName = EventNameAttribute.GetNameOrDefault(eventType), - EventData = eventData + EventName = GetEventName(eventType, eventData), + EventData = GetEventData(eventData) }); await PublishToEventBusAsync(eventType, eventData); } + /// + public override Task PublishAsync(string eventName, object eventData, bool onUnitOfWorkComplete = true) + { + return PublishAsync(eventName, eventData, onUnitOfWorkComplete, useOutbox: true); + } + + /// + public override Task PublishAsync(string eventName, object eventData, bool onUnitOfWorkComplete = true, bool useOutbox = true) + { + var eventType = EventTypes.GetOrDefault(eventName); + var dynamicEventData = eventData as DynamicEventData ?? new DynamicEventData(eventName, eventData); + + if (eventType != null) + { + return PublishAsync(eventType, ConvertDynamicEventData(dynamicEventData.Data, eventType), onUnitOfWorkComplete, useOutbox); + } + + return PublishAsync(typeof(DynamicEventData), dynamicEventData, onUnitOfWorkComplete, useOutbox); + } + protected async override Task PublishToEventBusAsync(Type eventType, object eventData) { - if (await AddToInboxAsync(Guid.NewGuid().ToString(), EventNameAttribute.GetNameOrDefault(eventType), eventType, eventData, null)) + if (await AddToInboxAsync(Guid.NewGuid().ToString(), GetEventName(eventType, eventData), eventType, eventData, null)) { return; } @@ -168,10 +217,27 @@ public class LocalDistributedEventBus : DistributedEventBusBase, ISingletonDepen var eventType = EventTypes.GetOrDefault(outgoingEvent.EventName); if (eventType == null) { - return; + var isDynamic = DynamicEventNames.ContainsKey(outgoingEvent.EventName); + if (!isDynamic) + { + return; + } + + eventType = typeof(DynamicEventData); + } + + object eventData; + if (eventType == typeof(DynamicEventData)) + { + eventData = new DynamicEventData( + outgoingEvent.EventName, + System.Text.Json.JsonSerializer.Deserialize(outgoingEvent.EventData)!); + } + else + { + eventData = System.Text.Json.JsonSerializer.Deserialize(outgoingEvent.EventData, eventType)!; } - var eventData = JsonSerializer.Deserialize(Encoding.UTF8.GetString(outgoingEvent.EventData), eventType)!; if (await AddToInboxAsync(Guid.NewGuid().ToString(), outgoingEvent.EventName, eventType, eventData, null)) { return; @@ -193,10 +259,27 @@ public class LocalDistributedEventBus : DistributedEventBusBase, ISingletonDepen var eventType = EventTypes.GetOrDefault(incomingEvent.EventName); if (eventType == null) { - return; + var isDynamic = DynamicEventNames.ContainsKey(incomingEvent.EventName); + if (!isDynamic) + { + return; + } + + eventType = typeof(DynamicEventData); + } + + object eventData; + if (eventType == typeof(DynamicEventData)) + { + eventData = new DynamicEventData( + incomingEvent.EventName, + System.Text.Json.JsonSerializer.Deserialize(incomingEvent.EventData)!); + } + else + { + eventData = System.Text.Json.JsonSerializer.Deserialize(incomingEvent.EventData, eventType)!; } - var eventData = JsonSerializer.Deserialize(incomingEvent.EventData, eventType); var exceptions = new List(); using (CorrelationIdProvider.Change(incomingEvent.GetCorrelationId())) { @@ -210,12 +293,15 @@ public class LocalDistributedEventBus : DistributedEventBusBase, ISingletonDepen protected override byte[] Serialize(object eventData) { - return Encoding.UTF8.GetBytes(JsonSerializer.Serialize(eventData)); + return System.Text.Json.JsonSerializer.SerializeToUtf8Bytes(eventData); } protected override Task OnAddToOutboxAsync(string eventName, Type eventType, object eventData) { - EventTypes.GetOrAdd(eventName, eventType); + if (eventType != typeof(DynamicEventData)) + { + EventTypes.GetOrAdd(eventName, eventType); + } return base.OnAddToOutboxAsync(eventName, eventType, eventData); } @@ -223,4 +309,14 @@ public class LocalDistributedEventBus : DistributedEventBusBase, ISingletonDepen { return LocalEventBus.GetEventHandlerFactories(eventType); } + + protected override IEnumerable GetDynamicHandlerFactories(string eventName) + { + return LocalEventBus.GetDynamicEventHandlerFactories(eventName); + } + + protected override Type? GetEventTypeByEventName(string eventName) + { + return EventTypes.GetOrDefault(eventName); + } } diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/NullDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/NullDistributedEventBus.cs index bf64839863..73636f49a5 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/NullDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/NullDistributedEventBus.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Threading.Tasks; namespace Volo.Abp.EventBus.Distributed; @@ -12,6 +12,12 @@ public sealed class NullDistributedEventBus : IDistributedEventBus } + /// + public Task PublishAsync(string eventName, object eventData, bool onUnitOfWorkComplete = true) + { + return Task.CompletedTask; + } + public IDisposable Subscribe(Func action) where TEvent : class { return NullDisposable.Instance; @@ -32,6 +38,24 @@ public sealed class NullDistributedEventBus : IDistributedEventBus return NullDisposable.Instance; } + /// + public IDisposable Subscribe(string eventName, IEventHandler handler) + { + return NullDisposable.Instance; + } + + /// + public IDisposable Subscribe(string eventName, IEventHandlerFactory handler) + { + return NullDisposable.Instance; + } + + /// + public IDisposable Subscribe(string eventName, IDistributedEventHandler handler) + { + return NullDisposable.Instance; + } + public IDisposable Subscribe(IEventHandlerFactory factory) where TEvent : class { return NullDisposable.Instance; @@ -67,6 +91,16 @@ public sealed class NullDistributedEventBus : IDistributedEventBus } + /// + public void Unsubscribe(string eventName, IEventHandlerFactory factory) + { + } + + /// + public void Unsubscribe(string eventName, IEventHandler handler) + { + } + public void UnsubscribeAll() where TEvent : class { @@ -74,7 +108,11 @@ public sealed class NullDistributedEventBus : IDistributedEventBus public void UnsubscribeAll(Type eventType) { + } + /// + public void UnsubscribeAll(string eventName) + { } public Task PublishAsync(TEvent eventData, bool onUnitOfWorkComplete = true) where TEvent : class @@ -96,4 +134,10 @@ public sealed class NullDistributedEventBus : IDistributedEventBus { return Task.CompletedTask; } + + /// + public Task PublishAsync(string eventName, object eventData, bool onUnitOfWorkComplete = true, bool useOutbox = true) + { + return Task.CompletedTask; + } } diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventBusBase.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventBusBase.cs index c8ce733b22..e59cd95613 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventBusBase.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventBusBase.cs @@ -9,6 +9,7 @@ using Microsoft.Extensions.DependencyInjection; using Volo.Abp.Collections; using Volo.Abp.DynamicProxy; using Volo.Abp.EventBus.Distributed; +using Volo.Abp.Json; using Volo.Abp.MultiTenancy; using Volo.Abp.Reflection; using Volo.Abp.Uow; @@ -57,6 +58,15 @@ public abstract class EventBusBase : IEventBus return Subscribe(eventType, new SingleInstanceHandlerFactory(handler)); } + /// + public virtual IDisposable Subscribe(string eventName, IEventHandler handler) + { + return Subscribe(eventName, new SingleInstanceHandlerFactory(handler)); + } + + /// + public abstract IDisposable Subscribe(string eventName, IEventHandlerFactory handler); + /// public virtual IDisposable Subscribe(IEventHandlerFactory factory) where TEvent : class { @@ -83,6 +93,12 @@ public abstract class EventBusBase : IEventBus public abstract void Unsubscribe(Type eventType, IEventHandlerFactory factory); + /// + public abstract void Unsubscribe(string eventName, IEventHandlerFactory factory); + + /// + public abstract void Unsubscribe(string eventName, IEventHandler handler); + /// public virtual void UnsubscribeAll() where TEvent : class { @@ -92,6 +108,9 @@ public abstract class EventBusBase : IEventBus /// public abstract void UnsubscribeAll(Type eventType); + /// + public abstract void UnsubscribeAll(string eventName); + /// public Task PublishAsync(TEvent eventData, bool onUnitOfWorkComplete = true) where TEvent : class @@ -117,6 +136,9 @@ public abstract class EventBusBase : IEventBus await PublishToEventBusAsync(eventType, eventData); } + /// + public abstract Task PublishAsync(string eventName, object eventData, bool onUnitOfWorkComplete = true); + protected abstract Task PublishToEventBusAsync(Type eventType, object eventData); protected abstract void AddToUnitOfWork(IUnitOfWork unitOfWork, UnitOfWorkEventRecord eventRecord); @@ -137,31 +159,81 @@ public abstract class EventBusBase : IEventBus { await new SynchronizationContextRemover(); - foreach (var handlerFactories in GetHandlerFactories(eventType).ToList()) + var (handlerFactoriesList, actualEventType) = ResolveHandlerFactories(eventType, eventData); + + foreach (var handlerFactories in handlerFactoriesList) { foreach (var handlerFactory in handlerFactories.EventHandlerFactories.ToList()) { - await TriggerHandlerAsync(handlerFactory, handlerFactories.EventType, eventData, exceptions, inboxConfig); + var resolvedEventData = ResolveEventDataForHandler(eventData, eventType, handlerFactories.EventType); + await TriggerHandlerAsync(handlerFactory, handlerFactories.EventType, resolvedEventData, exceptions, inboxConfig); } } - //Implements generic argument inheritance. See IEventDataWithInheritableGenericArgument - if (eventType.GetTypeInfo().IsGenericType && - eventType.GetGenericArguments().Length == 1 && - typeof(IEventDataWithInheritableGenericArgument).IsAssignableFrom(eventType)) + if (actualEventType != null && + actualEventType.GetTypeInfo().IsGenericType && + actualEventType.GetGenericArguments().Length == 1 && + typeof(IEventDataWithInheritableGenericArgument).IsAssignableFrom(actualEventType)) { - var genericArg = eventType.GetGenericArguments()[0]; + var resolvedEventData = eventData is DynamicEventData aed + ? ConvertDynamicEventData(aed.Data, actualEventType) + : eventData; + + var genericArg = actualEventType.GetGenericArguments()[0]; var baseArg = genericArg.GetTypeInfo().BaseType; if (baseArg != null) { - var baseEventType = eventType.GetGenericTypeDefinition().MakeGenericType(baseArg); - var constructorArgs = ((IEventDataWithInheritableGenericArgument)eventData).GetConstructorArgs(); + var baseEventType = actualEventType.GetGenericTypeDefinition().MakeGenericType(baseArg); + var constructorArgs = ((IEventDataWithInheritableGenericArgument)resolvedEventData).GetConstructorArgs(); var baseEventData = Activator.CreateInstance(baseEventType, constructorArgs)!; await PublishToEventBusAsync(baseEventType, baseEventData); } } } + protected virtual (List Factories, Type? ActualEventType) ResolveHandlerFactories( + Type eventType, + object eventData) + { + if (eventData is DynamicEventData dynamicEventData) + { + return ( + GetDynamicHandlerFactories(dynamicEventData.EventName).ToList(), + GetEventTypeByEventName(dynamicEventData.EventName) + ); + } + + return (GetHandlerFactories(eventType).ToList(), eventType); + } + + protected virtual object ResolveEventDataForHandler(object eventData, Type sourceEventType, Type handlerEventType) + { + if (eventData is DynamicEventData dynamicEventData && handlerEventType != typeof(DynamicEventData)) + { + return ConvertDynamicEventData(dynamicEventData.Data, handlerEventType); + } + + if (handlerEventType == typeof(DynamicEventData) && eventData is not DynamicEventData) + { + return new DynamicEventData(EventNameAttribute.GetNameOrDefault(sourceEventType), eventData); + } + + return eventData; + } + + protected virtual object ConvertDynamicEventData(object data, Type targetType) + { + if (targetType.IsInstanceOfType(data)) + { + return data; + } + + using var scope = ServiceScopeFactory.CreateScope(); + var jsonSerializer = scope.ServiceProvider.GetRequiredService(); + var json = jsonSerializer.Serialize(data); + return jsonSerializer.Deserialize(targetType, json); + } + protected void ThrowOriginalExceptions(Type eventType, List exceptions) { if (exceptions.Count == 1) @@ -198,6 +270,10 @@ public abstract class EventBusBase : IEventBus protected abstract IEnumerable GetHandlerFactories(Type eventType); + protected abstract IEnumerable GetDynamicHandlerFactories(string eventName); + + protected abstract Type? GetEventTypeByEventName(string eventName); + protected virtual async Task TriggerHandlerAsync(IEventHandlerFactory asyncHandlerFactory, Type eventType, object eventData, List exceptions, InboxConfig? inboxConfig = null) { diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventHandlerFactoryUnregistrar.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventHandlerFactoryUnregistrar.cs index f94d7a4ed1..ffbf14d25f 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventHandlerFactoryUnregistrar.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventHandlerFactoryUnregistrar.cs @@ -23,3 +23,25 @@ public class EventHandlerFactoryUnregistrar : IDisposable _eventBus.Unsubscribe(_eventType, _factory); } } + +/// +/// Used to unregister an for a string-based event name on method. +/// +public class DynamicEventHandlerFactoryUnregistrar : IDisposable +{ + private readonly IEventBus _eventBus; + private readonly string _eventName; + private readonly IEventHandlerFactory _factory; + + public DynamicEventHandlerFactoryUnregistrar(IEventBus eventBus, string eventName, IEventHandlerFactory factory) + { + _eventBus = eventBus; + _eventName = eventName; + _factory = factory; + } + + public void Dispose() + { + _eventBus.Unsubscribe(_eventName, _factory); + } +} diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/LocalEventBus.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/LocalEventBus.cs index 7123ff340a..729afa2915 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/LocalEventBus.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/LocalEventBus.cs @@ -30,6 +30,10 @@ public class LocalEventBus : EventBusBase, ILocalEventBus, ISingletonDependency protected ConcurrentDictionary> HandlerFactories { get; } + protected ConcurrentDictionary EventTypes { get; } + + protected ConcurrentDictionary> DynamicEventHandlerFactories { get; } + public LocalEventBus( IOptions options, IServiceScopeFactory serviceScopeFactory, @@ -42,6 +46,8 @@ public class LocalEventBus : EventBusBase, ILocalEventBus, ISingletonDependency Logger = NullLogger.Instance; HandlerFactories = new ConcurrentDictionary>(); + EventTypes = new ConcurrentDictionary(); + DynamicEventHandlerFactories = new ConcurrentDictionary>(); SubscribeHandlers(Options.Handlers); } @@ -51,9 +57,25 @@ public class LocalEventBus : EventBusBase, ILocalEventBus, ISingletonDependency return Subscribe(typeof(TEvent), handler); } + /// + public override IDisposable Subscribe(string eventName, IEventHandlerFactory handler) + { + GetOrCreateDynamicHandlerFactories(eventName).Locking(factories => + { + if (!handler.IsInFactories(factories)) + { + factories.Add(handler); + } + }); + + return new DynamicEventHandlerFactoryUnregistrar(this, eventName, handler); + } + /// public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory) { + EventTypes.GetOrAdd(EventNameAttribute.GetNameOrDefault(eventType), eventType); + GetOrCreateHandlerFactories(eventType) .Locking(factories => { @@ -115,12 +137,53 @@ public class LocalEventBus : EventBusBase, ILocalEventBus, ISingletonDependency GetOrCreateHandlerFactories(eventType).Locking(factories => factories.Remove(factory)); } + /// + public override void Unsubscribe(string eventName, IEventHandlerFactory factory) + { + GetOrCreateDynamicHandlerFactories(eventName).Locking(factories => factories.Remove(factory)); + } + + /// + public override void Unsubscribe(string eventName, IEventHandler handler) + { + GetOrCreateDynamicHandlerFactories(eventName) + .Locking(factories => + { + factories.RemoveAll( + factory => + factory is SingleInstanceHandlerFactory singleFactory && + singleFactory.HandlerInstance == handler + ); + }); + } + /// public override void UnsubscribeAll(Type eventType) { GetOrCreateHandlerFactories(eventType).Locking(factories => factories.Clear()); } + /// + public override void UnsubscribeAll(string eventName) + { + GetOrCreateDynamicHandlerFactories(eventName).Locking(factories => factories.Clear()); + } + + /// + public override Task PublishAsync(string eventName, object eventData, bool onUnitOfWorkComplete = true) + { + var eventType = EventTypes.GetOrDefault(eventName); + + var dynamicEventData = eventData as DynamicEventData ?? new DynamicEventData(eventName, eventData); + + if (eventType != null) + { + return PublishAsync(eventType, ConvertDynamicEventData(dynamicEventData.Data, eventType), onUnitOfWorkComplete); + } + + return PublishAsync(typeof(DynamicEventData), dynamicEventData, onUnitOfWorkComplete); + } + protected override async Task PublishToEventBusAsync(Type eventType, object eventData) { await PublishAsync(new LocalEventMessage(Guid.NewGuid(), eventData, eventType)); @@ -141,9 +204,17 @@ public class LocalEventBus : EventBusBase, ILocalEventBus, ISingletonDependency return GetHandlerFactories(eventType).ToList(); } + /// + public virtual List GetDynamicEventHandlerFactories(string eventName) + { + return GetDynamicHandlerFactories(eventName).ToList(); + } + protected override IEnumerable GetHandlerFactories(Type eventType) { var handlerFactoryList = new List>(); + var eventNames = EventTypes.Where(x => ShouldTriggerEventForHandler(eventType, x.Value)).Select(x => x.Key).ToList(); + foreach (var handlerFactory in HandlerFactories.Where(hf => ShouldTriggerEventForHandler(eventType, hf.Key))) { foreach (var factory in handlerFactory.Value) @@ -155,23 +226,71 @@ public class LocalEventBus : EventBusBase, ILocalEventBus, ISingletonDependency } } + foreach (var handlerFactory in DynamicEventHandlerFactories.Where(aehf => eventNames.Contains(aehf.Key))) + { + foreach (var factory in handlerFactory.Value) + { + handlerFactoryList.Add(new Tuple( + factory, + typeof(DynamicEventData), + ReflectionHelper.GetAttributesOfMemberOrDeclaringType(factory.GetHandler().EventHandler.GetType()).FirstOrDefault()?.Order ?? 0)); + } + } + return handlerFactoryList.OrderBy(x => x.Item3).Select(x => new EventTypeWithEventHandlerFactories(x.Item2, new List {x.Item1})).ToArray(); } + protected override IEnumerable GetDynamicHandlerFactories(string eventName) + { + var eventType = EventTypes.GetOrDefault(eventName); + if (eventType != null) + { + return GetHandlerFactories(eventType); + } + + var handlerFactoryList = new List>(); + + foreach (var handlerFactory in DynamicEventHandlerFactories.Where(aehf => aehf.Key == eventName)) + { + foreach (var factory in handlerFactory.Value) + { + using var handler = factory.GetHandler(); + var handlerType = handler.EventHandler.GetType(); + handlerFactoryList.Add(new Tuple( + factory, + typeof(DynamicEventData), + ReflectionHelper + .GetAttributesOfMemberOrDeclaringType(handlerType) + .FirstOrDefault()?.Order ?? 0)); + } + } + + return handlerFactoryList.OrderBy(x => x.Item3).Select(x => + new EventTypeWithEventHandlerFactories(x.Item2, new List { x.Item1 })).ToArray(); + } + + protected override Type? GetEventTypeByEventName(string eventName) + { + return EventTypes.GetOrDefault(eventName); + } + private List GetOrCreateHandlerFactories(Type eventType) { return HandlerFactories.GetOrAdd(eventType, (type) => new List()); } + private List GetOrCreateDynamicHandlerFactories(string eventName) + { + return DynamicEventHandlerFactories.GetOrAdd(eventName, (name) => new List()); + } + private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType) { - //Should trigger same type if (handlerEventType == targetEventType) { return true; } - //Should trigger for inherited types if (handlerEventType.IsAssignableFrom(targetEventType)) { return true; diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/NullLocalEventBus.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/NullLocalEventBus.cs index 3ffcd911ce..682b49d939 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/NullLocalEventBus.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Local/NullLocalEventBus.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Collections.Generic; using System.Threading.Tasks; @@ -13,6 +13,12 @@ public sealed class NullLocalEventBus : ILocalEventBus } + /// + public Task PublishAsync(string eventName, object eventData, bool onUnitOfWorkComplete = true) + { + return Task.CompletedTask; + } + public IDisposable Subscribe(Func action) where TEvent : class { return NullDisposable.Instance; @@ -28,6 +34,12 @@ public sealed class NullLocalEventBus : ILocalEventBus return new List(); } + /// + public List GetDynamicEventHandlerFactories(string eventName) + { + return new List(); + } + public IDisposable Subscribe() where TEvent : class where THandler : IEventHandler, new() { return NullDisposable.Instance; @@ -38,6 +50,18 @@ public sealed class NullLocalEventBus : ILocalEventBus return NullDisposable.Instance; } + /// + public IDisposable Subscribe(string eventName, IEventHandler handler) + { + return NullDisposable.Instance; + } + + /// + public IDisposable Subscribe(string eventName, IEventHandlerFactory handler) + { + return NullDisposable.Instance; + } + public IDisposable Subscribe(IEventHandlerFactory factory) where TEvent : class { return NullDisposable.Instance; @@ -73,6 +97,16 @@ public sealed class NullLocalEventBus : ILocalEventBus } + /// + public void Unsubscribe(string eventName, IEventHandlerFactory factory) + { + } + + /// + public void Unsubscribe(string eventName, IEventHandler handler) + { + } + public void UnsubscribeAll() where TEvent : class { @@ -80,7 +114,11 @@ public sealed class NullLocalEventBus : ILocalEventBus public void UnsubscribeAll(Type eventType) { + } + /// + public void UnsubscribeAll(string eventName) + { } public Task PublishAsync(TEvent eventData, bool onUnitOfWorkComplete = true) where TEvent : class diff --git a/framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus_Test.cs b/framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus_Test.cs index 88c515f8dc..b77f282a85 100644 --- a/framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus_Test.cs +++ b/framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus_Test.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using System.Threading.Tasks; using Shouldly; using Volo.Abp.Domain.Entities.Events.Distributed; @@ -10,10 +11,17 @@ namespace Volo.Abp.EventBus.Distributed; public class LocalDistributedEventBus_Test : LocalDistributedEventBusTestBase { + public LocalDistributedEventBus_Test() + { + MySimpleDistributedTransientEventHandler.HandleCount = 0; + MySimpleDistributedTransientEventHandler.DisposeCount = 0; + MySimpleDistributedSingleInstanceEventHandler.TenantId = null; + } + [Fact] public async Task Should_Call_Handler_AndDispose() { - DistributedEventBus.Subscribe(); + using var subscription = DistributedEventBus.Subscribe(); await DistributedEventBus.PublishAsync(new MySimpleEventData(1)); await DistributedEventBus.PublishAsync(new MySimpleEventData(2)); @@ -23,12 +31,255 @@ public class LocalDistributedEventBus_Test : LocalDistributedEventBusTestBase Assert.Equal(3, MySimpleDistributedTransientEventHandler.DisposeCount); } + [Fact] + public async Task Should_Handle_Typed_Handler_When_Published_With_EventName() + { + using var subscription = DistributedEventBus.Subscribe(); + + var eventName = EventNameAttribute.GetNameOrDefault(); + await DistributedEventBus.PublishAsync(eventName, new MySimpleEventData(1)); + await DistributedEventBus.PublishAsync(eventName, new Dictionary() + { + {"Value", 2} + }); + await DistributedEventBus.PublishAsync(eventName, new { Value = 3 }); + + Assert.Equal(3, MySimpleDistributedTransientEventHandler.HandleCount); + Assert.Equal(3, MySimpleDistributedTransientEventHandler.DisposeCount); + } + + [Fact] + public async Task Should_Handle_Dynamic_Handler_When_Published_With_EventName() + { + var handleCount = 0; + var eventName = "MyEvent-" + Guid.NewGuid().ToString("N"); + + using var subscription = DistributedEventBus.Subscribe(eventName, + new SingleInstanceHandlerFactory(new ActionEventHandler(async (d) => + { + handleCount++; + await Task.CompletedTask; + }))); + + await DistributedEventBus.PublishAsync(eventName, new MySimpleEventData(1)); + await DistributedEventBus.PublishAsync(eventName, new Dictionary() + { + {"Value", 2} + }); + await DistributedEventBus.PublishAsync(eventName, new { Value = 3 }); + await DistributedEventBus.PublishAsync(eventName, new[] { 1, 2, 3 }); + + Assert.Equal(4, handleCount); + } + + [Fact] + public async Task Should_Handle_Dynamic_Handler_When_Published_With_DynamicEventData() + { + var handleCount = 0; + var eventName = "MyEvent-" + Guid.NewGuid().ToString("N"); + + using var subscription = DistributedEventBus.Subscribe(eventName, + new SingleInstanceHandlerFactory(new ActionEventHandler(async (d) => + { + handleCount++; + d.Data.ShouldNotBeNull(); + await Task.CompletedTask; + }))); + + await DistributedEventBus.PublishAsync(new DynamicEventData(eventName, new MySimpleEventData(1))); + await DistributedEventBus.PublishAsync(new DynamicEventData(eventName, new Dictionary() + { + {"Value", 2} + })); + await DistributedEventBus.PublishAsync(new DynamicEventData(eventName, new { Value = 3 })); + + Assert.Equal(3, handleCount); + } + + [Fact] + public async Task Should_Handle_Typed_Handler_When_Published_With_DynamicEventData() + { + using var subscription = DistributedEventBus.Subscribe(); + + var eventName = EventNameAttribute.GetNameOrDefault(); + + await DistributedEventBus.PublishAsync(new DynamicEventData(eventName, new MySimpleEventData(1))); + await DistributedEventBus.PublishAsync(new DynamicEventData(eventName, new Dictionary() + { + {"Value", 2} + })); + await DistributedEventBus.PublishAsync(new DynamicEventData(eventName, new { Value = 3 })); + + Assert.Equal(3, MySimpleDistributedTransientEventHandler.HandleCount); + } + + [Fact] + public async Task Should_Trigger_Both_Typed_And_Dynamic_Handlers_For_Typed_Event() + { + using var typedSubscription = DistributedEventBus.Subscribe(); + + var eventName = EventNameAttribute.GetNameOrDefault(); + + var dynamicHandleCount = 0; + + using var dynamicSubscription = DistributedEventBus.Subscribe(eventName, new SingleInstanceHandlerFactory(new ActionEventHandler(async (d) => + { + dynamicHandleCount++; + await Task.CompletedTask; + }))); + + await DistributedEventBus.PublishAsync(new MySimpleEventData(1)); + await DistributedEventBus.PublishAsync(new MySimpleEventData(2)); + await DistributedEventBus.PublishAsync(new MySimpleEventData(3)); + + Assert.Equal(3, MySimpleDistributedTransientEventHandler.HandleCount); + Assert.Equal(3, dynamicHandleCount); + } + + [Fact] + public async Task Should_Trigger_Both_Handlers_For_Mixed_Typed_And_Dynamic_Publish() + { + using var typedSubscription = DistributedEventBus.Subscribe(); + + var eventName = EventNameAttribute.GetNameOrDefault(); + + var dynamicHandleCount = 0; + + using var dynamicSubscription = DistributedEventBus.Subscribe(eventName, new SingleInstanceHandlerFactory(new ActionEventHandler(async (d) => + { + dynamicHandleCount++; + await Task.CompletedTask; + }))); + + await DistributedEventBus.PublishAsync(new MySimpleEventData(1)); + await DistributedEventBus.PublishAsync(new DynamicEventData(eventName, new Dictionary() + { + {"Value", 2} + })); + await DistributedEventBus.PublishAsync(new DynamicEventData(eventName, new { Value = 3 })); + + Assert.Equal(3, MySimpleDistributedTransientEventHandler.HandleCount); + Assert.Equal(3, dynamicHandleCount); + } + + [Fact] + public async Task Should_Unsubscribe_Dynamic_Handler() + { + var handleCount = 0; + var eventName = "MyEvent-" + Guid.NewGuid().ToString("N"); + + var handler = new ActionEventHandler(async (d) => + { + handleCount++; + await Task.CompletedTask; + }); + var factory = new SingleInstanceHandlerFactory(handler); + + var disposable = DistributedEventBus.Subscribe(eventName, factory); + + await DistributedEventBus.PublishAsync(eventName, new { Value = 1 }); + Assert.Equal(1, handleCount); + + disposable.Dispose(); + + await DistributedEventBus.PublishAsync(eventName, new { Value = 2 }); + Assert.Equal(1, handleCount); + } + + [Fact] + public async Task Should_Not_Throw_For_Unknown_Event_Name() + { + // Publishing to an unknown event name should not throw (consistent with typed PublishAsync behavior) + await DistributedEventBus.PublishAsync("NonExistentEvent", new { Value = 1 }); + } + + [Fact] + public async Task Should_Convert_DynamicEventData_To_Typed_Object() + { + MySimpleEventData? receivedData = null; + + using var subscription = DistributedEventBus.Subscribe(async (data) => + { + receivedData = data; + await Task.CompletedTask; + }); + + var eventName = EventNameAttribute.GetNameOrDefault(); + await DistributedEventBus.PublishAsync(eventName, new { Value = 42 }); + + receivedData.ShouldNotBeNull(); + receivedData.Value.ShouldBe(42); + } + + [Fact] + public async Task Should_Subscribe_With_IDistributedEventHandler() + { + var handleCount = 0; + var eventName = "MyEvent-" + Guid.NewGuid().ToString("N"); + + using var subscription = DistributedEventBus.Subscribe(eventName, + new TestDynamicDistributedEventHandler(() => handleCount++)); + + await DistributedEventBus.PublishAsync(eventName, new { Value = 1 }); + await DistributedEventBus.PublishAsync(eventName, new { Value = 2 }); + + Assert.Equal(2, handleCount); + } + + [Fact] + public async Task Should_Handle_Multiple_Dynamic_Events_Independently() + { + var countA = 0; + var countB = 0; + var eventNameA = "EventA-" + Guid.NewGuid().ToString("N"); + var eventNameB = "EventB-" + Guid.NewGuid().ToString("N"); + + using var subA = DistributedEventBus.Subscribe(eventNameA, + new SingleInstanceHandlerFactory(new ActionEventHandler(async (d) => + { + countA++; + await Task.CompletedTask; + }))); + + using var subB = DistributedEventBus.Subscribe(eventNameB, + new SingleInstanceHandlerFactory(new ActionEventHandler(async (d) => + { + countB++; + await Task.CompletedTask; + }))); + + await DistributedEventBus.PublishAsync(eventNameA, new { Value = 1 }); + await DistributedEventBus.PublishAsync(eventNameB, new { Value = 2 }); + await DistributedEventBus.PublishAsync(eventNameA, new { Value = 3 }); + + Assert.Equal(2, countA); + Assert.Equal(1, countB); + } + + [Fact] + public async Task Should_Receive_EventName_In_DynamicEventData() + { + string? receivedEventName = null; + var eventName = "TestEvent-" + Guid.NewGuid().ToString("N"); + + using var subscription = DistributedEventBus.Subscribe(eventName, + new SingleInstanceHandlerFactory(new ActionEventHandler(async (d) => + { + receivedEventName = d.EventName; + await Task.CompletedTask; + }))); + + await DistributedEventBus.PublishAsync(eventName, new { Value = 1 }); + + receivedEventName.ShouldBe(eventName); + } + [Fact] public async Task Should_Change_TenantId_If_EventData_Is_MultiTenant() { var tenantId = Guid.NewGuid(); - DistributedEventBus.Subscribe(GetRequiredService()); + using var subscription = DistributedEventBus.Subscribe(GetRequiredService()); await DistributedEventBus.PublishAsync(new MySimpleEventData(3, tenantId)); @@ -40,7 +291,7 @@ public class LocalDistributedEventBus_Test : LocalDistributedEventBusTestBase { var tenantId = Guid.NewGuid(); - DistributedEventBus.Subscribe>(GetRequiredService()); + using var subscription = DistributedEventBus.Subscribe>(GetRequiredService()); await DistributedEventBus.PublishAsync(new MySimpleEventData(3, tenantId)); @@ -52,7 +303,7 @@ public class LocalDistributedEventBus_Test : LocalDistributedEventBusTestBase { var tenantId = Guid.NewGuid(); - DistributedEventBus.Subscribe(GetRequiredService()); + using var subscription = DistributedEventBus.Subscribe(GetRequiredService()); await DistributedEventBus.PublishAsync(new MySimpleEto { @@ -70,10 +321,10 @@ public class LocalDistributedEventBus_Test : LocalDistributedEventBusTestBase { var localEventBus = GetRequiredService(); - localEventBus.Subscribe(); - localEventBus.Subscribe(); + using var distributedEventSentSubscription = localEventBus.Subscribe(); + using var distributedEventReceivedSubscription = localEventBus.Subscribe(); - DistributedEventBus.Subscribe(); + using var subscription = DistributedEventBus.Subscribe(); using (var uow = GetRequiredService().Begin()) { @@ -121,4 +372,19 @@ public class LocalDistributedEventBus_Test : LocalDistributedEventBusTestBase } } + class TestDynamicDistributedEventHandler : IDistributedEventHandler + { + private readonly Action _onHandle; + + public TestDynamicDistributedEventHandler(Action onHandle) + { + _onHandle = onHandle; + } + + public Task HandleEventAsync(DynamicEventData eventData) + { + _onHandle(); + return Task.CompletedTask; + } + } } diff --git a/framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/Local/LocalEventBus_Dynamic_Test.cs b/framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/Local/LocalEventBus_Dynamic_Test.cs new file mode 100644 index 0000000000..c9813b8717 --- /dev/null +++ b/framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/Local/LocalEventBus_Dynamic_Test.cs @@ -0,0 +1,258 @@ +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using Shouldly; +using Xunit; + +namespace Volo.Abp.EventBus.Local; + +public class LocalEventBus_Dynamic_Test : EventBusTestBase +{ + [Fact] + public async Task Should_Handle_Dynamic_Handler_With_EventName() + { + var handleCount = 0; + var eventName = "TestEvent-" + Guid.NewGuid().ToString("N"); + + using var subscription = LocalEventBus.Subscribe(eventName, + new SingleInstanceHandlerFactory(new ActionEventHandler(async (d) => + { + handleCount++; + d.EventName.ShouldBe(eventName); + await Task.CompletedTask; + }))); + + await LocalEventBus.PublishAsync(eventName, new { Value = 1 }); + await LocalEventBus.PublishAsync(eventName, new { Value = 2 }); + + handleCount.ShouldBe(2); + } + + [Fact] + public async Task Should_Handle_Typed_Handler_When_Published_With_EventName() + { + var handleCount = 0; + + using var subscription = LocalEventBus.Subscribe(async (data) => + { + handleCount++; + await Task.CompletedTask; + }); + + var eventName = EventNameAttribute.GetNameOrDefault(); + await LocalEventBus.PublishAsync(eventName, new MySimpleEventData(42)); + + handleCount.ShouldBe(1); + } + + [Fact] + public async Task Should_Convert_Dictionary_To_Typed_Handler() + { + MySimpleEventData? receivedData = null; + + using var subscription = LocalEventBus.Subscribe(async (data) => + { + receivedData = data; + await Task.CompletedTask; + }); + + var eventName = EventNameAttribute.GetNameOrDefault(); + await LocalEventBus.PublishAsync(eventName, new Dictionary + { + { "Value", 42 } + }); + + receivedData.ShouldNotBeNull(); + receivedData.Value.ShouldBe(42); + } + + [Fact] + public async Task Should_Trigger_Both_Typed_And_Dynamic_Handlers() + { + var typedHandleCount = 0; + var dynamicHandleCount = 0; + + using var typedSubscription = LocalEventBus.Subscribe(async (data) => + { + typedHandleCount++; + await Task.CompletedTask; + }); + + var eventName = EventNameAttribute.GetNameOrDefault(); + + using var dynamicSubscription = LocalEventBus.Subscribe(eventName, + new SingleInstanceHandlerFactory(new ActionEventHandler(async (d) => + { + dynamicHandleCount++; + await Task.CompletedTask; + }))); + + await LocalEventBus.PublishAsync(new MySimpleEventData(1)); + + typedHandleCount.ShouldBe(1); + dynamicHandleCount.ShouldBe(1); + } + + [Fact] + public async Task Should_Unsubscribe_Dynamic_Handler() + { + var handleCount = 0; + var eventName = "TestEvent-" + Guid.NewGuid().ToString("N"); + + var handler = new ActionEventHandler(async (d) => + { + handleCount++; + await Task.CompletedTask; + }); + var factory = new SingleInstanceHandlerFactory(handler); + + var disposable = LocalEventBus.Subscribe(eventName, factory); + + await LocalEventBus.PublishAsync(eventName, new { Value = 1 }); + handleCount.ShouldBe(1); + + disposable.Dispose(); + + await LocalEventBus.PublishAsync(eventName, new { Value = 2 }); + handleCount.ShouldBe(1); + } + + [Fact] + public async Task Should_Not_Throw_For_Unknown_Event_Name() + { + // Publishing to an unknown event name should not throw (consistent with typed PublishAsync behavior) + await LocalEventBus.PublishAsync("NonExistentEvent", new { Value = 1 }); + } + + [Fact] + public async Task Should_Access_Data_In_Dynamic_Handler() + { + object? receivedData = null; + var eventName = "TestEvent-" + Guid.NewGuid().ToString("N"); + + using var subscription = LocalEventBus.Subscribe(eventName, + new SingleInstanceHandlerFactory(new ActionEventHandler(async (d) => + { + receivedData = d.Data; + await Task.CompletedTask; + }))); + + await LocalEventBus.PublishAsync(eventName, new { Name = "Hello", Count = 42 }); + + receivedData.ShouldNotBeNull(); + } + + [Fact] + public async Task Should_Receive_Typed_Data_Via_Typed_Handler_From_Dynamic_Publish() + { + MySimpleEventData? receivedData = null; + var eventName = EventNameAttribute.GetNameOrDefault(); + + using var subscription = LocalEventBus.Subscribe(async (d) => + { + receivedData = d; + await Task.CompletedTask; + }); + + await LocalEventBus.PublishAsync(eventName, new MySimpleEventData(99)); + + receivedData.ShouldNotBeNull(); + receivedData.Value.ShouldBe(99); + } + + [Fact] + public async Task Should_Unsubscribe_All_Dynamic_Handlers() + { + var handleCount = 0; + var eventName = "TestEvent-" + Guid.NewGuid().ToString("N"); + + LocalEventBus.Subscribe(eventName, + new SingleInstanceHandlerFactory(new ActionEventHandler(async (d) => + { + handleCount++; + await Task.CompletedTask; + }))); + + LocalEventBus.Subscribe(eventName, + new SingleInstanceHandlerFactory(new ActionEventHandler(async (d) => + { + handleCount++; + await Task.CompletedTask; + }))); + + await LocalEventBus.PublishAsync(eventName, new { Value = 1 }); + handleCount.ShouldBe(2); + + LocalEventBus.UnsubscribeAll(eventName); + + // After UnsubscribeAll, publishing still works (key exists) but no handlers are invoked + await LocalEventBus.PublishAsync(eventName, new { Value = 2 }); + handleCount.ShouldBe(2); + } + + [Fact] + public async Task Should_Handle_Multiple_Dynamic_Events_Independently() + { + var countA = 0; + var countB = 0; + var eventNameA = "EventA-" + Guid.NewGuid().ToString("N"); + var eventNameB = "EventB-" + Guid.NewGuid().ToString("N"); + + using var subA = LocalEventBus.Subscribe(eventNameA, + new SingleInstanceHandlerFactory(new ActionEventHandler(async (d) => + { + countA++; + await Task.CompletedTask; + }))); + + using var subB = LocalEventBus.Subscribe(eventNameB, + new SingleInstanceHandlerFactory(new ActionEventHandler(async (d) => + { + countB++; + await Task.CompletedTask; + }))); + + await LocalEventBus.PublishAsync(eventNameA, new { Value = 1 }); + await LocalEventBus.PublishAsync(eventNameB, new { Value = 2 }); + await LocalEventBus.PublishAsync(eventNameA, new { Value = 3 }); + + countA.ShouldBe(2); + countB.ShouldBe(1); + } + + [Fact] + public async Task Should_Receive_EventName_In_DynamicEventData() + { + string? receivedEventName = null; + var eventName = "TestEvent-" + Guid.NewGuid().ToString("N"); + + using var subscription = LocalEventBus.Subscribe(eventName, + new SingleInstanceHandlerFactory(new ActionEventHandler(async (d) => + { + receivedEventName = d.EventName; + await Task.CompletedTask; + }))); + + await LocalEventBus.PublishAsync(eventName, new { Value = 1 }); + + receivedEventName.ShouldBe(eventName); + } + + [Fact] + public async Task Should_Convert_Anonymous_Object_To_Typed_Handler() + { + MySimpleEventData? receivedData = null; + var eventName = EventNameAttribute.GetNameOrDefault(); + + using var subscription = LocalEventBus.Subscribe(async (d) => + { + receivedData = d; + await Task.CompletedTask; + }); + + await LocalEventBus.PublishAsync(eventName, new { Value = 77 }); + + receivedData.ShouldNotBeNull(); + receivedData.Value.ShouldBe(77); + } +} diff --git a/test/DistEvents/DistDemoApp.EfCoreRabbitMq/DistDemoApp.EfCoreRabbitMq.csproj b/test/DistEvents/DistDemoApp.EfCoreRabbitMq/DistDemoApp.EfCoreRabbitMq.csproj deleted file mode 100644 index c3b8719b3f..0000000000 --- a/test/DistEvents/DistDemoApp.EfCoreRabbitMq/DistDemoApp.EfCoreRabbitMq.csproj +++ /dev/null @@ -1,28 +0,0 @@ - - - - Exe - net10.0 - DistDemoApp - - - - - - - - - - - runtime; build; native; contentfiles; analyzers - compile; contentFiles; build; buildMultitargeting; buildTransitive; analyzers; native - - - - - - Always - - - - diff --git a/test/DistEvents/DistDemoApp.EfCoreRabbitMq/DistDemoAppEfCoreRabbitMqModule.cs b/test/DistEvents/DistDemoApp.EfCoreRabbitMq/DistDemoAppEfCoreRabbitMqModule.cs deleted file mode 100644 index da2e7a1445..0000000000 --- a/test/DistEvents/DistDemoApp.EfCoreRabbitMq/DistDemoAppEfCoreRabbitMqModule.cs +++ /dev/null @@ -1,44 +0,0 @@ -using Microsoft.Extensions.DependencyInjection; -using Volo.Abp.EntityFrameworkCore; -using Volo.Abp.EntityFrameworkCore.DistributedEvents; -using Volo.Abp.EntityFrameworkCore.SqlServer; -using Volo.Abp.EventBus.Distributed; -using Volo.Abp.EventBus.RabbitMq; -using Volo.Abp.Modularity; - -namespace DistDemoApp -{ - [DependsOn( - typeof(AbpEntityFrameworkCoreSqlServerModule), - typeof(AbpEventBusRabbitMqModule), - typeof(DistDemoAppSharedModule) - )] - public class DistDemoAppEfCoreRabbitMqModule : AbpModule - { - public override void ConfigureServices(ServiceConfigurationContext context) - { - context.Services.AddAbpDbContext(options => - { - options.AddDefaultRepositories(); - }); - - Configure(options => - { - options.UseSqlServer(); - }); - - Configure(options => - { - options.Outboxes.Configure(config => - { - config.UseDbContext(); - }); - - options.Inboxes.Configure(config => - { - config.UseDbContext(); - }); - }); - } - } -} \ No newline at end of file diff --git a/test/DistEvents/DistDemoApp.EfCoreRabbitMq/Migrations/20210910152547_Added_Boxes_Initial.Designer.cs b/test/DistEvents/DistDemoApp.EfCoreRabbitMq/Migrations/20210910152547_Added_Boxes_Initial.Designer.cs deleted file mode 100644 index 292cff66f9..0000000000 --- a/test/DistEvents/DistDemoApp.EfCoreRabbitMq/Migrations/20210910152547_Added_Boxes_Initial.Designer.cs +++ /dev/null @@ -1,162 +0,0 @@ -// -using System; -using DistDemoApp; -using Microsoft.EntityFrameworkCore; -using Microsoft.EntityFrameworkCore.Infrastructure; -using Microsoft.EntityFrameworkCore.Metadata; -using Microsoft.EntityFrameworkCore.Migrations; -using Microsoft.EntityFrameworkCore.Storage.ValueConversion; -using Volo.Abp.EntityFrameworkCore; - -namespace DistDemoApp.Migrations -{ - [DbContext(typeof(TodoDbContext))] - [Migration("20210910152547_Added_Boxes_Initial")] - partial class Added_Boxes_Initial - { - protected override void BuildTargetModel(ModelBuilder modelBuilder) - { -#pragma warning disable 612, 618 - modelBuilder - .HasAnnotation("_Abp_DatabaseProvider", EfCoreDatabaseProvider.SqlServer) - .HasAnnotation("Relational:MaxIdentifierLength", 128) - .HasAnnotation("ProductVersion", "5.0.9") - .HasAnnotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn); - - modelBuilder.Entity("DistDemoApp.TodoItem", b => - { - b.Property("Id") - .HasColumnType("uniqueidentifier"); - - b.Property("ConcurrencyStamp") - .IsConcurrencyToken() - .HasMaxLength(40) - .HasColumnType("nvarchar(40)") - .HasColumnName("ConcurrencyStamp"); - - b.Property("CreationTime") - .HasColumnType("datetime2") - .HasColumnName("CreationTime"); - - b.Property("CreatorId") - .HasColumnType("uniqueidentifier") - .HasColumnName("CreatorId"); - - b.Property("ExtraProperties") - .HasColumnType("nvarchar(max)") - .HasColumnName("ExtraProperties"); - - b.Property("Text") - .IsRequired() - .HasMaxLength(128) - .HasColumnType("nvarchar(128)"); - - b.HasKey("Id"); - - b.ToTable("TodoItems"); - }); - - modelBuilder.Entity("DistDemoApp.TodoSummary", b => - { - b.Property("Id") - .ValueGeneratedOnAdd() - .HasColumnType("int") - .HasAnnotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn); - - b.Property("ConcurrencyStamp") - .IsConcurrencyToken() - .HasMaxLength(40) - .HasColumnType("nvarchar(40)") - .HasColumnName("ConcurrencyStamp"); - - b.Property("Day") - .HasColumnType("tinyint"); - - b.Property("ExtraProperties") - .HasColumnType("nvarchar(max)") - .HasColumnName("ExtraProperties"); - - b.Property("Month") - .HasColumnType("tinyint"); - - b.Property("TotalCount") - .HasColumnType("int"); - - b.Property("Year") - .HasColumnType("int"); - - b.HasKey("Id"); - - b.ToTable("TodoSummaries"); - }); - - modelBuilder.Entity("Volo.Abp.EntityFrameworkCore.DistributedEvents.IncomingEventRecord", b => - { - b.Property("Id") - .HasColumnType("uniqueidentifier"); - - b.Property("CreationTime") - .HasColumnType("datetime2") - .HasColumnName("CreationTime"); - - b.Property("EventData") - .IsRequired() - .HasColumnType("varbinary(max)"); - - b.Property("EventName") - .IsRequired() - .HasMaxLength(256) - .HasColumnType("nvarchar(256)"); - - b.Property("ExtraProperties") - .HasColumnType("nvarchar(max)") - .HasColumnName("ExtraProperties"); - - b.Property("MessageId") - .HasColumnType("nvarchar(450)"); - - b.Property("Processed") - .HasColumnType("bit"); - - b.Property("ProcessedTime") - .HasColumnType("datetime2"); - - b.HasKey("Id"); - - b.HasIndex("MessageId"); - - b.HasIndex("Processed", "CreationTime"); - - b.ToTable("AbpEventInbox"); - }); - - modelBuilder.Entity("Volo.Abp.EntityFrameworkCore.DistributedEvents.OutgoingEventRecord", b => - { - b.Property("Id") - .HasColumnType("uniqueidentifier"); - - b.Property("CreationTime") - .HasColumnType("datetime2") - .HasColumnName("CreationTime"); - - b.Property("EventData") - .IsRequired() - .HasColumnType("varbinary(max)"); - - b.Property("EventName") - .IsRequired() - .HasMaxLength(256) - .HasColumnType("nvarchar(256)"); - - b.Property("ExtraProperties") - .HasColumnType("nvarchar(max)") - .HasColumnName("ExtraProperties"); - - b.HasKey("Id"); - - b.ToTable("AbpEventOutbox"); - }); -#pragma warning restore 612, 618 - } - } -} diff --git a/test/DistEvents/DistDemoApp.EfCoreRabbitMq/Migrations/20210910152547_Added_Boxes_Initial.cs b/test/DistEvents/DistDemoApp.EfCoreRabbitMq/Migrations/20210910152547_Added_Boxes_Initial.cs deleted file mode 100644 index 9094eaa8c9..0000000000 --- a/test/DistEvents/DistDemoApp.EfCoreRabbitMq/Migrations/20210910152547_Added_Boxes_Initial.cs +++ /dev/null @@ -1,103 +0,0 @@ -using System; -using Microsoft.EntityFrameworkCore.Migrations; - -namespace DistDemoApp.Migrations -{ - public partial class Added_Boxes_Initial : Migration - { - protected override void Up(MigrationBuilder migrationBuilder) - { - migrationBuilder.CreateTable( - name: "AbpEventInbox", - columns: table => new - { - Id = table.Column(type: "uniqueidentifier", nullable: false), - ExtraProperties = table.Column(type: "nvarchar(max)", nullable: true), - MessageId = table.Column(type: "nvarchar(450)", nullable: true), - EventName = table.Column(type: "nvarchar(256)", maxLength: 256, nullable: false), - EventData = table.Column(type: "varbinary(max)", nullable: false), - CreationTime = table.Column(type: "datetime2", nullable: false), - Processed = table.Column(type: "bit", nullable: false), - ProcessedTime = table.Column(type: "datetime2", nullable: true) - }, - constraints: table => - { - table.PrimaryKey("PK_AbpEventInbox", x => x.Id); - }); - - migrationBuilder.CreateTable( - name: "AbpEventOutbox", - columns: table => new - { - Id = table.Column(type: "uniqueidentifier", nullable: false), - ExtraProperties = table.Column(type: "nvarchar(max)", nullable: true), - EventName = table.Column(type: "nvarchar(256)", maxLength: 256, nullable: false), - EventData = table.Column(type: "varbinary(max)", nullable: false), - CreationTime = table.Column(type: "datetime2", nullable: false) - }, - constraints: table => - { - table.PrimaryKey("PK_AbpEventOutbox", x => x.Id); - }); - - migrationBuilder.CreateTable( - name: "TodoItems", - columns: table => new - { - Id = table.Column(type: "uniqueidentifier", nullable: false), - Text = table.Column(type: "nvarchar(128)", maxLength: 128, nullable: false), - ExtraProperties = table.Column(type: "nvarchar(max)", nullable: true), - ConcurrencyStamp = table.Column(type: "nvarchar(40)", maxLength: 40, nullable: true), - CreationTime = table.Column(type: "datetime2", nullable: false), - CreatorId = table.Column(type: "uniqueidentifier", nullable: true) - }, - constraints: table => - { - table.PrimaryKey("PK_TodoItems", x => x.Id); - }); - - migrationBuilder.CreateTable( - name: "TodoSummaries", - columns: table => new - { - Id = table.Column(type: "int", nullable: false) - .Annotation("SqlServer:Identity", "1, 1"), - Year = table.Column(type: "int", nullable: false), - Month = table.Column(type: "tinyint", nullable: false), - Day = table.Column(type: "tinyint", nullable: false), - TotalCount = table.Column(type: "int", nullable: false), - ExtraProperties = table.Column(type: "nvarchar(max)", nullable: true), - ConcurrencyStamp = table.Column(type: "nvarchar(40)", maxLength: 40, nullable: true) - }, - constraints: table => - { - table.PrimaryKey("PK_TodoSummaries", x => x.Id); - }); - - migrationBuilder.CreateIndex( - name: "IX_AbpEventInbox_MessageId", - table: "AbpEventInbox", - column: "MessageId"); - - migrationBuilder.CreateIndex( - name: "IX_AbpEventInbox_Processed_CreationTime", - table: "AbpEventInbox", - columns: new[] { "Processed", "CreationTime" }); - } - - protected override void Down(MigrationBuilder migrationBuilder) - { - migrationBuilder.DropTable( - name: "AbpEventInbox"); - - migrationBuilder.DropTable( - name: "AbpEventOutbox"); - - migrationBuilder.DropTable( - name: "TodoItems"); - - migrationBuilder.DropTable( - name: "TodoSummaries"); - } - } -} diff --git a/test/DistEvents/DistDemoApp.EfCoreRabbitMq/Migrations/TodoDbContextModelSnapshot.cs b/test/DistEvents/DistDemoApp.EfCoreRabbitMq/Migrations/TodoDbContextModelSnapshot.cs deleted file mode 100644 index 57e8c14442..0000000000 --- a/test/DistEvents/DistDemoApp.EfCoreRabbitMq/Migrations/TodoDbContextModelSnapshot.cs +++ /dev/null @@ -1,160 +0,0 @@ -// -using System; -using DistDemoApp; -using Microsoft.EntityFrameworkCore; -using Microsoft.EntityFrameworkCore.Infrastructure; -using Microsoft.EntityFrameworkCore.Metadata; -using Microsoft.EntityFrameworkCore.Storage.ValueConversion; -using Volo.Abp.EntityFrameworkCore; - -namespace DistDemoApp.Migrations -{ - [DbContext(typeof(TodoDbContext))] - partial class TodoDbContextModelSnapshot : ModelSnapshot - { - protected override void BuildModel(ModelBuilder modelBuilder) - { -#pragma warning disable 612, 618 - modelBuilder - .HasAnnotation("_Abp_DatabaseProvider", EfCoreDatabaseProvider.SqlServer) - .HasAnnotation("Relational:MaxIdentifierLength", 128) - .HasAnnotation("ProductVersion", "5.0.9") - .HasAnnotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn); - - modelBuilder.Entity("DistDemoApp.TodoItem", b => - { - b.Property("Id") - .HasColumnType("uniqueidentifier"); - - b.Property("ConcurrencyStamp") - .IsConcurrencyToken() - .HasMaxLength(40) - .HasColumnType("nvarchar(40)") - .HasColumnName("ConcurrencyStamp"); - - b.Property("CreationTime") - .HasColumnType("datetime2") - .HasColumnName("CreationTime"); - - b.Property("CreatorId") - .HasColumnType("uniqueidentifier") - .HasColumnName("CreatorId"); - - b.Property("ExtraProperties") - .HasColumnType("nvarchar(max)") - .HasColumnName("ExtraProperties"); - - b.Property("Text") - .IsRequired() - .HasMaxLength(128) - .HasColumnType("nvarchar(128)"); - - b.HasKey("Id"); - - b.ToTable("TodoItems"); - }); - - modelBuilder.Entity("DistDemoApp.TodoSummary", b => - { - b.Property("Id") - .ValueGeneratedOnAdd() - .HasColumnType("int") - .HasAnnotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn); - - b.Property("ConcurrencyStamp") - .IsConcurrencyToken() - .HasMaxLength(40) - .HasColumnType("nvarchar(40)") - .HasColumnName("ConcurrencyStamp"); - - b.Property("Day") - .HasColumnType("tinyint"); - - b.Property("ExtraProperties") - .HasColumnType("nvarchar(max)") - .HasColumnName("ExtraProperties"); - - b.Property("Month") - .HasColumnType("tinyint"); - - b.Property("TotalCount") - .HasColumnType("int"); - - b.Property("Year") - .HasColumnType("int"); - - b.HasKey("Id"); - - b.ToTable("TodoSummaries"); - }); - - modelBuilder.Entity("Volo.Abp.EntityFrameworkCore.DistributedEvents.IncomingEventRecord", b => - { - b.Property("Id") - .HasColumnType("uniqueidentifier"); - - b.Property("CreationTime") - .HasColumnType("datetime2") - .HasColumnName("CreationTime"); - - b.Property("EventData") - .IsRequired() - .HasColumnType("varbinary(max)"); - - b.Property("EventName") - .IsRequired() - .HasMaxLength(256) - .HasColumnType("nvarchar(256)"); - - b.Property("ExtraProperties") - .HasColumnType("nvarchar(max)") - .HasColumnName("ExtraProperties"); - - b.Property("MessageId") - .HasColumnType("nvarchar(450)"); - - b.Property("Processed") - .HasColumnType("bit"); - - b.Property("ProcessedTime") - .HasColumnType("datetime2"); - - b.HasKey("Id"); - - b.HasIndex("MessageId"); - - b.HasIndex("Processed", "CreationTime"); - - b.ToTable("AbpEventInbox"); - }); - - modelBuilder.Entity("Volo.Abp.EntityFrameworkCore.DistributedEvents.OutgoingEventRecord", b => - { - b.Property("Id") - .HasColumnType("uniqueidentifier"); - - b.Property("CreationTime") - .HasColumnType("datetime2") - .HasColumnName("CreationTime"); - - b.Property("EventData") - .IsRequired() - .HasColumnType("varbinary(max)"); - - b.Property("EventName") - .IsRequired() - .HasMaxLength(256) - .HasColumnType("nvarchar(256)"); - - b.Property("ExtraProperties") - .HasColumnType("nvarchar(max)") - .HasColumnName("ExtraProperties"); - - b.HasKey("Id"); - - b.ToTable("AbpEventOutbox"); - }); -#pragma warning restore 612, 618 - } - } -} diff --git a/test/DistEvents/DistDemoApp.EfCoreRabbitMq/Program.cs b/test/DistEvents/DistDemoApp.EfCoreRabbitMq/Program.cs deleted file mode 100644 index 597b29ae22..0000000000 --- a/test/DistEvents/DistDemoApp.EfCoreRabbitMq/Program.cs +++ /dev/null @@ -1,57 +0,0 @@ -using System; -using System.Threading.Tasks; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Hosting; -using Serilog; -using Serilog.Events; - -namespace DistDemoApp -{ - public class Program - { - public static async Task Main(string[] args) - { - Log.Logger = new LoggerConfiguration() -#if DEBUG - .MinimumLevel.Debug() -#else - .MinimumLevel.Information() -#endif - .MinimumLevel.Override("Microsoft", LogEventLevel.Warning) - .Enrich.FromLogContext() - .WriteTo.Async(c => c.File("Logs/logs.txt")) - .WriteTo.Async(c => c.Console()) - .CreateLogger(); - - try - { - Log.Information("Starting console host."); - await CreateHostBuilder(args).RunConsoleAsync(); - return 0; - } - catch (Exception ex) - { - Log.Fatal(ex, "Host terminated unexpectedly!"); - return 1; - } - finally - { - Log.CloseAndFlush(); - } - - } - - internal static IHostBuilder CreateHostBuilder(string[] args) => - Host.CreateDefaultBuilder(args) - .UseAutofac() - .UseSerilog() - .ConfigureAppConfiguration((context, config) => - { - //setup your additional configuration sources - }) - .ConfigureServices((hostContext, services) => - { - services.AddApplication(); - }); - } -} diff --git a/test/DistEvents/DistDemoApp.EfCoreRabbitMq/TodoDbContext.cs b/test/DistEvents/DistDemoApp.EfCoreRabbitMq/TodoDbContext.cs deleted file mode 100644 index 5a1ddd2c3f..0000000000 --- a/test/DistEvents/DistDemoApp.EfCoreRabbitMq/TodoDbContext.cs +++ /dev/null @@ -1,34 +0,0 @@ -using Microsoft.EntityFrameworkCore; -using Volo.Abp.Domain.Entities; -using Volo.Abp.EntityFrameworkCore; -using Volo.Abp.EntityFrameworkCore.DistributedEvents; - -namespace DistDemoApp -{ - public class TodoDbContext : AbpDbContext, IHasEventOutbox, IHasEventInbox - { - public DbSet TodoItems { get; set; } - public DbSet TodoSummaries { get; set; } - public DbSet OutgoingEvents { get; set; } - public DbSet IncomingEvents { get; set; } - - public TodoDbContext(DbContextOptions options) - : base(options) - { - - } - - protected override void OnModelCreating(ModelBuilder modelBuilder) - { - base.OnModelCreating(modelBuilder); - - modelBuilder.ConfigureEventOutbox(); - modelBuilder.ConfigureEventInbox(); - - modelBuilder.Entity(b => - { - b.Property(x => x.Text).IsRequired().HasMaxLength(128); - }); - } - } -} \ No newline at end of file diff --git a/test/DistEvents/DistDemoApp.EfCoreRabbitMq/TodoDbContextFactory.cs b/test/DistEvents/DistDemoApp.EfCoreRabbitMq/TodoDbContextFactory.cs deleted file mode 100644 index 97be637acc..0000000000 --- a/test/DistEvents/DistDemoApp.EfCoreRabbitMq/TodoDbContextFactory.cs +++ /dev/null @@ -1,29 +0,0 @@ -using System.IO; -using Microsoft.EntityFrameworkCore; -using Microsoft.EntityFrameworkCore.Design; -using Microsoft.Extensions.Configuration; - -namespace DistDemoApp -{ - public class TodoDbContextFactory : IDesignTimeDbContextFactory - { - public TodoDbContext CreateDbContext(string[] args) - { - var configuration = BuildConfiguration(); - - var builder = new DbContextOptionsBuilder() - .UseSqlServer(configuration.GetConnectionString("Default")); - - return new TodoDbContext(builder.Options); - } - - private static IConfigurationRoot BuildConfiguration() - { - var builder = new ConfigurationBuilder() - .SetBasePath(Directory.GetCurrentDirectory()) - .AddJsonFile("appsettings.json", optional: false); - - return builder.Build(); - } - } -} \ No newline at end of file diff --git a/test/DistEvents/DistDemoApp.EfCoreRabbitMq/appsettings.json b/test/DistEvents/DistDemoApp.EfCoreRabbitMq/appsettings.json deleted file mode 100644 index 393be04a6c..0000000000 --- a/test/DistEvents/DistDemoApp.EfCoreRabbitMq/appsettings.json +++ /dev/null @@ -1,19 +0,0 @@ -{ - "ConnectionStrings": { - "Default": "Server=(LocalDb)\\MSSQLLocalDB;Database=DistEventsDemo;Trusted_Connection=True;TrustServerCertificate=True" - }, - "RabbitMQ": { - "Connections": { - "Default": { - "HostName": "localhost" - } - }, - "EventBus": { - "ClientName": "DistDemoApp", - "ExchangeName": "DistDemo" - } - }, - "Redis": { - "Configuration": "127.0.0.1" - } -} diff --git a/test/DistEvents/DistDemoApp.MongoDbKafka/DistDemoApp.MongoDbKafka.csproj b/test/DistEvents/DistDemoApp.MongoDbKafka/DistDemoApp.MongoDbKafka.csproj deleted file mode 100644 index abeb206500..0000000000 --- a/test/DistEvents/DistDemoApp.MongoDbKafka/DistDemoApp.MongoDbKafka.csproj +++ /dev/null @@ -1,21 +0,0 @@ - - - - Exe - net10.0 - DistDemoApp - - - - - - - - - - - Always - - - - diff --git a/test/DistEvents/DistDemoApp.MongoDbKafka/DistDemoAppMongoDbKafkaModule.cs b/test/DistEvents/DistDemoApp.MongoDbKafka/DistDemoAppMongoDbKafkaModule.cs deleted file mode 100644 index b2e41b6ca7..0000000000 --- a/test/DistEvents/DistDemoApp.MongoDbKafka/DistDemoAppMongoDbKafkaModule.cs +++ /dev/null @@ -1,38 +0,0 @@ -using Microsoft.Extensions.DependencyInjection; -using Volo.Abp.EventBus.Distributed; -using Volo.Abp.EventBus.Kafka; -using Volo.Abp.Modularity; -using Volo.Abp.MongoDB; -using Volo.Abp.MongoDB.DistributedEvents; - -namespace DistDemoApp -{ - [DependsOn( - typeof(AbpMongoDbModule), - typeof(AbpEventBusKafkaModule), - typeof(DistDemoAppSharedModule) - )] - public class DistDemoAppMongoDbKafkaModule : AbpModule - { - public override void ConfigureServices(ServiceConfigurationContext context) - { - context.Services.AddMongoDbContext(options => - { - options.AddDefaultRepositories(); - }); - - Configure(options => - { - options.Outboxes.Configure(config => - { - config.UseMongoDbContext(); - }); - - options.Inboxes.Configure(config => - { - config.UseMongoDbContext(); - }); - }); - } - } -} diff --git a/test/DistEvents/DistDemoApp.MongoDbKafka/Program.cs b/test/DistEvents/DistDemoApp.MongoDbKafka/Program.cs deleted file mode 100644 index b048c17389..0000000000 --- a/test/DistEvents/DistDemoApp.MongoDbKafka/Program.cs +++ /dev/null @@ -1,57 +0,0 @@ -using System; -using System.Threading.Tasks; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Hosting; -using Serilog; -using Serilog.Events; - -namespace DistDemoApp -{ - public class Program - { - public static async Task Main(string[] args) - { - Log.Logger = new LoggerConfiguration() -#if DEBUG - .MinimumLevel.Debug() -#else - .MinimumLevel.Information() -#endif - .MinimumLevel.Override("Microsoft", LogEventLevel.Warning) - .Enrich.FromLogContext() - .WriteTo.Async(c => c.File("Logs/logs.txt")) - .WriteTo.Async(c => c.Console()) - .CreateLogger(); - - try - { - Log.Information("Starting console host."); - await CreateHostBuilder(args).RunConsoleAsync(); - return 0; - } - catch (Exception ex) - { - Log.Fatal(ex, "Host terminated unexpectedly!"); - return 1; - } - finally - { - Log.CloseAndFlush(); - } - - } - - internal static IHostBuilder CreateHostBuilder(string[] args) => - Host.CreateDefaultBuilder(args) - .UseAutofac() - .UseSerilog() - .ConfigureAppConfiguration((context, config) => - { - //setup your additional configuration sources - }) - .ConfigureServices((hostContext, services) => - { - services.AddApplication(); - }); - } -} diff --git a/test/DistEvents/DistDemoApp.MongoDbKafka/TodoMongoDbContext.cs b/test/DistEvents/DistDemoApp.MongoDbKafka/TodoMongoDbContext.cs deleted file mode 100644 index a7f1b78f86..0000000000 --- a/test/DistEvents/DistDemoApp.MongoDbKafka/TodoMongoDbContext.cs +++ /dev/null @@ -1,26 +0,0 @@ -using MongoDB.Driver; -using Volo.Abp.Data; -using Volo.Abp.MongoDB; -using Volo.Abp.MongoDB.DistributedEvents; - -namespace DistDemoApp -{ - [ConnectionStringName("Default")] - public class TodoMongoDbContext : AbpMongoDbContext, IHasEventOutbox, IHasEventInbox - { - public IMongoCollection TodoItems => Collection(); - public IMongoCollection TodoSummaries => Collection(); - - public IMongoCollection OutgoingEvents - { - get => Collection(); - set {} - } - public IMongoCollection IncomingEvents - { - get => Collection(); - set {} - } - } - -} diff --git a/test/DistEvents/DistDemoApp.MongoDbKafka/appsettings.json b/test/DistEvents/DistDemoApp.MongoDbKafka/appsettings.json deleted file mode 100644 index f9ee345d5a..0000000000 --- a/test/DistEvents/DistDemoApp.MongoDbKafka/appsettings.json +++ /dev/null @@ -1,19 +0,0 @@ -{ - "ConnectionStrings": { - "Default": "mongodb://localhost:27018,localhost:27019,localhost:27020/DistEventsDemo" - }, - "Kafka": { - "Connections": { - "Default": { - "BootstrapServers": "localhost:9092" - } - }, - "EventBus": { - "GroupId": "DistDemoApp", - "TopicName": "DistDemoTopic" - } - }, - "Redis": { - "Configuration": "127.0.0.1" - } -} diff --git a/test/DistEvents/DistDemoApp.MongoDbRebus/DistDemoApp.MongoDbRebus.csproj b/test/DistEvents/DistDemoApp.MongoDbRebus/DistDemoApp.MongoDbRebus.csproj deleted file mode 100644 index 0b04220e3e..0000000000 --- a/test/DistEvents/DistDemoApp.MongoDbRebus/DistDemoApp.MongoDbRebus.csproj +++ /dev/null @@ -1,21 +0,0 @@ - - - - Exe - net10.0 - DistDemoApp - - - - - - - - - - - Always - - - - diff --git a/test/DistEvents/DistDemoApp.MongoDbRebus/DistDemoAppMongoDbRebusModule.cs b/test/DistEvents/DistDemoApp.MongoDbRebus/DistDemoAppMongoDbRebusModule.cs deleted file mode 100644 index 21dab7b9b5..0000000000 --- a/test/DistEvents/DistDemoApp.MongoDbRebus/DistDemoAppMongoDbRebusModule.cs +++ /dev/null @@ -1,53 +0,0 @@ -using Microsoft.Extensions.DependencyInjection; -using Rebus.Persistence.InMem; -using Rebus.Transport.InMem; -using Volo.Abp.EventBus.Distributed; -using Volo.Abp.EventBus.Rebus; -using Volo.Abp.Modularity; -using Volo.Abp.MongoDB; -using Volo.Abp.MongoDB.DistributedEvents; - -namespace DistDemoApp -{ - [DependsOn( - typeof(AbpMongoDbModule), - typeof(AbpEventBusRebusModule), - typeof(DistDemoAppSharedModule) - )] - public class DistDemoAppMongoDbRebusModule : AbpModule - { - public override void PreConfigureServices(ServiceConfigurationContext context) - { - PreConfigure(options => - { - options.InputQueueName = "eventbus"; - options.Configurer = rebusConfigurer => - { - rebusConfigurer.Transport(t => t.UseInMemoryTransport(new InMemNetwork(), "eventbus")); - rebusConfigurer.Subscriptions(s => s.StoreInMemory()); - }; - }); - } - - public override void ConfigureServices(ServiceConfigurationContext context) - { - context.Services.AddMongoDbContext(options => - { - options.AddDefaultRepositories(); - }); - - Configure(options => - { - options.Outboxes.Configure(config => - { - config.UseMongoDbContext(); - }); - - options.Inboxes.Configure(config => - { - config.UseMongoDbContext(); - }); - }); - } - } -} diff --git a/test/DistEvents/DistDemoApp.MongoDbRebus/Program.cs b/test/DistEvents/DistDemoApp.MongoDbRebus/Program.cs deleted file mode 100644 index a79ad1b1bf..0000000000 --- a/test/DistEvents/DistDemoApp.MongoDbRebus/Program.cs +++ /dev/null @@ -1,57 +0,0 @@ -using System; -using System.Threading.Tasks; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Hosting; -using Serilog; -using Serilog.Events; - -namespace DistDemoApp -{ - public class Program - { - public static async Task Main(string[] args) - { - Log.Logger = new LoggerConfiguration() -#if DEBUG - .MinimumLevel.Debug() -#else - .MinimumLevel.Information() -#endif - .MinimumLevel.Override("Microsoft", LogEventLevel.Warning) - .Enrich.FromLogContext() - .WriteTo.Async(c => c.File("Logs/logs.txt")) - .WriteTo.Async(c => c.Console()) - .CreateLogger(); - - try - { - Log.Information("Starting console host."); - await CreateHostBuilder(args).RunConsoleAsync(); - return 0; - } - catch (Exception ex) - { - Log.Fatal(ex, "Host terminated unexpectedly!"); - return 1; - } - finally - { - Log.CloseAndFlush(); - } - - } - - internal static IHostBuilder CreateHostBuilder(string[] args) => - Host.CreateDefaultBuilder(args) - .UseAutofac() - .UseSerilog() - .ConfigureAppConfiguration((context, config) => - { - //setup your additional configuration sources - }) - .ConfigureServices((hostContext, services) => - { - services.AddApplication(); - }); - } -} diff --git a/test/DistEvents/DistDemoApp.MongoDbRebus/TodoMongoDbContext.cs b/test/DistEvents/DistDemoApp.MongoDbRebus/TodoMongoDbContext.cs deleted file mode 100644 index 95370bb4d2..0000000000 --- a/test/DistEvents/DistDemoApp.MongoDbRebus/TodoMongoDbContext.cs +++ /dev/null @@ -1,19 +0,0 @@ -using MongoDB.Driver; -using Volo.Abp.Data; -using Volo.Abp.MongoDB; -using Volo.Abp.MongoDB.DistributedEvents; - -namespace DistDemoApp -{ - [ConnectionStringName("Default")] - public class TodoMongoDbContext : AbpMongoDbContext, IHasEventOutbox, IHasEventInbox - { - public IMongoCollection TodoItems => Collection(); - public IMongoCollection TodoSummaries => Collection(); - - public IMongoCollection OutgoingEvents => Collection(); - - public IMongoCollection IncomingEvents => Collection(); - } - -} diff --git a/test/DistEvents/DistDemoApp.MongoDbRebus/appsettings.json b/test/DistEvents/DistDemoApp.MongoDbRebus/appsettings.json deleted file mode 100644 index f9ee345d5a..0000000000 --- a/test/DistEvents/DistDemoApp.MongoDbRebus/appsettings.json +++ /dev/null @@ -1,19 +0,0 @@ -{ - "ConnectionStrings": { - "Default": "mongodb://localhost:27018,localhost:27019,localhost:27020/DistEventsDemo" - }, - "Kafka": { - "Connections": { - "Default": { - "BootstrapServers": "localhost:9092" - } - }, - "EventBus": { - "GroupId": "DistDemoApp", - "TopicName": "DistDemoTopic" - } - }, - "Redis": { - "Configuration": "127.0.0.1" - } -} diff --git a/test/DistEvents/DistDemoApp.Shared/DemoService.cs b/test/DistEvents/DistDemoApp.Shared/DemoService.cs deleted file mode 100644 index c970485252..0000000000 --- a/test/DistEvents/DistDemoApp.Shared/DemoService.cs +++ /dev/null @@ -1,29 +0,0 @@ -using System; -using System.Threading.Tasks; -using Volo.Abp.DependencyInjection; -using Volo.Abp.Domain.Repositories; - -namespace DistDemoApp -{ - public class DemoService : ITransientDependency - { - private readonly IRepository _todoItemRepository; - - public DemoService(IRepository todoItemRepository) - { - _todoItemRepository = todoItemRepository; - } - - public async Task CreateTodoItemAsync() - { - var todoItem = await _todoItemRepository.InsertAsync( - new TodoItem - { - Text = "todo item " + DateTime.Now.Ticks - } - ); - - Console.WriteLine("Created a new todo item: " + todoItem); - } - } -} \ No newline at end of file diff --git a/test/DistEvents/DistDemoApp.Shared/DistDemoApp.Shared.csproj b/test/DistEvents/DistDemoApp.Shared/DistDemoApp.Shared.csproj deleted file mode 100644 index 36d4d635ce..0000000000 --- a/test/DistEvents/DistDemoApp.Shared/DistDemoApp.Shared.csproj +++ /dev/null @@ -1,23 +0,0 @@ - - - - net10.0 - DistDemoApp - - - - - - - - - - - - - - - - - - diff --git a/test/DistEvents/DistDemoApp.Shared/DistDemoAppHostedService.cs b/test/DistEvents/DistDemoApp.Shared/DistDemoAppHostedService.cs deleted file mode 100644 index ba72d6902a..0000000000 --- a/test/DistEvents/DistDemoApp.Shared/DistDemoAppHostedService.cs +++ /dev/null @@ -1,39 +0,0 @@ -using System; -using System.Threading; -using System.Threading.Tasks; -using Microsoft.Extensions.Hosting; -using Volo.Abp; - -namespace DistDemoApp -{ - public class DistDemoAppHostedService : IHostedService - { - private readonly IAbpApplicationWithExternalServiceProvider _application; - private readonly IServiceProvider _serviceProvider; - private readonly DemoService _demoService; - - public DistDemoAppHostedService( - IAbpApplicationWithExternalServiceProvider application, - IServiceProvider serviceProvider, - DemoService demoService) - { - _application = application; - _serviceProvider = serviceProvider; - _demoService = demoService; - } - - public async Task StartAsync(CancellationToken cancellationToken) - { - _application.Initialize(_serviceProvider); - - await _demoService.CreateTodoItemAsync(); - } - - public Task StopAsync(CancellationToken cancellationToken) - { - _application.Shutdown(); - - return Task.CompletedTask; - } - } -} diff --git a/test/DistEvents/DistDemoApp.Shared/DistDemoAppSharedModule.cs b/test/DistEvents/DistDemoApp.Shared/DistDemoAppSharedModule.cs deleted file mode 100644 index 936264e828..0000000000 --- a/test/DistEvents/DistDemoApp.Shared/DistDemoAppSharedModule.cs +++ /dev/null @@ -1,39 +0,0 @@ -using Medallion.Threading; -using Medallion.Threading.Redis; -using Microsoft.Extensions.DependencyInjection; -using StackExchange.Redis; -using Volo.Abp.Autofac; -using Volo.Abp.Domain; -using Volo.Abp.Domain.Entities.Events.Distributed; -using Volo.Abp.EventBus; -using Volo.Abp.Modularity; - -namespace DistDemoApp -{ - [DependsOn( - typeof(AbpAutofacModule), - typeof(AbpDddDomainModule), - typeof(AbpEventBusModule) - )] - public class DistDemoAppSharedModule : AbpModule - { - public override void ConfigureServices(ServiceConfigurationContext context) - { - var configuration = context.Services.GetConfiguration(); - - context.Services.AddHostedService(); - - Configure(options => - { - options.EtoMappings.Add(); - options.AutoEventSelectors.Add(); - }); - - context.Services.AddSingleton(sp => - { - var connection = ConnectionMultiplexer.Connect(configuration["Redis:Configuration"]); - return new RedisDistributedSynchronizationProvider(connection.GetDatabase()); - }); - } - } -} diff --git a/test/DistEvents/DistDemoApp.Shared/TodoEventHandler.cs b/test/DistEvents/DistDemoApp.Shared/TodoEventHandler.cs deleted file mode 100644 index 7a69ceed1e..0000000000 --- a/test/DistEvents/DistDemoApp.Shared/TodoEventHandler.cs +++ /dev/null @@ -1,63 +0,0 @@ -using System; -using System.Threading.Tasks; -using Volo.Abp.DependencyInjection; -using Volo.Abp.Domain.Entities.Events.Distributed; -using Volo.Abp.Domain.Repositories; -using Volo.Abp.EventBus.Distributed; -using Volo.Abp.Uow; - -namespace DistDemoApp -{ - public class TodoEventHandler : - IDistributedEventHandler>, - IDistributedEventHandler>, - ITransientDependency - { - private readonly IRepository _todoSummaryRepository; - - public TodoEventHandler(IRepository todoSummaryRepository) - { - _todoSummaryRepository = todoSummaryRepository; - } - - [UnitOfWork] - public virtual async Task HandleEventAsync(EntityCreatedEto eventData) - { - var dateTime = eventData.Entity.CreationTime; - var todoSummary = await _todoSummaryRepository.FindAsync( - x => x.Year == dateTime.Year && - x.Month == dateTime.Month && - x.Day == dateTime.Day - ); - - if (todoSummary == null) - { - todoSummary = await _todoSummaryRepository.InsertAsync(new TodoSummary(dateTime)); - } - else - { - todoSummary.Increase(); - await _todoSummaryRepository.UpdateAsync(todoSummary); - } - - Console.WriteLine("Increased total count: " + todoSummary); - } - - public async Task HandleEventAsync(EntityDeletedEto eventData) - { - var dateTime = eventData.Entity.CreationTime; - var todoSummary = await _todoSummaryRepository.FirstOrDefaultAsync( - x => x.Year == dateTime.Year && - x.Month == dateTime.Month && - x.Day == dateTime.Day - ); - - if (todoSummary != null) - { - todoSummary.Decrease(); - await _todoSummaryRepository.UpdateAsync(todoSummary); - Console.WriteLine("Decreased total count: " + todoSummary); - } - } - } -} \ No newline at end of file diff --git a/test/DistEvents/DistDemoApp.Shared/TodoItem.cs b/test/DistEvents/DistDemoApp.Shared/TodoItem.cs deleted file mode 100644 index 9257791efd..0000000000 --- a/test/DistEvents/DistDemoApp.Shared/TodoItem.cs +++ /dev/null @@ -1,15 +0,0 @@ -using System; -using Volo.Abp.Domain.Entities.Auditing; - -namespace DistDemoApp -{ - public class TodoItem : CreationAuditedAggregateRoot - { - public string Text { get; set; } - - public override string ToString() - { - return $"{base.ToString()}, Text = {Text}"; - } - } -} \ No newline at end of file diff --git a/test/DistEvents/DistDemoApp.Shared/TodoItemEto.cs b/test/DistEvents/DistDemoApp.Shared/TodoItemEto.cs deleted file mode 100644 index d593a10347..0000000000 --- a/test/DistEvents/DistDemoApp.Shared/TodoItemEto.cs +++ /dev/null @@ -1,12 +0,0 @@ -using System; -using Volo.Abp.EventBus; - -namespace DistDemoApp -{ - [EventName("todo-item")] - public class TodoItemEto - { - public DateTime CreationTime { get; set; } - public string Text { get; set; } - } -} \ No newline at end of file diff --git a/test/DistEvents/DistDemoApp.Shared/TodoItemObjectMapper.cs b/test/DistEvents/DistDemoApp.Shared/TodoItemObjectMapper.cs deleted file mode 100644 index 167c8e114f..0000000000 --- a/test/DistEvents/DistDemoApp.Shared/TodoItemObjectMapper.cs +++ /dev/null @@ -1,24 +0,0 @@ -using Volo.Abp.DependencyInjection; -using Volo.Abp.ObjectMapping; - -namespace DistDemoApp -{ - public class TodoItemObjectMapper : IObjectMapper, ISingletonDependency - { - public TodoItemEto Map(TodoItem source) - { - return new TodoItemEto - { - Text = source.Text, - CreationTime = source.CreationTime - }; - } - - public TodoItemEto Map(TodoItem source, TodoItemEto destination) - { - destination.Text = source.Text; - destination.CreationTime = source.CreationTime; - return destination; - } - } -} \ No newline at end of file diff --git a/test/DistEvents/DistDemoApp.Shared/TodoSummary.cs b/test/DistEvents/DistDemoApp.Shared/TodoSummary.cs deleted file mode 100644 index 70eaee1847..0000000000 --- a/test/DistEvents/DistDemoApp.Shared/TodoSummary.cs +++ /dev/null @@ -1,41 +0,0 @@ -using System; -using Volo.Abp.Domain.Entities; - -namespace DistDemoApp -{ - public class TodoSummary : AggregateRoot - { - public int Year { get; private set; } - public byte Month { get; private set; } - public byte Day { get; private set; } - public int TotalCount { get; private set; } - - private TodoSummary() - { - - } - - public TodoSummary(DateTime dateTime, int initialCount = 1) - { - Year = dateTime.Year; - Month = (byte)dateTime.Month; - Day = (byte)dateTime.Day; - TotalCount = initialCount; - } - - public void Increase(int amount = 1) - { - TotalCount += amount; - } - - public void Decrease(int amount = 1) - { - TotalCount -= amount; - } - - public override string ToString() - { - return $"{base.ToString()}, {Year}-{Month:00}-{Day:00}: {TotalCount}"; - } - } -} \ No newline at end of file diff --git a/test/DistEvents/DistEventsDemo.slnx b/test/DistEvents/DistEventsDemo.slnx deleted file mode 100644 index 595e2e5d3e..0000000000 --- a/test/DistEvents/DistEventsDemo.slnx +++ /dev/null @@ -1,6 +0,0 @@ - - - - - -