Browse Source

Merge pull request #11125 from abpframework/LocalDistributedEventBus

Publish local events when uow completed.
pull/11126/head
Halil İbrahim Kalkan 4 years ago
committed by GitHub
parent
commit
fb152b4d4a
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 56
      framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus.cs
  2. 72
      framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus_Test.cs

56
framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus.cs

@ -6,6 +6,7 @@ using Microsoft.Extensions.Options;
using Volo.Abp.Collections;
using Volo.Abp.DependencyInjection;
using Volo.Abp.EventBus.Local;
using Volo.Abp.Uow;
namespace Volo.Abp.EventBus.Distributed
{
@ -15,16 +16,19 @@ namespace Volo.Abp.EventBus.Distributed
{
private readonly ILocalEventBus _localEventBus;
protected IUnitOfWorkManager UnitOfWorkManager { get; }
protected IServiceScopeFactory ServiceScopeFactory { get; }
protected AbpDistributedEventBusOptions AbpDistributedEventBusOptions { get; }
public LocalDistributedEventBus(
ILocalEventBus localEventBus,
IUnitOfWorkManager unitOfWorkManager,
IServiceScopeFactory serviceScopeFactory,
IOptions<AbpDistributedEventBusOptions> distributedEventBusOptions)
{
_localEventBus = localEventBus;
UnitOfWorkManager = unitOfWorkManager;
ServiceScopeFactory = serviceScopeFactory;
AbpDistributedEventBusOptions = distributedEventBusOptions.Value;
Subscribe(distributedEventBusOptions.Value.Handlers);
@ -122,25 +126,57 @@ namespace Volo.Abp.EventBus.Distributed
_localEventBus.UnsubscribeAll(eventType);
}
public Task PublishAsync<TEvent>(TEvent eventData, bool onUnitOfWorkComplete = true)
public async Task PublishAsync<TEvent>(TEvent eventData, bool onUnitOfWorkComplete = true)
where TEvent : class
{
return _localEventBus.PublishAsync(eventData, onUnitOfWorkComplete);
await PublishAsync(typeof(TEvent), eventData, onUnitOfWorkComplete);
}
public Task PublishAsync(Type eventType, object eventData, bool onUnitOfWorkComplete = true)
public async Task PublishAsync(Type eventType, object eventData, bool onUnitOfWorkComplete = true)
{
return _localEventBus.PublishAsync(eventType, eventData, onUnitOfWorkComplete);
if (onUnitOfWorkComplete && UnitOfWorkManager.Current != null)
{
AddToUnitOfWork(
UnitOfWorkManager.Current,
new UnitOfWorkEventRecord(eventType, eventData, EventOrderGenerator.GetNext())
);
return;
}
await _localEventBus.PublishAsync(eventType, eventData, onUnitOfWorkComplete: false);
}
public async Task PublishAsync<TEvent>(TEvent eventData, bool onUnitOfWorkComplete = true, bool useOutbox = true)
where TEvent : class
{
await PublishAsync(typeof(TEvent), eventData, onUnitOfWorkComplete, useOutbox);
}
public Task PublishAsync<TEvent>(TEvent eventData, bool onUnitOfWorkComplete = true, bool useOutbox = true) where TEvent : class
public async Task PublishAsync(Type eventType, object eventData, bool onUnitOfWorkComplete = true, bool useOutbox = true)
{
return _localEventBus.PublishAsync(eventData, onUnitOfWorkComplete);
if (onUnitOfWorkComplete && UnitOfWorkManager.Current != null)
{
AddToUnitOfWork(
UnitOfWorkManager.Current,
new UnitOfWorkEventRecord(eventType, eventData, EventOrderGenerator.GetNext(), useOutbox)
);
return;
}
if (useOutbox && UnitOfWorkManager.Current != null)
{
UnitOfWorkManager.Current.OnCompleted(async() => {
await _localEventBus.PublishAsync(eventType, eventData, onUnitOfWorkComplete: false);
});
return;
}
await _localEventBus.PublishAsync(eventType, eventData, onUnitOfWorkComplete: false);
}
public Task PublishAsync(Type eventType, object eventData, bool onUnitOfWorkComplete = true, bool useOutbox = true)
protected virtual void AddToUnitOfWork(IUnitOfWork unitOfWork, UnitOfWorkEventRecord eventRecord)
{
return _localEventBus.PublishAsync(eventType, eventData, onUnitOfWorkComplete);
unitOfWork.AddOrReplaceDistributedEvent(eventRecord);
}
}
}
}

