Browse Source

Merge pull request #15134 from abpframework/EntitySynchronizer-revisions

Entity synchronizer revisions
pull/15139/head
Halil İbrahim Kalkan 3 years ago
committed by GitHub
parent
commit
a7c9157777
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 123
      docs/en/Distributed-Event-Bus.md
  2. 13
      docs/en/Entities.md
  3. 5
      framework/src/Volo.Abp.Ddd.Domain/Volo/Abp/Domain/Entities/Events/Distributed/EntityEto.cs
  4. 105
      framework/src/Volo.Abp.Ddd.Domain/Volo/Abp/Domain/Entities/Events/Distributed/EntitySynchronizer.cs
  5. 9
      framework/src/Volo.Abp.Ddd.Domain/Volo/Abp/Domain/Entities/Events/Distributed/IEntityEto.cs
  6. 18
      framework/test/Volo.Abp.Ddd.Tests/Volo/Abp/Domain/Entities/Events/Distributed/EntitySynchronizers/EntitySynchronizer_Tests.cs
  7. 5
      framework/test/Volo.Abp.Ddd.Tests/Volo/Abp/Domain/Entities/Events/Distributed/EntitySynchronizers/WithEntityVersion/RemoteBookEto.cs
  8. 6
      framework/test/Volo.Abp.Ddd.Tests/Volo/Abp/Domain/Entities/Events/Distributed/EntitySynchronizers/WithoutEntityVersion/RemoteAuthorEto.cs

123
docs/en/Distributed-Event-Bus.md

