diff --git a/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs index 14ed2762e9..c4f1a141e6 100644 --- a/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs @@ -75,7 +75,15 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen public async Task ProcessEventAsync(Type eventType, object eventData) { var messageId = MessageContext.Current.TransportMessage.GetMessageId(); - var eventName = EventNameAttribute.GetNameOrDefault(eventType); + string eventName; + if (eventType == typeof(AnonymousEventData) && eventData is AnonymousEventData anonymousEventData) + { + eventName = anonymousEventData.EventName; + } + else + { + eventName = EventNameAttribute.GetNameOrDefault(eventType); + } var correlationId = MessageContext.Current.Headers.GetOrDefault(EventBusConsts.CorrelationIdHeaderName); if (await AddToInboxAsync(messageId, eventName, eventType, eventData, correlationId)) @@ -119,6 +127,11 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen } handlerFactories.Add(handler); + + if (AnonymousHandlerFactories.Count == 1) //TODO: Multi-threading! + { + Rebus.Subscribe(typeof(AnonymousEventData)); + } return new AnonymousEventHandlerFactoryUnregistrar(this, eventName, handler); } @@ -224,7 +237,7 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen } else if (AnonymousHandlerFactories.ContainsKey(outgoingEvent.EventName)) { - eventData = Serializer.Deserialize(outgoingEvent.EventData, typeof(object)); + eventData = new AnonymousEventData(outgoingEvent.EventName, Serializer.Deserialize(outgoingEvent.EventData, typeof(object))); eventType = typeof(AnonymousEventData); } else diff --git a/test/DistEvents/DistDemoApp.MongoDbRebus/Program.cs b/test/DistEvents/DistDemoApp.MongoDbRebus/Program.cs index a79ad1b1bf..9f8c1e6c56 100644 --- a/test/DistEvents/DistDemoApp.MongoDbRebus/Program.cs +++ b/test/DistEvents/DistDemoApp.MongoDbRebus/Program.cs @@ -4,39 +4,67 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Serilog; using Serilog.Events; +using Volo.Abp; +using Volo.Abp.Threading; namespace DistDemoApp { public class Program { - public static async Task Main(string[] args) + public static void Main(string[] args) { - Log.Logger = new LoggerConfiguration() -#if DEBUG - .MinimumLevel.Debug() -#else - .MinimumLevel.Information() -#endif - .MinimumLevel.Override("Microsoft", LogEventLevel.Warning) - .Enrich.FromLogContext() - .WriteTo.Async(c => c.File("Logs/logs.txt")) - .WriteTo.Async(c => c.Console()) - .CreateLogger(); +// Log.Logger = new LoggerConfiguration() +// #if DEBUG +// .MinimumLevel.Debug() +// #else +// .MinimumLevel.Information() +// #endif +// .MinimumLevel.Override("Microsoft", LogEventLevel.Warning) +// .Enrich.FromLogContext() +// .WriteTo.Async(c => c.File("Logs/logs.txt")) +// .WriteTo.Async(c => c.Console()) +// .CreateLogger(); +// +// try +// { +// Log.Information("Starting console host."); +// await CreateHostBuilder(args).RunConsoleAsync(); +// return 0; +// } +// catch (Exception ex) +// { +// Log.Fatal(ex, "Host terminated unexpectedly!"); +// return 1; +// } +// finally +// { +// Log.CloseAndFlush(); +// } - try + using (var application = AbpApplicationFactory.Create(options => + { + options.UseAutofac(); + options.Services.AddSerilog((serviceProvider, c) => + { + // c.Enrich.FromLogContext() + // .WriteTo.Async(c => c.File("Logs/logs.txt")) + // .WriteTo.Async(c => c.Console()) + // .WriteTo.AbpStudio(serviceProvider); + }); + options.Services.AddLogging(c => c.AddSerilog()); + })) { - Log.Information("Starting console host."); - await CreateHostBuilder(args).RunConsoleAsync(); - return 0; - } - catch (Exception ex) - { - Log.Fatal(ex, "Host terminated unexpectedly!"); - return 1; - } - finally - { - Log.CloseAndFlush(); + Log.Information("Starting Volo.AbpIo.DbMigrator."); + + application.Initialize(); + + AsyncHelper.RunSync( + () => application + .ServiceProvider + .GetRequiredService().CreateTodoItemAsync() + ); + + application.Shutdown(); } }