Browse Source

Introduced AddLocalEvent and AddDistributedEvent methods on IUnitOfWork

pull/9909/head
Halil İbrahim Kalkan 5 years ago
parent
commit
8217bf59f6
  1. 10
      framework/src/Volo.Abp.Uow/Volo/Abp/Uow/ChildUnitOfWork.cs
  2. 4
      framework/src/Volo.Abp.Uow/Volo/Abp/Uow/IUnitOfWork.cs
  3. 11
      framework/src/Volo.Abp.Uow/Volo/Abp/Uow/IUnitOfWorkEventPublisher.cs
  4. 19
      framework/src/Volo.Abp.Uow/Volo/Abp/Uow/NullUnitOfWorkEventPublisher.cs
  5. 63
      framework/src/Volo.Abp.Uow/Volo/Abp/Uow/UnitOfWork.cs
  6. 10
      framework/test/Volo.Abp.AspNetCore.Mvc.Tests/Volo/Abp/AspNetCore/Mvc/Uow/TestUnitOfWork.cs
  7. 4
      test/DistEvents/DistDemoApp/TodoEventHandler.cs

10
framework/src/Volo.Abp.Uow/Volo/Abp/Uow/ChildUnitOfWork.cs

@ -76,6 +76,16 @@ namespace Volo.Abp.Uow
_parent.OnCompleted(handler);
}
public void AddLocalEvent(object eventData)
{
_parent.AddLocalEvent(eventData);
}
public void AddDistributedEvent(object eventData)
{
_parent.AddDistributedEvent(eventData);
}
public IDatabaseApi FindDatabaseApi(string key)
{
return _parent.FindDatabaseApi(key);

4
framework/src/Volo.Abp.Uow/Volo/Abp/Uow/IUnitOfWork.cs

@ -42,5 +42,9 @@ namespace Volo.Abp.Uow
Task RollbackAsync(CancellationToken cancellationToken = default);
void OnCompleted(Func<Task> handler);
void AddLocalEvent(object eventData);
void AddDistributedEvent(object eventData);
}
}

11
framework/src/Volo.Abp.Uow/Volo/Abp/Uow/IUnitOfWorkEventPublisher.cs

@ -0,0 +1,11 @@
using System.Collections.Generic;
using System.Threading.Tasks;
namespace Volo.Abp.Uow
{
public interface IUnitOfWorkEventPublisher
{
Task PublishLocalEventsAsync(IEnumerable<object> localEvents);
Task PublishDistributedEventsAsync(IEnumerable<object> distributedEvents);
}
}

19
framework/src/Volo.Abp.Uow/Volo/Abp/Uow/NullUnitOfWorkEventPublisher.cs

@ -0,0 +1,19 @@
using System.Collections.Generic;
using System.Threading.Tasks;
using Volo.Abp.DependencyInjection;
namespace Volo.Abp.Uow
{
public class NullUnitOfWorkEventPublisher : IUnitOfWorkEventPublisher, ISingletonDependency
{
public Task PublishLocalEventsAsync(IEnumerable<object> localEvents)
{
return Task.CompletedTask;
}
public Task PublishDistributedEventsAsync(IEnumerable<object> distributedEvents)
{
return Task.CompletedTask;
}
}
}

63
framework/src/Volo.Abp.Uow/Volo/Abp/Uow/UnitOfWork.cs

