Browse Source

Basic implementation is done for the event outbox.

pull/10008/head
Halil İbrahim Kalkan 5 years ago
parent
commit
afcd640e40
  1. 2
      framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/AbpEntityFrameworkCoreModule.cs
  2. 48
      framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventOutbox.cs
  3. 2
      framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/EfCoreOutboxConfigExtensions.cs
  4. 10
      framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/IDbContextEventOutbox.cs
  5. 36
      framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/OutgoingEventRecord.cs
  6. 1
      framework/src/Volo.Abp.EventBus.Boxes/Volo.Abp.EventBus.Boxes.csproj
  7. 18
      framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/AbpDistributedEventBusExtensions.cs
  8. 11
      framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/AbpEventBusBoxesModule.cs
  9. 11
      framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/IOutboxSender.cs
  10. 77
      framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/OutboxSender.cs
  11. 48
      framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/OutboxSenderManager.cs
  12. 36
      framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs
  13. 29
      framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs
  14. 16
      framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs
  15. 1
      framework/src/Volo.Abp.EventBus/Volo.Abp.EventBus.csproj
  16. 5
      framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/AbpEventBusModule.cs
  17. 25
      framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs
  18. 8
      framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/IEventOutbox.cs
  19. 13
      framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/IRawEventPublisher.cs
  20. 39
      framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/OutgoingEventInfo.cs
  21. 1
      test/DistEvents/DistDemoApp/DistDemoApp.csproj
  22. 4
      test/DistEvents/DistDemoApp/DistDemoAppModule.cs

2
framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/AbpEntityFrameworkCoreModule.cs

@ -27,7 +27,7 @@ namespace Volo.Abp.EntityFrameworkCore
}); });
context.Services.TryAddTransient(typeof(IDbContextProvider<>), typeof(UnitOfWorkDbContextProvider<>)); context.Services.TryAddTransient(typeof(IDbContextProvider<>), typeof(UnitOfWorkDbContextProvider<>));
context.Services.AddTransient(typeof(DbContextEventOutbox<>)); context.Services.AddTransient(typeof(IDbContextEventOutbox<>), typeof(DbContextEventOutbox<>));
} }
} }
} }

48
framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventOutbox.cs

@ -1,29 +1,59 @@
using System.Threading.Tasks; using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Volo.Abp.EventBus.Distributed; using Volo.Abp.EventBus.Distributed;
using Volo.Abp.Guids; using Volo.Abp.Uow;
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{ {
public class DbContextEventOutbox<TDbContext> : IEventOutbox public class DbContextEventOutbox<TDbContext> : IDbContextEventOutbox<TDbContext>
where TDbContext : IHasEventOutbox where TDbContext : IHasEventOutbox
{ {
protected IDbContextProvider<TDbContext> DbContextProvider { get; } protected IDbContextProvider<TDbContext> DbContextProvider { get; }
protected IGuidGenerator GuidGenerator { get; }
public DbContextEventOutbox( public DbContextEventOutbox(
IDbContextProvider<TDbContext> dbContextProvider, IDbContextProvider<TDbContext> dbContextProvider)
IGuidGenerator guidGenerator)
{ {
DbContextProvider = dbContextProvider; DbContextProvider = dbContextProvider;
GuidGenerator = guidGenerator;
} }
public async Task EnqueueAsync(string eventName, byte[] eventData) [UnitOfWork]
public virtual async Task EnqueueAsync(OutgoingEventInfo outgoingEvent)
{ {
var dbContext = (IHasEventOutbox) await DbContextProvider.GetDbContextAsync(); var dbContext = (IHasEventOutbox) await DbContextProvider.GetDbContextAsync();
dbContext.OutgoingEventRecords.Add( dbContext.OutgoingEventRecords.Add(
new OutgoingEventRecord(GuidGenerator.Create(), eventName, eventData) new OutgoingEventRecord(outgoingEvent)
); );
} }
[UnitOfWork]
public virtual async Task<List<OutgoingEventInfo>> GetWaitingEventsAsync(int maxCount)
{
var dbContext = (IHasEventOutbox) await DbContextProvider.GetDbContextAsync();
var outgoingEventRecords = await dbContext
.OutgoingEventRecords
.AsNoTracking()
.OrderBy(x => x.CreationTime)
.Take(maxCount)
.ToListAsync();
return outgoingEventRecords
.Select(x => x.ToOutgoingEventInfo())
.ToList();
}
[UnitOfWork]
public virtual async Task DeleteAsync(Guid id)
{
var dbContext = (IHasEventOutbox) await DbContextProvider.GetDbContextAsync();
var outgoingEvent = await dbContext.OutgoingEventRecords.FindAsync(id);
if (outgoingEvent != null)
{
dbContext.Remove(outgoingEvent);
}
}
} }
} }

