mirror of https://github.com/abpframework/abp.git
251 changed files with 4190 additions and 606 deletions
@ -0,0 +1,101 @@ |
|||
# Angular UI v4.x to v5.0 Migration Guide |
|||
|
|||
## Breaking Changes |
|||
|
|||
### Overall |
|||
|
|||
See the overall list of breaking changes: |
|||
|
|||
- Bootstrap 5 implementation [#10067](https://github.com/abpframework/abp/issues/10067) |
|||
- Remove NGXS dependency & states [#9952](https://github.com/abpframework/abp/issues/9952) |
|||
- Install @angular/localize package to startup templates [#10099](https://github.com/abpframework/abp/issues/10099) |
|||
- Create new secondary entrypoints and move the related proxies to there [#10060](https://github.com/abpframework/abp/issues/10060) |
|||
- Move SettingTabsService to @abp/ng.setting-management/config package from @abp/ng.core [#10061](https://github.com/abpframework/abp/issues/10061) |
|||
- Make the @abp/ng.account dependent on @abp/ng.identity [#10059](https://github.com/abpframework/abp/issues/10059) |
|||
- Set default abp-modal size medium [#10118](https://github.com/abpframework/abp/issues/10118) |
|||
- Update all dependency versions to the latest [#9806](https://github.com/abpframework/abp/issues/9806) |
|||
- Chart.js big include with CommonJS warning [#7472](https://github.com/abpframework/abp/issues/7472) |
|||
|
|||
### Angular v12 |
|||
|
|||
The new ABP Angular UI is based on Angular v12. We started to compile Angular UI packages with the Ivy compilation. Therefore, **new packages only work with Angular v12**. If you are still on the older version of Angular v12, you have to update to Angular v12. The update is usually very easy. See [Angular Update Guide](https://update.angular.io/?l=2&v=11.0-12.0) for further information. |
|||
|
|||
### Bootstrap 5 |
|||
|
|||
TODO |
|||
|
|||
### NGXS has been removed |
|||
|
|||
We aim to make the ABP Framework free of any state-management solutions. ABP developers should be able to use the ABP Framework with any library/framework of their choice. So, we decided to remove NGXS from ABP packages. |
|||
|
|||
If you'd like to use NGXS after upgrading to v5.0, you have to install the NGXS to your project. The package can be installed with the following command: |
|||
|
|||
```bash |
|||
npm install @ngxs/store |
|||
|
|||
# or |
|||
|
|||
yarn add @ngxs/store |
|||
``` |
|||
|
|||
NGXS states and actions, some namespaces have been removed. See [this issue](https://github.com/abpframework/abp/issues/9952) for the details. |
|||
|
|||
If you don't want to use the NGXS, you should remove all NGXS related imports, injections, etc., from your project. |
|||
|
|||
### @angular/localize package |
|||
|
|||
[`@angular/localize`](https://angular.io/api/localize) dependency has been removed from `@abp/ng.core` package. The package must be installed in your app. Run the following command to install: |
|||
|
|||
```bash |
|||
npm install @angular/localize |
|||
|
|||
# or |
|||
|
|||
yarn add @angular/localize |
|||
``` |
|||
|
|||
> ABP Angular UI packages are not dependent on the `@angular/localize` package. However, some packages (like `@ng-bootstrap/ng-bootstrap`) depend on the package. Thus, this package needs to be installed in your project. |
|||
|
|||
### Proxy endpoints |
|||
|
|||
New endpoints named proxy have been created, related proxies have moved. |
|||
For example; before v5.0, `IdentityUserService` could be imported from `@abp/ng.identity`. As of v5.0, the service can be imported from `@abp/ng.identity/proxy`. See an example: |
|||
|
|||
```ts |
|||
import { IdentityUserService } from '@abp/ng.identity/proxy'; |
|||
|
|||
@Component({}) |
|||
export class YourComponent { |
|||
constructor(private identityUserService: IdentityUserService) {} |
|||
} |
|||
``` |
|||
|
|||
Following proxies have been affected: |
|||
|
|||
- `@abp/ng.account` to `@abp/ng.account.core/proxy` |
|||
- `@abp/ng.feature-management` to `@abp/ng.feature-management/proxy` |
|||
- `@abp/ng.identity` to `@abp/ng.identity/proxy` |
|||
- `@abp/ng.permission-management` to `@abp/ng.permission-management/proxy` |
|||
- `@abp/ng.tenant-management` to `@abp/ng.tenant-management/proxy` |
|||
- **ProfileService** is deleted from `@abp/ng.core`. Instead, you can import it from `@abp/ng.identity/proxy` |
|||
|
|||
### SettingTabsService |
|||
|
|||
**SettingTabsService** has moved from `@abp/ng.core` to `@abp/ng.setting-management/config`. |
|||
|
|||
### ChartComponent |
|||
|
|||
`ChartComponent` has moved from `@abp/ng.theme.shared` to `@abp/ng.components/chart.js`. To use the component, you need to import the `ChartModule` to your module as follows: |
|||
|
|||
```ts |
|||
import { ChartModule } from '@abp/ng.components/chart.js'; |
|||
|
|||
@NgModule({ |
|||
imports: [ |
|||
ChartModule, |
|||
// ... |
|||
], |
|||
// ... |
|||
}) |
|||
export class YourFeatureModule {} |
|||
``` |
|||
@ -0,0 +1,14 @@ |
|||
# ABP Framework v4.x to v5.0 Migration Guide |
|||
|
|||
## MongoDB |
|||
|
|||
ABP Framework will serialize the datetime based on [AbpClockOptions](https://docs.abp.io/en/abp/latest/Timing#clock-options) start from 5.0, before `DateTime` values in MongoDB are [always saved as UTC](https://mongodb.github.io/mongo-csharp-driver/2.13/reference/bson/mapping/#datetime-serialization-options). |
|||
|
|||
You can disable this behavior by configure `AbpMongoDbOptions`. |
|||
```cs |
|||
services.Configure<AbpMongoDbOptions>(x => x.UseAbpClockHandleDateTime = false); |
|||
``` |
|||
|
|||
## Angular UI |
|||
|
|||
See the [Angular UI Migration Guide](Abp-5_0-Angular.md). |
|||
@ -1,7 +1,7 @@ |
|||
# ABP Framework Migration Guides |
|||
|
|||
* [4.2 to 4.3](Abp-4_3.md) |
|||
* [4.x to 4.2](Abp-4_2.md) |
|||
* [3.3.x to 4.0](Abp-4_0.md) |
|||
* [2.9.x to 3.0](../UI/Angular/Migration-Guide-v3.md) |
|||
|
|||
- [4.x to 5.0](Abp-5_0.md) |
|||
- [4.2 to 4.3](Abp-4_3.md) |
|||
- [4.x to 4.2](Abp-4_2.md) |
|||
- [3.3.x to 4.0](Abp-4_0.md) |
|||
- [2.9.x to 3.0](../UI/Angular/Migration-Guide-v3.md) |
|||
|
|||
@ -0,0 +1,3 @@ |
|||
<Weavers xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="FodyWeavers.xsd"> |
|||
<ConfigureAwait ContinueOnCapturedContext="false" /> |
|||
</Weavers> |
|||
@ -0,0 +1,30 @@ |
|||
<?xml version="1.0" encoding="utf-8"?> |
|||
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"> |
|||
<!-- This file was generated by Fody. Manual changes to this file will be lost when your project is rebuilt. --> |
|||
<xs:element name="Weavers"> |
|||
<xs:complexType> |
|||
<xs:all> |
|||
<xs:element name="ConfigureAwait" minOccurs="0" maxOccurs="1"> |
|||
<xs:complexType> |
|||
<xs:attribute name="ContinueOnCapturedContext" type="xs:boolean" /> |
|||
</xs:complexType> |
|||
</xs:element> |
|||
</xs:all> |
|||
<xs:attribute name="VerifyAssembly" type="xs:boolean"> |
|||
<xs:annotation> |
|||
<xs:documentation>'true' to run assembly verification (PEVerify) on the target assembly after all weavers have been executed.</xs:documentation> |
|||
</xs:annotation> |
|||
</xs:attribute> |
|||
<xs:attribute name="VerifyIgnoreCodes" type="xs:string"> |
|||
<xs:annotation> |
|||
<xs:documentation>A comma-separated list of error codes that can be safely ignored in assembly verification.</xs:documentation> |
|||
</xs:annotation> |
|||
</xs:attribute> |
|||
<xs:attribute name="GenerateXsd" type="xs:boolean"> |
|||
<xs:annotation> |
|||
<xs:documentation>'false' to turn off automatic generation of the XML Schema file.</xs:documentation> |
|||
</xs:annotation> |
|||
</xs:attribute> |
|||
</xs:complexType> |
|||
</xs:element> |
|||
</xs:schema> |
|||
@ -0,0 +1,25 @@ |
|||
<Project Sdk="Microsoft.NET.Sdk"> |
|||
|
|||
<Import Project="..\..\..\configureawait.props" /> |
|||
<Import Project="..\..\..\common.props" /> |
|||
|
|||
<PropertyGroup> |
|||
<TargetFramework>netstandard2.0</TargetFramework> |
|||
<AssemblyName>Volo.Abp.DistributedLocking</AssemblyName> |
|||
<PackageId>Volo.Abp.DistributedLocking</PackageId> |
|||
<AssetTargetFallback>$(AssetTargetFallback);portable-net45+win8+wp8+wpa81;</AssetTargetFallback> |
|||
<GenerateAssemblyConfigurationAttribute>false</GenerateAssemblyConfigurationAttribute> |
|||
<GenerateAssemblyCompanyAttribute>false</GenerateAssemblyCompanyAttribute> |
|||
<GenerateAssemblyProductAttribute>false</GenerateAssemblyProductAttribute> |
|||
<RootNamespace /> |
|||
</PropertyGroup> |
|||
|
|||
<ItemGroup> |
|||
<ProjectReference Include="..\Volo.Abp.Core\Volo.Abp.Core.csproj" /> |
|||
</ItemGroup> |
|||
|
|||
<ItemGroup> |
|||
<PackageReference Include="DistributedLock.Core" Version="1.0.2" /> |
|||
</ItemGroup> |
|||
|
|||
</Project> |
|||
@ -0,0 +1,9 @@ |
|||
using Volo.Abp.Modularity; |
|||
|
|||
namespace Volo.Abp.DistributedLocking |
|||
{ |
|||
public class AbpDistributedLockingModule : AbpModule |
|||
{ |
|||
|
|||
} |
|||
} |
|||
@ -0,0 +1,13 @@ |
|||
using Volo.Abp.EventBus.Distributed; |
|||
|
|||
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents |
|||
{ |
|||
public static class MySQLInboxConfigExtensions |
|||
{ |
|||
public static void UseMySQL<TDbContext>(this InboxConfig outboxConfig) |
|||
where TDbContext : IHasEventInbox |
|||
{ |
|||
outboxConfig.ImplementationType = typeof(ISqlRawDbContextEventInbox<TDbContext>); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,13 @@ |
|||
using Volo.Abp.EventBus.Distributed; |
|||
|
|||
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents |
|||
{ |
|||
public static class MySQLOutboxConfigExtensions |
|||
{ |
|||
public static void UseMySQL<TDbContext>(this OutboxConfig outboxConfig) |
|||
where TDbContext : IHasEventOutbox |
|||
{ |
|||
outboxConfig.ImplementationType = typeof(ISqlRawDbContextEventOutbox<TDbContext>); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,8 @@ |
|||
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents |
|||
{ |
|||
public interface IOracleDbContextEventInbox<TDbContext> : IDbContextEventInbox<TDbContext> |
|||
where TDbContext : IHasEventInbox |
|||
{ |
|||
|
|||
} |
|||
} |
|||
@ -0,0 +1,7 @@ |
|||
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents |
|||
{ |
|||
public interface IOracleDbContextEventOutbox<TDbContext> : IDbContextEventOutbox<TDbContext> |
|||
where TDbContext : IHasEventOutbox |
|||
{ |
|||
} |
|||
} |
|||
@ -0,0 +1,49 @@ |
|||
using System; |
|||
using System.Threading.Tasks; |
|||
using Microsoft.EntityFrameworkCore; |
|||
using Microsoft.Extensions.Options; |
|||
using Volo.Abp.EventBus.Boxes; |
|||
using Volo.Abp.EventBus.Distributed; |
|||
using Volo.Abp.Timing; |
|||
using Volo.Abp.Uow; |
|||
|
|||
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents |
|||
{ |
|||
public class OracleDbContextEventInbox<TDbContext> : DbContextEventInbox<TDbContext> , IOracleDbContextEventInbox<TDbContext> |
|||
where TDbContext : IHasEventInbox |
|||
{ |
|||
public OracleDbContextEventInbox( |
|||
IDbContextProvider<TDbContext> dbContextProvider, |
|||
IClock clock, |
|||
IOptions<AbpEventBusBoxesOptions> eventBusBoxesOptions) |
|||
: base(dbContextProvider, clock, eventBusBoxesOptions) |
|||
{ |
|||
} |
|||
|
|||
[UnitOfWork] |
|||
public override async Task MarkAsProcessedAsync(Guid id) |
|||
{ |
|||
var dbContext = await DbContextProvider.GetDbContextAsync(); |
|||
var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName(); |
|||
|
|||
var sql = $"UPDATE \"{tableName}\" SET \"Processed\" = '1', \"ProcessedTime\" = TO_DATE('{Clock.Now}', 'yyyy-mm-dd hh24:mi:ss') WHERE \"Id\" = HEXTORAW('{GuidToOracleType(id)}')"; |
|||
await dbContext.Database.ExecuteSqlRawAsync(sql); |
|||
} |
|||
|
|||
[UnitOfWork] |
|||
public override async Task DeleteOldEventsAsync() |
|||
{ |
|||
var dbContext = await DbContextProvider.GetDbContextAsync(); |
|||
var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName(); |
|||
var timeToKeepEvents = Clock.Now - EventBusBoxesOptions.WaitTimeToDeleteProcessedInboxEvents; |
|||
|
|||
var sql = $"DELETE FROM \"{tableName}\" WHERE \"Processed\" = '1' AND \"CreationTime\" < TO_DATE('{timeToKeepEvents}', 'yyyy-mm-dd hh24:mi:ss')"; |
|||
await dbContext.Database.ExecuteSqlRawAsync(sql); |
|||
} |
|||
|
|||
protected virtual string GuidToOracleType(Guid id) |
|||
{ |
|||
return BitConverter.ToString(id.ToByteArray()).Replace("-", "").ToUpper(); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,31 @@ |
|||
using System; |
|||
using System.Threading.Tasks; |
|||
using Microsoft.EntityFrameworkCore; |
|||
using Volo.Abp.Uow; |
|||
|
|||
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents |
|||
{ |
|||
public class OracleDbContextEventOutbox<TDbContext> : DbContextEventOutbox<TDbContext> , IOracleDbContextEventOutbox<TDbContext> |
|||
where TDbContext : IHasEventOutbox |
|||
{ |
|||
public OracleDbContextEventOutbox(IDbContextProvider<TDbContext> dbContextProvider) |
|||
: base(dbContextProvider) |
|||
{ |
|||
} |
|||
|
|||
[UnitOfWork] |
|||
public override async Task DeleteAsync(Guid id) |
|||
{ |
|||
var dbContext = (IHasEventOutbox) await DbContextProvider.GetDbContextAsync(); |
|||
var tableName = dbContext.OutgoingEvents.EntityType.GetSchemaQualifiedTableName(); |
|||
|
|||
var sql = $"DELETE FROM \"{tableName}\" WHERE \"Id\" = HEXTORAW('{GuidToOracleType(id)}')"; |
|||
await dbContext.Database.ExecuteSqlRawAsync(sql); |
|||
} |
|||
|
|||
protected virtual string GuidToOracleType(Guid id) |
|||
{ |
|||
return BitConverter.ToString(id.ToByteArray()).Replace("-", "").ToUpper(); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,13 @@ |
|||
using Volo.Abp.EventBus.Distributed; |
|||
|
|||
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents |
|||
{ |
|||
public static class OracleInboxConfigExtensions |
|||
{ |
|||
public static void UseOracle<TDbContext>(this InboxConfig outboxConfig) |
|||
where TDbContext : IHasEventInbox |
|||
{ |
|||
outboxConfig.ImplementationType = typeof(IOracleDbContextEventInbox<TDbContext>); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,13 @@ |
|||
using Volo.Abp.EventBus.Distributed; |
|||
|
|||
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents |
|||
{ |
|||
public static class OracleOutboxConfigExtensions |
|||
{ |
|||
public static void UseOracle<TDbContext>(this OutboxConfig outboxConfig) |
|||
where TDbContext : IHasEventOutbox |
|||
{ |
|||
outboxConfig.ImplementationType = typeof(IOracleDbContextEventOutbox<TDbContext>); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,8 @@ |
|||
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents |
|||
{ |
|||
public interface IOracleDbContextEventInbox<TDbContext> : IDbContextEventInbox<TDbContext> |
|||
where TDbContext : IHasEventInbox |
|||
{ |
|||
|
|||
} |
|||
} |
|||
@ -0,0 +1,7 @@ |
|||
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents |
|||
{ |
|||
public interface IOracleDbContextEventOutbox<TDbContext> : IDbContextEventOutbox<TDbContext> |
|||
where TDbContext : IHasEventOutbox |
|||
{ |
|||
} |
|||
} |
|||
@ -0,0 +1,48 @@ |
|||
using System; |
|||
using System.Threading.Tasks; |
|||
using Microsoft.EntityFrameworkCore; |
|||
using Microsoft.Extensions.Options; |
|||
using Volo.Abp.EventBus.Boxes; |
|||
using Volo.Abp.Timing; |
|||
using Volo.Abp.Uow; |
|||
|
|||
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents |
|||
{ |
|||
public class OracleDbContextEventInbox<TDbContext> : DbContextEventInbox<TDbContext> , IOracleDbContextEventInbox<TDbContext> |
|||
where TDbContext : IHasEventInbox |
|||
{ |
|||
public OracleDbContextEventInbox( |
|||
IDbContextProvider<TDbContext> dbContextProvider, |
|||
IClock clock, |
|||
IOptions<AbpEventBusBoxesOptions> eventBusBoxesOptions) |
|||
: base(dbContextProvider, clock, eventBusBoxesOptions) |
|||
{ |
|||
} |
|||
|
|||
[UnitOfWork] |
|||
public override async Task MarkAsProcessedAsync(Guid id) |
|||
{ |
|||
var dbContext = await DbContextProvider.GetDbContextAsync(); |
|||
var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName(); |
|||
|
|||
var sql = $"UPDATE \"{tableName}\" SET \"Processed\" = '1', \"ProcessedTime\" = TO_DATE('{Clock.Now}', 'yyyy-mm-dd hh24:mi:ss') WHERE \"Id\" = HEXTORAW('{GuidToOracleType(id)}')"; |
|||
await dbContext.Database.ExecuteSqlRawAsync(sql); |
|||
} |
|||
|
|||
[UnitOfWork] |
|||
public override async Task DeleteOldEventsAsync() |
|||
{ |
|||
var dbContext = await DbContextProvider.GetDbContextAsync(); |
|||
var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName(); |
|||
var timeToKeepEvents = Clock.Now - EventBusBoxesOptions.WaitTimeToDeleteProcessedInboxEvents; |
|||
|
|||
var sql = $"DELETE FROM \"{tableName}\" WHERE \"Processed\" = '1' AND \"CreationTime\" < TO_DATE('{timeToKeepEvents}', 'yyyy-mm-dd hh24:mi:ss')"; |
|||
await dbContext.Database.ExecuteSqlRawAsync(sql); |
|||
} |
|||
|
|||
protected virtual string GuidToOracleType(Guid id) |
|||
{ |
|||
return BitConverter.ToString(id.ToByteArray()).Replace("-", "").ToUpper(); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,31 @@ |
|||
using System; |
|||
using System.Threading.Tasks; |
|||
using Microsoft.EntityFrameworkCore; |
|||
using Volo.Abp.Uow; |
|||
|
|||
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents |
|||
{ |
|||
public class OracleDbContextEventOutbox<TDbContext> : DbContextEventOutbox<TDbContext> , IOracleDbContextEventOutbox<TDbContext> |
|||
where TDbContext : IHasEventOutbox |
|||
{ |
|||
public OracleDbContextEventOutbox(IDbContextProvider<TDbContext> dbContextProvider) |
|||
: base(dbContextProvider) |
|||
{ |
|||
} |
|||
|
|||
[UnitOfWork] |
|||
public override async Task DeleteAsync(Guid id) |
|||
{ |
|||
var dbContext = (IHasEventOutbox) await DbContextProvider.GetDbContextAsync(); |
|||
var tableName = dbContext.OutgoingEvents.EntityType.GetSchemaQualifiedTableName(); |
|||
|
|||
var sql = $"DELETE FROM \"{tableName}\" WHERE \"Id\" = HEXTORAW('{GuidToOracleType(id)}')"; |
|||
await dbContext.Database.ExecuteSqlRawAsync(sql); |
|||
} |
|||
|
|||
protected virtual string GuidToOracleType(Guid id) |
|||
{ |
|||
return BitConverter.ToString(id.ToByteArray()).Replace("-", "").ToUpper(); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,13 @@ |
|||
using Volo.Abp.EventBus.Distributed; |
|||
|
|||
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents |
|||
{ |
|||
public static class OracleInboxConfigExtensions |
|||
{ |
|||
public static void UseOracle<TDbContext>(this InboxConfig outboxConfig) |
|||
where TDbContext : IHasEventInbox |
|||
{ |
|||
outboxConfig.ImplementationType = typeof(IOracleDbContextEventInbox<TDbContext>); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,13 @@ |
|||
using Volo.Abp.EventBus.Distributed; |
|||
|
|||
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents |
|||
{ |
|||
public static class OracleOutboxConfigExtensions |
|||
{ |
|||
public static void UseOracle<TDbContext>(this OutboxConfig outboxConfig) |
|||
where TDbContext : IHasEventOutbox |
|||
{ |
|||
outboxConfig.ImplementationType = typeof(IOracleDbContextEventOutbox<TDbContext>); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,8 @@ |
|||
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents |
|||
{ |
|||
public interface IPostgreSqlDbContextEventInbox<TDbContext> : IDbContextEventInbox<TDbContext> |
|||
where TDbContext : IHasEventInbox |
|||
{ |
|||
|
|||
} |
|||
} |
|||
@ -0,0 +1,7 @@ |
|||
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents |
|||
{ |
|||
public interface IPostgreSqlDbContextEventOutbox<TDbContext> : IDbContextEventOutbox<TDbContext> |
|||
where TDbContext : IHasEventOutbox |
|||
{ |
|||
} |
|||
} |
|||
@ -0,0 +1,44 @@ |
|||
using System; |
|||
using System.Threading.Tasks; |
|||
using Microsoft.EntityFrameworkCore; |
|||
using Microsoft.Extensions.Options; |
|||
using Volo.Abp.EventBus.Boxes; |
|||
using Volo.Abp.EventBus.Distributed; |
|||
using Volo.Abp.Timing; |
|||
using Volo.Abp.Uow; |
|||
|
|||
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents |
|||
{ |
|||
public class PostgreSqlDbContextEventInbox<TDbContext> : DbContextEventInbox<TDbContext>, IPostgreSqlDbContextEventInbox<TDbContext> |
|||
where TDbContext : IHasEventInbox |
|||
{ |
|||
public PostgreSqlDbContextEventInbox( |
|||
IDbContextProvider<TDbContext> dbContextProvider, |
|||
IClock clock, |
|||
IOptions<AbpEventBusBoxesOptions> eventBusBoxesOptions) |
|||
: base(dbContextProvider, clock, eventBusBoxesOptions) |
|||
{ |
|||
} |
|||
|
|||
[UnitOfWork] |
|||
public override async Task MarkAsProcessedAsync(Guid id) |
|||
{ |
|||
var dbContext = await DbContextProvider.GetDbContextAsync(); |
|||
var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName(); |
|||
|
|||
var sql = $"UPDATE \"{tableName}\" SET \"Processed\" = '1', \"ProcessedTime\" = '{Clock.Now}' WHERE \"Id\" = '{id}'"; |
|||
await dbContext.Database.ExecuteSqlRawAsync(sql); |
|||
} |
|||
|
|||
[UnitOfWork] |
|||
public override async Task DeleteOldEventsAsync() |
|||
{ |
|||
var dbContext = await DbContextProvider.GetDbContextAsync(); |
|||
var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName(); |
|||
var timeToKeepEvents = Clock.Now - EventBusBoxesOptions.WaitTimeToDeleteProcessedInboxEvents; |
|||
|
|||
var sql = $"DELETE FROM \"{tableName}\" WHERE \"Processed\" = '1' AND \"CreationTime\" < '{timeToKeepEvents}'"; |
|||
await dbContext.Database.ExecuteSqlRawAsync(sql); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,25 @@ |
|||
using System; |
|||
using System.Threading.Tasks; |
|||
using Microsoft.EntityFrameworkCore; |
|||
using Volo.Abp.Uow; |
|||
|
|||
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents |
|||
{ |
|||
public class PostgreSqlDbContextEventOutbox<TDbContext> : DbContextEventOutbox<TDbContext> , IPostgreSqlDbContextEventOutbox<TDbContext> |
|||
where TDbContext : IHasEventOutbox |
|||
{ |
|||
public PostgreSqlDbContextEventOutbox(IDbContextProvider<TDbContext> dbContextProvider) : base(dbContextProvider) |
|||
{ |
|||
} |
|||
|
|||
[UnitOfWork] |
|||
public override async Task DeleteAsync(Guid id) |
|||
{ |
|||
var dbContext = (IHasEventOutbox) await DbContextProvider.GetDbContextAsync(); |
|||
var tableName = dbContext.OutgoingEvents.EntityType.GetSchemaQualifiedTableName(); |
|||
|
|||
var sql = $"DELETE FROM \"{tableName}\" WHERE \"Id\" = '{id}'"; |
|||
await dbContext.Database.ExecuteSqlRawAsync(sql); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,13 @@ |
|||
using Volo.Abp.EventBus.Distributed; |
|||
|
|||
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents |
|||
{ |
|||
public static class PostgreSqlInboxConfigExtensions |
|||
{ |
|||
public static void UseNpgsql<TDbContext>(this InboxConfig outboxConfig) |
|||
where TDbContext : IHasEventInbox |
|||
{ |
|||
outboxConfig.ImplementationType = typeof(IPostgreSqlDbContextEventInbox<TDbContext>); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,13 @@ |
|||
using Volo.Abp.EventBus.Distributed; |
|||
|
|||
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents |
|||
{ |
|||
public static class PostgreSqlOutboxConfigExtensions |
|||
{ |
|||
public static void UseNpgsql<TDbContext>(this OutboxConfig outboxConfig) |
|||
where TDbContext : IHasEventOutbox |
|||
{ |
|||
outboxConfig.ImplementationType = typeof(IPostgreSqlDbContextEventOutbox<TDbContext>); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,13 @@ |
|||
using Volo.Abp.EventBus.Distributed; |
|||
|
|||
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents |
|||
{ |
|||
public static class SqlServerInboxConfigExtensions |
|||
{ |
|||
public static void UseSqlServer<TDbContext>(this InboxConfig outboxConfig) |
|||
where TDbContext : IHasEventInbox |
|||
{ |
|||
outboxConfig.ImplementationType = typeof(ISqlRawDbContextEventInbox<TDbContext>); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,13 @@ |
|||
using Volo.Abp.EventBus.Distributed; |
|||
|
|||
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents |
|||
{ |
|||
public static class SqlServerOutboxConfigExtensions |
|||
{ |
|||
public static void UseSqlServer<TDbContext>(this OutboxConfig outboxConfig) |
|||
where TDbContext : IHasEventOutbox |
|||
{ |
|||
outboxConfig.ImplementationType = typeof(ISqlRawDbContextEventOutbox<TDbContext>); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,13 @@ |
|||
using Volo.Abp.EventBus.Distributed; |
|||
|
|||
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents |
|||
{ |
|||
public static class SqliteInboxConfigExtensions |
|||
{ |
|||
public static void UseSqlite<TDbContext>(this InboxConfig outboxConfig) |
|||
where TDbContext : IHasEventInbox |
|||
{ |
|||
outboxConfig.ImplementationType = typeof(ISqlRawDbContextEventInbox<TDbContext>); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,13 @@ |
|||
using Volo.Abp.EventBus.Distributed; |
|||
|
|||
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents |
|||
{ |
|||
public static class SqliteOutboxConfigExtensions |
|||
{ |
|||
public static void UseSqlite<TDbContext>(this OutboxConfig outboxConfig) |
|||
where TDbContext : IHasEventOutbox |
|||
{ |
|||
outboxConfig.ImplementationType = typeof(ISqlRawDbContextEventOutbox<TDbContext>); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,89 @@ |
|||
using System; |
|||
using System.Collections.Generic; |
|||
using System.Linq; |
|||
using System.Threading; |
|||
using System.Threading.Tasks; |
|||
using Microsoft.EntityFrameworkCore; |
|||
using Microsoft.Extensions.Options; |
|||
using Volo.Abp.EventBus.Boxes; |
|||
using Volo.Abp.EventBus.Distributed; |
|||
using Volo.Abp.Timing; |
|||
using Volo.Abp.Uow; |
|||
|
|||
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents |
|||
{ |
|||
public class DbContextEventInbox<TDbContext> : IDbContextEventInbox<TDbContext> |
|||
where TDbContext : IHasEventInbox |
|||
{ |
|||
protected IDbContextProvider<TDbContext> DbContextProvider { get; } |
|||
protected AbpEventBusBoxesOptions EventBusBoxesOptions { get; } |
|||
protected IClock Clock { get; } |
|||
|
|||
public DbContextEventInbox( |
|||
IDbContextProvider<TDbContext> dbContextProvider, |
|||
IClock clock, |
|||
IOptions<AbpEventBusBoxesOptions> eventBusBoxesOptions) |
|||
{ |
|||
DbContextProvider = dbContextProvider; |
|||
Clock = clock; |
|||
EventBusBoxesOptions = eventBusBoxesOptions.Value; |
|||
} |
|||
|
|||
[UnitOfWork] |
|||
public virtual async Task EnqueueAsync(IncomingEventInfo incomingEvent) |
|||
{ |
|||
var dbContext = await DbContextProvider.GetDbContextAsync(); |
|||
|
|||
dbContext.IncomingEvents.Add( |
|||
new IncomingEventRecord(incomingEvent) |
|||
); |
|||
} |
|||
|
|||
[UnitOfWork] |
|||
public virtual async Task<List<IncomingEventInfo>> GetWaitingEventsAsync(int maxCount, CancellationToken cancellationToken = default) |
|||
{ |
|||
var dbContext = await DbContextProvider.GetDbContextAsync(); |
|||
|
|||
var outgoingEventRecords = await dbContext |
|||
.IncomingEvents |
|||
.AsNoTracking() |
|||
.Where(x => !x.Processed) |
|||
.OrderBy(x => x.CreationTime) |
|||
.Take(maxCount) |
|||
.ToListAsync(cancellationToken: cancellationToken); |
|||
|
|||
return outgoingEventRecords |
|||
.Select(x => x.ToIncomingEventInfo()) |
|||
.ToList(); |
|||
} |
|||
|
|||
[UnitOfWork] |
|||
public virtual async Task MarkAsProcessedAsync(Guid id) |
|||
{ |
|||
var dbContext = await DbContextProvider.GetDbContextAsync(); |
|||
var incomingEvent = await dbContext.IncomingEvents.FindAsync(id); |
|||
if (incomingEvent != null) |
|||
{ |
|||
incomingEvent.MarkAsProcessed(Clock.Now); |
|||
} |
|||
} |
|||
|
|||
[UnitOfWork] |
|||
public virtual async Task<bool> ExistsByMessageIdAsync(string messageId) |
|||
{ |
|||
var dbContext = await DbContextProvider.GetDbContextAsync(); |
|||
return await dbContext.IncomingEvents.AnyAsync(x => x.MessageId == messageId); |
|||
} |
|||
|
|||
[UnitOfWork] |
|||
public virtual async Task DeleteOldEventsAsync() |
|||
{ |
|||
var dbContext = await DbContextProvider.GetDbContextAsync(); |
|||
var timeToKeepEvents = Clock.Now - EventBusBoxesOptions.WaitTimeToDeleteProcessedInboxEvents; |
|||
var oldEvents = await dbContext.IncomingEvents |
|||
.Where(x => x.Processed && x.CreationTime < timeToKeepEvents) |
|||
.ToListAsync(); |
|||
dbContext.IncomingEvents.RemoveRange(oldEvents); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,60 @@ |
|||
using System; |
|||
using System.Collections.Generic; |
|||
using System.Linq; |
|||
using System.Threading; |
|||
using System.Threading.Tasks; |
|||
using Microsoft.EntityFrameworkCore; |
|||
using Volo.Abp.EventBus.Distributed; |
|||
using Volo.Abp.Uow; |
|||
|
|||
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents |
|||
{ |
|||
public class DbContextEventOutbox<TDbContext> : IDbContextEventOutbox<TDbContext> |
|||
where TDbContext : IHasEventOutbox |
|||
{ |
|||
protected IDbContextProvider<TDbContext> DbContextProvider { get; } |
|||
|
|||
public DbContextEventOutbox( |
|||
IDbContextProvider<TDbContext> dbContextProvider) |
|||
{ |
|||
DbContextProvider = dbContextProvider; |
|||
} |
|||
|
|||
[UnitOfWork] |
|||
public virtual async Task EnqueueAsync(OutgoingEventInfo outgoingEvent) |
|||
{ |
|||
var dbContext = (IHasEventOutbox) await DbContextProvider.GetDbContextAsync(); |
|||
dbContext.OutgoingEvents.Add( |
|||
new OutgoingEventRecord(outgoingEvent) |
|||
); |
|||
} |
|||
|
|||
[UnitOfWork] |
|||
public virtual async Task<List<OutgoingEventInfo>> GetWaitingEventsAsync(int maxCount, CancellationToken cancellationToken = default) |
|||
{ |
|||
var dbContext = (IHasEventOutbox) await DbContextProvider.GetDbContextAsync(); |
|||
|
|||
var outgoingEventRecords = await dbContext |
|||
.OutgoingEvents |
|||
.AsNoTracking() |
|||
.OrderBy(x => x.CreationTime) |
|||
.Take(maxCount) |
|||
.ToListAsync(cancellationToken: cancellationToken); |
|||
|
|||
return outgoingEventRecords |
|||
.Select(x => x.ToOutgoingEventInfo()) |
|||
.ToList(); |
|||
} |
|||
|
|||
[UnitOfWork] |
|||
public virtual async Task DeleteAsync(Guid id) |
|||
{ |
|||
var dbContext = (IHasEventOutbox) await DbContextProvider.GetDbContextAsync(); |
|||
var outgoingEvent = await dbContext.OutgoingEvents.FindAsync(id); |
|||
if (outgoingEvent != null) |
|||
{ |
|||
dbContext.Remove(outgoingEvent); |
|||
} |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,13 @@ |
|||
using Volo.Abp.EventBus.Distributed; |
|||
|
|||
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents |
|||
{ |
|||
public static class EfCoreInboxConfigExtensions |
|||
{ |
|||
public static void UseDbContext<TDbContext>(this InboxConfig outboxConfig) |
|||
where TDbContext : IHasEventInbox |
|||
{ |
|||
outboxConfig.ImplementationType = typeof(IDbContextEventInbox<TDbContext>); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,13 @@ |
|||
using Volo.Abp.EventBus.Distributed; |
|||
|
|||
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents |
|||
{ |
|||
public static class EfCoreOutboxConfigExtensions |
|||
{ |
|||
public static void UseDbContext<TDbContext>(this OutboxConfig outboxConfig) |
|||
where TDbContext : IHasEventOutbox |
|||
{ |
|||
outboxConfig.ImplementationType = typeof(IDbContextEventOutbox<TDbContext>); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,24 @@ |
|||
using JetBrains.Annotations; |
|||
using Microsoft.EntityFrameworkCore; |
|||
using Volo.Abp.Data; |
|||
using Volo.Abp.EntityFrameworkCore.Modeling; |
|||
|
|||
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents |
|||
{ |
|||
public static class EventInboxDbContextModelBuilderExtensions |
|||
{ |
|||
public static void ConfigureEventInbox([NotNull] this ModelBuilder builder) |
|||
{ |
|||
builder.Entity<IncomingEventRecord>(b => |
|||
{ |
|||
b.ToTable(AbpCommonDbProperties.DbTablePrefix + "EventInbox", AbpCommonDbProperties.DbSchema); |
|||
b.ConfigureByConvention(); |
|||
b.Property(x => x.EventName).IsRequired().HasMaxLength(IncomingEventRecord.MaxEventNameLength); |
|||
b.Property(x => x.EventData).IsRequired(); |
|||
|
|||
b.HasIndex(x => new { x.Processed, x.CreationTime }); |
|||
b.HasIndex(x => x.MessageId); |
|||
}); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,23 @@ |
|||
using JetBrains.Annotations; |
|||
using Microsoft.EntityFrameworkCore; |
|||
using Volo.Abp.Data; |
|||
using Volo.Abp.EntityFrameworkCore.Modeling; |
|||
|
|||
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents |
|||
{ |
|||
public static class EventOutboxDbContextModelBuilderExtensions |
|||
{ |
|||
public static void ConfigureEventOutbox([NotNull] this ModelBuilder builder) |
|||
{ |
|||
builder.Entity<OutgoingEventRecord>(b => |
|||
{ |
|||
b.ToTable(AbpCommonDbProperties.DbTablePrefix + "EventOutbox", AbpCommonDbProperties.DbSchema); |
|||
b.ConfigureByConvention(); |
|||
b.Property(x => x.EventName).IsRequired().HasMaxLength(OutgoingEventRecord.MaxEventNameLength); |
|||
b.Property(x => x.EventData).IsRequired(); |
|||
|
|||
b.HasIndex(x => x.CreationTime); |
|||
}); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,10 @@ |
|||
using Volo.Abp.EventBus.Distributed; |
|||
|
|||
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents |
|||
{ |
|||
public interface IDbContextEventInbox<TDbContext> : IEventInbox |
|||
where TDbContext : IHasEventInbox |
|||
{ |
|||
|
|||
} |
|||
} |
|||
@ -0,0 +1,10 @@ |
|||
using Volo.Abp.EventBus.Distributed; |
|||
|
|||
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents |
|||
{ |
|||
public interface IDbContextEventOutbox<TDbContext> : IEventOutbox |
|||
where TDbContext : IHasEventOutbox |
|||
{ |
|||
|
|||
} |
|||
} |
|||
@ -0,0 +1,9 @@ |
|||
using Microsoft.EntityFrameworkCore; |
|||
|
|||
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents |
|||
{ |
|||
public interface IHasEventInbox : IEfCoreDbContext |
|||
{ |
|||
DbSet<IncomingEventRecord> IncomingEvents { get; set; } |
|||
} |
|||
} |
|||
@ -0,0 +1,9 @@ |
|||
using Microsoft.EntityFrameworkCore; |
|||
|
|||
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents |
|||
{ |
|||
public interface IHasEventOutbox : IEfCoreDbContext |
|||
{ |
|||
DbSet<OutgoingEventRecord> OutgoingEvents { get; set; } |
|||
} |
|||
} |
|||
@ -0,0 +1,7 @@ |
|||
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents |
|||
{ |
|||
public interface ISqlRawDbContextEventInbox<TDbContext> : IDbContextEventInbox<TDbContext> |
|||
where TDbContext : IHasEventInbox |
|||
{ |
|||
} |
|||
} |
|||
@ -0,0 +1,7 @@ |
|||
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents |
|||
{ |
|||
public interface ISqlRawDbContextEventOutbox<TDbContext> : IDbContextEventOutbox<TDbContext> |
|||
where TDbContext : IHasEventOutbox |
|||
{ |
|||
} |
|||
} |
|||
@ -0,0 +1,66 @@ |
|||
using System; |
|||
using Volo.Abp.Auditing; |
|||
using Volo.Abp.Data; |
|||
using Volo.Abp.Domain.Entities; |
|||
using Volo.Abp.EventBus.Distributed; |
|||
|
|||
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents |
|||
{ |
|||
public class IncomingEventRecord : |
|||
BasicAggregateRoot<Guid>, |
|||
IHasExtraProperties, |
|||
IHasCreationTime |
|||
{ |
|||
public static int MaxEventNameLength { get; set; } = 256; |
|||
|
|||
public ExtraPropertyDictionary ExtraProperties { get; private set; } |
|||
|
|||
public string MessageId { get; private set; } |
|||
|
|||
public string EventName { get; private set; } |
|||
|
|||
public byte[] EventData { get; private set; } |
|||
|
|||
public DateTime CreationTime { get; private set; } |
|||
|
|||
public bool Processed { get; set; } |
|||
|
|||
public DateTime? ProcessedTime { get; set; } |
|||
|
|||
protected IncomingEventRecord() |
|||
{ |
|||
ExtraProperties = new ExtraPropertyDictionary(); |
|||
this.SetDefaultsForExtraProperties(); |
|||
} |
|||
|
|||
public IncomingEventRecord( |
|||
IncomingEventInfo eventInfo) |
|||
: base(eventInfo.Id) |
|||
{ |
|||
MessageId = eventInfo.MessageId; |
|||
EventName = eventInfo.EventName; |
|||
EventData = eventInfo.EventData; |
|||
CreationTime = eventInfo.CreationTime; |
|||
|
|||
ExtraProperties = new ExtraPropertyDictionary(); |
|||
this.SetDefaultsForExtraProperties(); |
|||
} |
|||
|
|||
public IncomingEventInfo ToIncomingEventInfo() |
|||
{ |
|||
return new IncomingEventInfo( |
|||
Id, |
|||
MessageId, |
|||
EventName, |
|||
EventData, |
|||
CreationTime |
|||
); |
|||
} |
|||
|
|||
public void MarkAsProcessed(DateTime processedTime) |
|||
{ |
|||
Processed = true; |
|||
ProcessedTime = processedTime; |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,52 @@ |
|||
using System; |
|||
using Volo.Abp.Auditing; |
|||
using Volo.Abp.Data; |
|||
using Volo.Abp.Domain.Entities; |
|||
using Volo.Abp.EventBus.Distributed; |
|||
|
|||
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents |
|||
{ |
|||
public class OutgoingEventRecord : |
|||
BasicAggregateRoot<Guid>, |
|||
IHasExtraProperties, |
|||
IHasCreationTime |
|||
{ |
|||
public static int MaxEventNameLength { get; set; } = 256; |
|||
|
|||
public ExtraPropertyDictionary ExtraProperties { get; private set; } |
|||
|
|||
public string EventName { get; private set; } |
|||
|
|||
public byte[] EventData { get; private set; } |
|||
|
|||
public DateTime CreationTime { get; private set; } |
|||
|
|||
protected OutgoingEventRecord() |
|||
{ |
|||
ExtraProperties = new ExtraPropertyDictionary(); |
|||
this.SetDefaultsForExtraProperties(); |
|||
} |
|||
|
|||
public OutgoingEventRecord( |
|||
OutgoingEventInfo eventInfo) |
|||
: base(eventInfo.Id) |
|||
{ |
|||
EventName = eventInfo.EventName; |
|||
EventData = eventInfo.EventData; |
|||
CreationTime = eventInfo.CreationTime; |
|||
|
|||
ExtraProperties = new ExtraPropertyDictionary(); |
|||
this.SetDefaultsForExtraProperties(); |
|||
} |
|||
|
|||
public OutgoingEventInfo ToOutgoingEventInfo() |
|||
{ |
|||
return new OutgoingEventInfo( |
|||
Id, |
|||
EventName, |
|||
EventData, |
|||
CreationTime |
|||
); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,43 @@ |
|||
using System; |
|||
using System.Threading.Tasks; |
|||
using Microsoft.EntityFrameworkCore; |
|||
using Microsoft.Extensions.Options; |
|||
using Volo.Abp.EventBus.Boxes; |
|||
using Volo.Abp.Timing; |
|||
using Volo.Abp.Uow; |
|||
|
|||
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents |
|||
{ |
|||
public class SqlRawDbContextEventInbox<TDbContext> : DbContextEventInbox<TDbContext> , ISqlRawDbContextEventInbox<TDbContext> |
|||
where TDbContext : IHasEventInbox |
|||
{ |
|||
public SqlRawDbContextEventInbox( |
|||
IDbContextProvider<TDbContext> dbContextProvider, |
|||
IClock clock, |
|||
IOptions<AbpEventBusBoxesOptions> eventBusBoxesOptions) |
|||
: base(dbContextProvider, clock, eventBusBoxesOptions) |
|||
{ |
|||
} |
|||
|
|||
[UnitOfWork] |
|||
public override async Task MarkAsProcessedAsync(Guid id) |
|||
{ |
|||
var dbContext = await DbContextProvider.GetDbContextAsync(); |
|||
var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName(); |
|||
|
|||
var sql = $"UPDATE {tableName} SET Processed = '1', ProcessedTime = '{Clock.Now}' WHERE Id = '{id.ToString().ToUpper()}'"; |
|||
await dbContext.Database.ExecuteSqlRawAsync(sql); |
|||
} |
|||
|
|||
[UnitOfWork] |
|||
public override async Task DeleteOldEventsAsync() |
|||
{ |
|||
var dbContext = await DbContextProvider.GetDbContextAsync(); |
|||
var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName(); |
|||
var timeToKeepEvents = Clock.Now - EventBusBoxesOptions.WaitTimeToDeleteProcessedInboxEvents; |
|||
|
|||
var sql = $"DELETE FROM {tableName} WHERE Processed = '1' AND CreationTime < '{timeToKeepEvents}'"; |
|||
await dbContext.Database.ExecuteSqlRawAsync(sql); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,26 @@ |
|||
using System; |
|||
using System.Threading.Tasks; |
|||
using Microsoft.EntityFrameworkCore; |
|||
using Volo.Abp.Uow; |
|||
|
|||
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents |
|||
{ |
|||
public class SqlRawDbContextEventOutbox<TDbContext> : DbContextEventOutbox<TDbContext> , ISqlRawDbContextEventOutbox<TDbContext> |
|||
where TDbContext : IHasEventOutbox |
|||
{ |
|||
public SqlRawDbContextEventOutbox(IDbContextProvider<TDbContext> dbContextProvider) |
|||
: base(dbContextProvider) |
|||
{ |
|||
} |
|||
|
|||
[UnitOfWork] |
|||
public override async Task DeleteAsync(Guid id) |
|||
{ |
|||
var dbContext = (IHasEventOutbox) await DbContextProvider.GetDbContextAsync(); |
|||
var tableName = dbContext.OutgoingEvents.EntityType.GetSchemaQualifiedTableName(); |
|||
|
|||
var sql = $"DELETE FROM {tableName} WHERE Id = '{id.ToString().ToUpper()}'"; |
|||
await dbContext.Database.ExecuteSqlRawAsync(sql); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,3 @@ |
|||
<Weavers xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="FodyWeavers.xsd"> |
|||
<ConfigureAwait ContinueOnCapturedContext="false" /> |
|||
</Weavers> |
|||
@ -0,0 +1,30 @@ |
|||
<?xml version="1.0" encoding="utf-8"?> |
|||
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"> |
|||
<!-- This file was generated by Fody. Manual changes to this file will be lost when your project is rebuilt. --> |
|||
<xs:element name="Weavers"> |
|||
<xs:complexType> |
|||
<xs:all> |
|||
<xs:element name="ConfigureAwait" minOccurs="0" maxOccurs="1"> |
|||
<xs:complexType> |
|||
<xs:attribute name="ContinueOnCapturedContext" type="xs:boolean" /> |
|||
</xs:complexType> |
|||
</xs:element> |
|||
</xs:all> |
|||
<xs:attribute name="VerifyAssembly" type="xs:boolean"> |
|||
<xs:annotation> |
|||
<xs:documentation>'true' to run assembly verification (PEVerify) on the target assembly after all weavers have been executed.</xs:documentation> |
|||
</xs:annotation> |
|||
</xs:attribute> |
|||
<xs:attribute name="VerifyIgnoreCodes" type="xs:string"> |
|||
<xs:annotation> |
|||
<xs:documentation>A comma-separated list of error codes that can be safely ignored in assembly verification.</xs:documentation> |
|||
</xs:annotation> |
|||
</xs:attribute> |
|||
<xs:attribute name="GenerateXsd" type="xs:boolean"> |
|||
<xs:annotation> |
|||
<xs:documentation>'false' to turn off automatic generation of the XML Schema file.</xs:documentation> |
|||
</xs:annotation> |
|||
</xs:attribute> |
|||
</xs:complexType> |
|||
</xs:element> |
|||
</xs:schema> |
|||
@ -0,0 +1,23 @@ |
|||
<Project Sdk="Microsoft.NET.Sdk"> |
|||
|
|||
<Import Project="..\..\..\configureawait.props" /> |
|||
<Import Project="..\..\..\common.props" /> |
|||
|
|||
<PropertyGroup> |
|||
<TargetFramework>netstandard2.0</TargetFramework> |
|||
<AssemblyName>Volo.Abp.EventBus.Boxes</AssemblyName> |
|||
<PackageId>Volo.Abp.EventBus.Boxes</PackageId> |
|||
<AssetTargetFallback>$(AssetTargetFallback);portable-net45+win8+wp8+wpa81;</AssetTargetFallback> |
|||
<GenerateAssemblyConfigurationAttribute>false</GenerateAssemblyConfigurationAttribute> |
|||
<GenerateAssemblyCompanyAttribute>false</GenerateAssemblyCompanyAttribute> |
|||
<GenerateAssemblyProductAttribute>false</GenerateAssemblyProductAttribute> |
|||
<RootNamespace /> |
|||
</PropertyGroup> |
|||
|
|||
<ItemGroup> |
|||
<ProjectReference Include="..\Volo.Abp.BackgroundWorkers\Volo.Abp.BackgroundWorkers.csproj" /> |
|||
<ProjectReference Include="..\Volo.Abp.DistributedLocking\Volo.Abp.DistributedLocking.csproj" /> |
|||
<ProjectReference Include="..\Volo.Abp.EventBus\Volo.Abp.EventBus.csproj" /> |
|||
</ItemGroup> |
|||
|
|||
</Project> |
|||
@ -0,0 +1,18 @@ |
|||
using Volo.Abp.EventBus.Distributed; |
|||
|
|||
namespace Volo.Abp.EventBus.Boxes |
|||
{ |
|||
public static class AbpDistributedEventBusExtensions |
|||
{ |
|||
public static ISupportsEventBoxes AsSupportsEventBoxes(this IDistributedEventBus eventBus) |
|||
{ |
|||
var supportsEventBoxes = eventBus as ISupportsEventBoxes; |
|||
if (supportsEventBoxes == null) |
|||
{ |
|||
throw new AbpException($"Given type ({eventBus.GetType().AssemblyQualifiedName}) should implement {nameof(ISupportsEventBoxes)}!"); |
|||
} |
|||
|
|||
return supportsEventBoxes; |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,18 @@ |
|||
using Volo.Abp.BackgroundWorkers; |
|||
using Volo.Abp.Modularity; |
|||
|
|||
namespace Volo.Abp.EventBus.Boxes |
|||
{ |
|||
[DependsOn( |
|||
typeof(AbpEventBusModule), |
|||
typeof(AbpBackgroundWorkersModule) |
|||
)] |
|||
public class AbpEventBusBoxesModule : AbpModule |
|||
{ |
|||
public override void OnApplicationInitialization(ApplicationInitializationContext context) |
|||
{ |
|||
context.AddBackgroundWorker<OutboxSenderManager>(); |
|||
context.AddBackgroundWorker<InboxProcessManager>(); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,48 @@ |
|||
using System; |
|||
|
|||
namespace Volo.Abp.EventBus.Boxes |
|||
{ |
|||
public class AbpEventBusBoxesOptions |
|||
{ |
|||
/// <summary>
|
|||
/// Default: 6 hours
|
|||
/// </summary>
|
|||
public TimeSpan CleanOldEventTimeIntervalSpan { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// Default: 1000
|
|||
/// </summary>
|
|||
public int InboxWaitingEventMaxCount { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// Default: 1000
|
|||
/// </summary>
|
|||
public int OutboxWaitingEventMaxCount { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// Period time of <see cref="InboxProcessor"/> and <see cref="OutboxSender"/>
|
|||
/// Default: 2 seconds
|
|||
/// </summary>
|
|||
public TimeSpan PeriodTimeSpan { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// Default: 15 seconds
|
|||
/// </summary>
|
|||
public TimeSpan DistributedLockWaitDuration { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// Default: 2 hours
|
|||
/// </summary>
|
|||
public TimeSpan WaitTimeToDeleteProcessedInboxEvents { get; set; } |
|||
|
|||
public AbpEventBusBoxesOptions() |
|||
{ |
|||
CleanOldEventTimeIntervalSpan = TimeSpan.FromHours(6); |
|||
InboxWaitingEventMaxCount = 1000; |
|||
OutboxWaitingEventMaxCount = 1000; |
|||
PeriodTimeSpan = TimeSpan.FromSeconds(2); |
|||
DistributedLockWaitDuration = TimeSpan.FromSeconds(15); |
|||
WaitTimeToDeleteProcessedInboxEvents = TimeSpan.FromHours(2); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,13 @@ |
|||
using System.Threading; |
|||
using System.Threading.Tasks; |
|||
using Volo.Abp.EventBus.Distributed; |
|||
|
|||
namespace Volo.Abp.EventBus.Boxes |
|||
{ |
|||
public interface IInboxProcessor |
|||
{ |
|||
Task StartAsync(InboxConfig inboxConfig, CancellationToken cancellationToken = default); |
|||
|
|||
Task StopAsync(CancellationToken cancellationToken = default); |
|||
} |
|||
} |
|||
@ -0,0 +1,13 @@ |
|||
using System.Threading; |
|||
using System.Threading.Tasks; |
|||
using Volo.Abp.EventBus.Distributed; |
|||
|
|||
namespace Volo.Abp.EventBus.Boxes |
|||
{ |
|||
public interface IOutboxSender |
|||
{ |
|||
Task StartAsync(OutboxConfig outboxConfig, CancellationToken cancellationToken = default); |
|||
|
|||
Task StopAsync(CancellationToken cancellationToken = default); |
|||
} |
|||
} |
|||
@ -0,0 +1,48 @@ |
|||
using System; |
|||
using System.Collections.Generic; |
|||
using System.Threading; |
|||
using System.Threading.Tasks; |
|||
using Microsoft.Extensions.DependencyInjection; |
|||
using Microsoft.Extensions.Options; |
|||
using Volo.Abp.BackgroundWorkers; |
|||
using Volo.Abp.EventBus.Distributed; |
|||
|
|||
namespace Volo.Abp.EventBus.Boxes |
|||
{ |
|||
public class InboxProcessManager : IBackgroundWorker |
|||
{ |
|||
protected AbpDistributedEventBusOptions Options { get; } |
|||
protected IServiceProvider ServiceProvider { get; } |
|||
protected List<IInboxProcessor> Processors { get; } |
|||
|
|||
public InboxProcessManager( |
|||
IOptions<AbpDistributedEventBusOptions> options, |
|||
IServiceProvider serviceProvider) |
|||
{ |
|||
ServiceProvider = serviceProvider; |
|||
Options = options.Value; |
|||
Processors = new List<IInboxProcessor>(); |
|||
} |
|||
|
|||
public async Task StartAsync(CancellationToken cancellationToken = default) |
|||
{ |
|||
foreach (var inboxConfig in Options.Inboxes.Values) |
|||
{ |
|||
if (inboxConfig.IsProcessingEnabled) |
|||
{ |
|||
var processor = ServiceProvider.GetRequiredService<IInboxProcessor>(); |
|||
await processor.StartAsync(inboxConfig, cancellationToken); |
|||
Processors.Add(processor); |
|||
} |
|||
} |
|||
} |
|||
|
|||
public async Task StopAsync(CancellationToken cancellationToken = default) |
|||
{ |
|||
foreach (var processor in Processors) |
|||
{ |
|||
await processor.StopAsync(cancellationToken); |
|||
} |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,140 @@ |
|||
using System; |
|||
using System.Threading; |
|||
using System.Threading.Tasks; |
|||
using Medallion.Threading; |
|||
using Microsoft.Extensions.DependencyInjection; |
|||
using Microsoft.Extensions.Logging; |
|||
using Microsoft.Extensions.Logging.Abstractions; |
|||
using Microsoft.Extensions.Options; |
|||
using Volo.Abp.DependencyInjection; |
|||
using Volo.Abp.EventBus.Distributed; |
|||
using Volo.Abp.Threading; |
|||
using Volo.Abp.Timing; |
|||
using Volo.Abp.Uow; |
|||
|
|||
namespace Volo.Abp.EventBus.Boxes |
|||
{ |
|||
public class InboxProcessor : IInboxProcessor, ITransientDependency |
|||
{ |
|||
protected IServiceProvider ServiceProvider { get; } |
|||
protected AbpAsyncTimer Timer { get; } |
|||
protected IDistributedEventBus DistributedEventBus { get; } |
|||
protected IDistributedLockProvider DistributedLockProvider { get; } |
|||
protected IUnitOfWorkManager UnitOfWorkManager { get; } |
|||
protected IClock Clock { get; } |
|||
protected IEventInbox Inbox { get; private set; } |
|||
protected InboxConfig InboxConfig { get; private set; } |
|||
protected AbpEventBusBoxesOptions EventBusBoxesOptions { get; } |
|||
|
|||
protected DateTime? LastCleanTime { get; set; } |
|||
|
|||
protected string DistributedLockName => "Inbox_" + InboxConfig.Name; |
|||
public ILogger<InboxProcessor> Logger { get; set; } |
|||
protected CancellationTokenSource StoppingTokenSource { get; } |
|||
protected CancellationToken StoppingToken { get; } |
|||
|
|||
public InboxProcessor( |
|||
IServiceProvider serviceProvider, |
|||
AbpAsyncTimer timer, |
|||
IDistributedEventBus distributedEventBus, |
|||
IDistributedLockProvider distributedLockProvider, |
|||
IUnitOfWorkManager unitOfWorkManager, |
|||
IClock clock, |
|||
IOptions<AbpEventBusBoxesOptions> eventBusBoxesOptions) |
|||
{ |
|||
ServiceProvider = serviceProvider; |
|||
Timer = timer; |
|||
DistributedEventBus = distributedEventBus; |
|||
DistributedLockProvider = distributedLockProvider; |
|||
UnitOfWorkManager = unitOfWorkManager; |
|||
Clock = clock; |
|||
EventBusBoxesOptions = eventBusBoxesOptions.Value; |
|||
Timer.Period = EventBusBoxesOptions.PeriodTimeSpan.Milliseconds; |
|||
Timer.Elapsed += TimerOnElapsed; |
|||
Logger = NullLogger<InboxProcessor>.Instance; |
|||
StoppingTokenSource = new CancellationTokenSource(); |
|||
StoppingToken = StoppingTokenSource.Token; |
|||
} |
|||
|
|||
private async Task TimerOnElapsed(AbpAsyncTimer arg) |
|||
{ |
|||
await RunAsync(); |
|||
} |
|||
|
|||
public Task StartAsync(InboxConfig inboxConfig, CancellationToken cancellationToken = default) |
|||
{ |
|||
InboxConfig = inboxConfig; |
|||
Inbox = (IEventInbox)ServiceProvider.GetRequiredService(inboxConfig.ImplementationType); |
|||
Timer.Start(cancellationToken); |
|||
return Task.CompletedTask; |
|||
} |
|||
|
|||
public Task StopAsync(CancellationToken cancellationToken = default) |
|||
{ |
|||
StoppingTokenSource.Cancel(); |
|||
Timer.Stop(cancellationToken); |
|||
StoppingTokenSource.Dispose(); |
|||
return Task.CompletedTask; |
|||
} |
|||
|
|||
protected virtual async Task RunAsync() |
|||
{ |
|||
if (StoppingToken.IsCancellationRequested) |
|||
{ |
|||
return; |
|||
} |
|||
|
|||
await using (var handle = await DistributedLockProvider.TryAcquireLockAsync(DistributedLockName, cancellationToken: StoppingToken)) |
|||
{ |
|||
if (handle != null) |
|||
{ |
|||
await DeleteOldEventsAsync(); |
|||
|
|||
while (true) |
|||
{ |
|||
var waitingEvents = await Inbox.GetWaitingEventsAsync(EventBusBoxesOptions.InboxWaitingEventMaxCount, StoppingToken); |
|||
if (waitingEvents.Count <= 0) |
|||
{ |
|||
break; |
|||
} |
|||
|
|||
Logger.LogInformation($"Found {waitingEvents.Count} events in the inbox."); |
|||
|
|||
foreach (var waitingEvent in waitingEvents) |
|||
{ |
|||
using (var uow = UnitOfWorkManager.Begin(isTransactional: true, requiresNew: true)) |
|||
{ |
|||
await DistributedEventBus |
|||
.AsSupportsEventBoxes() |
|||
.ProcessFromInboxAsync(waitingEvent, InboxConfig); |
|||
|
|||
await Inbox.MarkAsProcessedAsync(waitingEvent.Id); |
|||
|
|||
await uow.CompleteAsync(); |
|||
} |
|||
|
|||
Logger.LogInformation($"Processed the incoming event with id = {waitingEvent.Id:N}"); |
|||
} |
|||
} |
|||
} |
|||
else |
|||
{ |
|||
Logger.LogDebug("Could not obtain the distributed lock: " + DistributedLockName); |
|||
await TaskDelayHelper.DelayAsync(EventBusBoxesOptions.DistributedLockWaitDuration.Milliseconds, StoppingToken); |
|||
} |
|||
} |
|||
} |
|||
|
|||
protected virtual async Task DeleteOldEventsAsync() |
|||
{ |
|||
if (LastCleanTime != null && LastCleanTime + EventBusBoxesOptions.CleanOldEventTimeIntervalSpan > Clock.Now) |
|||
{ |
|||
return; |
|||
} |
|||
|
|||
await Inbox.DeleteOldEventsAsync(); |
|||
|
|||
LastCleanTime = Clock.Now; |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,108 @@ |
|||
using System; |
|||
using System.Threading; |
|||
using System.Threading.Tasks; |
|||
using Medallion.Threading; |
|||
using Microsoft.Extensions.DependencyInjection; |
|||
using Microsoft.Extensions.Logging; |
|||
using Microsoft.Extensions.Logging.Abstractions; |
|||
using Microsoft.Extensions.Options; |
|||
using Volo.Abp.DependencyInjection; |
|||
using Volo.Abp.EventBus.Distributed; |
|||
using Volo.Abp.Threading; |
|||
|
|||
namespace Volo.Abp.EventBus.Boxes |
|||
{ |
|||
public class OutboxSender : IOutboxSender, ITransientDependency |
|||
{ |
|||
protected IServiceProvider ServiceProvider { get; } |
|||
protected AbpAsyncTimer Timer { get; } |
|||
protected IDistributedEventBus DistributedEventBus { get; } |
|||
protected IDistributedLockProvider DistributedLockProvider { get; } |
|||
protected IEventOutbox Outbox { get; private set; } |
|||
protected OutboxConfig OutboxConfig { get; private set; } |
|||
protected AbpEventBusBoxesOptions EventBusBoxesOptions { get; } |
|||
protected string DistributedLockName => "Outbox_" + OutboxConfig.Name; |
|||
public ILogger<OutboxSender> Logger { get; set; } |
|||
|
|||
protected CancellationTokenSource StoppingTokenSource { get; } |
|||
protected CancellationToken StoppingToken { get; } |
|||
|
|||
public OutboxSender( |
|||
IServiceProvider serviceProvider, |
|||
AbpAsyncTimer timer, |
|||
IDistributedEventBus distributedEventBus, |
|||
IDistributedLockProvider distributedLockProvider, |
|||
IOptions<AbpEventBusBoxesOptions> eventBusBoxesOptions) |
|||
{ |
|||
ServiceProvider = serviceProvider; |
|||
Timer = timer; |
|||
DistributedEventBus = distributedEventBus; |
|||
DistributedLockProvider = distributedLockProvider; |
|||
EventBusBoxesOptions = eventBusBoxesOptions.Value; |
|||
Timer.Period = EventBusBoxesOptions.PeriodTimeSpan.Milliseconds; |
|||
Timer.Elapsed += TimerOnElapsed; |
|||
Logger = NullLogger<OutboxSender>.Instance; |
|||
StoppingTokenSource = new CancellationTokenSource(); |
|||
StoppingToken = StoppingTokenSource.Token; |
|||
} |
|||
|
|||
public virtual Task StartAsync(OutboxConfig outboxConfig, CancellationToken cancellationToken = default) |
|||
{ |
|||
OutboxConfig = outboxConfig; |
|||
Outbox = (IEventOutbox)ServiceProvider.GetRequiredService(outboxConfig.ImplementationType); |
|||
Timer.Start(cancellationToken); |
|||
return Task.CompletedTask; |
|||
} |
|||
|
|||
public virtual Task StopAsync(CancellationToken cancellationToken = default) |
|||
{ |
|||
StoppingTokenSource.Cancel(); |
|||
Timer.Stop(cancellationToken); |
|||
StoppingTokenSource.Dispose(); |
|||
return Task.CompletedTask; |
|||
} |
|||
|
|||
private async Task TimerOnElapsed(AbpAsyncTimer arg) |
|||
{ |
|||
await RunAsync(); |
|||
} |
|||
|
|||
protected virtual async Task RunAsync() |
|||
{ |
|||
await using (var handle = await DistributedLockProvider.TryAcquireLockAsync(DistributedLockName, cancellationToken: StoppingToken)) |
|||
{ |
|||
if (handle != null) |
|||
{ |
|||
while (true) |
|||
{ |
|||
var waitingEvents = await Outbox.GetWaitingEventsAsync(EventBusBoxesOptions.OutboxWaitingEventMaxCount, StoppingToken); |
|||
if (waitingEvents.Count <= 0) |
|||
{ |
|||
break; |
|||
} |
|||
|
|||
Logger.LogInformation($"Found {waitingEvents.Count} events in the outbox."); |
|||
|
|||
foreach (var waitingEvent in waitingEvents) |
|||
{ |
|||
await DistributedEventBus |
|||
.AsSupportsEventBoxes() |
|||
.PublishFromOutboxAsync( |
|||
waitingEvent, |
|||
OutboxConfig |
|||
); |
|||
|
|||
await Outbox.DeleteAsync(waitingEvent.Id); |
|||
Logger.LogInformation($"Sent the event to the message broker with id = {waitingEvent.Id:N}"); |
|||
} |
|||
} |
|||
} |
|||
else |
|||
{ |
|||
Logger.LogDebug("Could not obtain the distributed lock: " + DistributedLockName); |
|||
await TaskDelayHelper.DelayAsync(EventBusBoxesOptions.DistributedLockWaitDuration.Milliseconds, StoppingToken); |
|||
} |
|||
} |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,48 @@ |
|||
using System; |
|||
using System.Collections.Generic; |
|||
using System.Threading; |
|||
using System.Threading.Tasks; |
|||
using Microsoft.Extensions.DependencyInjection; |
|||
using Microsoft.Extensions.Options; |
|||
using Volo.Abp.BackgroundWorkers; |
|||
using Volo.Abp.EventBus.Distributed; |
|||
|
|||
namespace Volo.Abp.EventBus.Boxes |
|||
{ |
|||
public class OutboxSenderManager : IBackgroundWorker |
|||
{ |
|||
protected AbpDistributedEventBusOptions Options { get; } |
|||
protected IServiceProvider ServiceProvider { get; } |
|||
protected List<IOutboxSender> Senders { get; } |
|||
|
|||
public OutboxSenderManager( |
|||
IOptions<AbpDistributedEventBusOptions> options, |
|||
IServiceProvider serviceProvider) |
|||
{ |
|||
ServiceProvider = serviceProvider; |
|||
Options = options.Value; |
|||
Senders = new List<IOutboxSender>(); |
|||
} |
|||
|
|||
public async Task StartAsync(CancellationToken cancellationToken = default) |
|||
{ |
|||
foreach (var outboxConfig in Options.Outboxes.Values) |
|||
{ |
|||
if (outboxConfig.IsSendingEnabled) |
|||
{ |
|||
var sender = ServiceProvider.GetRequiredService<IOutboxSender>(); |
|||
await sender.StartAsync(outboxConfig, cancellationToken); |
|||
Senders.Add(sender); |
|||
} |
|||
} |
|||
} |
|||
|
|||
public async Task StopAsync(CancellationToken cancellationToken = default) |
|||
{ |
|||
foreach (var sender in Senders) |
|||
{ |
|||
await sender.StopAsync(cancellationToken); |
|||
} |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,20 @@ |
|||
using System.Threading; |
|||
using System.Threading.Tasks; |
|||
|
|||
namespace Volo.Abp.EventBus.Boxes |
|||
{ |
|||
internal static class TaskDelayHelper |
|||
{ |
|||
public static async Task DelayAsync(int milliseconds, CancellationToken cancellationToken = default) |
|||
{ |
|||
try |
|||
{ |
|||
await Task.Delay(milliseconds, cancellationToken); |
|||
} |
|||
catch (TaskCanceledException) |
|||
{ |
|||
return; |
|||
} |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,13 @@ |
|||
using System; |
|||
|
|||
namespace Volo.Abp.EventBus.Rebus |
|||
{ |
|||
public interface IRebusSerializer |
|||
{ |
|||
byte[] Serialize(object obj); |
|||
|
|||
object Deserialize(byte[] value, Type type); |
|||
|
|||
T Deserialize<T>(byte[] value); |
|||
} |
|||
} |
|||
@ -0,0 +1,32 @@ |
|||
using System; |
|||
using System.Text; |
|||
using Volo.Abp.DependencyInjection; |
|||
using Volo.Abp.Json; |
|||
|
|||
namespace Volo.Abp.EventBus.Rebus |
|||
{ |
|||
public class Utf8JsonRebusSerializer : IRebusSerializer, ITransientDependency |
|||
{ |
|||
private readonly IJsonSerializer _jsonSerializer; |
|||
|
|||
public Utf8JsonRebusSerializer(IJsonSerializer jsonSerializer) |
|||
{ |
|||
_jsonSerializer = jsonSerializer; |
|||
} |
|||
|
|||
public byte[] Serialize(object obj) |
|||
{ |
|||
return Encoding.UTF8.GetBytes(_jsonSerializer.Serialize(obj)); |
|||
} |
|||
|
|||
public object Deserialize(byte[] value, Type type) |
|||
{ |
|||
return _jsonSerializer.Deserialize(type, Encoding.UTF8.GetString(value)); |
|||
} |
|||
|
|||
public T Deserialize<T>(byte[] value) |
|||
{ |
|||
return _jsonSerializer.Deserialize<T>(Encoding.UTF8.GetString(value)); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,166 @@ |
|||
using System; |
|||
using System.Collections.Generic; |
|||
using System.Threading.Tasks; |
|||
using Microsoft.Extensions.DependencyInjection; |
|||
using Microsoft.Extensions.Options; |
|||
using Volo.Abp.Guids; |
|||
using Volo.Abp.MultiTenancy; |
|||
using Volo.Abp.Timing; |
|||
using Volo.Abp.Uow; |
|||
|
|||
namespace Volo.Abp.EventBus.Distributed |
|||
{ |
|||
public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventBus, ISupportsEventBoxes |
|||
{ |
|||
protected IGuidGenerator GuidGenerator { get; } |
|||
protected IClock Clock { get; } |
|||
protected AbpDistributedEventBusOptions AbpDistributedEventBusOptions { get; } |
|||
|
|||
protected DistributedEventBusBase( |
|||
IServiceScopeFactory serviceScopeFactory, |
|||
ICurrentTenant currentTenant, |
|||
IUnitOfWorkManager unitOfWorkManager, |
|||
IEventErrorHandler errorHandler, |
|||
IOptions<AbpDistributedEventBusOptions> abpDistributedEventBusOptions, |
|||
IGuidGenerator guidGenerator, |
|||
IClock clock |
|||
) : base( |
|||
serviceScopeFactory, |
|||
currentTenant, |
|||
unitOfWorkManager, |
|||
errorHandler) |
|||
{ |
|||
GuidGenerator = guidGenerator; |
|||
Clock = clock; |
|||
AbpDistributedEventBusOptions = abpDistributedEventBusOptions.Value; |
|||
} |
|||
|
|||
public IDisposable Subscribe<TEvent>(IDistributedEventHandler<TEvent> handler) where TEvent : class |
|||
{ |
|||
return Subscribe(typeof(TEvent), handler); |
|||
} |
|||
|
|||
public override Task PublishAsync(Type eventType, object eventData, bool onUnitOfWorkComplete = true) |
|||
{ |
|||
return PublishAsync(eventType, eventData, onUnitOfWorkComplete, useOutbox: true); |
|||
} |
|||
|
|||
public Task PublishAsync<TEvent>( |
|||
TEvent eventData, |
|||
bool onUnitOfWorkComplete = true, |
|||
bool useOutbox = true) |
|||
where TEvent : class |
|||
{ |
|||
return PublishAsync(typeof(TEvent), eventData, onUnitOfWorkComplete, useOutbox); |
|||
} |
|||
|
|||
public async Task PublishAsync( |
|||
Type eventType, |
|||
object eventData, |
|||
bool onUnitOfWorkComplete = true, |
|||
bool useOutbox = true) |
|||
{ |
|||
if (onUnitOfWorkComplete && UnitOfWorkManager.Current != null) |
|||
{ |
|||
AddToUnitOfWork( |
|||
UnitOfWorkManager.Current, |
|||
new UnitOfWorkEventRecord(eventType, eventData, EventOrderGenerator.GetNext(), useOutbox) |
|||
); |
|||
return; |
|||
} |
|||
|
|||
if (useOutbox) |
|||
{ |
|||
if (await AddToOutboxAsync(eventType, eventData)) |
|||
{ |
|||
return; |
|||
} |
|||
} |
|||
|
|||
await PublishToEventBusAsync(eventType, eventData); |
|||
} |
|||
|
|||
public abstract Task PublishFromOutboxAsync( |
|||
OutgoingEventInfo outgoingEvent, |
|||
OutboxConfig outboxConfig |
|||
); |
|||
|
|||
public abstract Task ProcessFromInboxAsync( |
|||
IncomingEventInfo incomingEvent, |
|||
InboxConfig inboxConfig); |
|||
|
|||
private async Task<bool> AddToOutboxAsync(Type eventType, object eventData) |
|||
{ |
|||
var unitOfWork = UnitOfWorkManager.Current; |
|||
if (unitOfWork == null) |
|||
{ |
|||
return false; |
|||
} |
|||
|
|||
foreach (var outboxConfig in AbpDistributedEventBusOptions.Outboxes.Values) |
|||
{ |
|||
if (outboxConfig.Selector == null || outboxConfig.Selector(eventType)) |
|||
{ |
|||
var eventOutbox = (IEventOutbox)unitOfWork.ServiceProvider.GetRequiredService(outboxConfig.ImplementationType); |
|||
var eventName = EventNameAttribute.GetNameOrDefault(eventType); |
|||
await eventOutbox.EnqueueAsync( |
|||
new OutgoingEventInfo( |
|||
GuidGenerator.Create(), |
|||
eventName, |
|||
Serialize(eventData), |
|||
Clock.Now |
|||
) |
|||
); |
|||
return true; |
|||
} |
|||
} |
|||
|
|||
return false; |
|||
} |
|||
|
|||
protected async Task<bool> AddToInboxAsync( |
|||
string messageId, |
|||
string eventName, |
|||
Type eventType, |
|||
byte[] eventBytes) |
|||
{ |
|||
if (AbpDistributedEventBusOptions.Inboxes.Count <= 0) |
|||
{ |
|||
return false; |
|||
} |
|||
|
|||
using (var scope = ServiceScopeFactory.CreateScope()) |
|||
{ |
|||
foreach (var inboxConfig in AbpDistributedEventBusOptions.Inboxes.Values) |
|||
{ |
|||
if (inboxConfig.EventSelector == null || inboxConfig.EventSelector(eventType)) |
|||
{ |
|||
var eventInbox = (IEventInbox) scope.ServiceProvider.GetRequiredService(inboxConfig.ImplementationType); |
|||
|
|||
if (!messageId.IsNullOrEmpty()) |
|||
{ |
|||
if (await eventInbox.ExistsByMessageIdAsync(messageId)) |
|||
{ |
|||
continue; |
|||
} |
|||
} |
|||
|
|||
await eventInbox.EnqueueAsync( |
|||
new IncomingEventInfo( |
|||
GuidGenerator.Create(), |
|||
messageId, |
|||
eventName, |
|||
eventBytes, |
|||
Clock.Now |
|||
) |
|||
); |
|||
} |
|||
} |
|||
} |
|||
|
|||
return true; |
|||
} |
|||
|
|||
protected abstract byte[] Serialize(object eventData); |
|||
} |
|||
} |
|||
@ -0,0 +1,20 @@ |
|||
using System; |
|||
using System.Collections.Generic; |
|||
using System.Threading; |
|||
using System.Threading.Tasks; |
|||
|
|||
namespace Volo.Abp.EventBus.Distributed |
|||
{ |
|||
public interface IEventInbox |
|||
{ |
|||
Task EnqueueAsync(IncomingEventInfo incomingEvent); |
|||
|
|||
Task<List<IncomingEventInfo>> GetWaitingEventsAsync(int maxCount, CancellationToken cancellationToken = default); |
|||
|
|||
Task MarkAsProcessedAsync(Guid id); |
|||
|
|||
Task<bool> ExistsByMessageIdAsync(string messageId); |
|||
|
|||
Task DeleteOldEventsAsync(); |
|||
} |
|||
} |
|||
@ -0,0 +1,16 @@ |
|||
using System; |
|||
using System.Collections.Generic; |
|||
using System.Threading; |
|||
using System.Threading.Tasks; |
|||
|
|||
namespace Volo.Abp.EventBus.Distributed |
|||
{ |
|||
public interface IEventOutbox |
|||
{ |
|||
Task EnqueueAsync(OutgoingEventInfo outgoingEvent); |
|||
|
|||
Task<List<OutgoingEventInfo>> GetWaitingEventsAsync(int maxCount, CancellationToken cancellationToken = default); |
|||
|
|||
Task DeleteAsync(Guid id); |
|||
} |
|||
} |
|||
@ -0,0 +1,17 @@ |
|||
using System.Threading.Tasks; |
|||
|
|||
namespace Volo.Abp.EventBus.Distributed |
|||
{ |
|||
public interface ISupportsEventBoxes |
|||
{ |
|||
Task PublishFromOutboxAsync( |
|||
OutgoingEventInfo outgoingEvent, |
|||
OutboxConfig outboxConfig |
|||
); |
|||
|
|||
Task ProcessFromInboxAsync( |
|||
IncomingEventInfo incomingEvent, |
|||
InboxConfig inboxConfig |
|||
); |
|||
} |
|||
} |
|||
@ -0,0 +1,28 @@ |
|||
using System; |
|||
using JetBrains.Annotations; |
|||
|
|||
namespace Volo.Abp.EventBus.Distributed |
|||
{ |
|||
public class InboxConfig |
|||
{ |
|||
[NotNull] |
|||
public string Name { get; } |
|||
|
|||
public Type ImplementationType { get; set; } |
|||
|
|||
public Func<Type, bool> EventSelector { get; set; } |
|||
|
|||
public Func<Type, bool> HandlerSelector { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// Used to enable/disable processing incoming events.
|
|||
/// Default: true.
|
|||
/// </summary>
|
|||
public bool IsProcessingEnabled { get; set; } = true; |
|||
|
|||
public InboxConfig([NotNull] string name) |
|||
{ |
|||
Name = Check.NotNullOrWhiteSpace(name, nameof(name)); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,19 @@ |
|||
using System; |
|||
using System.Collections.Generic; |
|||
|
|||
namespace Volo.Abp.EventBus.Distributed |
|||
{ |
|||
public class InboxConfigDictionary : Dictionary<string, InboxConfig> |
|||
{ |
|||
public void Configure(Action<InboxConfig> configAction) |
|||
{ |
|||
Configure("Default", configAction); |
|||
} |
|||
|
|||
public void Configure(string outboxName, Action<InboxConfig> configAction) |
|||
{ |
|||
var outboxConfig = this.GetOrAdd(outboxName, () => new InboxConfig(outboxName)); |
|||
configAction(outboxConfig); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,44 @@ |
|||
using System; |
|||
using Volo.Abp.Data; |
|||
|
|||
namespace Volo.Abp.EventBus.Distributed |
|||
{ |
|||
public class IncomingEventInfo : IHasExtraProperties |
|||
{ |
|||
public static int MaxEventNameLength { get; set; } = 256; |
|||
|
|||
public ExtraPropertyDictionary ExtraProperties { get; protected set; } |
|||
|
|||
public Guid Id { get; } |
|||
|
|||
public string MessageId { get; } |
|||
|
|||
public string EventName { get; } |
|||
|
|||
public byte[] EventData { get; } |
|||
|
|||
public DateTime CreationTime { get; } |
|||
|
|||
protected IncomingEventInfo() |
|||
{ |
|||
ExtraProperties = new ExtraPropertyDictionary(); |
|||
this.SetDefaultsForExtraProperties(); |
|||
} |
|||
|
|||
public IncomingEventInfo( |
|||
Guid id, |
|||
string messageId, |
|||
string eventName, |
|||
byte[] eventData, |
|||
DateTime creationTime) |
|||
{ |
|||
Id = id; |
|||
MessageId = messageId; |
|||
EventName = Check.NotNullOrWhiteSpace(eventName, nameof(eventName), MaxEventNameLength); |
|||
EventData = eventData; |
|||
CreationTime = creationTime; |
|||
ExtraProperties = new ExtraPropertyDictionary(); |
|||
this.SetDefaultsForExtraProperties(); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,26 @@ |
|||
using System; |
|||
using JetBrains.Annotations; |
|||
|
|||
namespace Volo.Abp.EventBus.Distributed |
|||
{ |
|||
public class OutboxConfig |
|||
{ |
|||
[NotNull] |
|||
public string Name { get; } |
|||
|
|||
public Type ImplementationType { get; set; } |
|||
|
|||
public Func<Type, bool> Selector { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// Used to enable/disable sending events from outbox to the message broker.
|
|||
/// Default: true.
|
|||
/// </summary>
|
|||
public bool IsSendingEnabled { get; set; } = true; |
|||
|
|||
public OutboxConfig([NotNull] string name) |
|||
{ |
|||
Name = Check.NotNullOrWhiteSpace(name, nameof(name)); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,19 @@ |
|||
using System; |
|||
using System.Collections.Generic; |
|||
|
|||
namespace Volo.Abp.EventBus.Distributed |
|||
{ |
|||
public class OutboxConfigDictionary : Dictionary<string, OutboxConfig> |
|||
{ |
|||
public void Configure(Action<OutboxConfig> configAction) |
|||
{ |
|||
Configure("Default", configAction); |
|||
} |
|||
|
|||
public void Configure(string outboxName, Action<OutboxConfig> configAction) |
|||
{ |
|||
var outboxConfig = this.GetOrAdd(outboxName, () => new OutboxConfig(outboxName)); |
|||
configAction(outboxConfig); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,39 @@ |
|||
using System; |
|||
using Volo.Abp.Data; |
|||
|
|||
namespace Volo.Abp.EventBus.Distributed |
|||
{ |
|||
public class OutgoingEventInfo : IHasExtraProperties |
|||
{ |
|||
public static int MaxEventNameLength { get; set; } = 256; |
|||
|
|||
public ExtraPropertyDictionary ExtraProperties { get; protected set; } |
|||
|
|||
public Guid Id { get; } |
|||
|
|||
public string EventName { get; } |
|||
|
|||
public byte[] EventData { get; } |
|||
|
|||
public DateTime CreationTime { get; } |
|||
|
|||
protected OutgoingEventInfo() |
|||
{ |
|||
ExtraProperties = new ExtraPropertyDictionary(); |
|||
this.SetDefaultsForExtraProperties(); |
|||
} |
|||
|
|||
public OutgoingEventInfo( |
|||
Guid id, |
|||
string eventName, |
|||
byte[] eventData, |
|||
DateTime creationTime) |
|||
{ |
|||
Id = id; |
|||
EventName = Check.NotNullOrWhiteSpace(eventName, nameof(eventName), MaxEventNameLength); |
|||
EventData = eventData; |
|||
CreationTime = creationTime; |
|||
ExtraProperties = new ExtraPropertyDictionary(); |
|||
this.SetDefaultsForExtraProperties(); |
|||
} |
|||
}} |
|||
Some files were not shown because too many files changed in this diff
Loading…
Reference in new issue