72
framework/test/Volo.Abp.EventBus.Tests/Volo/Abp/EventBus/Distributed/LocalDistributedEventBus_Test.cs

@ -1,7 +1,9 @@
using System;
using System.Threading.Tasks;
using Shouldly;
using Volo.Abp.Domain.Entities.Events.Distributed;
using Volo.Abp.MultiTenancy;
using Volo.Abp.Uow;
using Xunit;
namespace Volo.Abp.EventBus.Distributed
@ -44,12 +46,12 @@ namespace Volo.Abp.EventBus.Distributed
Assert.Equal(tenantId, MySimpleDistributedSingleInstanceEventHandler.TenantId);
}
[Fact]
public async Task Should_Get_TenantId_From_EventEto_Extra_Property()
{
var tenantId = Guid.NewGuid();
DistributedEventBus.Subscribe<MySimpleEto>(GetRequiredService<MySimpleDistributedSingleInstanceEventHandler>());
await DistributedEventBus.PublishAsync(new MySimpleEto
@ -59,8 +61,72 @@ namespace Volo.Abp.EventBus.Distributed
{"TenantId", tenantId.ToString()}
}
});
Assert.Equal(tenantId, MySimpleDistributedSingleInstanceEventHandler.TenantId);
}
[Fact]
public async Task Event_Should_Published_On_UnitOfWorkComplete()
{
var id = 0;
DistributedEventBus.Subscribe<MySimpleEventData>(data =>
{
id = data.Value;
return Task.CompletedTask;
});
var unitOfWorkManager = GetRequiredService<IUnitOfWorkManager>();
using (var uow = unitOfWorkManager.Begin())
{
await DistributedEventBus.PublishAsync(new MySimpleEventData(3), onUnitOfWorkComplete: true, useOutbox: false);
}
id.ShouldBe(0);
using (var uow = unitOfWorkManager.Begin())
{
await DistributedEventBus.PublishAsync(new MySimpleEventData(3), onUnitOfWorkComplete: true, useOutbox: false);
await uow.CompleteAsync();
}
id.ShouldBe(3);
id = 0;
using (var uow = unitOfWorkManager.Begin())
{
await DistributedEventBus.PublishAsync(new MySimpleEventData(3), onUnitOfWorkComplete: false, useOutbox: false);
}
id.ShouldBe(3);
}
[Fact]
public async Task Event_Should_Published_On_UnitOfWorkComplete_UseOutbox()
{
var id = 0;
DistributedEventBus.Subscribe<MySimpleEventData>(data =>
{
id = data.Value;
return Task.CompletedTask;
});
var unitOfWorkManager = GetRequiredService<IUnitOfWorkManager>();
using (var uow = unitOfWorkManager.Begin())
{
await DistributedEventBus.PublishAsync(new MySimpleEventData(3), onUnitOfWorkComplete: false, useOutbox: true);
}
id.ShouldBe(0);
using (var uow = unitOfWorkManager.Begin())
{
await DistributedEventBus.PublishAsync(new MySimpleEventData(3), onUnitOfWorkComplete: false, useOutbox: true);
await uow.CompleteAsync();
}
id.ShouldBe(3);
id = 0;
using (var uow = unitOfWorkManager.Begin())
{
await DistributedEventBus.PublishAsync(new MySimpleEventData(3), onUnitOfWorkComplete: false, useOutbox: false);
}
id.ShouldBe(3);
}
}
}

Loading…
Cancel
Save