diff --git a/docs/en/Distributed-Event-Bus.md b/docs/en/Distributed-Event-Bus.md index e40568a9c0..c4e34b11db 100644 --- a/docs/en/Distributed-Event-Bus.md +++ b/docs/en/Distributed-Event-Bus.md @@ -296,6 +296,74 @@ This example; > Distributed event system use the [object to object mapping](Object-To-Object-Mapping.md) system to map `Product` objects to `ProductEto` objects. So, you need to configure the object mapping (`Product` -> `ProductEto`) too. You can check the [object to object mapping document](Object-To-Object-Mapping.md) to learn how to do it. +## Entity Synchronizer + +In a distributed (or microservice) system, it is typical to subscribe to change events for an [entity](Entities.md) type of another service, so you can get notifications when the subscribed entity changes. In that case, you can use ABP's Pre-Defined Events as explained in the previous section. + +If your purpose is to store your local copies of a remote entity, you typically subscribe to create, update and delete events of the remote entity and update your local database in your event handler. ABP provides a pre-built `EntitySynchronizer` base class to make that operation easier for you. + +Assume that there is a `Product` entity (probably an aggregate root entity) in a Catalog microservice, and you want to keep copies of products in your Ordering microservice, with a local `OrderProduct` entity. In practice, properties of the `OrderProduct` class will be a subset of the `Product` properties, because not all product data is needed in the Ordering microservice (however, you can make a full copy if you need). Also, the `OrderProduct` entity may have additional properties that are populated and used in the Ordering microservice. + +The first step to establish the synchronization is to define an ETO (Event Transfer Object) class in the Catalog microservice that is used to transfer the event data. Assuming the `Product` entity has a `Guid` key, your ETO can be as shown below: + +```` +[EventName("product")] +public class ProductEto : EntityEto +{ + // Your Product properties here... +} +```` + +`ProductEto` can be put a shared project (DLL) that is referenced by the Catalog and the Ordering microservices. Alternatively, you can put a copy of the `ProductEto` class in the Ordering microservice if you don't want to introduce a common project dependency between the services. In this case, the `EventName` attribute becomes critical to map the `ProductEto` classes across two services (you should use the same event name). + +Once you define an ETO class, you should configure the ABP Framework to publish auto (create, update and delete) events for the `Product` entity, as explained in the previous section: + +````csharp +Configure(options => +{ + options.AutoEventSelectors.Add(); + options.EtoMappings.Add(); +}); +```` + +Finally, you should create class in the Ordering microservice, that is derived from the `EntitySynchronizer` class: + +````csharp +public class ProductSynchronizer : EntitySynchronizer +{ + public ProductSynchronizer( + IObjectMapper objectMapper, + IRepository repository + ) : base(objectMapper, repository) + { + } +} +```` + +The main point of this class is it subscribes to the create, update and delete events of the source entity and updates the local entity in the database. It uses the [Object Mapper](Object-To-Object-Mapping.md) system to create or update the `OrderProduct` objects from `ProductEto` objects. So, you should also configure the object mapper to make it properly works. Otherwise, you should manually perform the object mapping by overriding the `MapToEntityAsync(TSourceEntityEto)` and `MapToEntityAsync(TSourceEntityEto,TEntity)` methods in your `ProductSynchronizer` class. + +If your entity has a composite primary key (see the [Entities document](Entities.md)), then you should inherit from the `EntitySynchronizer` class (just don't use the `Guid` generic argument in the previous example) and implement the `FindLocalEntityAsync` to find the entity in your local database using the `Repository`. + +`EntitySynchronizer` is compatible with the *Entity Versioning* system (see the [Entities document](Entities.md)). So, it works as expected even if the events are received as disordered. If the entity's version in your local database is newer than the entity in the received event, then the event is ignored. You should implement the `IHasEntityVersion` interface for the entity and ETO classes (for this example, you should implement for `Product`, `ProductEto` and `OrderProduct` classes). + +If you want to ignore some type of change events, you can set `IgnoreEntityCreatedEvent`, `IgnoreEntityUpdatedEvent` and `IgnoreEntityDeletedEvent` in the constructor of your class. Example: + +````csharp +public class ProductSynchronizer + : EntitySynchronizer +{ + public ProductSynchronizer( + IObjectMapper objectMapper, + IRepository repository + ) : base(objectMapper, repository) + { + IgnoreEntityDeletedEvent = true; + } +} +```` + +> Notice that the `EntitySynchronizer` can only create/update the entities after you use it. If you have an existing system with existing data, you should manually copy the data for one time, because the `EntitySynchronizer` starts to work. + ## Transaction and Exception Handling Distributed event bus works in-process (since default implementation is `LocalDistributedEventBus`) unless you configure an actual provider (e.g. [Kafka](Distributed-Event-Bus-Kafka-Integration.md) or [RabbitMQ](Distributed-Event-Bus-RabbitMQ-Integration.md)). In-process event bus always executes event handlers in the same [unit of work](Unit-Of-Work.md) scope that you publishes the events in. That means, if an event handler throws an exception, then the related unit of work (the database transaction) is rolled back. In this way, your application logic and event handling logic becomes transactional (atomic) and consistent. If you want to ignore errors in an event handler, you must use a `try-catch` block in your handler and shouldn't re-throw the exception. @@ -522,61 +590,6 @@ Configure(options => }); ```` -## Entity Synchronizer - -Todo: introduction. - -### Create a Synchronizer Class - -Todo. - -```csharp -public class BlogUserSynchronizer : EntitySynchronizer, ITransientDependency -{ - public BlogUserSynchronizer(IObjectMapper objectMapper, IRepository repository) : - base(objectMapper, repository) - { - } -} -``` - -### Advanced Usages - -We may want to skip synchronizing the entity data on the external entity created, updated, or deleted. The `EntitySynchronizer` has three bool properties to control the handling behaviors. - -```csharp -public class BlogUserSynchronizer : EntitySynchronizer, ITransientDependency -{ - protected override bool IgnoreEntityCreatedEvent => true; - protected override bool IgnoreEntityUpdatedEvent => true; - protected override bool IgnoreEntityDeletedEvent => true; - - // ctor ... -} -``` - -### Eventual Consistency Guarantee - -Developers should always handle the distributed events disordering. ABP framework has an `EntityVersion` audit property to avoid an old version of entity data overriding a new one. - -The only thing we need to do is make the entity class and the ETO class implement the `IHasEntityVersion` interface. - -```csharp -public class User : Entity, IHasEntityVersion -{ - public int EntityVersion { get; set; } -} - -public class UserEto : EntityEto, IHasEntityVersion -{ - public int EntityVersion { get; set; } -} -``` - -After that, the entity synchronizer will know the entity version number and skip handling the stale events. - -> See the community post [Notice and Solve ABP Distributed Events Disordering](https://community.abp.io/posts/notice-and-solve-abp-distributed-events-disordering-yi9vq3p4) for more if you are interested or worried. - ## See Also * [Local Event Bus](Local-Event-Bus.md) diff --git a/docs/en/Entities.md b/docs/en/Entities.md index a1558a73ae..e13b297f4f 100644 --- a/docs/en/Entities.md +++ b/docs/en/Entities.md @@ -324,6 +324,19 @@ It's designed as read-only and automatically invalidates a cached entity if the > See the [Entity Cache](Entity-Cache.md) documentation for more information. +## Versioning Entities + +ABP defines the `IHasEntityVersion` interface for automatic versioning of your entities. It only provides a single `EntityVersion` properties, as shown in the following code block: + +````csharp +public interface IHasEntityVersion +{ + int EntityVersion { get; } +} +```` + +If you implement the `IHasEntityVersion` interface, ABP automatically increases the `EntityVersion` value whenever you update your entity. The initial value will be `0`, when you first create the entity and save to the database. + ## Extra Properties ABP defines the `IHasExtraProperties` interface that can be implemented by an entity to be able to dynamically set and get properties for the entity. `AggregateRoot` base class already implements the `IHasExtraProperties` interface. If you've derived from this class (or one of the related audit class defined above), you can directly use the API. diff --git a/framework/src/Volo.Abp.Ddd.Domain/Volo/Abp/Domain/Entities/Events/Distributed/EntityEto.cs b/framework/src/Volo.Abp.Ddd.Domain/Volo/Abp/Domain/Entities/Events/Distributed/EntityEto.cs index 4c1c068a81..8a2983b076 100644 --- a/framework/src/Volo.Abp.Ddd.Domain/Volo/Abp/Domain/Entities/Events/Distributed/EntityEto.cs +++ b/framework/src/Volo.Abp.Ddd.Domain/Volo/Abp/Domain/Entities/Events/Distributed/EntityEto.cs @@ -20,3 +20,8 @@ public class EntityEto : EtoBase KeysAsString = keysAsString; } } + +public abstract class EntityEto : IEntityEto +{ + public TKey Id { get; set; } +} \ No newline at end of file diff --git a/framework/src/Volo.Abp.Ddd.Domain/Volo/Abp/Domain/Entities/Events/Distributed/EntitySynchronizer.cs b/framework/src/Volo.Abp.Ddd.Domain/Volo/Abp/Domain/Entities/Events/Distributed/EntitySynchronizer.cs index 91ec77583f..b971acc25f 100644 --- a/framework/src/Volo.Abp.Ddd.Domain/Volo/Abp/Domain/Entities/Events/Distributed/EntitySynchronizer.cs +++ b/framework/src/Volo.Abp.Ddd.Domain/Volo/Abp/Domain/Entities/Events/Distributed/EntitySynchronizer.cs @@ -1,9 +1,7 @@ -using System; -using System.ComponentModel; -using System.Globalization; -using System.Threading.Tasks; +using System.Threading.Tasks; using JetBrains.Annotations; using Volo.Abp.Auditing; +using Volo.Abp.DependencyInjection; using Volo.Abp.Domain.Repositories; using Volo.Abp.EventBus.Distributed; using Volo.Abp.ObjectMapping; @@ -11,62 +9,48 @@ using Volo.Abp.Uow; namespace Volo.Abp.Domain.Entities.Events.Distributed; -public abstract class EntitySynchronizer : - EntitySynchronizer +public abstract class EntitySynchronizer : + EntitySynchronizer where TEntity : class, IEntity - where TExternalEntityEto : EntityEto + where TSourceEntityEto : IEntityEto { - private readonly IRepository _repository; + protected new IRepository Repository { get; } protected EntitySynchronizer(IObjectMapper objectMapper, IRepository repository) : base(objectMapper, repository) { - _repository = repository; + Repository = repository; } - protected override Task FindLocalEntityAsync(TExternalEntityEto eto) + protected override Task FindLocalEntityAsync(TSourceEntityEto eto) { - return _repository.FindAsync(GetExternalEntityId(eto)); - } - - protected virtual TKey GetExternalEntityId(TExternalEntityEto eto) - { - var keyType = typeof(TKey); - var keyValue = Check.NotNullOrEmpty(eto.KeysAsString, nameof(eto.KeysAsString)); - - if (keyType == typeof(Guid)) - { - return (TKey)TypeDescriptor.GetConverter(keyType).ConvertFromInvariantString(keyValue); - } - - return (TKey)Convert.ChangeType(keyValue, keyType, CultureInfo.InvariantCulture); + return Repository.FindAsync(eto.Id); } } -public abstract class EntitySynchronizer : - IDistributedEventHandler>, - IDistributedEventHandler>, - IDistributedEventHandler>, - IUnitOfWorkEnabled +public abstract class EntitySynchronizer : + IDistributedEventHandler>, + IDistributedEventHandler>, + IDistributedEventHandler>, + ITransientDependency where TEntity : class, IEntity - where TExternalEntityEto : EntityEto { protected IObjectMapper ObjectMapper { get; } - private readonly IRepository _repository; + protected IRepository Repository { get; } - protected virtual bool IgnoreEntityCreatedEvent { get; set; } - protected virtual bool IgnoreEntityUpdatedEvent { get; set; } - protected virtual bool IgnoreEntityDeletedEvent { get; set; } + protected bool IgnoreEntityCreatedEvent { get; set; } + protected bool IgnoreEntityUpdatedEvent { get; set; } + protected bool IgnoreEntityDeletedEvent { get; set; } public EntitySynchronizer( IObjectMapper objectMapper, IRepository repository) { ObjectMapper = objectMapper; - _repository = repository; + Repository = repository; } - public virtual async Task HandleEventAsync(EntityCreatedEto eventData) + public virtual async Task HandleEventAsync(EntityCreatedEto eventData) { if (IgnoreEntityCreatedEvent) { @@ -76,7 +60,7 @@ public abstract class EntitySynchronizer : await TryCreateOrUpdateEntityAsync(eventData.Entity); } - public virtual async Task HandleEventAsync(EntityUpdatedEto eventData) + public virtual async Task HandleEventAsync(EntityUpdatedEto eventData) { if (IgnoreEntityUpdatedEvent) { @@ -86,7 +70,7 @@ public abstract class EntitySynchronizer : await TryCreateOrUpdateEntityAsync(eventData.Entity); } - public virtual async Task HandleEventAsync(EntityDeletedEto eventData) + public virtual async Task HandleEventAsync(EntityDeletedEto eventData) { if (IgnoreEntityDeletedEvent) { @@ -96,7 +80,8 @@ public abstract class EntitySynchronizer : await TryDeleteEntityAsync(eventData.Entity); } - protected virtual async Task TryCreateOrUpdateEntityAsync(TExternalEntityEto eto) + [UnitOfWork] + protected virtual async Task TryCreateOrUpdateEntityAsync(TSourceEntityEto eto) { var localEntity = await FindLocalEntityAsync(eto); @@ -111,11 +96,14 @@ public abstract class EntitySynchronizer : if (localEntity is IHasEntityVersion versionedLocalEntity && eto is IHasEntityVersion versionedEto) { - ObjectHelper.TrySetProperty(versionedLocalEntity, x => x.EntityVersion, - () => versionedEto.EntityVersion); + ObjectHelper.TrySetProperty( + versionedLocalEntity, + x => x.EntityVersion, + () => versionedEto.EntityVersion + ); } - await _repository.InsertAsync(localEntity, true); + await Repository.InsertAsync(localEntity); } else { @@ -123,30 +111,36 @@ public abstract class EntitySynchronizer : if (localEntity is IHasEntityVersion versionedLocalEntity && eto is IHasEntityVersion versionedEto) { - // The version will auto-increment by one when the repository updates the entity. + /* The version will auto-increment by one when the repository updates the entity. + * So, we are decreasing it as a workaround here. + */ var entityVersion = versionedEto.EntityVersion - 1; - - ObjectHelper.TrySetProperty(versionedLocalEntity, x => x.EntityVersion, () => entityVersion); + ObjectHelper.TrySetProperty( + versionedLocalEntity, + x => x.EntityVersion, + () => entityVersion + ); } - await _repository.UpdateAsync(localEntity, true); + await Repository.UpdateAsync(localEntity); } return true; } - protected virtual Task MapToEntityAsync(TExternalEntityEto eto) + protected virtual Task MapToEntityAsync(TSourceEntityEto eto) { - return Task.FromResult(ObjectMapper.Map(eto)); + return Task.FromResult(ObjectMapper.Map(eto)); } - protected virtual Task MapToEntityAsync(TExternalEntityEto eto, TEntity localEntity) + protected virtual Task MapToEntityAsync(TSourceEntityEto eto, TEntity localEntity) { ObjectMapper.Map(eto, localEntity); return Task.CompletedTask; } - protected virtual async Task TryDeleteEntityAsync(TExternalEntityEto eto) + [UnitOfWork] + protected virtual async Task TryDeleteEntityAsync(TSourceEntityEto eto) { var localEntity = await FindLocalEntityAsync(eto); @@ -155,19 +149,22 @@ public abstract class EntitySynchronizer : return false; } - await _repository.DeleteAsync(localEntity, true); + await Repository.DeleteAsync(localEntity, true); return true; } [ItemCanBeNull] - protected abstract Task FindLocalEntityAsync(TExternalEntityEto eto); + protected abstract Task FindLocalEntityAsync(TSourceEntityEto eto); - protected virtual Task IsEtoNewerAsync(TExternalEntityEto eto, [CanBeNull] TEntity localEntity) + protected virtual Task IsEtoNewerAsync(TSourceEntityEto eto, [CanBeNull] TEntity localEntity) { if (localEntity is IHasEntityVersion versionedLocalEntity && eto is IHasEntityVersion versionedEto) { - return Task.FromResult(versionedEto.EntityVersion > versionedLocalEntity.EntityVersion); + /* We are also accepting the same version because + * the entity may be updated, but the version might not be changed. + */ + return Task.FromResult(versionedEto.EntityVersion >= versionedLocalEntity.EntityVersion); } return Task.FromResult(true); diff --git a/framework/src/Volo.Abp.Ddd.Domain/Volo/Abp/Domain/Entities/Events/Distributed/IEntityEto.cs b/framework/src/Volo.Abp.Ddd.Domain/Volo/Abp/Domain/Entities/Events/Distributed/IEntityEto.cs new file mode 100644 index 0000000000..289f5354fd --- /dev/null +++ b/framework/src/Volo.Abp.Ddd.Domain/Volo/Abp/Domain/Entities/Events/Distributed/IEntityEto.cs @@ -0,0 +1,9 @@ +namespace Volo.Abp.Domain.Entities.Events.Distributed; + +public interface IEntityEto +{ + /// + /// Unique identifier for this entity. + /// + TKey Id { get; set; } +} \ No newline at end of file diff --git a/framework/test/Volo.Abp.Ddd.Tests/Volo/Abp/Domain/Entities/Events/Distributed/EntitySynchronizers/EntitySynchronizer_Tests.cs b/framework/test/Volo.Abp.Ddd.Tests/Volo/Abp/Domain/Entities/Events/Distributed/EntitySynchronizers/EntitySynchronizer_Tests.cs index 77c50d2aca..3ad8778913 100644 --- a/framework/test/Volo.Abp.Ddd.Tests/Volo/Abp/Domain/Entities/Events/Distributed/EntitySynchronizers/EntitySynchronizer_Tests.cs +++ b/framework/test/Volo.Abp.Ddd.Tests/Volo/Abp/Domain/Entities/Events/Distributed/EntitySynchronizers/EntitySynchronizer_Tests.cs @@ -33,7 +33,7 @@ public class EntitySynchronizer_Tests : AbpIntegratedTest(remoteAuthorEto)); @@ -60,7 +60,7 @@ public class EntitySynchronizer_Tests : AbpIntegratedTest(remoteAuthorEto)); @@ -88,7 +88,7 @@ public class EntitySynchronizer_Tests : AbpIntegratedTest(remoteAuthorEto)); @@ -111,7 +111,7 @@ public class EntitySynchronizer_Tests : AbpIntegratedTest(remoteBookEto)); @@ -139,7 +139,7 @@ public class EntitySynchronizer_Tests : AbpIntegratedTest(remoteBookEto)); @@ -188,7 +188,7 @@ public class EntitySynchronizer_Tests : AbpIntegratedTest(remoteBookEto)); @@ -245,10 +245,8 @@ public class EntitySynchronizer_Tests : AbpIntegratedTest(MemberList.None) - .ForMember(x => x.Id, options => options.MapFrom(x => Guid.Parse(x.KeysAsString))); - CreateMap(MemberList.None) - .ForMember(x => x.Id, options => options.MapFrom(x => Guid.Parse(x.KeysAsString))); + CreateMap(MemberList.None); + CreateMap(MemberList.None); } } } \ No newline at end of file diff --git a/framework/test/Volo.Abp.Ddd.Tests/Volo/Abp/Domain/Entities/Events/Distributed/EntitySynchronizers/WithEntityVersion/RemoteBookEto.cs b/framework/test/Volo.Abp.Ddd.Tests/Volo/Abp/Domain/Entities/Events/Distributed/EntitySynchronizers/WithEntityVersion/RemoteBookEto.cs index 48e532222d..3b5aea031d 100644 --- a/framework/test/Volo.Abp.Ddd.Tests/Volo/Abp/Domain/Entities/Events/Distributed/EntitySynchronizers/WithEntityVersion/RemoteBookEto.cs +++ b/framework/test/Volo.Abp.Ddd.Tests/Volo/Abp/Domain/Entities/Events/Distributed/EntitySynchronizers/WithEntityVersion/RemoteBookEto.cs @@ -1,8 +1,9 @@ -using Volo.Abp.Auditing; +using System; +using Volo.Abp.Auditing; namespace Volo.Abp.Domain.Entities.Events.Distributed.EntitySynchronizers.WithEntityVersion; -public class RemoteBookEto : EntityEto, IHasEntityVersion +public class RemoteBookEto : EntityEto, IHasEntityVersion { public int EntityVersion { get; set; } diff --git a/framework/test/Volo.Abp.Ddd.Tests/Volo/Abp/Domain/Entities/Events/Distributed/EntitySynchronizers/WithoutEntityVersion/RemoteAuthorEto.cs b/framework/test/Volo.Abp.Ddd.Tests/Volo/Abp/Domain/Entities/Events/Distributed/EntitySynchronizers/WithoutEntityVersion/RemoteAuthorEto.cs index 3923b2eeff..2cf44f8691 100644 --- a/framework/test/Volo.Abp.Ddd.Tests/Volo/Abp/Domain/Entities/Events/Distributed/EntitySynchronizers/WithoutEntityVersion/RemoteAuthorEto.cs +++ b/framework/test/Volo.Abp.Ddd.Tests/Volo/Abp/Domain/Entities/Events/Distributed/EntitySynchronizers/WithoutEntityVersion/RemoteAuthorEto.cs @@ -1,6 +1,8 @@ -namespace Volo.Abp.Domain.Entities.Events.Distributed.EntitySynchronizers.WithoutEntityVersion; +using System; -public class RemoteAuthorEto : EntityEto +namespace Volo.Abp.Domain.Entities.Events.Distributed.EntitySynchronizers.WithoutEntityVersion; + +public class RemoteAuthorEto : EntityEto { public string Name { get; set; } } \ No newline at end of file