2
framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/EfCoreOutboxConfigExtensions.cs

@ -7,7 +7,7 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
public static void UseDbContext<TDbContext>(this OutboxConfig outboxConfig) public static void UseDbContext<TDbContext>(this OutboxConfig outboxConfig)
where TDbContext : IHasEventOutbox where TDbContext : IHasEventOutbox
{ {
outboxConfig.ImplementationType = typeof(DbContextEventOutbox<TDbContext>); outboxConfig.ImplementationType = typeof(IDbContextEventOutbox<TDbContext>);
} }
} }
} }

10
framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/IDbContextEventOutbox.cs

@ -0,0 +1,10 @@
using Volo.Abp.EventBus.Distributed;
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public interface IDbContextEventOutbox<TDbContext> : IEventOutbox
where TDbContext : IHasEventOutbox
{
}
}

36
framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/OutgoingEventRecord.cs

@ -2,20 +2,24 @@
using Volo.Abp.Auditing; using Volo.Abp.Auditing;
using Volo.Abp.Data; using Volo.Abp.Data;
using Volo.Abp.Domain.Entities; using Volo.Abp.Domain.Entities;
using Volo.Abp.EventBus.Distributed;
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{ {
public class OutgoingEventRecord : BasicAggregateRoot<Guid>, IHasExtraProperties, IHasCreationTime public class OutgoingEventRecord :
BasicAggregateRoot<Guid>,
IHasExtraProperties,
IHasCreationTime
{ {
public static int MaxEventNameLength { get; set; } = 256; public static int MaxEventNameLength { get; set; } = 256;
public ExtraPropertyDictionary ExtraProperties { get; protected set; } public ExtraPropertyDictionary ExtraProperties { get; private set; }
public string EventName { get; set; } public string EventName { get; private set; }
public byte[] EventData { get; set; } public byte[] EventData { get; private set; }
public DateTime CreationTime { get; set; } public DateTime CreationTime { get; private set; }
protected OutgoingEventRecord() protected OutgoingEventRecord()
{ {
@ -23,14 +27,26 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
this.SetDefaultsForExtraProperties(); this.SetDefaultsForExtraProperties();
} }
public OutgoingEventRecord(Guid id, string eventName, byte[] eventData) public OutgoingEventRecord(
: base(id) OutgoingEventInfo eventInfo)
: base(eventInfo.Id)
{ {
EventName = eventName; EventName = eventInfo.EventName;
EventData = eventData; EventData = eventInfo.EventData;
CreationTime = eventInfo.CreationTime;
ExtraProperties = new ExtraPropertyDictionary(); ExtraProperties = new ExtraPropertyDictionary();
this.SetDefaultsForExtraProperties(); this.SetDefaultsForExtraProperties();
} }
public OutgoingEventInfo ToOutgoingEventInfo()
{
return new OutgoingEventInfo(
Id,
EventName,
EventData,
CreationTime
);
}
} }
} }

1
framework/src/Volo.Abp.EventBus.Boxes/Volo.Abp.EventBus.Boxes.csproj

@ -15,6 +15,7 @@
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<ProjectReference Include="..\Volo.Abp.BackgroundWorkers\Volo.Abp.BackgroundWorkers.csproj" />
<ProjectReference Include="..\Volo.Abp.EventBus\Volo.Abp.EventBus.csproj" /> <ProjectReference Include="..\Volo.Abp.EventBus\Volo.Abp.EventBus.csproj" />
</ItemGroup> </ItemGroup>

18
framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/AbpDistributedEventBusExtensions.cs

