From afcd640e40adfe8bb0233d9686607f04ab8ccb05 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Halil=20=C4=B0brahim=20Kalkan?= Date: Wed, 8 Sep 2021 14:02:05 +0300 Subject: [PATCH] Basic implementation is done for the event outbox. --- .../AbpEntityFrameworkCoreModule.cs | 2 +- .../DistributedEvents/DbContextEventOutbox.cs | 48 +++++++++--- .../EfCoreOutboxConfigExtensions.cs | 2 +- .../IDbContextEventOutbox.cs | 10 +++ .../DistributedEvents/OutgoingEventRecord.cs | 36 ++++++--- .../Volo.Abp.EventBus.Boxes.csproj | 1 + .../Boxes/AbpDistributedEventBusExtensions.cs | 18 +++++ .../EventBus/Boxes/AbpEventBusBoxesModule.cs | 11 ++- .../Volo/Abp/EventBus/Boxes/IOutboxSender.cs | 11 +++ .../Volo/Abp/EventBus/Boxes/OutboxSender.cs | 77 ++++++++++++++++++- .../Abp/EventBus/Boxes/OutboxSenderManager.cs | 48 ++++++++++++ .../Kafka/KafkaDistributedEventBus.cs | 36 ++++++++- .../RabbitMq/RabbitMqDistributedEventBus.cs | 29 ++++++- .../Rebus/RebusDistributedEventBus.cs | 16 +++- .../Volo.Abp.EventBus.csproj | 1 + .../Volo/Abp/EventBus/AbpEventBusModule.cs | 5 +- .../Distributed/DistributedEventBusBase.cs | 25 ++++-- .../Abp/EventBus/Distributed/IEventOutbox.cs | 8 +- .../Distributed/IRawEventPublisher.cs | 13 ++++ .../EventBus/Distributed/OutgoingEventInfo.cs | 39 ++++++++++ .../DistEvents/DistDemoApp/DistDemoApp.csproj | 1 + .../DistDemoApp/DistDemoAppModule.cs | 4 +- 22 files changed, 395 insertions(+), 46 deletions(-) create mode 100644 framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/IDbContextEventOutbox.cs create mode 100644 framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/AbpDistributedEventBusExtensions.cs create mode 100644 framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/IOutboxSender.cs create mode 100644 framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/OutboxSenderManager.cs create mode 100644 framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/IRawEventPublisher.cs create mode 100644 framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/OutgoingEventInfo.cs diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/AbpEntityFrameworkCoreModule.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/AbpEntityFrameworkCoreModule.cs index 1378f955e4..79db928711 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/AbpEntityFrameworkCoreModule.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/AbpEntityFrameworkCoreModule.cs @@ -27,7 +27,7 @@ namespace Volo.Abp.EntityFrameworkCore }); context.Services.TryAddTransient(typeof(IDbContextProvider<>), typeof(UnitOfWorkDbContextProvider<>)); - context.Services.AddTransient(typeof(DbContextEventOutbox<>)); + context.Services.AddTransient(typeof(IDbContextEventOutbox<>), typeof(DbContextEventOutbox<>)); } } } diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventOutbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventOutbox.cs index 1831b67ccb..b8d26fa357 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventOutbox.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventOutbox.cs @@ -1,29 +1,59 @@ -using System.Threading.Tasks; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Microsoft.EntityFrameworkCore; using Volo.Abp.EventBus.Distributed; -using Volo.Abp.Guids; +using Volo.Abp.Uow; namespace Volo.Abp.EntityFrameworkCore.DistributedEvents { - public class DbContextEventOutbox : IEventOutbox + public class DbContextEventOutbox : IDbContextEventOutbox where TDbContext : IHasEventOutbox { protected IDbContextProvider DbContextProvider { get; } - protected IGuidGenerator GuidGenerator { get; } public DbContextEventOutbox( - IDbContextProvider dbContextProvider, - IGuidGenerator guidGenerator) + IDbContextProvider dbContextProvider) { DbContextProvider = dbContextProvider; - GuidGenerator = guidGenerator; } - public async Task EnqueueAsync(string eventName, byte[] eventData) + [UnitOfWork] + public virtual async Task EnqueueAsync(OutgoingEventInfo outgoingEvent) { var dbContext = (IHasEventOutbox) await DbContextProvider.GetDbContextAsync(); dbContext.OutgoingEventRecords.Add( - new OutgoingEventRecord(GuidGenerator.Create(), eventName, eventData) + new OutgoingEventRecord(outgoingEvent) ); } + + [UnitOfWork] + public virtual async Task> GetWaitingEventsAsync(int maxCount) + { + var dbContext = (IHasEventOutbox) await DbContextProvider.GetDbContextAsync(); + + var outgoingEventRecords = await dbContext + .OutgoingEventRecords + .AsNoTracking() + .OrderBy(x => x.CreationTime) + .Take(maxCount) + .ToListAsync(); + + return outgoingEventRecords + .Select(x => x.ToOutgoingEventInfo()) + .ToList(); + } + + [UnitOfWork] + public virtual async Task DeleteAsync(Guid id) + { + var dbContext = (IHasEventOutbox) await DbContextProvider.GetDbContextAsync(); + var outgoingEvent = await dbContext.OutgoingEventRecords.FindAsync(id); + if (outgoingEvent != null) + { + dbContext.Remove(outgoingEvent); + } + } } } \ No newline at end of file diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/EfCoreOutboxConfigExtensions.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/EfCoreOutboxConfigExtensions.cs index 8199ff0739..53745477c7 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/EfCoreOutboxConfigExtensions.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/EfCoreOutboxConfigExtensions.cs @@ -7,7 +7,7 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents public static void UseDbContext(this OutboxConfig outboxConfig) where TDbContext : IHasEventOutbox { - outboxConfig.ImplementationType = typeof(DbContextEventOutbox); + outboxConfig.ImplementationType = typeof(IDbContextEventOutbox); } } } \ No newline at end of file diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/IDbContextEventOutbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/IDbContextEventOutbox.cs new file mode 100644 index 0000000000..71d65c7f23 --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/IDbContextEventOutbox.cs @@ -0,0 +1,10 @@ +using Volo.Abp.EventBus.Distributed; + +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public interface IDbContextEventOutbox : IEventOutbox + where TDbContext : IHasEventOutbox + { + + } +} \ No newline at end of file diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/OutgoingEventRecord.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/OutgoingEventRecord.cs index d33e6a0097..727216de48 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/OutgoingEventRecord.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/OutgoingEventRecord.cs @@ -2,20 +2,24 @@ using Volo.Abp.Auditing; using Volo.Abp.Data; using Volo.Abp.Domain.Entities; +using Volo.Abp.EventBus.Distributed; namespace Volo.Abp.EntityFrameworkCore.DistributedEvents { - public class OutgoingEventRecord : BasicAggregateRoot, IHasExtraProperties, IHasCreationTime + public class OutgoingEventRecord : + BasicAggregateRoot, + IHasExtraProperties, + IHasCreationTime { public static int MaxEventNameLength { get; set; } = 256; - public ExtraPropertyDictionary ExtraProperties { get; protected set; } + public ExtraPropertyDictionary ExtraProperties { get; private set; } - public string EventName { get; set; } + public string EventName { get; private set; } - public byte[] EventData { get; set; } + public byte[] EventData { get; private set; } - public DateTime CreationTime { get; set; } + public DateTime CreationTime { get; private set; } protected OutgoingEventRecord() { @@ -23,14 +27,26 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents this.SetDefaultsForExtraProperties(); } - public OutgoingEventRecord(Guid id, string eventName, byte[] eventData) - : base(id) + public OutgoingEventRecord( + OutgoingEventInfo eventInfo) + : base(eventInfo.Id) { - EventName = eventName; - EventData = eventData; - + EventName = eventInfo.EventName; + EventData = eventInfo.EventData; + CreationTime = eventInfo.CreationTime; + ExtraProperties = new ExtraPropertyDictionary(); this.SetDefaultsForExtraProperties(); } + + public OutgoingEventInfo ToOutgoingEventInfo() + { + return new OutgoingEventInfo( + Id, + EventName, + EventData, + CreationTime + ); + } } } \ No newline at end of file diff --git a/framework/src/Volo.Abp.EventBus.Boxes/Volo.Abp.EventBus.Boxes.csproj b/framework/src/Volo.Abp.EventBus.Boxes/Volo.Abp.EventBus.Boxes.csproj index 3cae05a96a..19b7f7032e 100644 --- a/framework/src/Volo.Abp.EventBus.Boxes/Volo.Abp.EventBus.Boxes.csproj +++ b/framework/src/Volo.Abp.EventBus.Boxes/Volo.Abp.EventBus.Boxes.csproj @@ -15,6 +15,7 @@ + diff --git a/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/AbpDistributedEventBusExtensions.cs b/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/AbpDistributedEventBusExtensions.cs new file mode 100644 index 0000000000..dda6f71245 --- /dev/null +++ b/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/AbpDistributedEventBusExtensions.cs @@ -0,0 +1,18 @@ +using Volo.Abp.EventBus.Distributed; + +namespace Volo.Abp.EventBus.Boxes +{ + public static class AbpDistributedEventBusExtensions + { + public static IRawEventPublisher AsRawEventPublisher(this IDistributedEventBus eventBus) + { + var rawPublisher = eventBus as IRawEventPublisher; + if (rawPublisher == null) + { + throw new AbpException($"Given type ({eventBus.GetType().AssemblyQualifiedName}) should implement {nameof(IRawEventPublisher)}!"); + } + + return rawPublisher; + } + } +} \ No newline at end of file diff --git a/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/AbpEventBusBoxesModule.cs b/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/AbpEventBusBoxesModule.cs index 904bab747a..9445f1de08 100644 --- a/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/AbpEventBusBoxesModule.cs +++ b/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/AbpEventBusBoxesModule.cs @@ -1,12 +1,17 @@ -using Volo.Abp.Modularity; +using Volo.Abp.BackgroundWorkers; +using Volo.Abp.Modularity; namespace Volo.Abp.EventBus.Boxes { [DependsOn( - typeof(AbpEventBusModule) + typeof(AbpEventBusModule), + typeof(AbpBackgroundWorkersModule) )] public class AbpEventBusBoxesModule : AbpModule { - + public override void OnApplicationInitialization(ApplicationInitializationContext context) + { + context.AddBackgroundWorker(); + } } } \ No newline at end of file diff --git a/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/IOutboxSender.cs b/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/IOutboxSender.cs new file mode 100644 index 0000000000..efc278844a --- /dev/null +++ b/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/IOutboxSender.cs @@ -0,0 +1,11 @@ +using System.Threading.Tasks; +using Volo.Abp.EventBus.Distributed; + +namespace Volo.Abp.EventBus.Boxes +{ + public interface IOutboxSender + { + Task StartAsync(OutboxConfig outboxConfig); + Task StopAsync(); + } +} \ No newline at end of file diff --git a/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/OutboxSender.cs b/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/OutboxSender.cs index 48bad9a6cf..8c7669e447 100644 --- a/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/OutboxSender.cs +++ b/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/OutboxSender.cs @@ -1,7 +1,78 @@ -namespace Volo.Abp.EventBus.Boxes +using System; +using System.Threading.Tasks; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; +using Volo.Abp.DependencyInjection; +using Volo.Abp.EventBus.Distributed; +using Volo.Abp.Threading; +using Volo.Abp.Uow; + +namespace Volo.Abp.EventBus.Boxes { - public class OutboxSender + //TODO: use distributed lock! + public class OutboxSender : IOutboxSender, ITransientDependency { - //Background worker & distributed lock + protected IServiceProvider ServiceProvider { get; } + protected AbpTimer Timer { get; } + protected IDistributedEventBus DistributedEventBus { get; } + protected IEventOutbox Outbox { get; private set; } + public ILogger Logger { get; set; } + + public OutboxSender( + IServiceProvider serviceProvider, + AbpTimer timer, + IDistributedEventBus distributedEventBus) + { + ServiceProvider = serviceProvider; + Timer = timer; + DistributedEventBus = distributedEventBus; + Timer.Period = 2000; //TODO: Config? + Timer.Elapsed += TimerOnElapsed; + Logger = NullLogger.Instance; + } + + public virtual Task StartAsync(OutboxConfig outboxConfig) + { + Outbox = (IEventOutbox)ServiceProvider.GetRequiredService(outboxConfig.ImplementationType); + Timer.Start(); + return Task.CompletedTask; + } + + public virtual Task StopAsync() + { + Timer.Stop(); + return Task.CompletedTask; + } + + private void TimerOnElapsed(object sender, EventArgs e) + { + AsyncHelper.RunSync(RunAsync); + } + + protected virtual async Task RunAsync() + { + while (true) + { + var waitingEvents = await Outbox.GetWaitingEventsAsync(100); + if (waitingEvents.Count <= 0) + { + break; + } + + Logger.LogInformation($"Found {waitingEvents.Count} events in the outbox."); + + foreach (var waitingEvent in waitingEvents) + { + await DistributedEventBus + .AsRawEventPublisher() + .PublishRawAsync(waitingEvent.Id, waitingEvent.EventName, waitingEvent.EventData); + + await Outbox.DeleteAsync(waitingEvent.Id); + + Logger.LogInformation($"Sent the event to the message broker with id = {waitingEvent.Id:N}"); + } + } + } } } \ No newline at end of file diff --git a/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/OutboxSenderManager.cs b/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/OutboxSenderManager.cs new file mode 100644 index 0000000000..617913e42f --- /dev/null +++ b/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/OutboxSenderManager.cs @@ -0,0 +1,48 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Options; +using Volo.Abp.BackgroundWorkers; +using Volo.Abp.EventBus.Distributed; + +namespace Volo.Abp.EventBus.Boxes +{ + public class OutboxSenderManager : IBackgroundWorker + { + protected AbpDistributedEventBusOptions Options { get; } + protected IServiceProvider ServiceProvider { get; } + protected List Senders { get; } + + public OutboxSenderManager( + IOptions options, + IServiceProvider serviceProvider) + { + ServiceProvider = serviceProvider; + Options = options.Value; + Senders = new List(); + } + + public async Task StartAsync(CancellationToken cancellationToken = default) + { + foreach (var outboxConfig in Options.Outboxes.Values) + { + if (outboxConfig.IsSendingEnabled) + { + var sender = ServiceProvider.GetRequiredService(); + await sender.StartAsync(outboxConfig); + Senders.Add(sender); + } + } + } + + public async Task StopAsync(CancellationToken cancellationToken = default) + { + foreach (var sender in Senders) + { + await sender.StopAsync(); + } + } + } +} \ No newline at end of file diff --git a/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs index 18f68923e1..9e62b47fed 100644 --- a/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs @@ -9,9 +9,11 @@ using Microsoft.Extensions.Options; using Volo.Abp.Data; using Volo.Abp.DependencyInjection; using Volo.Abp.EventBus.Distributed; +using Volo.Abp.Guids; using Volo.Abp.Kafka; using Volo.Abp.MultiTenancy; using Volo.Abp.Threading; +using Volo.Abp.Timing; using Volo.Abp.Uow; namespace Volo.Abp.EventBus.Kafka @@ -40,13 +42,17 @@ namespace Volo.Abp.EventBus.Kafka IKafkaSerializer serializer, IProducerPool producerPool, IEventErrorHandler errorHandler, - IOptions abpEventBusOptions) + IOptions abpEventBusOptions, + IGuidGenerator guidGenerator, + IClock clock) : base( serviceScopeFactory, currentTenant, unitOfWorkManager, errorHandler, - abpDistributedEventBusOptions) + abpDistributedEventBusOptions, + guidGenerator, + clock) { AbpKafkaEventBusOptions = abpKafkaEventBusOptions.Value; AbpEventBusOptions = abpEventBusOptions.Value; @@ -182,7 +188,24 @@ namespace Volo.Abp.EventBus.Kafka { unitOfWork.AddOrReplaceDistributedEvent(eventRecord); } - + + public override Task PublishRawAsync( + Guid eventId, + string eventName, + byte[] eventData) + { + return PublishAsync( + AbpKafkaEventBusOptions.TopicName, + eventName, + eventData, + new Headers + { + { "messageId", Serializer.Serialize(eventId) } + }, + null + ); + } + protected override byte[] Serialize(object eventData) { return Serializer.Serialize(eventData); @@ -204,11 +227,16 @@ namespace Volo.Abp.EventBus.Kafka await PublishAsync(DeadLetterTopicName, eventType, eventData, headers, headersArguments); } - private async Task PublishAsync(string topicName, Type eventType, object eventData, Headers headers, Dictionary headersArguments) + private Task PublishAsync(string topicName, Type eventType, object eventData, Headers headers, Dictionary headersArguments) { var eventName = EventNameAttribute.GetNameOrDefault(eventType); var body = Serializer.Serialize(eventData); + return PublishAsync(topicName, eventName, body, headers, headersArguments); + } + + private async Task PublishAsync(string topicName, string eventName, byte[] body, Headers headers, Dictionary headersArguments) + { var producer = ProducerPool.Get(AbpKafkaEventBusOptions.ConnectionName); SetEventMessageHeaders(headers, headersArguments); diff --git a/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs index d2766eb247..f51cd6821d 100644 --- a/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs @@ -10,9 +10,11 @@ using RabbitMQ.Client.Events; using Volo.Abp.Data; using Volo.Abp.DependencyInjection; using Volo.Abp.EventBus.Distributed; +using Volo.Abp.Guids; using Volo.Abp.MultiTenancy; using Volo.Abp.RabbitMQ; using Volo.Abp.Threading; +using Volo.Abp.Timing; using Volo.Abp.Uow; namespace Volo.Abp.EventBus.RabbitMq @@ -46,13 +48,17 @@ namespace Volo.Abp.EventBus.RabbitMq ICurrentTenant currentTenant, IUnitOfWorkManager unitOfWorkManager, IEventErrorHandler errorHandler, - IOptions abpEventBusOptions) + IOptions abpEventBusOptions, + IGuidGenerator guidGenerator, + IClock clock) : base( serviceScopeFactory, currentTenant, unitOfWorkManager, errorHandler, - distributedEventBusOptions) + distributedEventBusOptions, + guidGenerator, + clock) { ConnectionPool = connectionPool; Serializer = serializer; @@ -198,7 +204,12 @@ namespace Volo.Abp.EventBus.RabbitMq { unitOfWork.AddOrReplaceDistributedEvent(eventRecord); } - + + public override Task PublishRawAsync(Guid eventId, string eventName, byte[] eventData) + { + return PublishAsync(eventName, eventData, null, eventId: eventId); + } + protected override byte[] Serialize(object eventData) { return Serializer.Serialize(eventData); @@ -209,6 +220,16 @@ namespace Volo.Abp.EventBus.RabbitMq var eventName = EventNameAttribute.GetNameOrDefault(eventType); var body = Serializer.Serialize(eventData); + return PublishAsync(eventName, body, properties, headersArguments); + } + + protected Task PublishAsync( + string eventName, + byte[] body, + IBasicProperties properties, + Dictionary headersArguments = null, + Guid? eventId = null) + { using (var channel = ConnectionPool.Get(AbpRabbitMqEventBusOptions.ConnectionName).CreateModel()) { channel.ExchangeDeclare( @@ -221,7 +242,7 @@ namespace Volo.Abp.EventBus.RabbitMq { properties = channel.CreateBasicProperties(); properties.DeliveryMode = RabbitMqConsts.DeliveryModes.Persistent; - properties.MessageId = Guid.NewGuid().ToString("N"); + properties.MessageId = (eventId ?? GuidGenerator.Create()).ToString("N"); } SetEventMessageHeaders(properties, headersArguments); diff --git a/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs index 43017d9c4a..07995d67dd 100644 --- a/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs @@ -8,8 +8,10 @@ using Microsoft.Extensions.Options; using Rebus.Bus; using Volo.Abp.DependencyInjection; using Volo.Abp.EventBus.Distributed; +using Volo.Abp.Guids; using Volo.Abp.MultiTenancy; using Volo.Abp.Threading; +using Volo.Abp.Timing; using Volo.Abp.Uow; namespace Volo.Abp.EventBus.Rebus @@ -34,13 +36,17 @@ namespace Volo.Abp.EventBus.Rebus IOptions abpDistributedEventBusOptions, IOptions abpEventBusRebusOptions, IEventErrorHandler errorHandler, - IRebusSerializer serializer) : + IRebusSerializer serializer, + IGuidGenerator guidGenerator, + IClock clock) : base( serviceScopeFactory, currentTenant, unitOfWorkManager, errorHandler, - abpDistributedEventBusOptions) + abpDistributedEventBusOptions, + guidGenerator, + clock) { Rebus = rebus; Serializer = serializer; @@ -182,6 +188,12 @@ namespace Volo.Abp.EventBus.Rebus return false; } + public override Task PublishRawAsync(Guid eventId, string eventName, byte[] eventData) + { + /* TODO: IMPLEMENT! */ + throw new NotImplementedException(); + } + protected override byte[] Serialize(object eventData) { return Serializer.Serialize(eventData); diff --git a/framework/src/Volo.Abp.EventBus/Volo.Abp.EventBus.csproj b/framework/src/Volo.Abp.EventBus/Volo.Abp.EventBus.csproj index 261b10d8ae..5de5b0893f 100644 --- a/framework/src/Volo.Abp.EventBus/Volo.Abp.EventBus.csproj +++ b/framework/src/Volo.Abp.EventBus/Volo.Abp.EventBus.csproj @@ -16,6 +16,7 @@ + diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/AbpEventBusModule.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/AbpEventBusModule.cs index 329c26c93f..bd86f14703 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/AbpEventBusModule.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/AbpEventBusModule.cs @@ -4,6 +4,7 @@ using System.Collections.Generic; using Volo.Abp.EventBus.Abstractions; using Volo.Abp.EventBus.Distributed; using Volo.Abp.EventBus.Local; +using Volo.Abp.Guids; using Volo.Abp.Json; using Volo.Abp.Modularity; using Volo.Abp.MultiTenancy; @@ -14,7 +15,9 @@ namespace Volo.Abp.EventBus [DependsOn( typeof(AbpEventBusAbstractionsModule), typeof(AbpMultiTenancyModule), - typeof(AbpJsonModule))] + typeof(AbpJsonModule), + typeof(AbpGuidsModule) + )] public class AbpEventBusModule : AbpModule { public override void PreConfigureServices(ServiceConfigurationContext context) diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs index 3d68490ae2..786f58fe47 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/DistributedEventBusBase.cs @@ -1,15 +1,18 @@ using System; -using System.Linq; using System.Threading.Tasks; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Options; +using Volo.Abp.Guids; using Volo.Abp.MultiTenancy; +using Volo.Abp.Timing; using Volo.Abp.Uow; namespace Volo.Abp.EventBus.Distributed { - public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventBus + public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventBus, IRawEventPublisher { + protected IGuidGenerator GuidGenerator { get; } + protected IClock Clock { get; } protected AbpDistributedEventBusOptions AbpDistributedEventBusOptions { get; } protected DistributedEventBusBase( @@ -17,13 +20,17 @@ namespace Volo.Abp.EventBus.Distributed ICurrentTenant currentTenant, IUnitOfWorkManager unitOfWorkManager, IEventErrorHandler errorHandler, - IOptions abpDistributedEventBusOptions + IOptions abpDistributedEventBusOptions, + IGuidGenerator guidGenerator, + IClock clock ) : base( serviceScopeFactory, currentTenant, unitOfWorkManager, errorHandler) { + GuidGenerator = guidGenerator; + Clock = clock; AbpDistributedEventBusOptions = abpDistributedEventBusOptions.Value; } @@ -71,7 +78,9 @@ namespace Volo.Abp.EventBus.Distributed await PublishToEventBusAsync(eventType, eventData); } - + + public abstract Task PublishRawAsync(Guid eventId, string eventName, byte[] eventData); + private async Task AddToOutboxAsync(Type eventType, object eventData) { var unitOfWork = UnitOfWorkManager.Current; @@ -87,8 +96,12 @@ namespace Volo.Abp.EventBus.Distributed var eventOutbox = (IEventOutbox)unitOfWork.ServiceProvider.GetRequiredService(outboxConfig.ImplementationType); var eventName = EventNameAttribute.GetNameOrDefault(eventType); await eventOutbox.EnqueueAsync( - eventName, - Serialize(eventData) + new OutgoingEventInfo( + GuidGenerator.Create(), + eventName, + Serialize(eventData), + Clock.Now + ) ); return true; } diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/IEventOutbox.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/IEventOutbox.cs index ec6d010307..0b50993e8e 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/IEventOutbox.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/IEventOutbox.cs @@ -1,9 +1,15 @@ +using System; +using System.Collections.Generic; using System.Threading.Tasks; namespace Volo.Abp.EventBus.Distributed { public interface IEventOutbox { - Task EnqueueAsync(string eventName, byte[] eventData); + Task EnqueueAsync(OutgoingEventInfo outgoingEvent); + + Task> GetWaitingEventsAsync(int maxCount); + + Task DeleteAsync(Guid id); } } \ No newline at end of file diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/IRawEventPublisher.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/IRawEventPublisher.cs new file mode 100644 index 0000000000..f53eb2b78b --- /dev/null +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/IRawEventPublisher.cs @@ -0,0 +1,13 @@ +using System; +using System.Threading.Tasks; + +namespace Volo.Abp.EventBus.Distributed +{ + public interface IRawEventPublisher + { + Task PublishRawAsync( + Guid eventId, + string eventName, + byte[] eventData); + } +} \ No newline at end of file diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/OutgoingEventInfo.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/OutgoingEventInfo.cs new file mode 100644 index 0000000000..91cf806ba9 --- /dev/null +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/OutgoingEventInfo.cs @@ -0,0 +1,39 @@ +using System; +using Volo.Abp.Data; + +namespace Volo.Abp.EventBus.Distributed +{ + public class OutgoingEventInfo : IHasExtraProperties + { + public static int MaxEventNameLength { get; set; } = 256; + + public ExtraPropertyDictionary ExtraProperties { get; protected set; } + + public Guid Id { get; } + + public string EventName { get; } + + public byte[] EventData { get; } + + public DateTime CreationTime { get; } + + protected OutgoingEventInfo() + { + ExtraProperties = new ExtraPropertyDictionary(); + this.SetDefaultsForExtraProperties(); + } + + public OutgoingEventInfo( + Guid id, + string eventName, + byte[] eventData, + DateTime creationTime) + { + Id = id; + EventName = eventName; + EventData = eventData; + CreationTime = creationTime; + ExtraProperties = new ExtraPropertyDictionary(); + this.SetDefaultsForExtraProperties(); + } + }} \ No newline at end of file diff --git a/test/DistEvents/DistDemoApp/DistDemoApp.csproj b/test/DistEvents/DistDemoApp/DistDemoApp.csproj index 0513a520d1..adfe26ebea 100644 --- a/test/DistEvents/DistDemoApp/DistDemoApp.csproj +++ b/test/DistEvents/DistDemoApp/DistDemoApp.csproj @@ -17,6 +17,7 @@ + diff --git a/test/DistEvents/DistDemoApp/DistDemoAppModule.cs b/test/DistEvents/DistDemoApp/DistDemoAppModule.cs index 93880032d9..42e82f5e46 100644 --- a/test/DistEvents/DistDemoApp/DistDemoAppModule.cs +++ b/test/DistEvents/DistDemoApp/DistDemoAppModule.cs @@ -4,6 +4,7 @@ using Volo.Abp.Domain.Entities.Events.Distributed; using Volo.Abp.EntityFrameworkCore; using Volo.Abp.EntityFrameworkCore.DistributedEvents; using Volo.Abp.EntityFrameworkCore.SqlServer; +using Volo.Abp.EventBus.Boxes; using Volo.Abp.EventBus.Distributed; using Volo.Abp.EventBus.RabbitMq; using Volo.Abp.Modularity; @@ -13,7 +14,8 @@ namespace DistDemoApp [DependsOn( typeof(AbpEntityFrameworkCoreSqlServerModule), typeof(AbpAutofacModule), - typeof(AbpEventBusRabbitMqModule) + typeof(AbpEventBusRabbitMqModule), + typeof(AbpEventBusBoxesModule) )] public class DistDemoAppModule : AbpModule {