@ -1,6 +1,8 @@
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Collections.ObjectModel;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using JetBrains.Annotations;
@ -33,11 +35,14 @@ namespace Volo.Abp.Uow
public string ReservationName { get; set; }
protected List<Func<Task>> CompletedHandlers { get; } = new List<Func<Task>>();
protected ICollection<object> DistributedEvents { get; } = new Collection<object>();
protected ICollection<object> LocalEvents { get; } = new Collection<object>();
public event EventHandler<UnitOfWorkFailedEventArgs> Failed;
public event EventHandler<UnitOfWorkEventArgs> Disposed;
public IServiceProvider ServiceProvider { get; }
protected IUnitOfWorkEventPublisher UnitOfWorkEventPublisher { get; }
[NotNull]
public Dictionary<string, object> Items { get; }
@ -50,9 +55,13 @@ namespace Volo.Abp.Uow
private bool _isCompleting;
private bool _isRolledback;
public UnitOfWork(IServiceProvider serviceProvider, IOptions<AbpUnitOfWorkDefaultOptions> options)
public UnitOfWork(
IServiceProvider serviceProvider,
IUnitOfWorkEventPublisher unitOfWorkEventPublisher,
IOptions<AbpUnitOfWorkDefaultOptions> options)
{
ServiceProvider = serviceProvider;
UnitOfWorkEventPublisher = unitOfWorkEventPublisher;
_defaultOptions = options.Value;
_databaseApis = new Dictionary<string, IDatabaseApi>();
@ -103,12 +112,12 @@ namespace Volo.Abp.Uow
}
}
public IReadOnlyList<IDatabaseApi> GetAllActiveDatabaseApis()
public virtual IReadOnlyList<IDatabaseApi> GetAllActiveDatabaseApis()
{
return _databaseApis.Values.ToImmutableList();
}
public IReadOnlyList<ITransactionApi> GetAllActiveTransactionApis()
public virtual IReadOnlyList<ITransactionApi> GetAllActiveTransactionApis()
{
return _transactionApis.Values.ToImmutableList();
}
@ -126,6 +135,30 @@ namespace Volo.Abp.Uow
{
_isCompleting = true;
await SaveChangesAsync(cancellationToken);
while (LocalEvents.Any() || DistributedEvents.Any())
{
if (LocalEvents.Any())
{
var localEventsToBePublished = LocalEvents.ToArray();
LocalEvents.Clear();
await UnitOfWorkEventPublisher.PublishLocalEventsAsync(
localEventsToBePublished
);
}
if (DistributedEvents.Any())
{
var distributedEventsToBePublished = DistributedEvents.ToArray();
DistributedEvents.Clear();
await UnitOfWorkEventPublisher.PublishDistributedEventsAsync(
distributedEventsToBePublished
);
}
await SaveChangesAsync(cancellationToken);
}
await CommitTransactionsAsync();
IsCompleted = true;
await OnCompletedAsync();
@ -149,12 +182,12 @@ namespace Volo.Abp.Uow
await RollbackAllAsync(cancellationToken);
}
public IDatabaseApi FindDatabaseApi(string key)
public virtual IDatabaseApi FindDatabaseApi(string key)
{
return _databaseApis.GetOrDefault(key);
}
public void AddDatabaseApi(string key, IDatabaseApi api)
public virtual void AddDatabaseApi(string key, IDatabaseApi api)
{
Check.NotNull(key, nameof(key));
Check.NotNull(api, nameof(api));
@ -167,7 +200,7 @@ namespace Volo.Abp.Uow
_databaseApis.Add(key, api);
}
public IDatabaseApi GetOrAddDatabaseApi(string key, Func<IDatabaseApi> factory)
public virtual IDatabaseApi GetOrAddDatabaseApi(string key, Func<IDatabaseApi> factory)
{
Check.NotNull(key, nameof(key));
Check.NotNull(factory, nameof(factory));
@ -175,14 +208,14 @@ namespace Volo.Abp.Uow
return _databaseApis.GetOrAdd(key, factory);
}
public ITransactionApi FindTransactionApi(string key)
public virtual ITransactionApi FindTransactionApi(string key)
{
Check.NotNull(key, nameof(key));
return _transactionApis.GetOrDefault(key);
}
public void AddTransactionApi(string key, ITransactionApi api)
public virtual void AddTransactionApi(string key, ITransactionApi api)
{
Check.NotNull(key, nameof(key));
Check.NotNull(api, nameof(api));
@ -195,7 +228,7 @@ namespace Volo.Abp.Uow
_transactionApis.Add(key, api);
}
public ITransactionApi GetOrAddTransactionApi(string key, Func<ITransactionApi> factory)
public virtual ITransactionApi GetOrAddTransactionApi(string key, Func<ITransactionApi> factory)
{
Check.NotNull(key, nameof(key));
Check.NotNull(factory, nameof(factory));
@ -203,11 +236,21 @@ namespace Volo.Abp.Uow
return _transactionApis.GetOrAdd(key, factory);
}
public void OnCompleted(Func<Task> handler)
public virtual void OnCompleted(Func<Task> handler)
{
CompletedHandlers.Add(handler);
}
public virtual void AddLocalEvent(object eventData)
{
LocalEvents.Add(eventData);
}
public virtual void AddDistributedEvent(object eventData)
{
DistributedEvents.Add(eventData);
}
protected virtual async Task OnCompletedAsync()
{
foreach (var handler in CompletedHandlers)

10
framework/test/Volo.Abp.AspNetCore.Mvc.Tests/Volo/Abp/AspNetCore/Mvc/Uow/TestUnitOfWork.cs

@ -13,8 +13,14 @@ namespace Volo.Abp.AspNetCore.Mvc.Uow
{
private readonly TestUnitOfWorkConfig _config;
public TestUnitOfWork(IServiceProvider serviceProvider, IOptions<AbpUnitOfWorkDefaultOptions> options, TestUnitOfWorkConfig config)
: base(serviceProvider, options)
public TestUnitOfWork(
IServiceProvider serviceProvider,
IUnitOfWorkEventPublisher unitOfWorkEventPublisher,
IOptions<AbpUnitOfWorkDefaultOptions> options, TestUnitOfWorkConfig config)
: base(
serviceProvider,
unitOfWorkEventPublisher,
options)
{
_config = config;
}

4
test/DistEvents/DistDemoApp/TodoEventHandler.cs

@ -39,8 +39,10 @@ namespace DistDemoApp
todoSummary.Increase();
await _todoSummaryRepository.UpdateAsync(todoSummary);
}
Console.WriteLine("Increased total count: " + todoSummary);
throw new ApplicationException("Thrown to rollback the UOW!");
}
public async Task HandleEventAsync(EntityDeletedEto<TodoItemEto> eventData)

Loading…
Cancel
Save