@ -0,0 +1,18 @@
using Volo.Abp.EventBus.Distributed;
namespace Volo.Abp.EventBus.Boxes
{
public static class AbpDistributedEventBusExtensions
{
public static IRawEventPublisher AsRawEventPublisher(this IDistributedEventBus eventBus)
{
var rawPublisher = eventBus as IRawEventPublisher;
if (rawPublisher == null)
{
throw new AbpException($"Given type ({eventBus.GetType().AssemblyQualifiedName}) should implement {nameof(IRawEventPublisher)}!");
}
return rawPublisher;
}
}
}

11
framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/AbpEventBusBoxesModule.cs

@ -1,12 +1,17 @@
using Volo.Abp.Modularity; using Volo.Abp.BackgroundWorkers;
using Volo.Abp.Modularity;
namespace Volo.Abp.EventBus.Boxes namespace Volo.Abp.EventBus.Boxes
{ {
[DependsOn( [DependsOn(
typeof(AbpEventBusModule) typeof(AbpEventBusModule),
typeof(AbpBackgroundWorkersModule)
)] )]
public class AbpEventBusBoxesModule : AbpModule public class AbpEventBusBoxesModule : AbpModule
{ {
public override void OnApplicationInitialization(ApplicationInitializationContext context)
{
context.AddBackgroundWorker<OutboxSenderManager>();
}
} }
} }

11
framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/IOutboxSender.cs

@ -0,0 +1,11 @@
using System.Threading.Tasks;
using Volo.Abp.EventBus.Distributed;
namespace Volo.Abp.EventBus.Boxes
{
public interface IOutboxSender
{
Task StartAsync(OutboxConfig outboxConfig);
Task StopAsync();
}
}

77
framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/OutboxSender.cs

@ -1,7 +1,78 @@
namespace Volo.Abp.EventBus.Boxes using System;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Volo.Abp.DependencyInjection;
using Volo.Abp.EventBus.Distributed;
using Volo.Abp.Threading;
using Volo.Abp.Uow;
namespace Volo.Abp.EventBus.Boxes
{ {
public class OutboxSender //TODO: use distributed lock!
public class OutboxSender : IOutboxSender, ITransientDependency
{ {
//Background worker & distributed lock protected IServiceProvider ServiceProvider { get; }
protected AbpTimer Timer { get; }
protected IDistributedEventBus DistributedEventBus { get; }
protected IEventOutbox Outbox { get; private set; }
public ILogger<OutboxSender> Logger { get; set; }
public OutboxSender(
IServiceProvider serviceProvider,
AbpTimer timer,
IDistributedEventBus distributedEventBus)
{
ServiceProvider = serviceProvider;
Timer = timer;
DistributedEventBus = distributedEventBus;
Timer.Period = 2000; //TODO: Config?
Timer.Elapsed += TimerOnElapsed;
Logger = NullLogger<OutboxSender>.Instance;
}
public virtual Task StartAsync(OutboxConfig outboxConfig)
{
Outbox = (IEventOutbox)ServiceProvider.GetRequiredService(outboxConfig.ImplementationType);
Timer.Start();
return Task.CompletedTask;
}
public virtual Task StopAsync()
{
Timer.Stop();
return Task.CompletedTask;
}
private void TimerOnElapsed(object sender, EventArgs e)
{
AsyncHelper.RunSync(RunAsync);
}
protected virtual async Task RunAsync()
{
while (true)
{
var waitingEvents = await Outbox.GetWaitingEventsAsync(100);
if (waitingEvents.Count <= 0)
{
break;
}
Logger.LogInformation($"Found {waitingEvents.Count} events in the outbox.");
foreach (var waitingEvent in waitingEvents)
{
await DistributedEventBus
.AsRawEventPublisher()
.PublishRawAsync(waitingEvent.Id, waitingEvent.EventName, waitingEvent.EventData);
await Outbox.DeleteAsync(waitingEvent.Id);
Logger.LogInformation($"Sent the event to the message broker with id = {waitingEvent.Id:N}");
}
}
}
} }
} }

48
framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/OutboxSenderManager.cs

