From 0cf5d248fdf36b29cdd74bf62446a1f5e60368f0 Mon Sep 17 00:00:00 2001 From: liangshiwei Date: Tue, 28 Sep 2021 15:59:22 +0800 Subject: [PATCH 1/7] Complete the Outbox & Inbox Patterns feature --- .../DistributedEvents/DbContextEventInbox.cs | 19 +++--- .../DistributedEvents/DbContextEventOutbox.cs | 15 ++--- .../EventBus/Boxes/AbpEventBusBoxesOptions.cs | 43 +++++++++++++ .../Volo/Abp/EventBus/Boxes/InboxProcessor.cs | 16 +++-- .../Volo/Abp/EventBus/Boxes/OutboxSender.cs | 16 +++-- ...bitMqSerializer.cs => IRebusSerializer.cs} | 0 .../Rebus/RebusDistributedEventBus.cs | 37 +++++++++-- .../RebusDistributedEventHandlerAdapter.cs | 2 +- .../AbpDistributedEventBusOptions.cs | 13 +++- .../Abp/EventBus/Distributed/IEventInbox.cs | 11 ++-- .../Abp/EventBus/Distributed/IEventOutbox.cs | 9 +-- .../MongoDbContextEventInbox.cs | 60 +++++++++++++----- .../MongoDbContextEventOutbox.cs | 61 ++++++++++++++++--- .../appsettings.json | 2 +- .../DistDemoApp.MongoDbKafka.csproj | 12 ++++ .../DistDemoAppMongoDbKafkaModule.cs | 38 ++++++++++++ .../DistDemoApp.MongoDbKafka/Program.cs | 53 ++++++++++++++-- .../TodoMongoDbContext.cs | 26 ++++++++ .../DistDemoApp.MongoDbKafka/appsettings.json | 19 ++++++ .../DistDemoApp.MongoDbRebus.csproj | 21 +++++++ .../DistDemoAppMongoDbRebusModule.cs | 53 ++++++++++++++++ .../DistDemoApp.MongoDbRebus/Program.cs | 57 +++++++++++++++++ .../TodoMongoDbContext.cs | 26 ++++++++ .../DistDemoApp.MongoDbRebus/appsettings.json | 19 ++++++ test/DistEvents/DistEventsDemo.sln | 6 ++ 25 files changed, 563 insertions(+), 71 deletions(-) create mode 100644 framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/AbpEventBusBoxesOptions.cs rename framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/{IRabbitMqSerializer.cs => IRebusSerializer.cs} (100%) create mode 100644 test/DistEvents/DistDemoApp.MongoDbKafka/DistDemoAppMongoDbKafkaModule.cs create mode 100644 test/DistEvents/DistDemoApp.MongoDbKafka/TodoMongoDbContext.cs create mode 100644 test/DistEvents/DistDemoApp.MongoDbKafka/appsettings.json create mode 100644 test/DistEvents/DistDemoApp.MongoDbRebus/DistDemoApp.MongoDbRebus.csproj create mode 100644 test/DistEvents/DistDemoApp.MongoDbRebus/DistDemoAppMongoDbRebusModule.cs create mode 100644 test/DistEvents/DistDemoApp.MongoDbRebus/Program.cs create mode 100644 test/DistEvents/DistDemoApp.MongoDbRebus/TodoMongoDbContext.cs create mode 100644 test/DistEvents/DistDemoApp.MongoDbRebus/appsettings.json diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs index 2b0a448f74..86fc78cab3 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs @@ -1,26 +1,31 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Threading; using System.Threading.Tasks; using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Options; using Volo.Abp.EventBus.Distributed; using Volo.Abp.Timing; using Volo.Abp.Uow; namespace Volo.Abp.EntityFrameworkCore.DistributedEvents { - public class DbContextEventInbox : IDbContextEventInbox + public class DbContextEventInbox : IDbContextEventInbox where TDbContext : IHasEventInbox { protected IDbContextProvider DbContextProvider { get; } + protected AbpDistributedEventBusOptions DistributedEventsOptions { get; } protected IClock Clock { get; } public DbContextEventInbox( IDbContextProvider dbContextProvider, - IClock clock) + IClock clock, + IOptions distributedEventsOptions) { DbContextProvider = dbContextProvider; Clock = clock; + DistributedEventsOptions = distributedEventsOptions.Value; } [UnitOfWork] @@ -34,7 +39,7 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents } [UnitOfWork] - public virtual async Task> GetWaitingEventsAsync(int maxCount) + public virtual async Task> GetWaitingEventsAsync(int maxCount, CancellationToken cancellationToken = default) { var dbContext = await DbContextProvider.GetDbContextAsync(); @@ -44,8 +49,8 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents .Where(x => !x.Processed) .OrderBy(x => x.CreationTime) .Take(maxCount) - .ToListAsync(); - + .ToListAsync(cancellationToken: cancellationToken); + return outgoingEventRecords .Select(x => x.ToIncomingEventInfo()) .ToList(); @@ -76,11 +81,11 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents { //TODO: Optimize var dbContext = await DbContextProvider.GetDbContextAsync(); - var timeToKeepEvents = Clock.Now.AddHours(-2); //TODO: Config? + var timeToKeepEvents = Clock.Now.Add(DistributedEventsOptions.InboxKeepEventTimeSpan); var oldEvents = await dbContext.IncomingEvents .Where(x => x.Processed && x.CreationTime < timeToKeepEvents) .ToListAsync(); dbContext.IncomingEvents.RemoveRange(oldEvents); } } -} \ No newline at end of file +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventOutbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventOutbox.cs index a785ad7ce5..435bc70398 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventOutbox.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventOutbox.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Threading; using System.Threading.Tasks; using Microsoft.EntityFrameworkCore; using Volo.Abp.EventBus.Distributed; @@ -8,7 +9,7 @@ using Volo.Abp.Uow; namespace Volo.Abp.EntityFrameworkCore.DistributedEvents { - public class DbContextEventOutbox : IDbContextEventOutbox + public class DbContextEventOutbox : IDbContextEventOutbox where TDbContext : IHasEventOutbox { protected IDbContextProvider DbContextProvider { get; } @@ -18,7 +19,7 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents { DbContextProvider = dbContextProvider; } - + [UnitOfWork] public virtual async Task EnqueueAsync(OutgoingEventInfo outgoingEvent) { @@ -29,17 +30,17 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents } [UnitOfWork] - public virtual async Task> GetWaitingEventsAsync(int maxCount) + public virtual async Task> GetWaitingEventsAsync(int maxCount, CancellationToken cancellationToken = default) { var dbContext = (IHasEventOutbox) await DbContextProvider.GetDbContextAsync(); - + var outgoingEventRecords = await dbContext .OutgoingEvents .AsNoTracking() .OrderBy(x => x.CreationTime) .Take(maxCount) - .ToListAsync(); - + .ToListAsync(cancellationToken: cancellationToken); + return outgoingEventRecords .Select(x => x.ToOutgoingEventInfo()) .ToList(); @@ -57,4 +58,4 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents } } } -} \ No newline at end of file +} diff --git a/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/AbpEventBusBoxesOptions.cs b/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/AbpEventBusBoxesOptions.cs new file mode 100644 index 0000000000..da91d672d3 --- /dev/null +++ b/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/AbpEventBusBoxesOptions.cs @@ -0,0 +1,43 @@ +using System; + +namespace Volo.Abp.EventBus.Boxes +{ + public class AbpEventBusBoxesOptions + { + /// + /// Default: 6 hours + /// + public TimeSpan CleanOldEventTimeIntervalSpan { get; set; } + + /// + /// Default: 1000 + /// + public int InboxWaitingEventMaxCount { get; set; } + + /// + /// Default: 1000 + /// + public int OutboxWaitingEventMaxCount { get; set; } + + /// + /// Period time of and + /// Default: 2 seconds + /// + public TimeSpan PeriodTimeSpan { get; set; } + + /// + /// Delay time of and + /// Default: 15 seconds + /// + public TimeSpan DelayTimeSpan { get; set; } + + public AbpEventBusBoxesOptions() + { + CleanOldEventTimeIntervalSpan = TimeSpan.FromHours(6); + InboxWaitingEventMaxCount = 1000; + OutboxWaitingEventMaxCount = 1000; + PeriodTimeSpan = TimeSpan.FromSeconds(2); + DelayTimeSpan = TimeSpan.FromSeconds(15); + } + } +} diff --git a/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/InboxProcessor.cs b/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/InboxProcessor.cs index e711e5798e..29cca3d624 100644 --- a/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/InboxProcessor.cs +++ b/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/InboxProcessor.cs @@ -5,6 +5,7 @@ using Medallion.Threading; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.Extensions.Options; using Volo.Abp.DependencyInjection; using Volo.Abp.EventBus.Distributed; using Volo.Abp.Threading; @@ -23,6 +24,7 @@ namespace Volo.Abp.EventBus.Boxes protected IClock Clock { get; } protected IEventInbox Inbox { get; private set; } protected InboxConfig InboxConfig { get; private set; } + protected AbpEventBusBoxesOptions EventBusBoxesOptions { get; } protected DateTime? LastCleanTime { get; set; } @@ -37,7 +39,8 @@ namespace Volo.Abp.EventBus.Boxes IDistributedEventBus distributedEventBus, IDistributedLockProvider distributedLockProvider, IUnitOfWorkManager unitOfWorkManager, - IClock clock) + IClock clock, + IOptions eventBusBoxesOptions) { ServiceProvider = serviceProvider; Timer = timer; @@ -45,7 +48,8 @@ namespace Volo.Abp.EventBus.Boxes DistributedLockProvider = distributedLockProvider; UnitOfWorkManager = unitOfWorkManager; Clock = clock; - Timer.Period = 2000; //TODO: Config? + EventBusBoxesOptions = eventBusBoxesOptions.Value; + Timer.Period = EventBusBoxesOptions.PeriodTimeSpan.Seconds; Timer.Elapsed += TimerOnElapsed; Logger = NullLogger.Instance; StoppingTokenSource = new CancellationTokenSource(); @@ -79,7 +83,7 @@ namespace Volo.Abp.EventBus.Boxes { return; } - + await using (var handle = await DistributedLockProvider.TryAcquireLockAsync(DistributedLockName, cancellationToken: StoppingToken)) { if (handle != null) @@ -88,7 +92,7 @@ namespace Volo.Abp.EventBus.Boxes while (true) { - var waitingEvents = await Inbox.GetWaitingEventsAsync(1000); //TODO: Config? Pass StoppingToken! + var waitingEvents = await Inbox.GetWaitingEventsAsync(EventBusBoxesOptions.InboxWaitingEventMaxCount, StoppingToken); if (waitingEvents.Count <= 0) { break; @@ -116,14 +120,14 @@ namespace Volo.Abp.EventBus.Boxes else { Logger.LogDebug("Could not obtain the distributed lock: " + DistributedLockName); - await TaskDelayHelper.DelayAsync(15000, StoppingToken); //TODO: Config? + await TaskDelayHelper.DelayAsync(EventBusBoxesOptions.DelayTimeSpan.Milliseconds, StoppingToken); } } } protected virtual async Task DeleteOldEventsAsync() { - if (LastCleanTime != null && LastCleanTime > Clock.Now.AddHours(6)) //TODO: Config? + if (LastCleanTime != null && LastCleanTime > Clock.Now.Add(EventBusBoxesOptions.CleanOldEventTimeIntervalSpan)) { return; } diff --git a/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/OutboxSender.cs b/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/OutboxSender.cs index 1be2374268..e8a05c60ba 100644 --- a/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/OutboxSender.cs +++ b/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/OutboxSender.cs @@ -5,6 +5,7 @@ using Medallion.Threading; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.Extensions.Options; using Volo.Abp.DependencyInjection; using Volo.Abp.EventBus.Distributed; using Volo.Abp.Threading; @@ -19,9 +20,10 @@ namespace Volo.Abp.EventBus.Boxes protected IDistributedLockProvider DistributedLockProvider { get; } protected IEventOutbox Outbox { get; private set; } protected OutboxConfig OutboxConfig { get; private set; } + protected AbpEventBusBoxesOptions EventBusBoxesOptions { get; } protected string DistributedLockName => "Outbox_" + OutboxConfig.Name; public ILogger Logger { get; set; } - + protected CancellationTokenSource StoppingTokenSource { get; } protected CancellationToken StoppingToken { get; } @@ -29,13 +31,15 @@ namespace Volo.Abp.EventBus.Boxes IServiceProvider serviceProvider, AbpAsyncTimer timer, IDistributedEventBus distributedEventBus, - IDistributedLockProvider distributedLockProvider) + IDistributedLockProvider distributedLockProvider, + IOptions eventBusBoxesOptions) { ServiceProvider = serviceProvider; Timer = timer; DistributedEventBus = distributedEventBus; DistributedLockProvider = distributedLockProvider; - Timer.Period = 2000; //TODO: Config? + EventBusBoxesOptions = eventBusBoxesOptions.Value; + Timer.Period = EventBusBoxesOptions.PeriodTimeSpan.Seconds; Timer.Elapsed += TimerOnElapsed; Logger = NullLogger.Instance; StoppingTokenSource = new CancellationTokenSource(); @@ -65,13 +69,13 @@ namespace Volo.Abp.EventBus.Boxes protected virtual async Task RunAsync() { - await using (var handle = await DistributedLockProvider.TryAcquireLockAsync(DistributedLockName)) + await using (var handle = await DistributedLockProvider.TryAcquireLockAsync(DistributedLockName, cancellationToken: StoppingToken)) { if (handle != null) { while (true) { - var waitingEvents = await Outbox.GetWaitingEventsAsync(1000); //TODO: Config? + var waitingEvents = await Outbox.GetWaitingEventsAsync(EventBusBoxesOptions.OutboxWaitingEventMaxCount, StoppingToken); if (waitingEvents.Count <= 0) { break; @@ -96,7 +100,7 @@ namespace Volo.Abp.EventBus.Boxes else { Logger.LogDebug("Could not obtain the distributed lock: " + DistributedLockName); - await TaskDelayHelper.DelayAsync(15000, StoppingToken); //TODO: Config? + await TaskDelayHelper.DelayAsync(EventBusBoxesOptions.DelayTimeSpan.Milliseconds, StoppingToken); } } } diff --git a/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/IRabbitMqSerializer.cs b/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/IRebusSerializer.cs similarity index 100% rename from framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/IRabbitMqSerializer.cs rename to framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/IRebusSerializer.cs diff --git a/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs index 8dac885036..1cb380679f 100644 --- a/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs @@ -6,6 +6,7 @@ using System.Threading.Tasks; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Options; using Rebus.Bus; +using Rebus.Pipeline; using Volo.Abp.DependencyInjection; using Volo.Abp.EventBus.Distributed; using Volo.Abp.Guids; @@ -134,6 +135,19 @@ namespace Volo.Abp.EventBus.Rebus Rebus.Unsubscribe(eventType); } + public async Task ProcessEventAsync(Type eventType, object eventData) + { + var messageId = MessageContext.Current.TransportMessage.GetMessageId(); + var eventName = EventNameAttribute.GetNameOrDefault(eventType); + + if (await AddToInboxAsync(messageId, eventName, eventType, MessageContext.Current.TransportMessage.Body)) + { + return; + } + + await TriggerHandlersAsync(eventType, eventData); + } + protected override async Task PublishToEventBusAsync(Type eventType, object eventData) { await AbpRebusEventBusOptions.Publish(Rebus, eventType, eventData); @@ -192,16 +206,29 @@ namespace Volo.Abp.EventBus.Rebus OutgoingEventInfo outgoingEvent, OutboxConfig outboxConfig) { - /* TODO: IMPLEMENT! */ - throw new NotImplementedException(); + var eventType = EventTypes.GetOrDefault(outgoingEvent.EventName); + var eventData = Serializer.Deserialize(outgoingEvent.EventData, eventType); + + return PublishToEventBusAsync(eventType, eventData); } - public override Task ProcessFromInboxAsync( + public override async Task ProcessFromInboxAsync( IncomingEventInfo incomingEvent, InboxConfig inboxConfig) { - /* TODO: IMPLEMENT! */ - throw new NotImplementedException(); + var eventType = EventTypes.GetOrDefault(incomingEvent.EventName); + if (eventType == null) + { + return; + } + + var eventData = Serializer.Deserialize(incomingEvent.EventData, eventType); + var exceptions = new List(); + await TriggerHandlersAsync(eventType, eventData, exceptions, inboxConfig); + if (exceptions.Any()) + { + ThrowOriginalExceptions(eventType, exceptions); + } } protected override byte[] Serialize(object eventData) diff --git a/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventHandlerAdapter.cs b/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventHandlerAdapter.cs index 005226b885..61330828b4 100644 --- a/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventHandlerAdapter.cs +++ b/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventHandlerAdapter.cs @@ -14,7 +14,7 @@ namespace Volo.Abp.EventBus.Rebus public async Task Handle(TEventData message) { - await RebusDistributedEventBus.TriggerHandlersAsync(typeof(TEventData), message); + await RebusDistributedEventBus.ProcessEventAsync(message.GetType(), message); } } } diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/AbpDistributedEventBusOptions.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/AbpDistributedEventBusOptions.cs index 896dfd4f7d..5b71542665 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/AbpDistributedEventBusOptions.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/AbpDistributedEventBusOptions.cs @@ -1,3 +1,4 @@ +using System; using Volo.Abp.Collections; namespace Volo.Abp.EventBus.Distributed @@ -5,16 +6,22 @@ namespace Volo.Abp.EventBus.Distributed public class AbpDistributedEventBusOptions { public ITypeList Handlers { get; } - + public OutboxConfigDictionary Outboxes { get; } - + public InboxConfigDictionary Inboxes { get; } + /// + /// Default: -2 hours + /// + public TimeSpan InboxKeepEventTimeSpan { get; set; } + public AbpDistributedEventBusOptions() { Handlers = new TypeList(); Outboxes = new OutboxConfigDictionary(); Inboxes = new InboxConfigDictionary(); + InboxKeepEventTimeSpan = TimeSpan.FromHours(-2); } } -} \ No newline at end of file +} diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/IEventInbox.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/IEventInbox.cs index bee802a126..d072f5d922 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/IEventInbox.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/IEventInbox.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Threading; using System.Threading.Tasks; namespace Volo.Abp.EventBus.Distributed @@ -7,13 +8,13 @@ namespace Volo.Abp.EventBus.Distributed public interface IEventInbox { Task EnqueueAsync(IncomingEventInfo incomingEvent); - - Task> GetWaitingEventsAsync(int maxCount); - + + Task> GetWaitingEventsAsync(int maxCount, CancellationToken cancellationToken = default); + Task MarkAsProcessedAsync(Guid id); - + Task ExistsByMessageIdAsync(string messageId); Task DeleteOldEventsAsync(); } -} \ No newline at end of file +} diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/IEventOutbox.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/IEventOutbox.cs index 0b50993e8e..cc00ee6654 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/IEventOutbox.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/IEventOutbox.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Threading; using System.Threading.Tasks; namespace Volo.Abp.EventBus.Distributed @@ -7,9 +8,9 @@ namespace Volo.Abp.EventBus.Distributed public interface IEventOutbox { Task EnqueueAsync(OutgoingEventInfo outgoingEvent); - - Task> GetWaitingEventsAsync(int maxCount); - + + Task> GetWaitingEventsAsync(int maxCount, CancellationToken cancellationToken = default); + Task DeleteAsync(Guid id); } -} \ No newline at end of file +} diff --git a/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventInbox.cs b/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventInbox.cs index d98e144ada..886a7744d1 100644 --- a/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventInbox.cs +++ b/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventInbox.cs @@ -1,7 +1,9 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Threading; using System.Threading.Tasks; +using Microsoft.Extensions.Options; using MongoDB.Driver; using MongoDB.Driver.Linq; using Volo.Abp.EventBus.Distributed; @@ -10,21 +12,24 @@ using Volo.Abp.Uow; namespace Volo.Abp.MongoDB.DistributedEvents { - public class MongoDbContextEventInbox : IMongoDbContextEventInbox + public class MongoDbContextEventInbox : IMongoDbContextEventInbox where TMongoDbContext : IHasEventInbox { protected IMongoDbContextProvider DbContextProvider { get; } + protected AbpDistributedEventBusOptions DistributedEventsOptions { get; } protected IClock Clock { get; } - + public MongoDbContextEventInbox( IMongoDbContextProvider dbContextProvider, - IClock clock) + IClock clock, + IOptions distributedEventsOptions) { DbContextProvider = dbContextProvider; Clock = clock; + DistributedEventsOptions = distributedEventsOptions.Value; } - + [UnitOfWork] public virtual async Task EnqueueAsync(IncomingEventInfo incomingEvent) { @@ -45,9 +50,9 @@ namespace Volo.Abp.MongoDB.DistributedEvents } [UnitOfWork] - public virtual async Task> GetWaitingEventsAsync(int maxCount) + public virtual async Task> GetWaitingEventsAsync(int maxCount, CancellationToken cancellationToken = default) { - var dbContext = await DbContextProvider.GetDbContextAsync(); + var dbContext = await DbContextProvider.GetDbContextAsync(cancellationToken); var outgoingEventRecords = await dbContext .IncomingEvents @@ -55,8 +60,8 @@ namespace Volo.Abp.MongoDB.DistributedEvents .Where(x => !x.Processed) .OrderBy(x => x.CreationTime) .Take(maxCount) - .ToListAsync(); - + .ToListAsync(cancellationToken: cancellationToken); + return outgoingEventRecords .Select(x => x.ToIncomingEventInfo()) .ToList(); @@ -65,17 +70,44 @@ namespace Volo.Abp.MongoDB.DistributedEvents [UnitOfWork] public async Task MarkAsProcessedAsync(Guid id) { - throw new NotImplementedException(); + var dbContext = await DbContextProvider.GetDbContextAsync(); + var incomingEvent = await dbContext.IncomingEvents.Find(x => x.Id.Equals(id)).FirstOrDefaultAsync(); + if (incomingEvent != null) + { + incomingEvent.MarkAsProcessed(Clock.Now); + + if (dbContext.SessionHandle != null) + { + await dbContext.IncomingEvents.ReplaceOneAsync(dbContext.SessionHandle, Builders.Filter.Eq(e => e.Id, incomingEvent.Id), incomingEvent); + } + else + { + await dbContext.IncomingEvents.ReplaceOneAsync(Builders.Filter.Eq(e => e.Id, incomingEvent.Id), incomingEvent); + } + } } - public Task ExistsByMessageIdAsync(string messageId) + [UnitOfWork] + public async Task ExistsByMessageIdAsync(string messageId) { - throw new NotImplementedException(); + var dbContext = await DbContextProvider.GetDbContextAsync(); + return await dbContext.IncomingEvents.AsQueryable().AnyAsync(x => x.MessageId == messageId); } - public Task DeleteOldEventsAsync() + [UnitOfWork] + public async Task DeleteOldEventsAsync() { - throw new NotImplementedException(); + var dbContext = await DbContextProvider.GetDbContextAsync(); + var timeToKeepEvents = Clock.Now.Add(DistributedEventsOptions.InboxKeepEventTimeSpan); + + if (dbContext.SessionHandle != null) + { + await dbContext.IncomingEvents.DeleteManyAsync(dbContext.SessionHandle, x => x.Processed && x.CreationTime < timeToKeepEvents); + } + else + { + await dbContext.IncomingEvents.DeleteManyAsync(x => x.Processed && x.CreationTime < timeToKeepEvents); + } } } -} \ No newline at end of file +} diff --git a/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventOutbox.cs b/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventOutbox.cs index 1bec9e0834..df8aa108ee 100644 --- a/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventOutbox.cs +++ b/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventOutbox.cs @@ -1,27 +1,72 @@ using System; using System.Collections.Generic; +using System.Linq; +using System.Threading; using System.Threading.Tasks; +using MongoDB.Driver; +using MongoDB.Driver.Linq; using Volo.Abp.EventBus.Distributed; using Volo.Abp.Uow; namespace Volo.Abp.MongoDB.DistributedEvents { - public class MongoDbContextEventOutbox : IMongoDbContextEventOutbox + public class MongoDbContextEventOutbox : IMongoDbContextEventOutbox where TMongoDbContext : IHasEventOutbox { - public Task EnqueueAsync(OutgoingEventInfo outgoingEvent) + protected IMongoDbContextProvider MongoDbContextProvider { get; } + + public MongoDbContextEventOutbox(IMongoDbContextProvider mongoDbContextProvider) + { + MongoDbContextProvider = mongoDbContextProvider; + } + + [UnitOfWork] + public async Task EnqueueAsync(OutgoingEventInfo outgoingEvent) { - throw new NotImplementedException(); + var dbContext = (IHasEventOutbox) await MongoDbContextProvider.GetDbContextAsync(); + if (dbContext.SessionHandle != null) + { + await dbContext.OutgoingEvents.InsertOneAsync( + dbContext.SessionHandle, + new OutgoingEventRecord(outgoingEvent) + ); + } + else + { + await dbContext.OutgoingEvents.InsertOneAsync( + new OutgoingEventRecord(outgoingEvent) + ); + } } - public Task> GetWaitingEventsAsync(int maxCount) + [UnitOfWork] + public async Task> GetWaitingEventsAsync(int maxCount, CancellationToken cancellationToken = default) { - throw new NotImplementedException(); + var dbContext = (IHasEventOutbox) await MongoDbContextProvider.GetDbContextAsync(cancellationToken); + + var outgoingEventRecords = await dbContext + .OutgoingEvents.AsQueryable() + .OrderBy(x => x.CreationTime) + .Take(maxCount) + .ToListAsync(cancellationToken: cancellationToken); + + return outgoingEventRecords + .Select(x => x.ToOutgoingEventInfo()) + .ToList(); } - public Task DeleteAsync(Guid id) + [UnitOfWork] + public async Task DeleteAsync(Guid id) { - throw new NotImplementedException(); + var dbContext = (IHasEventOutbox) await MongoDbContextProvider.GetDbContextAsync(); + if (dbContext.SessionHandle != null) + { + await dbContext.OutgoingEvents.DeleteOneAsync(dbContext.SessionHandle, x => x.Id.Equals(id)); + } + else + { + await dbContext.OutgoingEvents.DeleteOneAsync(x => x.Id.Equals(id)); + } } } -} \ No newline at end of file +} diff --git a/test/DistEvents/DistDemoApp.EfCoreRabbitMq/appsettings.json b/test/DistEvents/DistDemoApp.EfCoreRabbitMq/appsettings.json index 1ed30c80a9..5359c021b7 100644 --- a/test/DistEvents/DistDemoApp.EfCoreRabbitMq/appsettings.json +++ b/test/DistEvents/DistDemoApp.EfCoreRabbitMq/appsettings.json @@ -16,4 +16,4 @@ "Redis": { "Configuration": "127.0.0.1" } -} \ No newline at end of file +} diff --git a/test/DistEvents/DistDemoApp.MongoDbKafka/DistDemoApp.MongoDbKafka.csproj b/test/DistEvents/DistDemoApp.MongoDbKafka/DistDemoApp.MongoDbKafka.csproj index cfd73d51bb..a503c6f6c0 100644 --- a/test/DistEvents/DistDemoApp.MongoDbKafka/DistDemoApp.MongoDbKafka.csproj +++ b/test/DistEvents/DistDemoApp.MongoDbKafka/DistDemoApp.MongoDbKafka.csproj @@ -6,4 +6,16 @@ DistDemoApp + + + + + + + + + Always + + + diff --git a/test/DistEvents/DistDemoApp.MongoDbKafka/DistDemoAppMongoDbKafkaModule.cs b/test/DistEvents/DistDemoApp.MongoDbKafka/DistDemoAppMongoDbKafkaModule.cs new file mode 100644 index 0000000000..b2e41b6ca7 --- /dev/null +++ b/test/DistEvents/DistDemoApp.MongoDbKafka/DistDemoAppMongoDbKafkaModule.cs @@ -0,0 +1,38 @@ +using Microsoft.Extensions.DependencyInjection; +using Volo.Abp.EventBus.Distributed; +using Volo.Abp.EventBus.Kafka; +using Volo.Abp.Modularity; +using Volo.Abp.MongoDB; +using Volo.Abp.MongoDB.DistributedEvents; + +namespace DistDemoApp +{ + [DependsOn( + typeof(AbpMongoDbModule), + typeof(AbpEventBusKafkaModule), + typeof(DistDemoAppSharedModule) + )] + public class DistDemoAppMongoDbKafkaModule : AbpModule + { + public override void ConfigureServices(ServiceConfigurationContext context) + { + context.Services.AddMongoDbContext(options => + { + options.AddDefaultRepositories(); + }); + + Configure(options => + { + options.Outboxes.Configure(config => + { + config.UseMongoDbContext(); + }); + + options.Inboxes.Configure(config => + { + config.UseMongoDbContext(); + }); + }); + } + } +} diff --git a/test/DistEvents/DistDemoApp.MongoDbKafka/Program.cs b/test/DistEvents/DistDemoApp.MongoDbKafka/Program.cs index d0e4cdf4ca..b048c17389 100644 --- a/test/DistEvents/DistDemoApp.MongoDbKafka/Program.cs +++ b/test/DistEvents/DistDemoApp.MongoDbKafka/Program.cs @@ -1,12 +1,57 @@ using System; +using System.Threading.Tasks; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Serilog; +using Serilog.Events; namespace DistDemoApp { - class Program + public class Program { - static void Main(string[] args) + public static async Task Main(string[] args) { - Console.WriteLine("Hello World!"); + Log.Logger = new LoggerConfiguration() +#if DEBUG + .MinimumLevel.Debug() +#else + .MinimumLevel.Information() +#endif + .MinimumLevel.Override("Microsoft", LogEventLevel.Warning) + .Enrich.FromLogContext() + .WriteTo.Async(c => c.File("Logs/logs.txt")) + .WriteTo.Async(c => c.Console()) + .CreateLogger(); + + try + { + Log.Information("Starting console host."); + await CreateHostBuilder(args).RunConsoleAsync(); + return 0; + } + catch (Exception ex) + { + Log.Fatal(ex, "Host terminated unexpectedly!"); + return 1; + } + finally + { + Log.CloseAndFlush(); + } + } + + internal static IHostBuilder CreateHostBuilder(string[] args) => + Host.CreateDefaultBuilder(args) + .UseAutofac() + .UseSerilog() + .ConfigureAppConfiguration((context, config) => + { + //setup your additional configuration sources + }) + .ConfigureServices((hostContext, services) => + { + services.AddApplication(); + }); } -} \ No newline at end of file +} diff --git a/test/DistEvents/DistDemoApp.MongoDbKafka/TodoMongoDbContext.cs b/test/DistEvents/DistDemoApp.MongoDbKafka/TodoMongoDbContext.cs new file mode 100644 index 0000000000..a7f1b78f86 --- /dev/null +++ b/test/DistEvents/DistDemoApp.MongoDbKafka/TodoMongoDbContext.cs @@ -0,0 +1,26 @@ +using MongoDB.Driver; +using Volo.Abp.Data; +using Volo.Abp.MongoDB; +using Volo.Abp.MongoDB.DistributedEvents; + +namespace DistDemoApp +{ + [ConnectionStringName("Default")] + public class TodoMongoDbContext : AbpMongoDbContext, IHasEventOutbox, IHasEventInbox + { + public IMongoCollection TodoItems => Collection(); + public IMongoCollection TodoSummaries => Collection(); + + public IMongoCollection OutgoingEvents + { + get => Collection(); + set {} + } + public IMongoCollection IncomingEvents + { + get => Collection(); + set {} + } + } + +} diff --git a/test/DistEvents/DistDemoApp.MongoDbKafka/appsettings.json b/test/DistEvents/DistDemoApp.MongoDbKafka/appsettings.json new file mode 100644 index 0000000000..d8c528fe87 --- /dev/null +++ b/test/DistEvents/DistDemoApp.MongoDbKafka/appsettings.json @@ -0,0 +1,19 @@ +{ + "ConnectionStrings": { + "Default": "mongodb://localhost:27018,localhost:27019,localhost:27020/DistEventsDemo" + }, + "Kafka": { + "Connections": { + "Default": { + "BootstrapServers": "localhost:9092" + } + }, + "EventBus": { + "GroupId": "DistDemoApp", + "TopicName": "DistDemoTopic" + } + }, + "Redis": { + "Configuration": "127.0.0.1" + } +} diff --git a/test/DistEvents/DistDemoApp.MongoDbRebus/DistDemoApp.MongoDbRebus.csproj b/test/DistEvents/DistDemoApp.MongoDbRebus/DistDemoApp.MongoDbRebus.csproj new file mode 100644 index 0000000000..c102adb382 --- /dev/null +++ b/test/DistEvents/DistDemoApp.MongoDbRebus/DistDemoApp.MongoDbRebus.csproj @@ -0,0 +1,21 @@ + + + + Exe + net5.0 + DistDemoApp + + + + + + + + + + + Always + + + + diff --git a/test/DistEvents/DistDemoApp.MongoDbRebus/DistDemoAppMongoDbRebusModule.cs b/test/DistEvents/DistDemoApp.MongoDbRebus/DistDemoAppMongoDbRebusModule.cs new file mode 100644 index 0000000000..21dab7b9b5 --- /dev/null +++ b/test/DistEvents/DistDemoApp.MongoDbRebus/DistDemoAppMongoDbRebusModule.cs @@ -0,0 +1,53 @@ +using Microsoft.Extensions.DependencyInjection; +using Rebus.Persistence.InMem; +using Rebus.Transport.InMem; +using Volo.Abp.EventBus.Distributed; +using Volo.Abp.EventBus.Rebus; +using Volo.Abp.Modularity; +using Volo.Abp.MongoDB; +using Volo.Abp.MongoDB.DistributedEvents; + +namespace DistDemoApp +{ + [DependsOn( + typeof(AbpMongoDbModule), + typeof(AbpEventBusRebusModule), + typeof(DistDemoAppSharedModule) + )] + public class DistDemoAppMongoDbRebusModule : AbpModule + { + public override void PreConfigureServices(ServiceConfigurationContext context) + { + PreConfigure(options => + { + options.InputQueueName = "eventbus"; + options.Configurer = rebusConfigurer => + { + rebusConfigurer.Transport(t => t.UseInMemoryTransport(new InMemNetwork(), "eventbus")); + rebusConfigurer.Subscriptions(s => s.StoreInMemory()); + }; + }); + } + + public override void ConfigureServices(ServiceConfigurationContext context) + { + context.Services.AddMongoDbContext(options => + { + options.AddDefaultRepositories(); + }); + + Configure(options => + { + options.Outboxes.Configure(config => + { + config.UseMongoDbContext(); + }); + + options.Inboxes.Configure(config => + { + config.UseMongoDbContext(); + }); + }); + } + } +} diff --git a/test/DistEvents/DistDemoApp.MongoDbRebus/Program.cs b/test/DistEvents/DistDemoApp.MongoDbRebus/Program.cs new file mode 100644 index 0000000000..a79ad1b1bf --- /dev/null +++ b/test/DistEvents/DistDemoApp.MongoDbRebus/Program.cs @@ -0,0 +1,57 @@ +using System; +using System.Threading.Tasks; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Serilog; +using Serilog.Events; + +namespace DistDemoApp +{ + public class Program + { + public static async Task Main(string[] args) + { + Log.Logger = new LoggerConfiguration() +#if DEBUG + .MinimumLevel.Debug() +#else + .MinimumLevel.Information() +#endif + .MinimumLevel.Override("Microsoft", LogEventLevel.Warning) + .Enrich.FromLogContext() + .WriteTo.Async(c => c.File("Logs/logs.txt")) + .WriteTo.Async(c => c.Console()) + .CreateLogger(); + + try + { + Log.Information("Starting console host."); + await CreateHostBuilder(args).RunConsoleAsync(); + return 0; + } + catch (Exception ex) + { + Log.Fatal(ex, "Host terminated unexpectedly!"); + return 1; + } + finally + { + Log.CloseAndFlush(); + } + + } + + internal static IHostBuilder CreateHostBuilder(string[] args) => + Host.CreateDefaultBuilder(args) + .UseAutofac() + .UseSerilog() + .ConfigureAppConfiguration((context, config) => + { + //setup your additional configuration sources + }) + .ConfigureServices((hostContext, services) => + { + services.AddApplication(); + }); + } +} diff --git a/test/DistEvents/DistDemoApp.MongoDbRebus/TodoMongoDbContext.cs b/test/DistEvents/DistDemoApp.MongoDbRebus/TodoMongoDbContext.cs new file mode 100644 index 0000000000..a7f1b78f86 --- /dev/null +++ b/test/DistEvents/DistDemoApp.MongoDbRebus/TodoMongoDbContext.cs @@ -0,0 +1,26 @@ +using MongoDB.Driver; +using Volo.Abp.Data; +using Volo.Abp.MongoDB; +using Volo.Abp.MongoDB.DistributedEvents; + +namespace DistDemoApp +{ + [ConnectionStringName("Default")] + public class TodoMongoDbContext : AbpMongoDbContext, IHasEventOutbox, IHasEventInbox + { + public IMongoCollection TodoItems => Collection(); + public IMongoCollection TodoSummaries => Collection(); + + public IMongoCollection OutgoingEvents + { + get => Collection(); + set {} + } + public IMongoCollection IncomingEvents + { + get => Collection(); + set {} + } + } + +} diff --git a/test/DistEvents/DistDemoApp.MongoDbRebus/appsettings.json b/test/DistEvents/DistDemoApp.MongoDbRebus/appsettings.json new file mode 100644 index 0000000000..d8c528fe87 --- /dev/null +++ b/test/DistEvents/DistDemoApp.MongoDbRebus/appsettings.json @@ -0,0 +1,19 @@ +{ + "ConnectionStrings": { + "Default": "mongodb://localhost:27018,localhost:27019,localhost:27020/DistEventsDemo" + }, + "Kafka": { + "Connections": { + "Default": { + "BootstrapServers": "localhost:9092" + } + }, + "EventBus": { + "GroupId": "DistDemoApp", + "TopicName": "DistDemoTopic" + } + }, + "Redis": { + "Configuration": "127.0.0.1" + } +} diff --git a/test/DistEvents/DistEventsDemo.sln b/test/DistEvents/DistEventsDemo.sln index 4e53ba6367..e6c3348a5f 100644 --- a/test/DistEvents/DistEventsDemo.sln +++ b/test/DistEvents/DistEventsDemo.sln @@ -6,6 +6,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DistDemoApp.MongoDbKafka", EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DistDemoApp.Shared", "DistDemoApp.Shared\DistDemoApp.Shared.csproj", "{C515F4E2-0ED3-4561-BC58-FC633B50E2EB}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DistDemoApp.MongoDbRebus", "DistDemoApp.MongoDbRebus\DistDemoApp.MongoDbRebus.csproj", "{4FB63540-4CC5-4A7B-900B-F5FCD907456E}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -24,5 +26,9 @@ Global {C515F4E2-0ED3-4561-BC58-FC633B50E2EB}.Debug|Any CPU.Build.0 = Debug|Any CPU {C515F4E2-0ED3-4561-BC58-FC633B50E2EB}.Release|Any CPU.ActiveCfg = Release|Any CPU {C515F4E2-0ED3-4561-BC58-FC633B50E2EB}.Release|Any CPU.Build.0 = Release|Any CPU + {4FB63540-4CC5-4A7B-900B-F5FCD907456E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {4FB63540-4CC5-4A7B-900B-F5FCD907456E}.Debug|Any CPU.Build.0 = Debug|Any CPU + {4FB63540-4CC5-4A7B-900B-F5FCD907456E}.Release|Any CPU.ActiveCfg = Release|Any CPU + {4FB63540-4CC5-4A7B-900B-F5FCD907456E}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection EndGlobal From f0f61884b76a1c6d3e44c194912bf8e052a1be15 Mon Sep 17 00:00:00 2001 From: liangshiwei Date: Tue, 28 Sep 2021 23:43:48 +0800 Subject: [PATCH 2/7] Improve performance --- .../DistributedEvents/DbContextEventInbox.cs | 26 ++++++++--------- .../DistributedEvents/DbContextEventOutbox.cs | 10 +++---- .../MongoDbContextEventInbox.cs | 28 +++++++++---------- .../MongoDbContextEventOutbox.cs | 6 ++-- 4 files changed, 31 insertions(+), 39 deletions(-) diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs index 86fc78cab3..5bee24d754 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs @@ -57,35 +57,31 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents } [UnitOfWork] - public async Task MarkAsProcessedAsync(Guid id) + public virtual async Task MarkAsProcessedAsync(Guid id) { - //TODO: Optimize? var dbContext = await DbContextProvider.GetDbContextAsync(); - var incomingEvent = await dbContext.IncomingEvents.FindAsync(id); - if (incomingEvent != null) - { - incomingEvent.MarkAsProcessed(Clock.Now); - } + var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName(); + + var sql = $"UPDATE {tableName} SET Processed = 1, ProcessedTime = '{Clock.Now}' WHERE Id = '{id}'"; + await dbContext.Database.ExecuteSqlRawAsync(sql); } [UnitOfWork] - public async Task ExistsByMessageIdAsync(string messageId) + public virtual async Task ExistsByMessageIdAsync(string messageId) { - //TODO: Optimize var dbContext = await DbContextProvider.GetDbContextAsync(); return await dbContext.IncomingEvents.AnyAsync(x => x.MessageId == messageId); } [UnitOfWork] - public async Task DeleteOldEventsAsync() + public virtual async Task DeleteOldEventsAsync() { - //TODO: Optimize var dbContext = await DbContextProvider.GetDbContextAsync(); + var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName(); var timeToKeepEvents = Clock.Now.Add(DistributedEventsOptions.InboxKeepEventTimeSpan); - var oldEvents = await dbContext.IncomingEvents - .Where(x => x.Processed && x.CreationTime < timeToKeepEvents) - .ToListAsync(); - dbContext.IncomingEvents.RemoveRange(oldEvents); + + var sql = $"DELETE FROM {tableName} WHERE Processed = 1 AND CreationTime < '{timeToKeepEvents}'"; + await dbContext.Database.ExecuteSqlRawAsync(sql); } } } diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventOutbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventOutbox.cs index 435bc70398..f0b4d3b98c 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventOutbox.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventOutbox.cs @@ -49,13 +49,11 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents [UnitOfWork] public virtual async Task DeleteAsync(Guid id) { - //TODO: Optimize? var dbContext = (IHasEventOutbox) await DbContextProvider.GetDbContextAsync(); - var outgoingEvent = await dbContext.OutgoingEvents.FindAsync(id); - if (outgoingEvent != null) - { - dbContext.Remove(outgoingEvent); - } + var tableName = dbContext.OutgoingEvents.EntityType.GetSchemaQualifiedTableName(); + + var sql = $"DELETE FROM {tableName} WHERE Id = '{id}'"; + await dbContext.Database.ExecuteSqlRawAsync(sql); } } } diff --git a/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventInbox.cs b/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventInbox.cs index 886a7744d1..62927d2279 100644 --- a/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventInbox.cs +++ b/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventInbox.cs @@ -68,34 +68,32 @@ namespace Volo.Abp.MongoDB.DistributedEvents } [UnitOfWork] - public async Task MarkAsProcessedAsync(Guid id) + public virtual async Task MarkAsProcessedAsync(Guid id) { var dbContext = await DbContextProvider.GetDbContextAsync(); - var incomingEvent = await dbContext.IncomingEvents.Find(x => x.Id.Equals(id)).FirstOrDefaultAsync(); - if (incomingEvent != null) - { - incomingEvent.MarkAsProcessed(Clock.Now); - if (dbContext.SessionHandle != null) - { - await dbContext.IncomingEvents.ReplaceOneAsync(dbContext.SessionHandle, Builders.Filter.Eq(e => e.Id, incomingEvent.Id), incomingEvent); - } - else - { - await dbContext.IncomingEvents.ReplaceOneAsync(Builders.Filter.Eq(e => e.Id, incomingEvent.Id), incomingEvent); - } + var filter = Builders.Filter.Eq(x => x.Id, id); + var update = Builders.Update.Set(x => x.Processed, true).Set(x => x.ProcessedTime, Clock.Now); + + if (dbContext.SessionHandle != null) + { + await dbContext.IncomingEvents.UpdateOneAsync(dbContext.SessionHandle, filter, update); + } + else + { + await dbContext.IncomingEvents.UpdateOneAsync(filter, update); } } [UnitOfWork] - public async Task ExistsByMessageIdAsync(string messageId) + public virtual async Task ExistsByMessageIdAsync(string messageId) { var dbContext = await DbContextProvider.GetDbContextAsync(); return await dbContext.IncomingEvents.AsQueryable().AnyAsync(x => x.MessageId == messageId); } [UnitOfWork] - public async Task DeleteOldEventsAsync() + public virtual async Task DeleteOldEventsAsync() { var dbContext = await DbContextProvider.GetDbContextAsync(); var timeToKeepEvents = Clock.Now.Add(DistributedEventsOptions.InboxKeepEventTimeSpan); diff --git a/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventOutbox.cs b/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventOutbox.cs index df8aa108ee..dad0294d1c 100644 --- a/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventOutbox.cs +++ b/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventOutbox.cs @@ -21,7 +21,7 @@ namespace Volo.Abp.MongoDB.DistributedEvents } [UnitOfWork] - public async Task EnqueueAsync(OutgoingEventInfo outgoingEvent) + public virtual async Task EnqueueAsync(OutgoingEventInfo outgoingEvent) { var dbContext = (IHasEventOutbox) await MongoDbContextProvider.GetDbContextAsync(); if (dbContext.SessionHandle != null) @@ -40,7 +40,7 @@ namespace Volo.Abp.MongoDB.DistributedEvents } [UnitOfWork] - public async Task> GetWaitingEventsAsync(int maxCount, CancellationToken cancellationToken = default) + public virtual async Task> GetWaitingEventsAsync(int maxCount, CancellationToken cancellationToken = default) { var dbContext = (IHasEventOutbox) await MongoDbContextProvider.GetDbContextAsync(cancellationToken); @@ -56,7 +56,7 @@ namespace Volo.Abp.MongoDB.DistributedEvents } [UnitOfWork] - public async Task DeleteAsync(Guid id) + public virtual async Task DeleteAsync(Guid id) { var dbContext = (IHasEventOutbox) await MongoDbContextProvider.GetDbContextAsync(); if (dbContext.SessionHandle != null) From bc4944c19d3612a6087bbdb8e901fb584f3a9e43 Mon Sep 17 00:00:00 2001 From: liangshiwei Date: Wed, 29 Sep 2021 17:06:59 +0800 Subject: [PATCH 3/7] Add ISqlAdapter --- .../DistributedEvents/PostgreSqlAdapter.cs | 25 +++++++++++++++++ .../AbpEntityFrameworkCorePostgreSqlModule.cs | 9 ++++++- .../AbpEntityFrameworkCoreModule.cs | 5 ++++ .../AbpEfCoreDistributedEventBusOptions.cs | 19 +++++++++++++ .../DistributedEvents/DbContextEventInbox.cs | 20 +++++++++++--- .../DistributedEvents/DbContextEventOutbox.cs | 12 +++++++-- .../DistributedEvents/DefaultSqlAdapter.cs | 27 +++++++++++++++++++ .../DistributedEvents/ISqlAdapter.cs | 13 +++++++++ 8 files changed, 124 insertions(+), 6 deletions(-) create mode 100644 framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlAdapter.cs create mode 100644 framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/AbpEfCoreDistributedEventBusOptions.cs create mode 100644 framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DefaultSqlAdapter.cs create mode 100644 framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/ISqlAdapter.cs diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlAdapter.cs b/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlAdapter.cs new file mode 100644 index 0000000000..0c84ec929e --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlAdapter.cs @@ -0,0 +1,25 @@ +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public class PostgreSqlAdapter : ISqlAdapter + { + public string NormalizeTableName(string tableName) + { + return $"\"{tableName}\""; + } + + public string NormalizeColumnName(string columnName) + { + return $"\"{columnName}\""; + } + + public string NormalizeColumnNameEqualsValue(string columnName, object value) + { + return $"\"{columnName}\" = '{value}'"; + } + + public string NormalizeValue(object value) + { + return $"'{value}'"; + } + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/PostgreSql/AbpEntityFrameworkCorePostgreSqlModule.cs b/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/PostgreSql/AbpEntityFrameworkCorePostgreSqlModule.cs index 4e76b89113..7d8c87c63a 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/PostgreSql/AbpEntityFrameworkCorePostgreSqlModule.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/PostgreSql/AbpEntityFrameworkCorePostgreSqlModule.cs @@ -1,4 +1,6 @@ -using Volo.Abp.Guids; +using Npgsql; +using Volo.Abp.EntityFrameworkCore.DistributedEvents; +using Volo.Abp.Guids; using Volo.Abp.Modularity; namespace Volo.Abp.EntityFrameworkCore.PostgreSql @@ -17,6 +19,11 @@ namespace Volo.Abp.EntityFrameworkCore.PostgreSql options.DefaultSequentialGuidType = SequentialGuidType.SequentialAsString; } }); + + Configure(options => + { + options.SqlAdapters.TryAdd(nameof(NpgsqlConnection).ToLower(), new PostgreSqlAdapter()); + }); } } } diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/AbpEntityFrameworkCoreModule.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/AbpEntityFrameworkCoreModule.cs index 90b541bc73..349116d6eb 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/AbpEntityFrameworkCoreModule.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/AbpEntityFrameworkCoreModule.cs @@ -26,6 +26,11 @@ namespace Volo.Abp.EntityFrameworkCore }); }); + Configure(options => + { + options.SqlAdapters.Add(DefaultSqlAdapter.Name, new DefaultSqlAdapter()); + }); + context.Services.TryAddTransient(typeof(IDbContextProvider<>), typeof(UnitOfWorkDbContextProvider<>)); context.Services.AddTransient(typeof(IDbContextEventOutbox<>), typeof(DbContextEventOutbox<>)); context.Services.AddTransient(typeof(IDbContextEventInbox<>), typeof(DbContextEventInbox<>)); diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/AbpEfCoreDistributedEventBusOptions.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/AbpEfCoreDistributedEventBusOptions.cs new file mode 100644 index 0000000000..9497552a09 --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/AbpEfCoreDistributedEventBusOptions.cs @@ -0,0 +1,19 @@ +using System.Collections.Generic; + +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public class AbpEfCoreDistributedEventBusOptions + { + public Dictionary SqlAdapters { get; set; } + + public ISqlAdapter GetSqlAdapter(string connectionType) + { + return SqlAdapters.TryGetValue(connectionType, out var sqlAdapter) ? sqlAdapter : SqlAdapters[DefaultSqlAdapter.Name]; + } + + public AbpEfCoreDistributedEventBusOptions() + { + SqlAdapters = new Dictionary(); + } + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs index 5bee24d754..d8f18c6f32 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs @@ -16,15 +16,18 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents { protected IDbContextProvider DbContextProvider { get; } protected AbpDistributedEventBusOptions DistributedEventsOptions { get; } + protected AbpEfCoreDistributedEventBusOptions EfCoreDistributedEventBusOptions { get; } protected IClock Clock { get; } public DbContextEventInbox( IDbContextProvider dbContextProvider, IClock clock, - IOptions distributedEventsOptions) + IOptions distributedEventsOptions, + IOptions efCoreDistributedEventBusOptions) { DbContextProvider = dbContextProvider; Clock = clock; + EfCoreDistributedEventBusOptions = efCoreDistributedEventBusOptions.Value; DistributedEventsOptions = distributedEventsOptions.Value; } @@ -61,8 +64,14 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents { var dbContext = await DbContextProvider.GetDbContextAsync(); var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName(); + var connectionName = dbContext.Database.GetDbConnection().GetType().Name.ToLower(); + var sqlAdapter = EfCoreDistributedEventBusOptions.GetSqlAdapter(connectionName); + + var sql = $"UPDATE {sqlAdapter.NormalizeTableName(tableName)} SET " + + $"{sqlAdapter.NormalizeColumnNameEqualsValue("Processed", 1)}, " + + $"{sqlAdapter.NormalizeColumnNameEqualsValue("ProcessedTime", Clock.Now)} WHERE " + + $"{sqlAdapter.NormalizeColumnNameEqualsValue("Id", id)}"; - var sql = $"UPDATE {tableName} SET Processed = 1, ProcessedTime = '{Clock.Now}' WHERE Id = '{id}'"; await dbContext.Database.ExecuteSqlRawAsync(sql); } @@ -79,8 +88,13 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents var dbContext = await DbContextProvider.GetDbContextAsync(); var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName(); var timeToKeepEvents = Clock.Now.Add(DistributedEventsOptions.InboxKeepEventTimeSpan); + var connectionName = dbContext.Database.GetDbConnection().GetType().Name.ToLower(); + var sqlAdapter = EfCoreDistributedEventBusOptions.GetSqlAdapter(connectionName); + + var sql = $"DELETE FROM {sqlAdapter.NormalizeTableName(tableName)} WHERE " + + $"{sqlAdapter.NormalizeColumnNameEqualsValue("Processed", 1)} AND " + + $"{sqlAdapter.NormalizeColumnName("CreationTime")} < {sqlAdapter.NormalizeValue(timeToKeepEvents)}"; - var sql = $"DELETE FROM {tableName} WHERE Processed = 1 AND CreationTime < '{timeToKeepEvents}'"; await dbContext.Database.ExecuteSqlRawAsync(sql); } } diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventOutbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventOutbox.cs index f0b4d3b98c..5451a0a30a 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventOutbox.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventOutbox.cs @@ -4,6 +4,7 @@ using System.Linq; using System.Threading; using System.Threading.Tasks; using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Options; using Volo.Abp.EventBus.Distributed; using Volo.Abp.Uow; @@ -13,11 +14,14 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents where TDbContext : IHasEventOutbox { protected IDbContextProvider DbContextProvider { get; } + protected AbpEfCoreDistributedEventBusOptions EfCoreDistributedEventBusOptions { get; } public DbContextEventOutbox( - IDbContextProvider dbContextProvider) + IDbContextProvider dbContextProvider, + IOptions efCoreDistributedEventBusOptions) { DbContextProvider = dbContextProvider; + EfCoreDistributedEventBusOptions = efCoreDistributedEventBusOptions.Value; } [UnitOfWork] @@ -51,8 +55,12 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents { var dbContext = (IHasEventOutbox) await DbContextProvider.GetDbContextAsync(); var tableName = dbContext.OutgoingEvents.EntityType.GetSchemaQualifiedTableName(); + var connectionName = dbContext.Database.GetDbConnection().GetType().Name.ToLower(); + var sqlAdapter = EfCoreDistributedEventBusOptions.GetSqlAdapter(connectionName); + + var sql = $"DELETE FROM {sqlAdapter.NormalizeTableName(tableName)} WHERE " + + $"{sqlAdapter.NormalizeColumnNameEqualsValue("Id", id)}"; - var sql = $"DELETE FROM {tableName} WHERE Id = '{id}'"; await dbContext.Database.ExecuteSqlRawAsync(sql); } } diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DefaultSqlAdapter.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DefaultSqlAdapter.cs new file mode 100644 index 0000000000..8a86af7434 --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DefaultSqlAdapter.cs @@ -0,0 +1,27 @@ +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public class DefaultSqlAdapter : ISqlAdapter + { + public const string Name = "default"; + + public string NormalizeTableName(string tableName) + { + return tableName; + } + + public string NormalizeColumnName(string columnName) + { + return columnName; + } + + public string NormalizeColumnNameEqualsValue(string columnName, object value) + { + return $"{columnName} = '{value}'"; + } + + public string NormalizeValue(object value) + { + return $"'{value}'"; + } + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/ISqlAdapter.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/ISqlAdapter.cs new file mode 100644 index 0000000000..9b366f00ab --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/ISqlAdapter.cs @@ -0,0 +1,13 @@ +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public interface ISqlAdapter + { + string NormalizeTableName(string tableName); + + string NormalizeColumnName(string columnName); + + string NormalizeColumnNameEqualsValue(string columnName, object value); + + string NormalizeValue(object value); + } +} From b809ee50e329d341f64f271c3d0eb05568d56379 Mon Sep 17 00:00:00 2001 From: liangshiwei Date: Thu, 30 Sep 2021 00:01:57 +0800 Subject: [PATCH 4/7] Refactor --- .../MySQLInboxConfigExtensions.cs | 13 +++++ .../MySQLOutboxConfigExtensions.cs | 13 +++++ .../IOracleDbContextEventInbox.cs | 8 ++++ .../IOracleDbContextEventOutbox.cs | 7 +++ .../OracleDbContextEventInbox.cs | 47 +++++++++++++++++++ .../OracleDbContextEventOutbox.cs | 31 ++++++++++++ .../OracleInboxConfigExtensions.cs | 13 +++++ .../OracleOutboxConfigExtensions.cs | 13 +++++ ...bpEntityFrameworkCoreOracleDevartModule.cs | 7 ++- .../IOracleDbContextEventInbox.cs | 8 ++++ .../IOracleDbContextEventOutbox.cs | 7 +++ .../OracleDbContextEventInbox.cs | 47 +++++++++++++++++++ .../OracleDbContextEventOutbox.cs | 31 ++++++++++++ .../OracleInboxConfigExtensions.cs | 13 +++++ .../OracleOutboxConfigExtensions.cs | 13 +++++ .../AbpEntityFrameworkCoreOracleModule.cs | 7 ++- .../IPostgreSqlDbContextEventInbox.cs | 8 ++++ .../IPostgreSqlDbContextEventOutbox.cs | 7 +++ .../DistributedEvents/PostgreSqlAdapter.cs | 25 ---------- .../PostgreSqlDbContextEventInbox.cs | 43 +++++++++++++++++ .../PostgreSqlDbContextEventOutbox.cs | 25 ++++++++++ .../PostgreSqlInboxConfigExtensions.cs | 13 +++++ .../PostgreSqlOutboxConfigExtensions.cs | 13 +++++ .../AbpEntityFrameworkCorePostgreSqlModule.cs | 8 ++-- .../SqlServerInboxConfigExtensions.cs | 13 +++++ .../SqlServerOutboxConfigExtensions.cs | 13 +++++ .../SqliteInboxConfigExtensions.cs | 13 +++++ .../SqliteOutboxConfigExtensions.cs | 13 +++++ .../AbpEntityFrameworkCoreModule.cs | 9 ++-- .../AbpEfCoreDistributedEventBusOptions.cs | 19 -------- .../DistributedEvents/DbContextEventInbox.cs | 33 ++++--------- .../DistributedEvents/DbContextEventOutbox.cs | 19 +++----- .../DistributedEvents/DefaultSqlAdapter.cs | 27 ----------- .../DistributedEvents/ISqlAdapter.cs | 13 ----- .../ISqlRawDbContextEventInbox.cs | 7 +++ .../ISqlRawDbContextEventOutbox.cs | 7 +++ .../SqlRawDbContextEventInbox.cs | 43 +++++++++++++++++ .../SqlRawDbContextEventOutbox.cs | 26 ++++++++++ 38 files changed, 543 insertions(+), 132 deletions(-) create mode 100644 framework/src/Volo.Abp.EntityFrameworkCore.MySQL/Volo/Abp/EntityFrameworkCore/DistributedEvents/MySQLInboxConfigExtensions.cs create mode 100644 framework/src/Volo.Abp.EntityFrameworkCore.MySQL/Volo/Abp/EntityFrameworkCore/DistributedEvents/MySQLOutboxConfigExtensions.cs create mode 100644 framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/IOracleDbContextEventInbox.cs create mode 100644 framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/IOracleDbContextEventOutbox.cs create mode 100644 framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventInbox.cs create mode 100644 framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventOutbox.cs create mode 100644 framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleInboxConfigExtensions.cs create mode 100644 framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleOutboxConfigExtensions.cs create mode 100644 framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/IOracleDbContextEventInbox.cs create mode 100644 framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/IOracleDbContextEventOutbox.cs create mode 100644 framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventInbox.cs create mode 100644 framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventOutbox.cs create mode 100644 framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleInboxConfigExtensions.cs create mode 100644 framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleOutboxConfigExtensions.cs create mode 100644 framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/IPostgreSqlDbContextEventInbox.cs create mode 100644 framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/IPostgreSqlDbContextEventOutbox.cs delete mode 100644 framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlAdapter.cs create mode 100644 framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlDbContextEventInbox.cs create mode 100644 framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlDbContextEventOutbox.cs create mode 100644 framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlInboxConfigExtensions.cs create mode 100644 framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlOutboxConfigExtensions.cs create mode 100644 framework/src/Volo.Abp.EntityFrameworkCore.SqlServer/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqlServerInboxConfigExtensions.cs create mode 100644 framework/src/Volo.Abp.EntityFrameworkCore.SqlServer/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqlServerOutboxConfigExtensions.cs create mode 100644 framework/src/Volo.Abp.EntityFrameworkCore.Sqlite/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqliteInboxConfigExtensions.cs create mode 100644 framework/src/Volo.Abp.EntityFrameworkCore.Sqlite/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqliteOutboxConfigExtensions.cs delete mode 100644 framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/AbpEfCoreDistributedEventBusOptions.cs delete mode 100644 framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DefaultSqlAdapter.cs delete mode 100644 framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/ISqlAdapter.cs create mode 100644 framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/ISqlRawDbContextEventInbox.cs create mode 100644 framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/ISqlRawDbContextEventOutbox.cs create mode 100644 framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqlRawDbContextEventInbox.cs create mode 100644 framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqlRawDbContextEventOutbox.cs diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.MySQL/Volo/Abp/EntityFrameworkCore/DistributedEvents/MySQLInboxConfigExtensions.cs b/framework/src/Volo.Abp.EntityFrameworkCore.MySQL/Volo/Abp/EntityFrameworkCore/DistributedEvents/MySQLInboxConfigExtensions.cs new file mode 100644 index 0000000000..ecda92702b --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.MySQL/Volo/Abp/EntityFrameworkCore/DistributedEvents/MySQLInboxConfigExtensions.cs @@ -0,0 +1,13 @@ +using Volo.Abp.EventBus.Distributed; + +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public static class MySQLInboxConfigExtensions + { + public static void UseMySQL(this InboxConfig outboxConfig) + where TDbContext : IHasEventInbox + { + outboxConfig.ImplementationType = typeof(ISqlRawDbContextEventInbox); + } + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.MySQL/Volo/Abp/EntityFrameworkCore/DistributedEvents/MySQLOutboxConfigExtensions.cs b/framework/src/Volo.Abp.EntityFrameworkCore.MySQL/Volo/Abp/EntityFrameworkCore/DistributedEvents/MySQLOutboxConfigExtensions.cs new file mode 100644 index 0000000000..8657ba92ab --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.MySQL/Volo/Abp/EntityFrameworkCore/DistributedEvents/MySQLOutboxConfigExtensions.cs @@ -0,0 +1,13 @@ +using Volo.Abp.EventBus.Distributed; + +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public static class MySQLOutboxConfigExtensions + { + public static void UseMySQL(this OutboxConfig outboxConfig) + where TDbContext : IHasEventOutbox + { + outboxConfig.ImplementationType = typeof(ISqlRawDbContextEventOutbox); + } + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/IOracleDbContextEventInbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/IOracleDbContextEventInbox.cs new file mode 100644 index 0000000000..0cc7ae5531 --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/IOracleDbContextEventInbox.cs @@ -0,0 +1,8 @@ +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public interface IOracleDbContextEventInbox : IDbContextEventInbox + where TDbContext : IHasEventInbox + { + + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/IOracleDbContextEventOutbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/IOracleDbContextEventOutbox.cs new file mode 100644 index 0000000000..a588f36e43 --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/IOracleDbContextEventOutbox.cs @@ -0,0 +1,7 @@ +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public interface IOracleDbContextEventOutbox : IDbContextEventOutbox + where TDbContext : IHasEventOutbox + { + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventInbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventInbox.cs new file mode 100644 index 0000000000..d5f1b7afcc --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventInbox.cs @@ -0,0 +1,47 @@ +using System; +using System.Threading.Tasks; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Options; +using Volo.Abp.EventBus.Distributed; +using Volo.Abp.Timing; +using Volo.Abp.Uow; + +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public class OracleDbContextEventInbox : DbContextEventInbox , IOracleDbContextEventInbox + where TDbContext : IHasEventInbox + { + public OracleDbContextEventInbox( + IDbContextProvider dbContextProvider, + IClock clock, + IOptions distributedEventsOptions) : base(dbContextProvider, clock, distributedEventsOptions) + { + } + + [UnitOfWork] + public override async Task MarkAsProcessedAsync(Guid id) + { + var dbContext = await DbContextProvider.GetDbContextAsync(); + var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName(); + + var sql = $"UPDATE \"{tableName}\" SET \"Processed\" = '1', \"ProcessedTime\" = TO_DATE('{Clock.Now}', 'yyyy-mm-dd hh24:mi:ss') WHERE \"Id\" = HEXTORAW('{GuidToOracleType(id)}')"; + await dbContext.Database.ExecuteSqlRawAsync(sql); + } + + [UnitOfWork] + public override async Task DeleteOldEventsAsync() + { + var dbContext = await DbContextProvider.GetDbContextAsync(); + var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName(); + var timeToKeepEvents = Clock.Now.Add(DistributedEventsOptions.InboxKeepEventTimeSpan); + + var sql = $"DELETE FROM \"{tableName}\" WHERE \"Processed\" = '1' AND \"CreationTime\" < TO_DATE('{timeToKeepEvents}', 'yyyy-mm-dd hh24:mi:ss')"; + await dbContext.Database.ExecuteSqlRawAsync(sql); + } + + protected virtual string GuidToOracleType(Guid id) + { + return BitConverter.ToString(id.ToByteArray()).Replace("-", "").ToUpper(); + } + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventOutbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventOutbox.cs new file mode 100644 index 0000000000..a5c4566ced --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventOutbox.cs @@ -0,0 +1,31 @@ +using System; +using System.Threading.Tasks; +using Microsoft.EntityFrameworkCore; +using Volo.Abp.Uow; + +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public class OracleDbContextEventOutbox : DbContextEventOutbox , IOracleDbContextEventOutbox + where TDbContext : IHasEventOutbox + { + public OracleDbContextEventOutbox(IDbContextProvider dbContextProvider) + : base(dbContextProvider) + { + } + + [UnitOfWork] + public override async Task DeleteAsync(Guid id) + { + var dbContext = (IHasEventOutbox) await DbContextProvider.GetDbContextAsync(); + var tableName = dbContext.OutgoingEvents.EntityType.GetSchemaQualifiedTableName(); + + var sql = $"DELETE FROM \"{tableName}\" WHERE \"Id\" = HEXTORAW('{GuidToOracleType(id)}')"; + await dbContext.Database.ExecuteSqlRawAsync(sql); + } + + protected virtual string GuidToOracleType(Guid id) + { + return BitConverter.ToString(id.ToByteArray()).Replace("-", "").ToUpper(); + } + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleInboxConfigExtensions.cs b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleInboxConfigExtensions.cs new file mode 100644 index 0000000000..ca79019e28 --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleInboxConfigExtensions.cs @@ -0,0 +1,13 @@ +using Volo.Abp.EventBus.Distributed; + +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public static class OracleInboxConfigExtensions + { + public static void UseOracle(this InboxConfig outboxConfig) + where TDbContext : IHasEventInbox + { + outboxConfig.ImplementationType = typeof(IOracleDbContextEventInbox); + } + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleOutboxConfigExtensions.cs b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleOutboxConfigExtensions.cs new file mode 100644 index 0000000000..e2d7d33761 --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleOutboxConfigExtensions.cs @@ -0,0 +1,13 @@ +using Volo.Abp.EventBus.Distributed; + +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public static class OracleOutboxConfigExtensions + { + public static void UseOracle(this OutboxConfig outboxConfig) + where TDbContext : IHasEventOutbox + { + outboxConfig.ImplementationType = typeof(IOracleDbContextEventOutbox); + } + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/Oracle/Devart/AbpEntityFrameworkCoreOracleDevartModule.cs b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/Oracle/Devart/AbpEntityFrameworkCoreOracleDevartModule.cs index 9580219cfc..76d0ecdd21 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/Oracle/Devart/AbpEntityFrameworkCoreOracleDevartModule.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/Oracle/Devart/AbpEntityFrameworkCoreOracleDevartModule.cs @@ -1,4 +1,6 @@ -using Volo.Abp.Guids; +using Microsoft.Extensions.DependencyInjection; +using Volo.Abp.EntityFrameworkCore.DistributedEvents; +using Volo.Abp.Guids; using Volo.Abp.Modularity; namespace Volo.Abp.EntityFrameworkCore.Oracle.Devart @@ -17,6 +19,9 @@ namespace Volo.Abp.EntityFrameworkCore.Oracle.Devart options.DefaultSequentialGuidType = SequentialGuidType.SequentialAsBinary; } }); + + context.Services.AddTransient(typeof(IOracleDbContextEventOutbox<>), typeof(OracleDbContextEventOutbox<>)); + context.Services.AddTransient(typeof(IOracleDbContextEventInbox<>), typeof(OracleDbContextEventInbox<>)); } } } diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/IOracleDbContextEventInbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/IOracleDbContextEventInbox.cs new file mode 100644 index 0000000000..0cc7ae5531 --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/IOracleDbContextEventInbox.cs @@ -0,0 +1,8 @@ +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public interface IOracleDbContextEventInbox : IDbContextEventInbox + where TDbContext : IHasEventInbox + { + + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/IOracleDbContextEventOutbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/IOracleDbContextEventOutbox.cs new file mode 100644 index 0000000000..a588f36e43 --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/IOracleDbContextEventOutbox.cs @@ -0,0 +1,7 @@ +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public interface IOracleDbContextEventOutbox : IDbContextEventOutbox + where TDbContext : IHasEventOutbox + { + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventInbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventInbox.cs new file mode 100644 index 0000000000..d5f1b7afcc --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventInbox.cs @@ -0,0 +1,47 @@ +using System; +using System.Threading.Tasks; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Options; +using Volo.Abp.EventBus.Distributed; +using Volo.Abp.Timing; +using Volo.Abp.Uow; + +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public class OracleDbContextEventInbox : DbContextEventInbox , IOracleDbContextEventInbox + where TDbContext : IHasEventInbox + { + public OracleDbContextEventInbox( + IDbContextProvider dbContextProvider, + IClock clock, + IOptions distributedEventsOptions) : base(dbContextProvider, clock, distributedEventsOptions) + { + } + + [UnitOfWork] + public override async Task MarkAsProcessedAsync(Guid id) + { + var dbContext = await DbContextProvider.GetDbContextAsync(); + var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName(); + + var sql = $"UPDATE \"{tableName}\" SET \"Processed\" = '1', \"ProcessedTime\" = TO_DATE('{Clock.Now}', 'yyyy-mm-dd hh24:mi:ss') WHERE \"Id\" = HEXTORAW('{GuidToOracleType(id)}')"; + await dbContext.Database.ExecuteSqlRawAsync(sql); + } + + [UnitOfWork] + public override async Task DeleteOldEventsAsync() + { + var dbContext = await DbContextProvider.GetDbContextAsync(); + var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName(); + var timeToKeepEvents = Clock.Now.Add(DistributedEventsOptions.InboxKeepEventTimeSpan); + + var sql = $"DELETE FROM \"{tableName}\" WHERE \"Processed\" = '1' AND \"CreationTime\" < TO_DATE('{timeToKeepEvents}', 'yyyy-mm-dd hh24:mi:ss')"; + await dbContext.Database.ExecuteSqlRawAsync(sql); + } + + protected virtual string GuidToOracleType(Guid id) + { + return BitConverter.ToString(id.ToByteArray()).Replace("-", "").ToUpper(); + } + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventOutbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventOutbox.cs new file mode 100644 index 0000000000..a5c4566ced --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventOutbox.cs @@ -0,0 +1,31 @@ +using System; +using System.Threading.Tasks; +using Microsoft.EntityFrameworkCore; +using Volo.Abp.Uow; + +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public class OracleDbContextEventOutbox : DbContextEventOutbox , IOracleDbContextEventOutbox + where TDbContext : IHasEventOutbox + { + public OracleDbContextEventOutbox(IDbContextProvider dbContextProvider) + : base(dbContextProvider) + { + } + + [UnitOfWork] + public override async Task DeleteAsync(Guid id) + { + var dbContext = (IHasEventOutbox) await DbContextProvider.GetDbContextAsync(); + var tableName = dbContext.OutgoingEvents.EntityType.GetSchemaQualifiedTableName(); + + var sql = $"DELETE FROM \"{tableName}\" WHERE \"Id\" = HEXTORAW('{GuidToOracleType(id)}')"; + await dbContext.Database.ExecuteSqlRawAsync(sql); + } + + protected virtual string GuidToOracleType(Guid id) + { + return BitConverter.ToString(id.ToByteArray()).Replace("-", "").ToUpper(); + } + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleInboxConfigExtensions.cs b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleInboxConfigExtensions.cs new file mode 100644 index 0000000000..ca79019e28 --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleInboxConfigExtensions.cs @@ -0,0 +1,13 @@ +using Volo.Abp.EventBus.Distributed; + +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public static class OracleInboxConfigExtensions + { + public static void UseOracle(this InboxConfig outboxConfig) + where TDbContext : IHasEventInbox + { + outboxConfig.ImplementationType = typeof(IOracleDbContextEventInbox); + } + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleOutboxConfigExtensions.cs b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleOutboxConfigExtensions.cs new file mode 100644 index 0000000000..e2d7d33761 --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleOutboxConfigExtensions.cs @@ -0,0 +1,13 @@ +using Volo.Abp.EventBus.Distributed; + +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public static class OracleOutboxConfigExtensions + { + public static void UseOracle(this OutboxConfig outboxConfig) + where TDbContext : IHasEventOutbox + { + outboxConfig.ImplementationType = typeof(IOracleDbContextEventOutbox); + } + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/Oracle/AbpEntityFrameworkCoreOracleModule.cs b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/Oracle/AbpEntityFrameworkCoreOracleModule.cs index b7cbaec1a1..7716ae6150 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/Oracle/AbpEntityFrameworkCoreOracleModule.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/Oracle/AbpEntityFrameworkCoreOracleModule.cs @@ -1,4 +1,6 @@ -using Volo.Abp.Guids; +using Microsoft.Extensions.DependencyInjection; +using Volo.Abp.EntityFrameworkCore.DistributedEvents; +using Volo.Abp.Guids; using Volo.Abp.Modularity; namespace Volo.Abp.EntityFrameworkCore.Oracle @@ -15,6 +17,9 @@ namespace Volo.Abp.EntityFrameworkCore.Oracle options.DefaultSequentialGuidType = SequentialGuidType.SequentialAsBinary; } }); + + context.Services.AddTransient(typeof(IOracleDbContextEventOutbox<>), typeof(OracleDbContextEventOutbox<>)); + context.Services.AddTransient(typeof(IOracleDbContextEventInbox<>), typeof(OracleDbContextEventInbox<>)); } } } diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/IPostgreSqlDbContextEventInbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/IPostgreSqlDbContextEventInbox.cs new file mode 100644 index 0000000000..5c24d79f88 --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/IPostgreSqlDbContextEventInbox.cs @@ -0,0 +1,8 @@ +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public interface IPostgreSqlDbContextEventInbox : IDbContextEventInbox + where TDbContext : IHasEventInbox + { + + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/IPostgreSqlDbContextEventOutbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/IPostgreSqlDbContextEventOutbox.cs new file mode 100644 index 0000000000..7e6bc4bd59 --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/IPostgreSqlDbContextEventOutbox.cs @@ -0,0 +1,7 @@ +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public interface IPostgreSqlDbContextEventOutbox : IDbContextEventOutbox + where TDbContext : IHasEventOutbox + { + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlAdapter.cs b/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlAdapter.cs deleted file mode 100644 index 0c84ec929e..0000000000 --- a/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlAdapter.cs +++ /dev/null @@ -1,25 +0,0 @@ -namespace Volo.Abp.EntityFrameworkCore.DistributedEvents -{ - public class PostgreSqlAdapter : ISqlAdapter - { - public string NormalizeTableName(string tableName) - { - return $"\"{tableName}\""; - } - - public string NormalizeColumnName(string columnName) - { - return $"\"{columnName}\""; - } - - public string NormalizeColumnNameEqualsValue(string columnName, object value) - { - return $"\"{columnName}\" = '{value}'"; - } - - public string NormalizeValue(object value) - { - return $"'{value}'"; - } - } -} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlDbContextEventInbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlDbContextEventInbox.cs new file mode 100644 index 0000000000..4560bdb2c4 --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlDbContextEventInbox.cs @@ -0,0 +1,43 @@ +using System; +using System.Threading.Tasks; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Options; +using Volo.Abp.EventBus.Distributed; +using Volo.Abp.Timing; +using Volo.Abp.Uow; + +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public class PostgreSqlDbContextEventInbox : DbContextEventInbox, IPostgreSqlDbContextEventInbox + where TDbContext : IHasEventInbox + { + public PostgreSqlDbContextEventInbox( + IDbContextProvider dbContextProvider, + IClock clock, + IOptions distributedEventsOptions) + : base(dbContextProvider, clock, distributedEventsOptions) + { + } + + [UnitOfWork] + public override async Task MarkAsProcessedAsync(Guid id) + { + var dbContext = await DbContextProvider.GetDbContextAsync(); + var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName(); + + var sql = $"UPDATE \"{tableName}\" SET \"Processed\" = '1', \"ProcessedTime\" = '{Clock.Now}' WHERE \"Id\" = '{id}'"; + await dbContext.Database.ExecuteSqlRawAsync(sql); + } + + [UnitOfWork] + public override async Task DeleteOldEventsAsync() + { + var dbContext = await DbContextProvider.GetDbContextAsync(); + var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName(); + var timeToKeepEvents = Clock.Now.Add(DistributedEventsOptions.InboxKeepEventTimeSpan); + + var sql = $"DELETE FROM \"{tableName}\" WHERE \"Processed\" = '1' AND \"CreationTime\" < '{timeToKeepEvents}'"; + await dbContext.Database.ExecuteSqlRawAsync(sql); + } + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlDbContextEventOutbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlDbContextEventOutbox.cs new file mode 100644 index 0000000000..c5e79a0014 --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlDbContextEventOutbox.cs @@ -0,0 +1,25 @@ +using System; +using System.Threading.Tasks; +using Microsoft.EntityFrameworkCore; +using Volo.Abp.Uow; + +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public class PostgreSqlDbContextEventOutbox : DbContextEventOutbox , IPostgreSqlDbContextEventOutbox + where TDbContext : IHasEventOutbox + { + public PostgreSqlDbContextEventOutbox(IDbContextProvider dbContextProvider) : base(dbContextProvider) + { + } + + [UnitOfWork] + public override async Task DeleteAsync(Guid id) + { + var dbContext = (IHasEventOutbox) await DbContextProvider.GetDbContextAsync(); + var tableName = dbContext.OutgoingEvents.EntityType.GetSchemaQualifiedTableName(); + + var sql = $"DELETE FROM \"{tableName}\" WHERE \"Id\" = '{id}'"; + await dbContext.Database.ExecuteSqlRawAsync(sql); + } + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlInboxConfigExtensions.cs b/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlInboxConfigExtensions.cs new file mode 100644 index 0000000000..55afc4794d --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlInboxConfigExtensions.cs @@ -0,0 +1,13 @@ +using Volo.Abp.EventBus.Distributed; + +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public static class PostgreSqlInboxConfigExtensions + { + public static void UsePostgreSql(this InboxConfig outboxConfig) + where TDbContext : IHasEventInbox + { + outboxConfig.ImplementationType = typeof(IPostgreSqlDbContextEventInbox); + } + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlOutboxConfigExtensions.cs b/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlOutboxConfigExtensions.cs new file mode 100644 index 0000000000..5a65d15c90 --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlOutboxConfigExtensions.cs @@ -0,0 +1,13 @@ +using Volo.Abp.EventBus.Distributed; + +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public static class PostgreSqlOutboxConfigExtensions + { + public static void UsePostgreSql(this OutboxConfig outboxConfig) + where TDbContext : IHasEventOutbox + { + outboxConfig.ImplementationType = typeof(IPostgreSqlDbContextEventOutbox); + } + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/PostgreSql/AbpEntityFrameworkCorePostgreSqlModule.cs b/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/PostgreSql/AbpEntityFrameworkCorePostgreSqlModule.cs index 7d8c87c63a..55c1a4af42 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/PostgreSql/AbpEntityFrameworkCorePostgreSqlModule.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/PostgreSql/AbpEntityFrameworkCorePostgreSqlModule.cs @@ -1,4 +1,4 @@ -using Npgsql; +using Microsoft.Extensions.DependencyInjection; using Volo.Abp.EntityFrameworkCore.DistributedEvents; using Volo.Abp.Guids; using Volo.Abp.Modularity; @@ -20,10 +20,8 @@ namespace Volo.Abp.EntityFrameworkCore.PostgreSql } }); - Configure(options => - { - options.SqlAdapters.TryAdd(nameof(NpgsqlConnection).ToLower(), new PostgreSqlAdapter()); - }); + context.Services.AddTransient(typeof(IPostgreSqlDbContextEventOutbox<>), typeof(PostgreSqlDbContextEventOutbox<>)); + context.Services.AddTransient(typeof(IPostgreSqlDbContextEventInbox<>), typeof(PostgreSqlDbContextEventInbox<>)); } } } diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.SqlServer/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqlServerInboxConfigExtensions.cs b/framework/src/Volo.Abp.EntityFrameworkCore.SqlServer/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqlServerInboxConfigExtensions.cs new file mode 100644 index 0000000000..60adf600c7 --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.SqlServer/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqlServerInboxConfigExtensions.cs @@ -0,0 +1,13 @@ +using Volo.Abp.EventBus.Distributed; + +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public static class SqlServerInboxConfigExtensions + { + public static void UseSqlServer(this InboxConfig outboxConfig) + where TDbContext : IHasEventInbox + { + outboxConfig.ImplementationType = typeof(ISqlRawDbContextEventInbox); + } + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.SqlServer/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqlServerOutboxConfigExtensions.cs b/framework/src/Volo.Abp.EntityFrameworkCore.SqlServer/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqlServerOutboxConfigExtensions.cs new file mode 100644 index 0000000000..9022d5c7e6 --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.SqlServer/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqlServerOutboxConfigExtensions.cs @@ -0,0 +1,13 @@ +using Volo.Abp.EventBus.Distributed; + +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public static class SqlServerOutboxConfigExtensions + { + public static void UseSqlServer(this OutboxConfig outboxConfig) + where TDbContext : IHasEventOutbox + { + outboxConfig.ImplementationType = typeof(ISqlRawDbContextEventOutbox); + } + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.Sqlite/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqliteInboxConfigExtensions.cs b/framework/src/Volo.Abp.EntityFrameworkCore.Sqlite/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqliteInboxConfigExtensions.cs new file mode 100644 index 0000000000..ccc92d4eb1 --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.Sqlite/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqliteInboxConfigExtensions.cs @@ -0,0 +1,13 @@ +using Volo.Abp.EventBus.Distributed; + +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public static class SqliteInboxConfigExtensions + { + public static void UseSqlite(this InboxConfig outboxConfig) + where TDbContext : IHasEventInbox + { + outboxConfig.ImplementationType = typeof(ISqlRawDbContextEventInbox); + } + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.Sqlite/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqliteOutboxConfigExtensions.cs b/framework/src/Volo.Abp.EntityFrameworkCore.Sqlite/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqliteOutboxConfigExtensions.cs new file mode 100644 index 0000000000..c1d9949c19 --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.Sqlite/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqliteOutboxConfigExtensions.cs @@ -0,0 +1,13 @@ +using Volo.Abp.EventBus.Distributed; + +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public static class SqliteOutboxConfigExtensions + { + public static void UseSqlite(this OutboxConfig outboxConfig) + where TDbContext : IHasEventOutbox + { + outboxConfig.ImplementationType = typeof(ISqlRawDbContextEventOutbox); + } + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/AbpEntityFrameworkCoreModule.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/AbpEntityFrameworkCoreModule.cs index 349116d6eb..f7834c1835 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/AbpEntityFrameworkCoreModule.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/AbpEntityFrameworkCoreModule.cs @@ -5,6 +5,7 @@ using Volo.Abp.Domain; using Volo.Abp.EntityFrameworkCore.DependencyInjection; using Volo.Abp.EntityFrameworkCore.DistributedEvents; using Volo.Abp.Modularity; +using Volo.Abp.Uow; using Volo.Abp.Uow.EntityFrameworkCore; namespace Volo.Abp.EntityFrameworkCore @@ -26,14 +27,12 @@ namespace Volo.Abp.EntityFrameworkCore }); }); - Configure(options => - { - options.SqlAdapters.Add(DefaultSqlAdapter.Name, new DefaultSqlAdapter()); - }); - context.Services.TryAddTransient(typeof(IDbContextProvider<>), typeof(UnitOfWorkDbContextProvider<>)); context.Services.AddTransient(typeof(IDbContextEventOutbox<>), typeof(DbContextEventOutbox<>)); context.Services.AddTransient(typeof(IDbContextEventInbox<>), typeof(DbContextEventInbox<>)); + + context.Services.AddTransient(typeof(ISqlRawDbContextEventOutbox<>), typeof(SqlRawDbContextEventOutbox<>)); + context.Services.AddTransient(typeof(ISqlRawDbContextEventInbox<>), typeof(SqlRawDbContextEventInbox<>)); } } } diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/AbpEfCoreDistributedEventBusOptions.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/AbpEfCoreDistributedEventBusOptions.cs deleted file mode 100644 index 9497552a09..0000000000 --- a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/AbpEfCoreDistributedEventBusOptions.cs +++ /dev/null @@ -1,19 +0,0 @@ -using System.Collections.Generic; - -namespace Volo.Abp.EntityFrameworkCore.DistributedEvents -{ - public class AbpEfCoreDistributedEventBusOptions - { - public Dictionary SqlAdapters { get; set; } - - public ISqlAdapter GetSqlAdapter(string connectionType) - { - return SqlAdapters.TryGetValue(connectionType, out var sqlAdapter) ? sqlAdapter : SqlAdapters[DefaultSqlAdapter.Name]; - } - - public AbpEfCoreDistributedEventBusOptions() - { - SqlAdapters = new Dictionary(); - } - } -} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs index d8f18c6f32..982d335d17 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs @@ -16,18 +16,15 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents { protected IDbContextProvider DbContextProvider { get; } protected AbpDistributedEventBusOptions DistributedEventsOptions { get; } - protected AbpEfCoreDistributedEventBusOptions EfCoreDistributedEventBusOptions { get; } protected IClock Clock { get; } public DbContextEventInbox( IDbContextProvider dbContextProvider, IClock clock, - IOptions distributedEventsOptions, - IOptions efCoreDistributedEventBusOptions) + IOptions distributedEventsOptions) { DbContextProvider = dbContextProvider; Clock = clock; - EfCoreDistributedEventBusOptions = efCoreDistributedEventBusOptions.Value; DistributedEventsOptions = distributedEventsOptions.Value; } @@ -63,16 +60,11 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents public virtual async Task MarkAsProcessedAsync(Guid id) { var dbContext = await DbContextProvider.GetDbContextAsync(); - var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName(); - var connectionName = dbContext.Database.GetDbConnection().GetType().Name.ToLower(); - var sqlAdapter = EfCoreDistributedEventBusOptions.GetSqlAdapter(connectionName); - - var sql = $"UPDATE {sqlAdapter.NormalizeTableName(tableName)} SET " + - $"{sqlAdapter.NormalizeColumnNameEqualsValue("Processed", 1)}, " + - $"{sqlAdapter.NormalizeColumnNameEqualsValue("ProcessedTime", Clock.Now)} WHERE " + - $"{sqlAdapter.NormalizeColumnNameEqualsValue("Id", id)}"; - - await dbContext.Database.ExecuteSqlRawAsync(sql); + var incomingEvent = await dbContext.IncomingEvents.FindAsync(id); + if (incomingEvent != null) + { + incomingEvent.MarkAsProcessed(Clock.Now); + } } [UnitOfWork] @@ -86,16 +78,11 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents public virtual async Task DeleteOldEventsAsync() { var dbContext = await DbContextProvider.GetDbContextAsync(); - var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName(); var timeToKeepEvents = Clock.Now.Add(DistributedEventsOptions.InboxKeepEventTimeSpan); - var connectionName = dbContext.Database.GetDbConnection().GetType().Name.ToLower(); - var sqlAdapter = EfCoreDistributedEventBusOptions.GetSqlAdapter(connectionName); - - var sql = $"DELETE FROM {sqlAdapter.NormalizeTableName(tableName)} WHERE " + - $"{sqlAdapter.NormalizeColumnNameEqualsValue("Processed", 1)} AND " + - $"{sqlAdapter.NormalizeColumnName("CreationTime")} < {sqlAdapter.NormalizeValue(timeToKeepEvents)}"; - - await dbContext.Database.ExecuteSqlRawAsync(sql); + var oldEvents = await dbContext.IncomingEvents + .Where(x => x.Processed && x.CreationTime < timeToKeepEvents) + .ToListAsync(); + dbContext.IncomingEvents.RemoveRange(oldEvents); } } } diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventOutbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventOutbox.cs index 5451a0a30a..0b8909b966 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventOutbox.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventOutbox.cs @@ -4,7 +4,6 @@ using System.Linq; using System.Threading; using System.Threading.Tasks; using Microsoft.EntityFrameworkCore; -using Microsoft.Extensions.Options; using Volo.Abp.EventBus.Distributed; using Volo.Abp.Uow; @@ -14,14 +13,11 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents where TDbContext : IHasEventOutbox { protected IDbContextProvider DbContextProvider { get; } - protected AbpEfCoreDistributedEventBusOptions EfCoreDistributedEventBusOptions { get; } public DbContextEventOutbox( - IDbContextProvider dbContextProvider, - IOptions efCoreDistributedEventBusOptions) + IDbContextProvider dbContextProvider) { DbContextProvider = dbContextProvider; - EfCoreDistributedEventBusOptions = efCoreDistributedEventBusOptions.Value; } [UnitOfWork] @@ -54,14 +50,11 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents public virtual async Task DeleteAsync(Guid id) { var dbContext = (IHasEventOutbox) await DbContextProvider.GetDbContextAsync(); - var tableName = dbContext.OutgoingEvents.EntityType.GetSchemaQualifiedTableName(); - var connectionName = dbContext.Database.GetDbConnection().GetType().Name.ToLower(); - var sqlAdapter = EfCoreDistributedEventBusOptions.GetSqlAdapter(connectionName); - - var sql = $"DELETE FROM {sqlAdapter.NormalizeTableName(tableName)} WHERE " + - $"{sqlAdapter.NormalizeColumnNameEqualsValue("Id", id)}"; - - await dbContext.Database.ExecuteSqlRawAsync(sql); + var outgoingEvent = await dbContext.OutgoingEvents.FindAsync(id); + if (outgoingEvent != null) + { + dbContext.Remove(outgoingEvent); + } } } } diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DefaultSqlAdapter.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DefaultSqlAdapter.cs deleted file mode 100644 index 8a86af7434..0000000000 --- a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DefaultSqlAdapter.cs +++ /dev/null @@ -1,27 +0,0 @@ -namespace Volo.Abp.EntityFrameworkCore.DistributedEvents -{ - public class DefaultSqlAdapter : ISqlAdapter - { - public const string Name = "default"; - - public string NormalizeTableName(string tableName) - { - return tableName; - } - - public string NormalizeColumnName(string columnName) - { - return columnName; - } - - public string NormalizeColumnNameEqualsValue(string columnName, object value) - { - return $"{columnName} = '{value}'"; - } - - public string NormalizeValue(object value) - { - return $"'{value}'"; - } - } -} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/ISqlAdapter.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/ISqlAdapter.cs deleted file mode 100644 index 9b366f00ab..0000000000 --- a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/ISqlAdapter.cs +++ /dev/null @@ -1,13 +0,0 @@ -namespace Volo.Abp.EntityFrameworkCore.DistributedEvents -{ - public interface ISqlAdapter - { - string NormalizeTableName(string tableName); - - string NormalizeColumnName(string columnName); - - string NormalizeColumnNameEqualsValue(string columnName, object value); - - string NormalizeValue(object value); - } -} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/ISqlRawDbContextEventInbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/ISqlRawDbContextEventInbox.cs new file mode 100644 index 0000000000..d86e3f36ed --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/ISqlRawDbContextEventInbox.cs @@ -0,0 +1,7 @@ +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public interface ISqlRawDbContextEventInbox : IDbContextEventInbox + where TDbContext : IHasEventInbox + { + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/ISqlRawDbContextEventOutbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/ISqlRawDbContextEventOutbox.cs new file mode 100644 index 0000000000..776cc2f93c --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/ISqlRawDbContextEventOutbox.cs @@ -0,0 +1,7 @@ +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public interface ISqlRawDbContextEventOutbox : IDbContextEventOutbox + where TDbContext : IHasEventOutbox + { + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqlRawDbContextEventInbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqlRawDbContextEventInbox.cs new file mode 100644 index 0000000000..04bef7579d --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqlRawDbContextEventInbox.cs @@ -0,0 +1,43 @@ +using System; +using System.Threading.Tasks; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Options; +using Volo.Abp.EventBus.Distributed; +using Volo.Abp.Timing; +using Volo.Abp.Uow; + +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public class SqlRawDbContextEventInbox : DbContextEventInbox , ISqlRawDbContextEventInbox + where TDbContext : IHasEventInbox + { + public SqlRawDbContextEventInbox( + IDbContextProvider dbContextProvider, + IClock clock, + IOptions distributedEventsOptions) + : base(dbContextProvider, clock, distributedEventsOptions) + { + } + + [UnitOfWork] + public override async Task MarkAsProcessedAsync(Guid id) + { + var dbContext = await DbContextProvider.GetDbContextAsync(); + var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName(); + + var sql = $"UPDATE {tableName} SET Processed = '1', ProcessedTime = '{Clock.Now}' WHERE Id = '{id}'"; + await dbContext.Database.ExecuteSqlRawAsync(sql); + } + + [UnitOfWork] + public override async Task DeleteOldEventsAsync() + { + var dbContext = await DbContextProvider.GetDbContextAsync(); + var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName(); + var timeToKeepEvents = Clock.Now.Add(DistributedEventsOptions.InboxKeepEventTimeSpan); + + var sql = $"DELETE FROM {tableName} WHERE Processed = '1' AND CreationTime < '{timeToKeepEvents}'"; + await dbContext.Database.ExecuteSqlRawAsync(sql); + } + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqlRawDbContextEventOutbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqlRawDbContextEventOutbox.cs new file mode 100644 index 0000000000..6c890ef9f8 --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqlRawDbContextEventOutbox.cs @@ -0,0 +1,26 @@ +using System; +using System.Threading.Tasks; +using Microsoft.EntityFrameworkCore; +using Volo.Abp.Uow; + +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public class SqlRawDbContextEventOutbox : DbContextEventOutbox , ISqlRawDbContextEventOutbox + where TDbContext : IHasEventOutbox + { + public SqlRawDbContextEventOutbox(IDbContextProvider dbContextProvider) + : base(dbContextProvider) + { + } + + [UnitOfWork] + public override async Task DeleteAsync(Guid id) + { + var dbContext = (IHasEventOutbox) await DbContextProvider.GetDbContextAsync(); + var tableName = dbContext.OutgoingEvents.EntityType.GetSchemaQualifiedTableName(); + + var sql = $"DELETE FROM {tableName} WHERE Id = '{id}'"; + await dbContext.Database.ExecuteSqlRawAsync(sql); + } + } +} From 28da0b837c35184068644dd0b599da79f2d9a098 Mon Sep 17 00:00:00 2001 From: liangshiwei Date: Thu, 30 Sep 2021 00:35:31 +0800 Subject: [PATCH 5/7] Improve --- .../DistributedEvents/PostgreSqlInboxConfigExtensions.cs | 2 +- .../DistributedEvents/PostgreSqlOutboxConfigExtensions.cs | 2 +- .../DistributedEvents/SqlRawDbContextEventInbox.cs | 2 +- .../DistributedEvents/SqlRawDbContextEventOutbox.cs | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlInboxConfigExtensions.cs b/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlInboxConfigExtensions.cs index 55afc4794d..f4bb462a1c 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlInboxConfigExtensions.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlInboxConfigExtensions.cs @@ -4,7 +4,7 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents { public static class PostgreSqlInboxConfigExtensions { - public static void UsePostgreSql(this InboxConfig outboxConfig) + public static void UseNpgsql(this InboxConfig outboxConfig) where TDbContext : IHasEventInbox { outboxConfig.ImplementationType = typeof(IPostgreSqlDbContextEventInbox); diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlOutboxConfigExtensions.cs b/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlOutboxConfigExtensions.cs index 5a65d15c90..853ae9ba59 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlOutboxConfigExtensions.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlOutboxConfigExtensions.cs @@ -4,7 +4,7 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents { public static class PostgreSqlOutboxConfigExtensions { - public static void UsePostgreSql(this OutboxConfig outboxConfig) + public static void UseNpgsql(this OutboxConfig outboxConfig) where TDbContext : IHasEventOutbox { outboxConfig.ImplementationType = typeof(IPostgreSqlDbContextEventOutbox); diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqlRawDbContextEventInbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqlRawDbContextEventInbox.cs index 04bef7579d..ce7302755a 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqlRawDbContextEventInbox.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqlRawDbContextEventInbox.cs @@ -25,7 +25,7 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents var dbContext = await DbContextProvider.GetDbContextAsync(); var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName(); - var sql = $"UPDATE {tableName} SET Processed = '1', ProcessedTime = '{Clock.Now}' WHERE Id = '{id}'"; + var sql = $"UPDATE {tableName} SET Processed = '1', ProcessedTime = '{Clock.Now}' WHERE Id = '{id.ToString().ToUpper()}'"; await dbContext.Database.ExecuteSqlRawAsync(sql); } diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqlRawDbContextEventOutbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqlRawDbContextEventOutbox.cs index 6c890ef9f8..8748e6dc09 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqlRawDbContextEventOutbox.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqlRawDbContextEventOutbox.cs @@ -19,7 +19,7 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents var dbContext = (IHasEventOutbox) await DbContextProvider.GetDbContextAsync(); var tableName = dbContext.OutgoingEvents.EntityType.GetSchemaQualifiedTableName(); - var sql = $"DELETE FROM {tableName} WHERE Id = '{id}'"; + var sql = $"DELETE FROM {tableName} WHERE Id = '{id.ToString().ToUpper()}'"; await dbContext.Database.ExecuteSqlRawAsync(sql); } } From 3ee9f57c9dd50a6ecb77b121cb74808aaae17e47 Mon Sep 17 00:00:00 2001 From: liangshiwei Date: Thu, 30 Sep 2021 15:17:19 +0800 Subject: [PATCH 6/7] Improve --- .../Volo.Abp.Ddd.Domain/Volo.Abp.Ddd.Domain.csproj | 2 +- .../Volo/Abp/Domain/AbpDddDomainModule.cs | 3 ++- .../DistributedEvents/OracleDbContextEventInbox.cs | 6 ++++-- .../DistributedEvents/OracleDbContextEventInbox.cs | 7 ++++--- .../PostgreSqlDbContextEventInbox.cs | 7 ++++--- .../AbpEntityFrameworkCoreModule.cs | 2 -- .../DistributedEvents/DbContextEventInbox.cs | 9 +++++---- .../DistributedEvents/SqlRawDbContextEventInbox.cs | 8 ++++---- .../Abp/EventBus/Boxes/AbpEventBusBoxesOptions.cs | 11 ++++++++--- .../Volo/Abp/EventBus/Boxes/InboxProcessor.cs | 8 ++++---- .../Volo/Abp/EventBus/Boxes/OutboxSender.cs | 4 ++-- .../Distributed/AbpDistributedEventBusOptions.cs | 8 -------- .../Abp/MongoDB/DistributedEvents/IHasEventInbox.cs | 4 ++-- .../MongoDB/DistributedEvents/IHasEventOutbox.cs | 4 ++-- .../DistributedEvents/MongoDbContextEventInbox.cs | 9 +++++---- .../DistDemoApp.MongoDbRebus/TodoMongoDbContext.cs | 13 +++---------- 16 files changed, 50 insertions(+), 55 deletions(-) diff --git a/framework/src/Volo.Abp.Ddd.Domain/Volo.Abp.Ddd.Domain.csproj b/framework/src/Volo.Abp.Ddd.Domain/Volo.Abp.Ddd.Domain.csproj index 87879334af..68c90489ff 100644 --- a/framework/src/Volo.Abp.Ddd.Domain/Volo.Abp.Ddd.Domain.csproj +++ b/framework/src/Volo.Abp.Ddd.Domain/Volo.Abp.Ddd.Domain.csproj @@ -17,7 +17,7 @@ - + diff --git a/framework/src/Volo.Abp.Ddd.Domain/Volo/Abp/Domain/AbpDddDomainModule.cs b/framework/src/Volo.Abp.Ddd.Domain/Volo/Abp/Domain/AbpDddDomainModule.cs index 523a6ee4f2..38e2d9fc84 100644 --- a/framework/src/Volo.Abp.Ddd.Domain/Volo/Abp/Domain/AbpDddDomainModule.cs +++ b/framework/src/Volo.Abp.Ddd.Domain/Volo/Abp/Domain/AbpDddDomainModule.cs @@ -3,6 +3,7 @@ using Volo.Abp.Auditing; using Volo.Abp.Data; using Volo.Abp.Domain.Repositories; using Volo.Abp.EventBus; +using Volo.Abp.EventBus.Boxes; using Volo.Abp.ExceptionHandling; using Volo.Abp.Guids; using Volo.Abp.Modularity; @@ -18,7 +19,7 @@ namespace Volo.Abp.Domain [DependsOn( typeof(AbpAuditingModule), typeof(AbpDataModule), - typeof(AbpEventBusModule), + typeof(AbpEventBusBoxesModule), typeof(AbpGuidsModule), typeof(AbpMultiTenancyModule), typeof(AbpThreadingModule), diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventInbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventInbox.cs index d5f1b7afcc..ee73291e58 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventInbox.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventInbox.cs @@ -2,6 +2,7 @@ using System.Threading.Tasks; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Options; +using Volo.Abp.EventBus.Boxes; using Volo.Abp.EventBus.Distributed; using Volo.Abp.Timing; using Volo.Abp.Uow; @@ -14,7 +15,8 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents public OracleDbContextEventInbox( IDbContextProvider dbContextProvider, IClock clock, - IOptions distributedEventsOptions) : base(dbContextProvider, clock, distributedEventsOptions) + IOptions eventBusBoxesOptions) + : base(dbContextProvider, clock, eventBusBoxesOptions) { } @@ -33,7 +35,7 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents { var dbContext = await DbContextProvider.GetDbContextAsync(); var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName(); - var timeToKeepEvents = Clock.Now.Add(DistributedEventsOptions.InboxKeepEventTimeSpan); + var timeToKeepEvents = Clock.Now.Add(- EventBusBoxesOptions.WaitTimeToDeleteProcessedInboxEvents); var sql = $"DELETE FROM \"{tableName}\" WHERE \"Processed\" = '1' AND \"CreationTime\" < TO_DATE('{timeToKeepEvents}', 'yyyy-mm-dd hh24:mi:ss')"; await dbContext.Database.ExecuteSqlRawAsync(sql); diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventInbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventInbox.cs index d5f1b7afcc..6d6d04d267 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventInbox.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventInbox.cs @@ -2,7 +2,7 @@ using System.Threading.Tasks; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Options; -using Volo.Abp.EventBus.Distributed; +using Volo.Abp.EventBus.Boxes; using Volo.Abp.Timing; using Volo.Abp.Uow; @@ -14,7 +14,8 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents public OracleDbContextEventInbox( IDbContextProvider dbContextProvider, IClock clock, - IOptions distributedEventsOptions) : base(dbContextProvider, clock, distributedEventsOptions) + IOptions eventBusBoxesOptions) + : base(dbContextProvider, clock, eventBusBoxesOptions) { } @@ -33,7 +34,7 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents { var dbContext = await DbContextProvider.GetDbContextAsync(); var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName(); - var timeToKeepEvents = Clock.Now.Add(DistributedEventsOptions.InboxKeepEventTimeSpan); + var timeToKeepEvents = Clock.Now - EventBusBoxesOptions.WaitTimeToDeleteProcessedInboxEvents; var sql = $"DELETE FROM \"{tableName}\" WHERE \"Processed\" = '1' AND \"CreationTime\" < TO_DATE('{timeToKeepEvents}', 'yyyy-mm-dd hh24:mi:ss')"; await dbContext.Database.ExecuteSqlRawAsync(sql); diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlDbContextEventInbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlDbContextEventInbox.cs index 4560bdb2c4..fedff90dec 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlDbContextEventInbox.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlDbContextEventInbox.cs @@ -2,6 +2,7 @@ using System.Threading.Tasks; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Options; +using Volo.Abp.EventBus.Boxes; using Volo.Abp.EventBus.Distributed; using Volo.Abp.Timing; using Volo.Abp.Uow; @@ -14,8 +15,8 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents public PostgreSqlDbContextEventInbox( IDbContextProvider dbContextProvider, IClock clock, - IOptions distributedEventsOptions) - : base(dbContextProvider, clock, distributedEventsOptions) + IOptions eventBusBoxesOptions) + : base(dbContextProvider, clock, eventBusBoxesOptions) { } @@ -34,7 +35,7 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents { var dbContext = await DbContextProvider.GetDbContextAsync(); var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName(); - var timeToKeepEvents = Clock.Now.Add(DistributedEventsOptions.InboxKeepEventTimeSpan); + var timeToKeepEvents = Clock.Now - EventBusBoxesOptions.WaitTimeToDeleteProcessedInboxEvents; var sql = $"DELETE FROM \"{tableName}\" WHERE \"Processed\" = '1' AND \"CreationTime\" < '{timeToKeepEvents}'"; await dbContext.Database.ExecuteSqlRawAsync(sql); diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/AbpEntityFrameworkCoreModule.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/AbpEntityFrameworkCoreModule.cs index f7834c1835..96c245be2e 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/AbpEntityFrameworkCoreModule.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/AbpEntityFrameworkCoreModule.cs @@ -2,10 +2,8 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; using Volo.Abp.Domain; -using Volo.Abp.EntityFrameworkCore.DependencyInjection; using Volo.Abp.EntityFrameworkCore.DistributedEvents; using Volo.Abp.Modularity; -using Volo.Abp.Uow; using Volo.Abp.Uow.EntityFrameworkCore; namespace Volo.Abp.EntityFrameworkCore diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs index 982d335d17..242b601177 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs @@ -5,6 +5,7 @@ using System.Threading; using System.Threading.Tasks; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Options; +using Volo.Abp.EventBus.Boxes; using Volo.Abp.EventBus.Distributed; using Volo.Abp.Timing; using Volo.Abp.Uow; @@ -15,17 +16,17 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents where TDbContext : IHasEventInbox { protected IDbContextProvider DbContextProvider { get; } - protected AbpDistributedEventBusOptions DistributedEventsOptions { get; } + protected AbpEventBusBoxesOptions EventBusBoxesOptions { get; } protected IClock Clock { get; } public DbContextEventInbox( IDbContextProvider dbContextProvider, IClock clock, - IOptions distributedEventsOptions) + IOptions eventBusBoxesOptions) { DbContextProvider = dbContextProvider; Clock = clock; - DistributedEventsOptions = distributedEventsOptions.Value; + EventBusBoxesOptions = eventBusBoxesOptions.Value; } [UnitOfWork] @@ -78,7 +79,7 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents public virtual async Task DeleteOldEventsAsync() { var dbContext = await DbContextProvider.GetDbContextAsync(); - var timeToKeepEvents = Clock.Now.Add(DistributedEventsOptions.InboxKeepEventTimeSpan); + var timeToKeepEvents = Clock.Now - EventBusBoxesOptions.WaitTimeToDeleteProcessedInboxEvents; var oldEvents = await dbContext.IncomingEvents .Where(x => x.Processed && x.CreationTime < timeToKeepEvents) .ToListAsync(); diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqlRawDbContextEventInbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqlRawDbContextEventInbox.cs index ce7302755a..cb764ede17 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqlRawDbContextEventInbox.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqlRawDbContextEventInbox.cs @@ -2,7 +2,7 @@ using System.Threading.Tasks; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Options; -using Volo.Abp.EventBus.Distributed; +using Volo.Abp.EventBus.Boxes; using Volo.Abp.Timing; using Volo.Abp.Uow; @@ -14,8 +14,8 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents public SqlRawDbContextEventInbox( IDbContextProvider dbContextProvider, IClock clock, - IOptions distributedEventsOptions) - : base(dbContextProvider, clock, distributedEventsOptions) + IOptions eventBusBoxesOptions) + : base(dbContextProvider, clock, eventBusBoxesOptions) { } @@ -34,7 +34,7 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents { var dbContext = await DbContextProvider.GetDbContextAsync(); var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName(); - var timeToKeepEvents = Clock.Now.Add(DistributedEventsOptions.InboxKeepEventTimeSpan); + var timeToKeepEvents = Clock.Now - EventBusBoxesOptions.WaitTimeToDeleteProcessedInboxEvents; var sql = $"DELETE FROM {tableName} WHERE Processed = '1' AND CreationTime < '{timeToKeepEvents}'"; await dbContext.Database.ExecuteSqlRawAsync(sql); diff --git a/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/AbpEventBusBoxesOptions.cs b/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/AbpEventBusBoxesOptions.cs index da91d672d3..1d72666c42 100644 --- a/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/AbpEventBusBoxesOptions.cs +++ b/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/AbpEventBusBoxesOptions.cs @@ -26,10 +26,14 @@ namespace Volo.Abp.EventBus.Boxes public TimeSpan PeriodTimeSpan { get; set; } /// - /// Delay time of and /// Default: 15 seconds /// - public TimeSpan DelayTimeSpan { get; set; } + public TimeSpan DistributedLockWaitDuration { get; set; } + + /// + /// Default: 2 hours + /// + public TimeSpan WaitTimeToDeleteProcessedInboxEvents { get; set; } public AbpEventBusBoxesOptions() { @@ -37,7 +41,8 @@ namespace Volo.Abp.EventBus.Boxes InboxWaitingEventMaxCount = 1000; OutboxWaitingEventMaxCount = 1000; PeriodTimeSpan = TimeSpan.FromSeconds(2); - DelayTimeSpan = TimeSpan.FromSeconds(15); + DistributedLockWaitDuration = TimeSpan.FromSeconds(15); + WaitTimeToDeleteProcessedInboxEvents = TimeSpan.FromHours(2); } } } diff --git a/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/InboxProcessor.cs b/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/InboxProcessor.cs index 29cca3d624..e3c71771ad 100644 --- a/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/InboxProcessor.cs +++ b/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/InboxProcessor.cs @@ -49,7 +49,7 @@ namespace Volo.Abp.EventBus.Boxes UnitOfWorkManager = unitOfWorkManager; Clock = clock; EventBusBoxesOptions = eventBusBoxesOptions.Value; - Timer.Period = EventBusBoxesOptions.PeriodTimeSpan.Seconds; + Timer.Period = EventBusBoxesOptions.PeriodTimeSpan.Milliseconds; Timer.Elapsed += TimerOnElapsed; Logger = NullLogger.Instance; StoppingTokenSource = new CancellationTokenSource(); @@ -120,21 +120,21 @@ namespace Volo.Abp.EventBus.Boxes else { Logger.LogDebug("Could not obtain the distributed lock: " + DistributedLockName); - await TaskDelayHelper.DelayAsync(EventBusBoxesOptions.DelayTimeSpan.Milliseconds, StoppingToken); + await TaskDelayHelper.DelayAsync(EventBusBoxesOptions.DistributedLockWaitDuration.Milliseconds, StoppingToken); } } } protected virtual async Task DeleteOldEventsAsync() { - if (LastCleanTime != null && LastCleanTime > Clock.Now.Add(EventBusBoxesOptions.CleanOldEventTimeIntervalSpan)) + if (LastCleanTime != null && LastCleanTime + EventBusBoxesOptions.CleanOldEventTimeIntervalSpan > Clock.Now) { return; } await Inbox.DeleteOldEventsAsync(); - LastCleanTime = DateTime.Now; + LastCleanTime = Clock.Now; } } } diff --git a/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/OutboxSender.cs b/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/OutboxSender.cs index e8a05c60ba..32545227c3 100644 --- a/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/OutboxSender.cs +++ b/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/OutboxSender.cs @@ -39,7 +39,7 @@ namespace Volo.Abp.EventBus.Boxes DistributedEventBus = distributedEventBus; DistributedLockProvider = distributedLockProvider; EventBusBoxesOptions = eventBusBoxesOptions.Value; - Timer.Period = EventBusBoxesOptions.PeriodTimeSpan.Seconds; + Timer.Period = EventBusBoxesOptions.PeriodTimeSpan.Milliseconds; Timer.Elapsed += TimerOnElapsed; Logger = NullLogger.Instance; StoppingTokenSource = new CancellationTokenSource(); @@ -100,7 +100,7 @@ namespace Volo.Abp.EventBus.Boxes else { Logger.LogDebug("Could not obtain the distributed lock: " + DistributedLockName); - await TaskDelayHelper.DelayAsync(EventBusBoxesOptions.DelayTimeSpan.Milliseconds, StoppingToken); + await TaskDelayHelper.DelayAsync(EventBusBoxesOptions.DistributedLockWaitDuration.Milliseconds, StoppingToken); } } } diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/AbpDistributedEventBusOptions.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/AbpDistributedEventBusOptions.cs index 5b71542665..ab8fe6823b 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/AbpDistributedEventBusOptions.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/AbpDistributedEventBusOptions.cs @@ -1,4 +1,3 @@ -using System; using Volo.Abp.Collections; namespace Volo.Abp.EventBus.Distributed @@ -10,18 +9,11 @@ namespace Volo.Abp.EventBus.Distributed public OutboxConfigDictionary Outboxes { get; } public InboxConfigDictionary Inboxes { get; } - - /// - /// Default: -2 hours - /// - public TimeSpan InboxKeepEventTimeSpan { get; set; } - public AbpDistributedEventBusOptions() { Handlers = new TypeList(); Outboxes = new OutboxConfigDictionary(); Inboxes = new InboxConfigDictionary(); - InboxKeepEventTimeSpan = TimeSpan.FromHours(-2); } } } diff --git a/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/IHasEventInbox.cs b/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/IHasEventInbox.cs index 387e860bce..792a85e124 100644 --- a/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/IHasEventInbox.cs +++ b/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/IHasEventInbox.cs @@ -4,6 +4,6 @@ namespace Volo.Abp.MongoDB.DistributedEvents { public interface IHasEventInbox : IAbpMongoDbContext { - IMongoCollection IncomingEvents { get; set; } + IMongoCollection IncomingEvents { get; } } -} \ No newline at end of file +} diff --git a/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/IHasEventOutbox.cs b/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/IHasEventOutbox.cs index cf57aaa699..ab4bc10f35 100644 --- a/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/IHasEventOutbox.cs +++ b/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/IHasEventOutbox.cs @@ -4,6 +4,6 @@ namespace Volo.Abp.MongoDB.DistributedEvents { public interface IHasEventOutbox : IAbpMongoDbContext { - IMongoCollection OutgoingEvents { get; set; } + IMongoCollection OutgoingEvents { get; } } -} \ No newline at end of file +} diff --git a/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventInbox.cs b/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventInbox.cs index 62927d2279..3cf5485512 100644 --- a/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventInbox.cs +++ b/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventInbox.cs @@ -6,6 +6,7 @@ using System.Threading.Tasks; using Microsoft.Extensions.Options; using MongoDB.Driver; using MongoDB.Driver.Linq; +using Volo.Abp.EventBus.Boxes; using Volo.Abp.EventBus.Distributed; using Volo.Abp.Timing; using Volo.Abp.Uow; @@ -16,17 +17,17 @@ namespace Volo.Abp.MongoDB.DistributedEvents where TMongoDbContext : IHasEventInbox { protected IMongoDbContextProvider DbContextProvider { get; } - protected AbpDistributedEventBusOptions DistributedEventsOptions { get; } + protected AbpEventBusBoxesOptions EventBusBoxesOptions { get; } protected IClock Clock { get; } public MongoDbContextEventInbox( IMongoDbContextProvider dbContextProvider, IClock clock, - IOptions distributedEventsOptions) + IOptions eventBusBoxesOptions) { DbContextProvider = dbContextProvider; Clock = clock; - DistributedEventsOptions = distributedEventsOptions.Value; + EventBusBoxesOptions = eventBusBoxesOptions.Value; } @@ -96,7 +97,7 @@ namespace Volo.Abp.MongoDB.DistributedEvents public virtual async Task DeleteOldEventsAsync() { var dbContext = await DbContextProvider.GetDbContextAsync(); - var timeToKeepEvents = Clock.Now.Add(DistributedEventsOptions.InboxKeepEventTimeSpan); + var timeToKeepEvents = Clock.Now - EventBusBoxesOptions.WaitTimeToDeleteProcessedInboxEvents; if (dbContext.SessionHandle != null) { diff --git a/test/DistEvents/DistDemoApp.MongoDbRebus/TodoMongoDbContext.cs b/test/DistEvents/DistDemoApp.MongoDbRebus/TodoMongoDbContext.cs index a7f1b78f86..95370bb4d2 100644 --- a/test/DistEvents/DistDemoApp.MongoDbRebus/TodoMongoDbContext.cs +++ b/test/DistEvents/DistDemoApp.MongoDbRebus/TodoMongoDbContext.cs @@ -11,16 +11,9 @@ namespace DistDemoApp public IMongoCollection TodoItems => Collection(); public IMongoCollection TodoSummaries => Collection(); - public IMongoCollection OutgoingEvents - { - get => Collection(); - set {} - } - public IMongoCollection IncomingEvents - { - get => Collection(); - set {} - } + public IMongoCollection OutgoingEvents => Collection(); + + public IMongoCollection IncomingEvents => Collection(); } } From 5291b97738c8e972d1491641e431eab4ba6dfdef Mon Sep 17 00:00:00 2001 From: liangshiwei Date: Thu, 30 Sep 2021 15:20:50 +0800 Subject: [PATCH 7/7] Update OracleDbContextEventInbox.cs --- .../DistributedEvents/OracleDbContextEventInbox.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventInbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventInbox.cs index ee73291e58..c19ca6746a 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventInbox.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventInbox.cs @@ -35,7 +35,7 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents { var dbContext = await DbContextProvider.GetDbContextAsync(); var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName(); - var timeToKeepEvents = Clock.Now.Add(- EventBusBoxesOptions.WaitTimeToDeleteProcessedInboxEvents); + var timeToKeepEvents = Clock.Now - EventBusBoxesOptions.WaitTimeToDeleteProcessedInboxEvents; var sql = $"DELETE FROM \"{tableName}\" WHERE \"Processed\" = '1' AND \"CreationTime\" < TO_DATE('{timeToKeepEvents}', 'yyyy-mm-dd hh24:mi:ss')"; await dbContext.Database.ExecuteSqlRawAsync(sql);