@ -296,6 +296,74 @@ This example;
> Distributed event system use the [object to object mapping](Object-To-Object-Mapping.md) system to map `Product` objects to `ProductEto` objects. So, you need to configure the object mapping (`Product` -> `ProductEto`) too. You can check the [object to object mapping document](Object-To-Object-Mapping.md) to learn how to do it.
## Entity Synchronizer
In a distributed (or microservice) system, it is typical to subscribe to change events for an [entity](Entities.md) type of another service, so you can get notifications when the subscribed entity changes. In that case, you can use ABP's Pre-Defined Events as explained in the previous section.
If your purpose is to store your local copies of a remote entity, you typically subscribe to create, update and delete events of the remote entity and update your local database in your event handler. ABP provides a pre-built `EntitySynchronizer` base class to make that operation easier for you.
Assume that there is a `Product` entity (probably an aggregate root entity) in a Catalog microservice, and you want to keep copies of products in your Ordering microservice, with a local `OrderProduct` entity. In practice, properties of the `OrderProduct` class will be a subset of the `Product` properties, because not all product data is needed in the Ordering microservice (however, you can make a full copy if you need). Also, the `OrderProduct` entity may have additional properties that are populated and used in the Ordering microservice.
The first step to establish the synchronization is to define an ETO (Event Transfer Object) class in the Catalog microservice that is used to transfer the event data. Assuming the `Product` entity has a `Guid` key, your ETO can be as shown below:
````
[EventName("product")]
public class ProductEto : EntityEto<Guid>
{
// Your Product properties here...
}
````
`ProductEto` can be put a shared project (DLL) that is referenced by the Catalog and the Ordering microservices. Alternatively, you can put a copy of the `ProductEto` class in the Ordering microservice if you don't want to introduce a common project dependency between the services. In this case, the `EventName` attribute becomes critical to map the `ProductEto` classes across two services (you should use the same event name).
Once you define an ETO class, you should configure the ABP Framework to publish auto (create, update and delete) events for the `Product` entity, as explained in the previous section:
````csharp
Configure<AbpDistributedEntityEventOptions>(options =>
{
options.AutoEventSelectors.Add<Product>();
options.EtoMappings.Add<Product, ProductEto>();
});
````
Finally, you should create class in the Ordering microservice, that is derived from the `EntitySynchronizer` class:
````csharp
public class ProductSynchronizer : EntitySynchronizer<OrderProduct, Guid, ProductEto>
{
public ProductSynchronizer(
IObjectMapper objectMapper,
IRepository<OrderProduct, Guid> repository
) : base(objectMapper, repository)
{
}
}
````
The main point of this class is it subscribes to the create, update and delete events of the source entity and updates the local entity in the database. It uses the [Object Mapper](Object-To-Object-Mapping.md) system to create or update the `OrderProduct` objects from `ProductEto` objects. So, you should also configure the object mapper to make it properly works. Otherwise, you should manually perform the object mapping by overriding the `MapToEntityAsync(TSourceEntityEto)` and `MapToEntityAsync(TSourceEntityEto,TEntity)` methods in your `ProductSynchronizer` class.
If your entity has a composite primary key (see the [Entities document](Entities.md)), then you should inherit from the `EntitySynchronizer<TEntity, TSourceEntityEto>` class (just don't use the `Guid` generic argument in the previous example) and implement the `FindLocalEntityAsync` to find the entity in your local database using the `Repository`.
`EntitySynchronizer` is compatible with the *Entity Versioning* system (see the [Entities document](Entities.md)). So, it works as expected even if the events are received as disordered. If the entity's version in your local database is newer than the entity in the received event, then the event is ignored. You should implement the `IHasEntityVersion` interface for the entity and ETO classes (for this example, you should implement for `Product`, `ProductEto` and `OrderProduct` classes).
If you want to ignore some type of change events, you can set `IgnoreEntityCreatedEvent`, `IgnoreEntityUpdatedEvent` and `IgnoreEntityDeletedEvent` in the constructor of your class. Example:
````csharp
public class ProductSynchronizer
: EntitySynchronizer<OrderProduct, Guid, ProductEto>
{
public ProductSynchronizer(
IObjectMapper objectMapper,
IRepository<OrderProduct, Guid> repository
) : base(objectMapper, repository)
{
IgnoreEntityDeletedEvent = true;
}
}
````
> Notice that the `EntitySynchronizer` can only create/update the entities after you use it. If you have an existing system with existing data, you should manually copy the data for one time, because the `EntitySynchronizer` starts to work.
## Transaction and Exception Handling
Distributed event bus works in-process (since default implementation is `LocalDistributedEventBus`) unless you configure an actual provider (e.g. [Kafka](Distributed-Event-Bus-Kafka-Integration.md) or [RabbitMQ](Distributed-Event-Bus-RabbitMQ-Integration.md)). In-process event bus always executes event handlers in the same [unit of work](Unit-Of-Work.md) scope that you publishes the events in. That means, if an event handler throws an exception, then the related unit of work (the database transaction) is rolled back. In this way, your application logic and event handling logic becomes transactional (atomic) and consistent. If you want to ignore errors in an event handler, you must use a `try-catch` block in your handler and shouldn't re-throw the exception.
@ -522,61 +590,6 @@ Configure<AbpDistributedEventBusOptions>(options =>
});
````
## Entity Synchronizer
Todo: introduction.
### Create a Synchronizer Class
Todo.
```csharp
public class BlogUserSynchronizer : EntitySynchronizer<BlogUser, Guid, UserEto>, ITransientDependency
{
public BlogUserSynchronizer(IObjectMapper objectMapper, IRepository<BlogUser, Guid> repository) :
base(objectMapper, repository)
{
}
}
```
### Advanced Usages
We may want to skip synchronizing the entity data on the external entity created, updated, or deleted. The `EntitySynchronizer` has three bool properties to control the handling behaviors.
```csharp
public class BlogUserSynchronizer : EntitySynchronizer<BlogUser, Guid, UserEto>, ITransientDependency
{
protected override bool IgnoreEntityCreatedEvent => true;
protected override bool IgnoreEntityUpdatedEvent => true;
protected override bool IgnoreEntityDeletedEvent => true;
// ctor ...
}
```
### Eventual Consistency Guarantee
Developers should always handle the distributed events disordering. ABP framework has an `EntityVersion` audit property to avoid an old version of entity data overriding a new one.
The only thing we need to do is make the entity class and the ETO class implement the `IHasEntityVersion` interface.
```csharp
public class User : Entity<Guid>, IHasEntityVersion
{
public int EntityVersion { get; set; }
}
public class UserEto : EntityEto, IHasEntityVersion
{
public int EntityVersion { get; set; }
}
```
After that, the entity synchronizer will know the entity version number and skip handling the stale events.
> See the community post [Notice and Solve ABP Distributed Events Disordering](https://community.abp.io/posts/notice-and-solve-abp-distributed-events-disordering-yi9vq3p4) for more if you are interested or worried.
## See Also
* [Local Event Bus](Local-Event-Bus.md)

13
docs/en/Entities.md

@ -324,6 +324,19 @@ It's designed as read-only and automatically invalidates a cached entity if the
> See the [Entity Cache](Entity-Cache.md) documentation for more information.
## Versioning Entities
ABP defines the `IHasEntityVersion` interface for automatic versioning of your entities. It only provides a single `EntityVersion` properties, as shown in the following code block:
````csharp
public interface IHasEntityVersion
{
int EntityVersion { get; }
}
````
If you implement the `IHasEntityVersion` interface, ABP automatically increases the `EntityVersion` value whenever you update your entity. The initial value will be `0`, when you first create the entity and save to the database.
## Extra Properties
ABP defines the `IHasExtraProperties` interface that can be implemented by an entity to be able to dynamically set and get properties for the entity. `AggregateRoot` base class already implements the `IHasExtraProperties` interface. If you've derived from this class (or one of the related audit class defined above), you can directly use the API.

5
framework/src/Volo.Abp.Ddd.Domain/Volo/Abp/Domain/Entities/Events/Distributed/EntityEto.cs

@ -20,3 +20,8 @@ public class EntityEto : EtoBase
KeysAsString = keysAsString;
}
}
public abstract class EntityEto<TKey> : IEntityEto<TKey>
{
public TKey Id { get; set; }
}