@ -0,0 +1,48 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using Volo.Abp.BackgroundWorkers;
using Volo.Abp.EventBus.Distributed;
namespace Volo.Abp.EventBus.Boxes
{
public class OutboxSenderManager : IBackgroundWorker
{
protected AbpDistributedEventBusOptions Options { get; }
protected IServiceProvider ServiceProvider { get; }
protected List<IOutboxSender> Senders { get; }
public OutboxSenderManager(
IOptions<AbpDistributedEventBusOptions> options,
IServiceProvider serviceProvider)
{
ServiceProvider = serviceProvider;
Options = options.Value;
Senders = new List<IOutboxSender>();
}
public async Task StartAsync(CancellationToken cancellationToken = default)
{
foreach (var outboxConfig in Options.Outboxes.Values)
{
if (outboxConfig.IsSendingEnabled)
{
var sender = ServiceProvider.GetRequiredService<IOutboxSender>();
await sender.StartAsync(outboxConfig);
Senders.Add(sender);
}
}
}
public async Task StopAsync(CancellationToken cancellationToken = default)
{
foreach (var sender in Senders)
{
await sender.StopAsync();
}
}
}
}

36
framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs

@ -9,9 +9,11 @@ using Microsoft.Extensions.Options;
using Volo.Abp.Data; using Volo.Abp.Data;
using Volo.Abp.DependencyInjection; using Volo.Abp.DependencyInjection;
using Volo.Abp.EventBus.Distributed; using Volo.Abp.EventBus.Distributed;
using Volo.Abp.Guids;
using Volo.Abp.Kafka; using Volo.Abp.Kafka;
using Volo.Abp.MultiTenancy; using Volo.Abp.MultiTenancy;
using Volo.Abp.Threading; using Volo.Abp.Threading;
using Volo.Abp.Timing;
using Volo.Abp.Uow; using Volo.Abp.Uow;
namespace Volo.Abp.EventBus.Kafka namespace Volo.Abp.EventBus.Kafka
@ -40,13 +42,17 @@ namespace Volo.Abp.EventBus.Kafka
IKafkaSerializer serializer, IKafkaSerializer serializer,
IProducerPool producerPool, IProducerPool producerPool,
IEventErrorHandler errorHandler, IEventErrorHandler errorHandler,
IOptions<AbpEventBusOptions> abpEventBusOptions) IOptions<AbpEventBusOptions> abpEventBusOptions,
IGuidGenerator guidGenerator,
IClock clock)
: base( : base(
serviceScopeFactory, serviceScopeFactory,
currentTenant, currentTenant,
unitOfWorkManager, unitOfWorkManager,
errorHandler, errorHandler,
abpDistributedEventBusOptions) abpDistributedEventBusOptions,
guidGenerator,
clock)
{ {
AbpKafkaEventBusOptions = abpKafkaEventBusOptions.Value; AbpKafkaEventBusOptions = abpKafkaEventBusOptions.Value;
AbpEventBusOptions = abpEventBusOptions.Value; AbpEventBusOptions = abpEventBusOptions.Value;
@ -182,7 +188,24 @@ namespace Volo.Abp.EventBus.Kafka
{ {
unitOfWork.AddOrReplaceDistributedEvent(eventRecord); unitOfWork.AddOrReplaceDistributedEvent(eventRecord);
} }
public override Task PublishRawAsync(
Guid eventId,
string eventName,
byte[] eventData)
{
return PublishAsync(
AbpKafkaEventBusOptions.TopicName,
eventName,
eventData,
new Headers
{
{ "messageId", Serializer.Serialize(eventId) }
},
null
);
}
protected override byte[] Serialize(object eventData) protected override byte[] Serialize(object eventData)
{ {
return Serializer.Serialize(eventData); return Serializer.Serialize(eventData);
@ -204,11 +227,16 @@ namespace Volo.Abp.EventBus.Kafka
await PublishAsync(DeadLetterTopicName, eventType, eventData, headers, headersArguments); await PublishAsync(DeadLetterTopicName, eventType, eventData, headers, headersArguments);
} }
private async Task PublishAsync(string topicName, Type eventType, object eventData, Headers headers, Dictionary<string, object> headersArguments) private Task PublishAsync(string topicName, Type eventType, object eventData, Headers headers, Dictionary<string, object> headersArguments)
{ {
var eventName = EventNameAttribute.GetNameOrDefault(eventType); var eventName = EventNameAttribute.GetNameOrDefault(eventType);
var body = Serializer.Serialize(eventData); var body = Serializer.Serialize(eventData);
return PublishAsync(topicName, eventName, body, headers, headersArguments);
}
private async Task PublishAsync(string topicName, string eventName, byte[] body, Headers headers, Dictionary<string, object> headersArguments)
{
var producer = ProducerPool.Get(AbpKafkaEventBusOptions.ConnectionName); var producer = ProducerPool.Get(AbpKafkaEventBusOptions.ConnectionName);
SetEventMessageHeaders(headers, headersArguments); SetEventMessageHeaders(headers, headersArguments);

29
framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs

@ -10,9 +10,11 @@ using RabbitMQ.Client.Events;
using Volo.Abp.Data; using Volo.Abp.Data;
using Volo.Abp.DependencyInjection; using Volo.Abp.DependencyInjection;
using Volo.Abp.EventBus.Distributed; using Volo.Abp.EventBus.Distributed;
using Volo.Abp.Guids;
using Volo.Abp.MultiTenancy; using Volo.Abp.MultiTenancy;
using Volo.Abp.RabbitMQ; using Volo.Abp.RabbitMQ;
using Volo.Abp.Threading; using Volo.Abp.Threading;
using Volo.Abp.Timing;
using Volo.Abp.Uow; using Volo.Abp.Uow;
namespace Volo.Abp.EventBus.RabbitMq namespace Volo.Abp.EventBus.RabbitMq
@ -46,13 +48,17 @@ namespace Volo.Abp.EventBus.RabbitMq
ICurrentTenant currentTenant, ICurrentTenant currentTenant,
IUnitOfWorkManager unitOfWorkManager, IUnitOfWorkManager unitOfWorkManager,
IEventErrorHandler errorHandler, IEventErrorHandler errorHandler,
IOptions<AbpEventBusOptions> abpEventBusOptions) IOptions<AbpEventBusOptions> abpEventBusOptions,
IGuidGenerator guidGenerator,
IClock clock)
: base( : base(
serviceScopeFactory, serviceScopeFactory,
currentTenant, currentTenant,
unitOfWorkManager, unitOfWorkManager,
errorHandler, errorHandler,
distributedEventBusOptions) distributedEventBusOptions,
guidGenerator,
clock)
{ {
ConnectionPool = connectionPool; ConnectionPool = connectionPool;
Serializer = serializer; Serializer = serializer;
@ -198,7 +204,12 @@ namespace Volo.Abp.EventBus.RabbitMq
{ {
unitOfWork.AddOrReplaceDistributedEvent(eventRecord); unitOfWork.AddOrReplaceDistributedEvent(eventRecord);
} }
public override Task PublishRawAsync(Guid eventId, string eventName, byte[] eventData)
{
return PublishAsync(eventName, eventData, null, eventId: eventId);
}
protected override byte[] Serialize(object eventData) protected override byte[] Serialize(object eventData)
{ {
return Serializer.Serialize(eventData); return Serializer.Serialize(eventData);
@ -209,6 +220,16 @@ namespace Volo.Abp.EventBus.RabbitMq
var eventName = EventNameAttribute.GetNameOrDefault(eventType); var eventName = EventNameAttribute.GetNameOrDefault(eventType);
var body = Serializer.Serialize(eventData); var body = Serializer.Serialize(eventData);
return PublishAsync(eventName, body, properties, headersArguments);
}
protected Task PublishAsync(
string eventName,
byte[] body,
IBasicProperties properties,
Dictionary<string, object> headersArguments = null,
Guid? eventId = null)
{
using (var channel = ConnectionPool.Get(AbpRabbitMqEventBusOptions.ConnectionName).CreateModel()) using (var channel = ConnectionPool.Get(AbpRabbitMqEventBusOptions.ConnectionName).CreateModel())
{ {
channel.ExchangeDeclare( channel.ExchangeDeclare(
@ -221,7 +242,7 @@ namespace Volo.Abp.EventBus.RabbitMq
{ {
properties = channel.CreateBasicProperties(); properties = channel.CreateBasicProperties();
properties.DeliveryMode = RabbitMqConsts.DeliveryModes.Persistent; properties.DeliveryMode = RabbitMqConsts.DeliveryModes.Persistent;
properties.MessageId = Guid.NewGuid().ToString("N"); properties.MessageId = (eventId ?? GuidGenerator.Create()).ToString("N");
} }
SetEventMessageHeaders(properties, headersArguments); SetEventMessageHeaders(properties, headersArguments);

16
framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs

@ -8,8 +8,10 @@ using Microsoft.Extensions.Options;
using Rebus.Bus; using Rebus.Bus;
using Volo.Abp.DependencyInjection; using Volo.Abp.DependencyInjection;
using Volo.Abp.EventBus.Distributed; using Volo.Abp.EventBus.Distributed;
using Volo.Abp.Guids;
using Volo.Abp.MultiTenancy; using Volo.Abp.MultiTenancy;
using Volo.Abp.Threading; using Volo.Abp.Threading;
using Volo.Abp.Timing;
using Volo.Abp.Uow; using Volo.Abp.Uow;
namespace Volo.Abp.EventBus.Rebus namespace Volo.Abp.EventBus.Rebus
@ -34,13 +36,17 @@ namespace Volo.Abp.EventBus.Rebus
IOptions<AbpDistributedEventBusOptions> abpDistributedEventBusOptions, IOptions<AbpDistributedEventBusOptions> abpDistributedEventBusOptions,
IOptions<AbpRebusEventBusOptions> abpEventBusRebusOptions, IOptions<AbpRebusEventBusOptions> abpEventBusRebusOptions,
IEventErrorHandler errorHandler, IEventErrorHandler errorHandler,
IRebusSerializer serializer) : IRebusSerializer serializer,
IGuidGenerator guidGenerator,
IClock clock) :
base( base(
serviceScopeFactory, serviceScopeFactory,
currentTenant, currentTenant,
unitOfWorkManager, unitOfWorkManager,
errorHandler, errorHandler,
abpDistributedEventBusOptions) abpDistributedEventBusOptions,
guidGenerator,
clock)
{ {
Rebus = rebus; Rebus = rebus;
Serializer = serializer; Serializer = serializer;
@ -182,6 +188,12 @@ namespace Volo.Abp.EventBus.Rebus
return false; return false;
} }
public override Task PublishRawAsync(Guid eventId, string eventName, byte[] eventData)
{
/* TODO: IMPLEMENT! */
throw new NotImplementedException();
}
protected override byte[] Serialize(object eventData) protected override byte[] Serialize(object eventData)
{ {
return Serializer.Serialize(eventData); return Serializer.Serialize(eventData);

1
framework/src/Volo.Abp.EventBus/Volo.Abp.EventBus.csproj

@ -16,6 +16,7 @@
<ItemGroup> <ItemGroup>
<ProjectReference Include="..\Volo.Abp.EventBus.Abstractions\Volo.Abp.EventBus.Abstractions.csproj" /> <ProjectReference Include="..\Volo.Abp.EventBus.Abstractions\Volo.Abp.EventBus.Abstractions.csproj" />
<ProjectReference Include="..\Volo.Abp.Guids\Volo.Abp.Guids.csproj" />
<ProjectReference Include="..\Volo.Abp.Json\Volo.Abp.Json.csproj" /> <ProjectReference Include="..\Volo.Abp.Json\Volo.Abp.Json.csproj" />
<ProjectReference Include="..\Volo.Abp.MultiTenancy\Volo.Abp.MultiTenancy.csproj" /> <ProjectReference Include="..\Volo.Abp.MultiTenancy\Volo.Abp.MultiTenancy.csproj" />
</ItemGroup> </ItemGroup>

5
framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/AbpEventBusModule.cs

@ -4,6 +4,7 @@ using System.Collections.Generic;
using Volo.Abp.EventBus.Abstractions; using Volo.Abp.EventBus.Abstractions;
using Volo.Abp.EventBus.Distributed; using Volo.Abp.EventBus.Distributed;
using Volo.Abp.EventBus.Local; using Volo.Abp.EventBus.Local;
using Volo.Abp.Guids;
using Volo.Abp.Json; using Volo.Abp.Json;
using Volo.Abp.Modularity; using Volo.Abp.Modularity;
using Volo.Abp.MultiTenancy; using Volo.Abp.MultiTenancy;
@ -14,7 +15,9 @@ namespace Volo.Abp.EventBus
[DependsOn( [DependsOn(
typeof(AbpEventBusAbstractionsModule), typeof(AbpEventBusAbstractionsModule),
typeof(AbpMultiTenancyModule), typeof(AbpMultiTenancyModule),
typeof(AbpJsonModule))] typeof(AbpJsonModule),
typeof(AbpGuidsModule)
)]
public class AbpEventBusModule : AbpModule public class AbpEventBusModule : AbpModule
{ {
public override void PreConfigureServices(ServiceConfigurationContext context) public override void PreConfigureServices(ServiceConfigurationContext context)

25
framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs

@ -1,15 +1,18 @@
using System; using System;
using System.Linq;
using System.Threading.Tasks; using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
using Volo.Abp.Guids;
using Volo.Abp.MultiTenancy; using Volo.Abp.MultiTenancy;
using Volo.Abp.Timing;
using Volo.Abp.Uow; using Volo.Abp.Uow;
namespace Volo.Abp.EventBus.Distributed namespace Volo.Abp.EventBus.Distributed
{ {
public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventBus public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventBus, IRawEventPublisher
{ {
protected IGuidGenerator GuidGenerator { get; }
protected IClock Clock { get; }
protected AbpDistributedEventBusOptions AbpDistributedEventBusOptions { get; } protected AbpDistributedEventBusOptions AbpDistributedEventBusOptions { get; }
protected DistributedEventBusBase( protected DistributedEventBusBase(
@ -17,13 +20,17 @@ namespace Volo.Abp.EventBus.Distributed
ICurrentTenant currentTenant, ICurrentTenant currentTenant,
IUnitOfWorkManager unitOfWorkManager, IUnitOfWorkManager unitOfWorkManager,
IEventErrorHandler errorHandler, IEventErrorHandler errorHandler,
IOptions<AbpDistributedEventBusOptions> abpDistributedEventBusOptions IOptions<AbpDistributedEventBusOptions> abpDistributedEventBusOptions,
IGuidGenerator guidGenerator,
IClock clock
) : base( ) : base(
serviceScopeFactory, serviceScopeFactory,
currentTenant, currentTenant,
unitOfWorkManager, unitOfWorkManager,
errorHandler) errorHandler)
{ {
GuidGenerator = guidGenerator;
Clock = clock;
AbpDistributedEventBusOptions = abpDistributedEventBusOptions.Value; AbpDistributedEventBusOptions = abpDistributedEventBusOptions.Value;
} }
@ -71,7 +78,9 @@ namespace Volo.Abp.EventBus.Distributed
await PublishToEventBusAsync(eventType, eventData); await PublishToEventBusAsync(eventType, eventData);
} }
public abstract Task PublishRawAsync(Guid eventId, string eventName, byte[] eventData);
private async Task<bool> AddToOutboxAsync(Type eventType, object eventData) private async Task<bool> AddToOutboxAsync(Type eventType, object eventData)
{ {
var unitOfWork = UnitOfWorkManager.Current; var unitOfWork = UnitOfWorkManager.Current;
@ -87,8 +96,12 @@ namespace Volo.Abp.EventBus.Distributed
var eventOutbox = (IEventOutbox)unitOfWork.ServiceProvider.GetRequiredService(outboxConfig.ImplementationType); var eventOutbox = (IEventOutbox)unitOfWork.ServiceProvider.GetRequiredService(outboxConfig.ImplementationType);
var eventName = EventNameAttribute.GetNameOrDefault(eventType); var eventName = EventNameAttribute.GetNameOrDefault(eventType);
await eventOutbox.EnqueueAsync( await eventOutbox.EnqueueAsync(
eventName, new OutgoingEventInfo(
Serialize(eventData) GuidGenerator.Create(),
eventName,
Serialize(eventData),
Clock.Now
)
); );
return true; return true;
} }

8
framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/IEventOutbox.cs

@ -1,9 +1,15 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks; using System.Threading.Tasks;
namespace Volo.Abp.EventBus.Distributed namespace Volo.Abp.EventBus.Distributed
{ {
public interface IEventOutbox public interface IEventOutbox
{ {
Task EnqueueAsync(string eventName, byte[] eventData); Task EnqueueAsync(OutgoingEventInfo outgoingEvent);
Task<List<OutgoingEventInfo>> GetWaitingEventsAsync(int maxCount);
Task DeleteAsync(Guid id);
} }
} }

13
framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/IRawEventPublisher.cs

@ -0,0 +1,13 @@
using System;
using System.Threading.Tasks;
namespace Volo.Abp.EventBus.Distributed
{
public interface IRawEventPublisher
{
Task PublishRawAsync(
Guid eventId,
string eventName,
byte[] eventData);
}
}

39
framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/OutgoingEventInfo.cs

@ -0,0 +1,39 @@
using System;
using Volo.Abp.Data;
namespace Volo.Abp.EventBus.Distributed
{
public class OutgoingEventInfo : IHasExtraProperties
{
public static int MaxEventNameLength { get; set; } = 256;
public ExtraPropertyDictionary ExtraProperties { get; protected set; }
public Guid Id { get; }
public string EventName { get; }
public byte[] EventData { get; }
public DateTime CreationTime { get; }
protected OutgoingEventInfo()
{
ExtraProperties = new ExtraPropertyDictionary();
this.SetDefaultsForExtraProperties();
}
public OutgoingEventInfo(
Guid id,
string eventName,
byte[] eventData,
DateTime creationTime)
{
Id = id;
EventName = eventName;
EventData = eventData;
CreationTime = creationTime;
ExtraProperties = new ExtraPropertyDictionary();
this.SetDefaultsForExtraProperties();
}
}}

1
test/DistEvents/DistDemoApp/DistDemoApp.csproj

@ -17,6 +17,7 @@
<ProjectReference Include="..\..\..\framework\src\Volo.Abp.EntityFrameworkCore.SqlServer\Volo.Abp.EntityFrameworkCore.SqlServer.csproj" /> <ProjectReference Include="..\..\..\framework\src\Volo.Abp.EntityFrameworkCore.SqlServer\Volo.Abp.EntityFrameworkCore.SqlServer.csproj" />
<ProjectReference Include="..\..\..\framework\src\Volo.Abp.Autofac\Volo.Abp.Autofac.csproj" /> <ProjectReference Include="..\..\..\framework\src\Volo.Abp.Autofac\Volo.Abp.Autofac.csproj" />
<ProjectReference Include="..\..\..\framework\src\Volo.Abp.EventBus.RabbitMQ\Volo.Abp.EventBus.RabbitMQ.csproj" /> <ProjectReference Include="..\..\..\framework\src\Volo.Abp.EventBus.RabbitMQ\Volo.Abp.EventBus.RabbitMQ.csproj" />
<ProjectReference Include="..\..\..\framework\src\Volo.Abp.EventBus.Boxes\Volo.Abp.EventBus.Boxes.csproj" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>

4
test/DistEvents/DistDemoApp/DistDemoAppModule.cs

@ -4,6 +4,7 @@ using Volo.Abp.Domain.Entities.Events.Distributed;
using Volo.Abp.EntityFrameworkCore; using Volo.Abp.EntityFrameworkCore;
using Volo.Abp.EntityFrameworkCore.DistributedEvents; using Volo.Abp.EntityFrameworkCore.DistributedEvents;
using Volo.Abp.EntityFrameworkCore.SqlServer; using Volo.Abp.EntityFrameworkCore.SqlServer;
using Volo.Abp.EventBus.Boxes;
using Volo.Abp.EventBus.Distributed; using Volo.Abp.EventBus.Distributed;
using Volo.Abp.EventBus.RabbitMq; using Volo.Abp.EventBus.RabbitMq;
using Volo.Abp.Modularity; using Volo.Abp.Modularity;
@ -13,7 +14,8 @@ namespace DistDemoApp
[DependsOn( [DependsOn(
typeof(AbpEntityFrameworkCoreSqlServerModule), typeof(AbpEntityFrameworkCoreSqlServerModule),
typeof(AbpAutofacModule), typeof(AbpAutofacModule),
typeof(AbpEventBusRabbitMqModule) typeof(AbpEventBusRabbitMqModule),
typeof(AbpEventBusBoxesModule)
)] )]
public class DistDemoAppModule : AbpModule public class DistDemoAppModule : AbpModule
{ {

Loading…
Cancel
Save