Browse Source

Anonymous Rebus events support; demo bootstrap

RebusDistributedEventBus: properly handle AnonymousEventData by extracting its EventName when processing, wrap deserialized anonymous payloads into AnonymousEventData, and subscribe to AnonymousEventData when the first anonymous handler is registered (note: a TODO about multi-threading remains).

DistDemoApp.MongoDbRebus Program: replace the previous Host/Serilog-driven async Main with an ABP application bootstrap using AbpApplicationFactory. The new Main initializes the ABP app, runs DemoService.CreateTodoItemAsync via AsyncHelper.RunSync, and then shuts down; prior Serilog/host startup code has been commented out and ABP logging/Serilog services are wired up.
pull/25023/head
SALİH ÖZKARA 4 weeks ago
parent
commit
71eece82e5
  1. 17
      framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs
  2. 78
      test/DistEvents/DistDemoApp.MongoDbRebus/Program.cs

17
framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs

@ -75,7 +75,15 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen
public async Task ProcessEventAsync(Type eventType, object eventData) public async Task ProcessEventAsync(Type eventType, object eventData)
{ {
var messageId = MessageContext.Current.TransportMessage.GetMessageId(); var messageId = MessageContext.Current.TransportMessage.GetMessageId();
var eventName = EventNameAttribute.GetNameOrDefault(eventType); string eventName;
if (eventType == typeof(AnonymousEventData) && eventData is AnonymousEventData anonymousEventData)
{
eventName = anonymousEventData.EventName;
}
else
{
eventName = EventNameAttribute.GetNameOrDefault(eventType);
}
var correlationId = MessageContext.Current.Headers.GetOrDefault(EventBusConsts.CorrelationIdHeaderName); var correlationId = MessageContext.Current.Headers.GetOrDefault(EventBusConsts.CorrelationIdHeaderName);
if (await AddToInboxAsync(messageId, eventName, eventType, eventData, correlationId)) if (await AddToInboxAsync(messageId, eventName, eventType, eventData, correlationId))
@ -119,6 +127,11 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen
} }
handlerFactories.Add(handler); handlerFactories.Add(handler);
if (AnonymousHandlerFactories.Count == 1) //TODO: Multi-threading!
{
Rebus.Subscribe(typeof(AnonymousEventData));
}
return new AnonymousEventHandlerFactoryUnregistrar(this, eventName, handler); return new AnonymousEventHandlerFactoryUnregistrar(this, eventName, handler);
} }
@ -224,7 +237,7 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen
} }
else if (AnonymousHandlerFactories.ContainsKey(outgoingEvent.EventName)) else if (AnonymousHandlerFactories.ContainsKey(outgoingEvent.EventName))
{ {
eventData = Serializer.Deserialize(outgoingEvent.EventData, typeof(object)); eventData = new AnonymousEventData(outgoingEvent.EventName, Serializer.Deserialize(outgoingEvent.EventData, typeof(object)));
eventType = typeof(AnonymousEventData); eventType = typeof(AnonymousEventData);
} }
else else

78
test/DistEvents/DistDemoApp.MongoDbRebus/Program.cs

@ -4,39 +4,67 @@ using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Hosting;
using Serilog; using Serilog;
using Serilog.Events; using Serilog.Events;
using Volo.Abp;
using Volo.Abp.Threading;
namespace DistDemoApp namespace DistDemoApp
{ {
public class Program public class Program
{ {
public static async Task<int> Main(string[] args) public static void Main(string[] args)
{ {
Log.Logger = new LoggerConfiguration() // Log.Logger = new LoggerConfiguration()
#if DEBUG // #if DEBUG
.MinimumLevel.Debug() // .MinimumLevel.Debug()
#else // #else
.MinimumLevel.Information() // .MinimumLevel.Information()
#endif // #endif
.MinimumLevel.Override("Microsoft", LogEventLevel.Warning) // .MinimumLevel.Override("Microsoft", LogEventLevel.Warning)
.Enrich.FromLogContext() // .Enrich.FromLogContext()
.WriteTo.Async(c => c.File("Logs/logs.txt")) // .WriteTo.Async(c => c.File("Logs/logs.txt"))
.WriteTo.Async(c => c.Console()) // .WriteTo.Async(c => c.Console())
.CreateLogger(); // .CreateLogger();
//
// try
// {
// Log.Information("Starting console host.");
// await CreateHostBuilder(args).RunConsoleAsync();
// return 0;
// }
// catch (Exception ex)
// {
// Log.Fatal(ex, "Host terminated unexpectedly!");
// return 1;
// }
// finally
// {
// Log.CloseAndFlush();
// }
try using (var application = AbpApplicationFactory.Create<DistDemoAppMongoDbRebusModule>(options =>
{
options.UseAutofac();
options.Services.AddSerilog((serviceProvider, c) =>
{
// c.Enrich.FromLogContext()
// .WriteTo.Async(c => c.File("Logs/logs.txt"))
// .WriteTo.Async(c => c.Console())
// .WriteTo.AbpStudio(serviceProvider);
});
options.Services.AddLogging(c => c.AddSerilog());
}))
{ {
Log.Information("Starting console host."); Log.Information("Starting Volo.AbpIo.DbMigrator.");
await CreateHostBuilder(args).RunConsoleAsync();
return 0; application.Initialize();
}
catch (Exception ex) AsyncHelper.RunSync(
{ () => application
Log.Fatal(ex, "Host terminated unexpectedly!"); .ServiceProvider
return 1; .GetRequiredService<DemoService>().CreateTodoItemAsync()
} );
finally
{ application.Shutdown();
Log.CloseAndFlush();
} }
} }

Loading…
Cancel
Save