105
framework/src/Volo.Abp.Ddd.Domain/Volo/Abp/Domain/Entities/Events/Distributed/EntitySynchronizer.cs

@ -1,9 +1,7 @@
using System;
using System.ComponentModel;
using System.Globalization;
using System.Threading.Tasks;
using System.Threading.Tasks;
using JetBrains.Annotations;
using Volo.Abp.Auditing;
using Volo.Abp.DependencyInjection;
using Volo.Abp.Domain.Repositories;
using Volo.Abp.EventBus.Distributed;
using Volo.Abp.ObjectMapping;
@ -11,62 +9,48 @@ using Volo.Abp.Uow;
namespace Volo.Abp.Domain.Entities.Events.Distributed;
public abstract class EntitySynchronizer<TEntity, TKey, TExternalEntityEto> :
EntitySynchronizer<TEntity, TExternalEntityEto>
public abstract class EntitySynchronizer<TEntity, TKey, TSourceEntityEto> :
EntitySynchronizer<TEntity, TSourceEntityEto>
where TEntity : class, IEntity<TKey>
where TExternalEntityEto : EntityEto
where TSourceEntityEto : IEntityEto<TKey>
{
private readonly IRepository<TEntity, TKey> _repository;
protected new IRepository<TEntity, TKey> Repository { get; }
protected EntitySynchronizer(IObjectMapper objectMapper, IRepository<TEntity, TKey> repository) :
base(objectMapper, repository)
{
_repository = repository;
Repository = repository;
}
protected override Task<TEntity> FindLocalEntityAsync(TExternalEntityEto eto)
protected override Task<TEntity> FindLocalEntityAsync(TSourceEntityEto eto)
{
return _repository.FindAsync(GetExternalEntityId(eto));
}
protected virtual TKey GetExternalEntityId(TExternalEntityEto eto)
{
var keyType = typeof(TKey);
var keyValue = Check.NotNullOrEmpty(eto.KeysAsString, nameof(eto.KeysAsString));
if (keyType == typeof(Guid))
{
return (TKey)TypeDescriptor.GetConverter(keyType).ConvertFromInvariantString(keyValue);
}
return (TKey)Convert.ChangeType(keyValue, keyType, CultureInfo.InvariantCulture);
return Repository.FindAsync(eto.Id);
}
}
public abstract class EntitySynchronizer<TEntity, TExternalEntityEto> :
IDistributedEventHandler<EntityCreatedEto<TExternalEntityEto>>,
IDistributedEventHandler<EntityUpdatedEto<TExternalEntityEto>>,
IDistributedEventHandler<EntityDeletedEto<TExternalEntityEto>>,
IUnitOfWorkEnabled
public abstract class EntitySynchronizer<TEntity, TSourceEntityEto> :
IDistributedEventHandler<EntityCreatedEto<TSourceEntityEto>>,
IDistributedEventHandler<EntityUpdatedEto<TSourceEntityEto>>,
IDistributedEventHandler<EntityDeletedEto<TSourceEntityEto>>,
ITransientDependency
where TEntity : class, IEntity
where TExternalEntityEto : EntityEto
{
protected IObjectMapper ObjectMapper { get; }
private readonly IRepository<TEntity> _repository;
protected IRepository<TEntity> Repository { get; }
protected virtual bool IgnoreEntityCreatedEvent { get; set; }
protected virtual bool IgnoreEntityUpdatedEvent { get; set; }
protected virtual bool IgnoreEntityDeletedEvent { get; set; }
protected bool IgnoreEntityCreatedEvent { get; set; }
protected bool IgnoreEntityUpdatedEvent { get; set; }
protected bool IgnoreEntityDeletedEvent { get; set; }
public EntitySynchronizer(
IObjectMapper objectMapper,
IRepository<TEntity> repository)
{
ObjectMapper = objectMapper;
_repository = repository;
Repository = repository;
}
public virtual async Task HandleEventAsync(EntityCreatedEto<TExternalEntityEto> eventData)
public virtual async Task HandleEventAsync(EntityCreatedEto<TSourceEntityEto> eventData)
{
if (IgnoreEntityCreatedEvent)
{
@ -76,7 +60,7 @@ public abstract class EntitySynchronizer<TEntity, TExternalEntityEto> :
await TryCreateOrUpdateEntityAsync(eventData.Entity);
}
public virtual async Task HandleEventAsync(EntityUpdatedEto<TExternalEntityEto> eventData)
public virtual async Task HandleEventAsync(EntityUpdatedEto<TSourceEntityEto> eventData)
{
if (IgnoreEntityUpdatedEvent)
{
@ -86,7 +70,7 @@ public abstract class EntitySynchronizer<TEntity, TExternalEntityEto> :
await TryCreateOrUpdateEntityAsync(eventData.Entity);
}
public virtual async Task HandleEventAsync(EntityDeletedEto<TExternalEntityEto> eventData)
public virtual async Task HandleEventAsync(EntityDeletedEto<TSourceEntityEto> eventData)
{
if (IgnoreEntityDeletedEvent)
{
@ -96,7 +80,8 @@ public abstract class EntitySynchronizer<TEntity, TExternalEntityEto> :
await TryDeleteEntityAsync(eventData.Entity);
}
protected virtual async Task<bool> TryCreateOrUpdateEntityAsync(TExternalEntityEto eto)
[UnitOfWork]
protected virtual async Task<bool> TryCreateOrUpdateEntityAsync(TSourceEntityEto eto)
{
var localEntity = await FindLocalEntityAsync(eto);
@ -111,11 +96,14 @@ public abstract class EntitySynchronizer<TEntity, TExternalEntityEto> :
if (localEntity is IHasEntityVersion versionedLocalEntity && eto is IHasEntityVersion versionedEto)
{
ObjectHelper.TrySetProperty(versionedLocalEntity, x => x.EntityVersion,
() => versionedEto.EntityVersion);
ObjectHelper.TrySetProperty(
versionedLocalEntity,
x => x.EntityVersion,
() => versionedEto.EntityVersion
);
}
await _repository.InsertAsync(localEntity, true);
await Repository.InsertAsync(localEntity);
}
else
{
@ -123,30 +111,36 @@ public abstract class EntitySynchronizer<TEntity, TExternalEntityEto> :
if (localEntity is IHasEntityVersion versionedLocalEntity && eto is IHasEntityVersion versionedEto)
{
// The version will auto-increment by one when the repository updates the entity.
/* The version will auto-increment by one when the repository updates the entity.
* So, we are decreasing it as a workaround here.
*/
var entityVersion = versionedEto.EntityVersion - 1;
ObjectHelper.TrySetProperty(versionedLocalEntity, x => x.EntityVersion, () => entityVersion);
ObjectHelper.TrySetProperty(
versionedLocalEntity,
x => x.EntityVersion,
() => entityVersion
);
}
await _repository.UpdateAsync(localEntity, true);
await Repository.UpdateAsync(localEntity);
}
return true;
}
protected virtual Task<TEntity> MapToEntityAsync(TExternalEntityEto eto)
protected virtual Task<TEntity> MapToEntityAsync(TSourceEntityEto eto)
{
return Task.FromResult(ObjectMapper.Map<TExternalEntityEto, TEntity>(eto));
return Task.FromResult(ObjectMapper.Map<TSourceEntityEto, TEntity>(eto));
}
protected virtual Task MapToEntityAsync(TExternalEntityEto eto, TEntity localEntity)
protected virtual Task MapToEntityAsync(TSourceEntityEto eto, TEntity localEntity)
{
ObjectMapper.Map(eto, localEntity);
return Task.CompletedTask;
}
protected virtual async Task<bool> TryDeleteEntityAsync(TExternalEntityEto eto)
[UnitOfWork]
protected virtual async Task<bool> TryDeleteEntityAsync(TSourceEntityEto eto)
{
var localEntity = await FindLocalEntityAsync(eto);
@ -155,19 +149,22 @@ public abstract class EntitySynchronizer<TEntity, TExternalEntityEto> :
return false;
}
await _repository.DeleteAsync(localEntity, true);
await Repository.DeleteAsync(localEntity, true);
return true;
}
[ItemCanBeNull]
protected abstract Task<TEntity> FindLocalEntityAsync(TExternalEntityEto eto);
protected abstract Task<TEntity> FindLocalEntityAsync(TSourceEntityEto eto);
protected virtual Task<bool> IsEtoNewerAsync(TExternalEntityEto eto, [CanBeNull] TEntity localEntity)
protected virtual Task<bool> IsEtoNewerAsync(TSourceEntityEto eto, [CanBeNull] TEntity localEntity)
{
if (localEntity is IHasEntityVersion versionedLocalEntity && eto is IHasEntityVersion versionedEto)
{
return Task.FromResult(versionedEto.EntityVersion > versionedLocalEntity.EntityVersion);
/* We are also accepting the same version because
* the entity may be updated, but the version might not be changed.
*/
return Task.FromResult(versionedEto.EntityVersion >= versionedLocalEntity.EntityVersion);
}
return Task.FromResult(true);

9
framework/src/Volo.Abp.Ddd.Domain/Volo/Abp/Domain/Entities/Events/Distributed/IEntityEto.cs

@ -0,0 +1,9 @@
namespace Volo.Abp.Domain.Entities.Events.Distributed;
public interface IEntityEto<TKey>
{
/// <summary>
/// Unique identifier for this entity.
/// </summary>
TKey Id { get; set; }
}

18
framework/test/Volo.Abp.Ddd.Tests/Volo/Abp/Domain/Entities/Events/Distributed/EntitySynchronizers/EntitySynchronizer_Tests.cs

@ -33,7 +33,7 @@ public class EntitySynchronizer_Tests : AbpIntegratedTest<EntitySynchronizer_Tes
(await repository.FindAsync(authorId)).ShouldBeNull();
var remoteAuthorEto = new RemoteAuthorEto { KeysAsString = authorId.ToString(), Name = "New" };
var remoteAuthorEto = new RemoteAuthorEto { Id = authorId, Name = "New" };
await authorSynchronizer.HandleEventAsync(new EntityCreatedEto<RemoteAuthorEto>(remoteAuthorEto));
@ -60,7 +60,7 @@ public class EntitySynchronizer_Tests : AbpIntegratedTest<EntitySynchronizer_Tes
author.Id.ShouldBe(authorId);
author.Name.ShouldBe("Old");
var remoteAuthorEto = new RemoteAuthorEto { KeysAsString = authorId.ToString(), Name = "New" };
var remoteAuthorEto = new RemoteAuthorEto { Id = authorId, Name = "New" };
await authorSynchronizer.HandleEventAsync(new EntityUpdatedEto<RemoteAuthorEto>(remoteAuthorEto));
@ -88,7 +88,7 @@ public class EntitySynchronizer_Tests : AbpIntegratedTest<EntitySynchronizer_Tes
author.Id.ShouldBe(authorId);
author.Name.ShouldBe("Old");
var remoteAuthorEto = new RemoteAuthorEto { KeysAsString = authorId.ToString(), Name = "New" };
var remoteAuthorEto = new RemoteAuthorEto { Id = authorId, Name = "New" };
await authorSynchronizer.HandleEventAsync(new EntityDeletedEto<RemoteAuthorEto>(remoteAuthorEto));
@ -111,7 +111,7 @@ public class EntitySynchronizer_Tests : AbpIntegratedTest<EntitySynchronizer_Tes
(await repository.FindAsync(bookId)).ShouldBeNull();
var remoteBookEto = new RemoteBookEto { KeysAsString = bookId.ToString(), EntityVersion = 0, Sold = 1 };
var remoteBookEto = new RemoteBookEto { Id = bookId, EntityVersion = 0, Sold = 1 };
await bookSynchronizer.HandleEventAsync(new EntityCreatedEto<RemoteBookEto>(remoteBookEto));
@ -139,7 +139,7 @@ public class EntitySynchronizer_Tests : AbpIntegratedTest<EntitySynchronizer_Tes
book.Id.ShouldBe(bookId);
book.EntityVersion.ShouldBe(0);
var remoteBookEto = new RemoteBookEto { KeysAsString = bookId.ToString(), EntityVersion = 0, Sold = 10 };
var remoteBookEto = new RemoteBookEto { Id = bookId, EntityVersion = 0, Sold = 10 };
await bookSynchronizer.HandleEventAsync(new EntityUpdatedEto<RemoteBookEto>(remoteBookEto));
@ -188,7 +188,7 @@ public class EntitySynchronizer_Tests : AbpIntegratedTest<EntitySynchronizer_Tes
book.Id.ShouldBe(bookId);
book.EntityVersion.ShouldBe(0);
var remoteBookEto = new RemoteBookEto { KeysAsString = bookId.ToString(), EntityVersion = 0, Sold = 1 };
var remoteBookEto = new RemoteBookEto { Id = bookId, EntityVersion = 0, Sold = 1 };
await bookSynchronizer.HandleEventAsync(new EntityDeletedEto<RemoteBookEto>(remoteBookEto));
@ -245,10 +245,8 @@ public class EntitySynchronizer_Tests : AbpIntegratedTest<EntitySynchronizer_Tes
{
public MyAutoMapperProfile()
{
CreateMap<RemoteBookEto, Book>(MemberList.None)
.ForMember(x => x.Id, options => options.MapFrom(x => Guid.Parse(x.KeysAsString)));
CreateMap<RemoteAuthorEto, Author>(MemberList.None)
.ForMember(x => x.Id, options => options.MapFrom(x => Guid.Parse(x.KeysAsString)));
CreateMap<RemoteBookEto, Book>(MemberList.None);
CreateMap<RemoteAuthorEto, Author>(MemberList.None);
}
}
}

5
framework/test/Volo.Abp.Ddd.Tests/Volo/Abp/Domain/Entities/Events/Distributed/EntitySynchronizers/WithEntityVersion/RemoteBookEto.cs

@ -1,8 +1,9 @@
using Volo.Abp.Auditing;
using System;
using Volo.Abp.Auditing;
namespace Volo.Abp.Domain.Entities.Events.Distributed.EntitySynchronizers.WithEntityVersion;
public class RemoteBookEto : EntityEto, IHasEntityVersion
public class RemoteBookEto : EntityEto<Guid>, IHasEntityVersion
{
public int EntityVersion { get; set; }

6
framework/test/Volo.Abp.Ddd.Tests/Volo/Abp/Domain/Entities/Events/Distributed/EntitySynchronizers/WithoutEntityVersion/RemoteAuthorEto.cs

@ -1,6 +1,8 @@
namespace Volo.Abp.Domain.Entities.Events.Distributed.EntitySynchronizers.WithoutEntityVersion;
using System;
public class RemoteAuthorEto : EntityEto
namespace Volo.Abp.Domain.Entities.Events.Distributed.EntitySynchronizers.WithoutEntityVersion;
public class RemoteAuthorEto : EntityEto<Guid>
{
public string Name { get; set; }
}
Loading…
Cancel
Save