Browse Source

Merge pull request #10159 from abpframework/liangshiwei/eventboxes

Complete the Outbox & Inbox Patterns feature
pull/10008/head
Halil İbrahim Kalkan 4 years ago
committed by GitHub
parent
commit
cca31d2dca
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      framework/src/Volo.Abp.Ddd.Domain/Volo.Abp.Ddd.Domain.csproj
  2. 3
      framework/src/Volo.Abp.Ddd.Domain/Volo/Abp/Domain/AbpDddDomainModule.cs
  3. 13
      framework/src/Volo.Abp.EntityFrameworkCore.MySQL/Volo/Abp/EntityFrameworkCore/DistributedEvents/MySQLInboxConfigExtensions.cs
  4. 13
      framework/src/Volo.Abp.EntityFrameworkCore.MySQL/Volo/Abp/EntityFrameworkCore/DistributedEvents/MySQLOutboxConfigExtensions.cs
  5. 8
      framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/IOracleDbContextEventInbox.cs
  6. 7
      framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/IOracleDbContextEventOutbox.cs
  7. 49
      framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventInbox.cs
  8. 31
      framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventOutbox.cs
  9. 13
      framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleInboxConfigExtensions.cs
  10. 13
      framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleOutboxConfigExtensions.cs
  11. 7
      framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/Oracle/Devart/AbpEntityFrameworkCoreOracleDevartModule.cs
  12. 8
      framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/IOracleDbContextEventInbox.cs
  13. 7
      framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/IOracleDbContextEventOutbox.cs
  14. 48
      framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventInbox.cs
  15. 31
      framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventOutbox.cs
  16. 13
      framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleInboxConfigExtensions.cs
  17. 13
      framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleOutboxConfigExtensions.cs
  18. 7
      framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/Oracle/AbpEntityFrameworkCoreOracleModule.cs
  19. 8
      framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/IPostgreSqlDbContextEventInbox.cs
  20. 7
      framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/IPostgreSqlDbContextEventOutbox.cs
  21. 44
      framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlDbContextEventInbox.cs
  22. 25
      framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlDbContextEventOutbox.cs
  23. 13
      framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlInboxConfigExtensions.cs
  24. 13
      framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlOutboxConfigExtensions.cs
  25. 7
      framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/PostgreSql/AbpEntityFrameworkCorePostgreSqlModule.cs
  26. 13
      framework/src/Volo.Abp.EntityFrameworkCore.SqlServer/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqlServerInboxConfigExtensions.cs
  27. 13
      framework/src/Volo.Abp.EntityFrameworkCore.SqlServer/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqlServerOutboxConfigExtensions.cs
  28. 13
      framework/src/Volo.Abp.EntityFrameworkCore.Sqlite/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqliteInboxConfigExtensions.cs
  29. 13
      framework/src/Volo.Abp.EntityFrameworkCore.Sqlite/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqliteOutboxConfigExtensions.cs
  30. 4
      framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/AbpEntityFrameworkCoreModule.cs
  31. 29
      framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs
  32. 16
      framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventOutbox.cs
  33. 7
      framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/ISqlRawDbContextEventInbox.cs
  34. 7
      framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/ISqlRawDbContextEventOutbox.cs
  35. 43
      framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqlRawDbContextEventInbox.cs
  36. 26
      framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqlRawDbContextEventOutbox.cs
  37. 48
      framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/AbpEventBusBoxesOptions.cs
  38. 18
      framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/InboxProcessor.cs
  39. 16
      framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/OutboxSender.cs
  40. 0
      framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/IRebusSerializer.cs
  41. 37
      framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs
  42. 2
      framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventHandlerAdapter.cs
  43. 7
      framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/AbpDistributedEventBusOptions.cs
  44. 11
      framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/IEventInbox.cs
  45. 9
      framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/IEventOutbox.cs
  46. 4
      framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/IHasEventInbox.cs
  47. 4
      framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/IHasEventOutbox.cs
  48. 61
      framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventInbox.cs
  49. 61
      framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventOutbox.cs
  50. 2
      test/DistEvents/DistDemoApp.EfCoreRabbitMq/appsettings.json
  51. 12
      test/DistEvents/DistDemoApp.MongoDbKafka/DistDemoApp.MongoDbKafka.csproj
  52. 38
      test/DistEvents/DistDemoApp.MongoDbKafka/DistDemoAppMongoDbKafkaModule.cs
  53. 53
      test/DistEvents/DistDemoApp.MongoDbKafka/Program.cs
  54. 26
      test/DistEvents/DistDemoApp.MongoDbKafka/TodoMongoDbContext.cs
  55. 19
      test/DistEvents/DistDemoApp.MongoDbKafka/appsettings.json
  56. 21
      test/DistEvents/DistDemoApp.MongoDbRebus/DistDemoApp.MongoDbRebus.csproj
  57. 53
      test/DistEvents/DistDemoApp.MongoDbRebus/DistDemoAppMongoDbRebusModule.cs
  58. 57
      test/DistEvents/DistDemoApp.MongoDbRebus/Program.cs
  59. 19
      test/DistEvents/DistDemoApp.MongoDbRebus/TodoMongoDbContext.cs
  60. 19
      test/DistEvents/DistDemoApp.MongoDbRebus/appsettings.json
  61. 6
      test/DistEvents/DistEventsDemo.sln

2
framework/src/Volo.Abp.Ddd.Domain/Volo.Abp.Ddd.Domain.csproj

@ -17,7 +17,7 @@
<ItemGroup>
<ProjectReference Include="..\Volo.Abp.Auditing\Volo.Abp.Auditing.csproj" />
<ProjectReference Include="..\Volo.Abp.Data\Volo.Abp.Data.csproj" />
<ProjectReference Include="..\Volo.Abp.EventBus\Volo.Abp.EventBus.csproj" />
<ProjectReference Include="..\Volo.Abp.EventBus.Boxes\Volo.Abp.EventBus.Boxes.csproj" />
<ProjectReference Include="..\Volo.Abp.ExceptionHandling\Volo.Abp.ExceptionHandling.csproj" />
<ProjectReference Include="..\Volo.Abp.Guids\Volo.Abp.Guids.csproj" />
<ProjectReference Include="..\Volo.Abp.MultiTenancy\Volo.Abp.MultiTenancy.csproj" />

3
framework/src/Volo.Abp.Ddd.Domain/Volo/Abp/Domain/AbpDddDomainModule.cs

@ -3,6 +3,7 @@ using Volo.Abp.Auditing;
using Volo.Abp.Data;
using Volo.Abp.Domain.Repositories;
using Volo.Abp.EventBus;
using Volo.Abp.EventBus.Boxes;
using Volo.Abp.ExceptionHandling;
using Volo.Abp.Guids;
using Volo.Abp.Modularity;
@ -18,7 +19,7 @@ namespace Volo.Abp.Domain
[DependsOn(
typeof(AbpAuditingModule),
typeof(AbpDataModule),
typeof(AbpEventBusModule),
typeof(AbpEventBusBoxesModule),
typeof(AbpGuidsModule),
typeof(AbpMultiTenancyModule),
typeof(AbpThreadingModule),

13
framework/src/Volo.Abp.EntityFrameworkCore.MySQL/Volo/Abp/EntityFrameworkCore/DistributedEvents/MySQLInboxConfigExtensions.cs

@ -0,0 +1,13 @@
using Volo.Abp.EventBus.Distributed;
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public static class MySQLInboxConfigExtensions
{
public static void UseMySQL<TDbContext>(this InboxConfig outboxConfig)
where TDbContext : IHasEventInbox
{
outboxConfig.ImplementationType = typeof(ISqlRawDbContextEventInbox<TDbContext>);
}
}
}

13
framework/src/Volo.Abp.EntityFrameworkCore.MySQL/Volo/Abp/EntityFrameworkCore/DistributedEvents/MySQLOutboxConfigExtensions.cs

@ -0,0 +1,13 @@
using Volo.Abp.EventBus.Distributed;
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public static class MySQLOutboxConfigExtensions
{
public static void UseMySQL<TDbContext>(this OutboxConfig outboxConfig)
where TDbContext : IHasEventOutbox
{
outboxConfig.ImplementationType = typeof(ISqlRawDbContextEventOutbox<TDbContext>);
}
}
}

8
framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/IOracleDbContextEventInbox.cs

@ -0,0 +1,8 @@
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public interface IOracleDbContextEventInbox<TDbContext> : IDbContextEventInbox<TDbContext>
where TDbContext : IHasEventInbox
{
}
}

7
framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/IOracleDbContextEventOutbox.cs

@ -0,0 +1,7 @@
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public interface IOracleDbContextEventOutbox<TDbContext> : IDbContextEventOutbox<TDbContext>
where TDbContext : IHasEventOutbox
{
}
}

49
framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventInbox.cs

@ -0,0 +1,49 @@
using System;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Options;
using Volo.Abp.EventBus.Boxes;
using Volo.Abp.EventBus.Distributed;
using Volo.Abp.Timing;
using Volo.Abp.Uow;
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public class OracleDbContextEventInbox<TDbContext> : DbContextEventInbox<TDbContext> , IOracleDbContextEventInbox<TDbContext>
where TDbContext : IHasEventInbox
{
public OracleDbContextEventInbox(
IDbContextProvider<TDbContext> dbContextProvider,
IClock clock,
IOptions<AbpEventBusBoxesOptions> eventBusBoxesOptions)
: base(dbContextProvider, clock, eventBusBoxesOptions)
{
}
[UnitOfWork]
public override async Task MarkAsProcessedAsync(Guid id)
{
var dbContext = await DbContextProvider.GetDbContextAsync();
var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName();
var sql = $"UPDATE \"{tableName}\" SET \"Processed\" = '1', \"ProcessedTime\" = TO_DATE('{Clock.Now}', 'yyyy-mm-dd hh24:mi:ss') WHERE \"Id\" = HEXTORAW('{GuidToOracleType(id)}')";
await dbContext.Database.ExecuteSqlRawAsync(sql);
}
[UnitOfWork]
public override async Task DeleteOldEventsAsync()
{
var dbContext = await DbContextProvider.GetDbContextAsync();
var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName();
var timeToKeepEvents = Clock.Now - EventBusBoxesOptions.WaitTimeToDeleteProcessedInboxEvents;
var sql = $"DELETE FROM \"{tableName}\" WHERE \"Processed\" = '1' AND \"CreationTime\" < TO_DATE('{timeToKeepEvents}', 'yyyy-mm-dd hh24:mi:ss')";
await dbContext.Database.ExecuteSqlRawAsync(sql);
}
protected virtual string GuidToOracleType(Guid id)
{
return BitConverter.ToString(id.ToByteArray()).Replace("-", "").ToUpper();
}
}
}

31
framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventOutbox.cs

@ -0,0 +1,31 @@
using System;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Volo.Abp.Uow;
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public class OracleDbContextEventOutbox<TDbContext> : DbContextEventOutbox<TDbContext> , IOracleDbContextEventOutbox<TDbContext>
where TDbContext : IHasEventOutbox
{
public OracleDbContextEventOutbox(IDbContextProvider<TDbContext> dbContextProvider)
: base(dbContextProvider)
{
}
[UnitOfWork]
public override async Task DeleteAsync(Guid id)
{
var dbContext = (IHasEventOutbox) await DbContextProvider.GetDbContextAsync();
var tableName = dbContext.OutgoingEvents.EntityType.GetSchemaQualifiedTableName();
var sql = $"DELETE FROM \"{tableName}\" WHERE \"Id\" = HEXTORAW('{GuidToOracleType(id)}')";
await dbContext.Database.ExecuteSqlRawAsync(sql);
}
protected virtual string GuidToOracleType(Guid id)
{
return BitConverter.ToString(id.ToByteArray()).Replace("-", "").ToUpper();
}
}
}

13
framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleInboxConfigExtensions.cs

@ -0,0 +1,13 @@
using Volo.Abp.EventBus.Distributed;
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public static class OracleInboxConfigExtensions
{
public static void UseOracle<TDbContext>(this InboxConfig outboxConfig)
where TDbContext : IHasEventInbox
{
outboxConfig.ImplementationType = typeof(IOracleDbContextEventInbox<TDbContext>);
}
}
}

13
framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleOutboxConfigExtensions.cs

@ -0,0 +1,13 @@
using Volo.Abp.EventBus.Distributed;
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public static class OracleOutboxConfigExtensions
{
public static void UseOracle<TDbContext>(this OutboxConfig outboxConfig)
where TDbContext : IHasEventOutbox
{
outboxConfig.ImplementationType = typeof(IOracleDbContextEventOutbox<TDbContext>);
}
}
}

7
framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/Oracle/Devart/AbpEntityFrameworkCoreOracleDevartModule.cs

@ -1,4 +1,6 @@
using Volo.Abp.Guids;
using Microsoft.Extensions.DependencyInjection;
using Volo.Abp.EntityFrameworkCore.DistributedEvents;
using Volo.Abp.Guids;
using Volo.Abp.Modularity;
namespace Volo.Abp.EntityFrameworkCore.Oracle.Devart
@ -17,6 +19,9 @@ namespace Volo.Abp.EntityFrameworkCore.Oracle.Devart
options.DefaultSequentialGuidType = SequentialGuidType.SequentialAsBinary;
}
});
context.Services.AddTransient(typeof(IOracleDbContextEventOutbox<>), typeof(OracleDbContextEventOutbox<>));
context.Services.AddTransient(typeof(IOracleDbContextEventInbox<>), typeof(OracleDbContextEventInbox<>));
}
}
}

8
framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/IOracleDbContextEventInbox.cs

@ -0,0 +1,8 @@
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public interface IOracleDbContextEventInbox<TDbContext> : IDbContextEventInbox<TDbContext>
where TDbContext : IHasEventInbox
{
}
}

7
framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/IOracleDbContextEventOutbox.cs

@ -0,0 +1,7 @@
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public interface IOracleDbContextEventOutbox<TDbContext> : IDbContextEventOutbox<TDbContext>
where TDbContext : IHasEventOutbox
{
}
}

48
framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventInbox.cs

@ -0,0 +1,48 @@
using System;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Options;
using Volo.Abp.EventBus.Boxes;
using Volo.Abp.Timing;
using Volo.Abp.Uow;
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public class OracleDbContextEventInbox<TDbContext> : DbContextEventInbox<TDbContext> , IOracleDbContextEventInbox<TDbContext>
where TDbContext : IHasEventInbox
{
public OracleDbContextEventInbox(
IDbContextProvider<TDbContext> dbContextProvider,
IClock clock,
IOptions<AbpEventBusBoxesOptions> eventBusBoxesOptions)
: base(dbContextProvider, clock, eventBusBoxesOptions)
{
}
[UnitOfWork]
public override async Task MarkAsProcessedAsync(Guid id)
{
var dbContext = await DbContextProvider.GetDbContextAsync();
var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName();
var sql = $"UPDATE \"{tableName}\" SET \"Processed\" = '1', \"ProcessedTime\" = TO_DATE('{Clock.Now}', 'yyyy-mm-dd hh24:mi:ss') WHERE \"Id\" = HEXTORAW('{GuidToOracleType(id)}')";
await dbContext.Database.ExecuteSqlRawAsync(sql);
}
[UnitOfWork]
public override async Task DeleteOldEventsAsync()
{
var dbContext = await DbContextProvider.GetDbContextAsync();
var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName();
var timeToKeepEvents = Clock.Now - EventBusBoxesOptions.WaitTimeToDeleteProcessedInboxEvents;
var sql = $"DELETE FROM \"{tableName}\" WHERE \"Processed\" = '1' AND \"CreationTime\" < TO_DATE('{timeToKeepEvents}', 'yyyy-mm-dd hh24:mi:ss')";
await dbContext.Database.ExecuteSqlRawAsync(sql);
}
protected virtual string GuidToOracleType(Guid id)
{
return BitConverter.ToString(id.ToByteArray()).Replace("-", "").ToUpper();
}
}
}

31
framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventOutbox.cs

@ -0,0 +1,31 @@
using System;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Volo.Abp.Uow;
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public class OracleDbContextEventOutbox<TDbContext> : DbContextEventOutbox<TDbContext> , IOracleDbContextEventOutbox<TDbContext>
where TDbContext : IHasEventOutbox
{
public OracleDbContextEventOutbox(IDbContextProvider<TDbContext> dbContextProvider)
: base(dbContextProvider)
{
}
[UnitOfWork]
public override async Task DeleteAsync(Guid id)
{
var dbContext = (IHasEventOutbox) await DbContextProvider.GetDbContextAsync();
var tableName = dbContext.OutgoingEvents.EntityType.GetSchemaQualifiedTableName();
var sql = $"DELETE FROM \"{tableName}\" WHERE \"Id\" = HEXTORAW('{GuidToOracleType(id)}')";
await dbContext.Database.ExecuteSqlRawAsync(sql);
}
protected virtual string GuidToOracleType(Guid id)
{
return BitConverter.ToString(id.ToByteArray()).Replace("-", "").ToUpper();
}
}
}

13
framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleInboxConfigExtensions.cs

@ -0,0 +1,13 @@
using Volo.Abp.EventBus.Distributed;
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public static class OracleInboxConfigExtensions
{
public static void UseOracle<TDbContext>(this InboxConfig outboxConfig)
where TDbContext : IHasEventInbox
{
outboxConfig.ImplementationType = typeof(IOracleDbContextEventInbox<TDbContext>);
}
}
}

13
framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleOutboxConfigExtensions.cs

@ -0,0 +1,13 @@
using Volo.Abp.EventBus.Distributed;
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public static class OracleOutboxConfigExtensions
{
public static void UseOracle<TDbContext>(this OutboxConfig outboxConfig)
where TDbContext : IHasEventOutbox
{
outboxConfig.ImplementationType = typeof(IOracleDbContextEventOutbox<TDbContext>);
}
}
}

7
framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/Oracle/AbpEntityFrameworkCoreOracleModule.cs

@ -1,4 +1,6 @@
using Volo.Abp.Guids;
using Microsoft.Extensions.DependencyInjection;
using Volo.Abp.EntityFrameworkCore.DistributedEvents;
using Volo.Abp.Guids;
using Volo.Abp.Modularity;
namespace Volo.Abp.EntityFrameworkCore.Oracle
@ -15,6 +17,9 @@ namespace Volo.Abp.EntityFrameworkCore.Oracle
options.DefaultSequentialGuidType = SequentialGuidType.SequentialAsBinary;
}
});
context.Services.AddTransient(typeof(IOracleDbContextEventOutbox<>), typeof(OracleDbContextEventOutbox<>));
context.Services.AddTransient(typeof(IOracleDbContextEventInbox<>), typeof(OracleDbContextEventInbox<>));
}
}
}

8
framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/IPostgreSqlDbContextEventInbox.cs

@ -0,0 +1,8 @@
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public interface IPostgreSqlDbContextEventInbox<TDbContext> : IDbContextEventInbox<TDbContext>
where TDbContext : IHasEventInbox
{
}
}

7
framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/IPostgreSqlDbContextEventOutbox.cs

@ -0,0 +1,7 @@
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public interface IPostgreSqlDbContextEventOutbox<TDbContext> : IDbContextEventOutbox<TDbContext>
where TDbContext : IHasEventOutbox
{
}
}

44
framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlDbContextEventInbox.cs

@ -0,0 +1,44 @@
using System;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Options;
using Volo.Abp.EventBus.Boxes;
using Volo.Abp.EventBus.Distributed;
using Volo.Abp.Timing;
using Volo.Abp.Uow;
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public class PostgreSqlDbContextEventInbox<TDbContext> : DbContextEventInbox<TDbContext>, IPostgreSqlDbContextEventInbox<TDbContext>
where TDbContext : IHasEventInbox
{
public PostgreSqlDbContextEventInbox(
IDbContextProvider<TDbContext> dbContextProvider,
IClock clock,
IOptions<AbpEventBusBoxesOptions> eventBusBoxesOptions)
: base(dbContextProvider, clock, eventBusBoxesOptions)
{
}
[UnitOfWork]
public override async Task MarkAsProcessedAsync(Guid id)
{
var dbContext = await DbContextProvider.GetDbContextAsync();
var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName();
var sql = $"UPDATE \"{tableName}\" SET \"Processed\" = '1', \"ProcessedTime\" = '{Clock.Now}' WHERE \"Id\" = '{id}'";
await dbContext.Database.ExecuteSqlRawAsync(sql);
}
[UnitOfWork]
public override async Task DeleteOldEventsAsync()
{
var dbContext = await DbContextProvider.GetDbContextAsync();
var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName();
var timeToKeepEvents = Clock.Now - EventBusBoxesOptions.WaitTimeToDeleteProcessedInboxEvents;
var sql = $"DELETE FROM \"{tableName}\" WHERE \"Processed\" = '1' AND \"CreationTime\" < '{timeToKeepEvents}'";
await dbContext.Database.ExecuteSqlRawAsync(sql);
}
}
}

25
framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlDbContextEventOutbox.cs

@ -0,0 +1,25 @@
using System;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Volo.Abp.Uow;
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public class PostgreSqlDbContextEventOutbox<TDbContext> : DbContextEventOutbox<TDbContext> , IPostgreSqlDbContextEventOutbox<TDbContext>
where TDbContext : IHasEventOutbox
{
public PostgreSqlDbContextEventOutbox(IDbContextProvider<TDbContext> dbContextProvider) : base(dbContextProvider)
{
}
[UnitOfWork]
public override async Task DeleteAsync(Guid id)
{
var dbContext = (IHasEventOutbox) await DbContextProvider.GetDbContextAsync();
var tableName = dbContext.OutgoingEvents.EntityType.GetSchemaQualifiedTableName();
var sql = $"DELETE FROM \"{tableName}\" WHERE \"Id\" = '{id}'";
await dbContext.Database.ExecuteSqlRawAsync(sql);
}
}
}

13
framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlInboxConfigExtensions.cs

@ -0,0 +1,13 @@
using Volo.Abp.EventBus.Distributed;
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public static class PostgreSqlInboxConfigExtensions
{
public static void UseNpgsql<TDbContext>(this InboxConfig outboxConfig)
where TDbContext : IHasEventInbox
{
outboxConfig.ImplementationType = typeof(IPostgreSqlDbContextEventInbox<TDbContext>);
}
}
}

13
framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlOutboxConfigExtensions.cs

@ -0,0 +1,13 @@
using Volo.Abp.EventBus.Distributed;
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public static class PostgreSqlOutboxConfigExtensions
{
public static void UseNpgsql<TDbContext>(this OutboxConfig outboxConfig)
where TDbContext : IHasEventOutbox
{
outboxConfig.ImplementationType = typeof(IPostgreSqlDbContextEventOutbox<TDbContext>);
}
}
}

7
framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/PostgreSql/AbpEntityFrameworkCorePostgreSqlModule.cs

@ -1,4 +1,6 @@
using Volo.Abp.Guids;
using Microsoft.Extensions.DependencyInjection;
using Volo.Abp.EntityFrameworkCore.DistributedEvents;
using Volo.Abp.Guids;
using Volo.Abp.Modularity;
namespace Volo.Abp.EntityFrameworkCore.PostgreSql
@ -17,6 +19,9 @@ namespace Volo.Abp.EntityFrameworkCore.PostgreSql
options.DefaultSequentialGuidType = SequentialGuidType.SequentialAsString;
}
});
context.Services.AddTransient(typeof(IPostgreSqlDbContextEventOutbox<>), typeof(PostgreSqlDbContextEventOutbox<>));
context.Services.AddTransient(typeof(IPostgreSqlDbContextEventInbox<>), typeof(PostgreSqlDbContextEventInbox<>));
}
}
}

13
framework/src/Volo.Abp.EntityFrameworkCore.SqlServer/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqlServerInboxConfigExtensions.cs

@ -0,0 +1,13 @@
using Volo.Abp.EventBus.Distributed;
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public static class SqlServerInboxConfigExtensions
{
public static void UseSqlServer<TDbContext>(this InboxConfig outboxConfig)
where TDbContext : IHasEventInbox
{
outboxConfig.ImplementationType = typeof(ISqlRawDbContextEventInbox<TDbContext>);
}
}
}

13
framework/src/Volo.Abp.EntityFrameworkCore.SqlServer/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqlServerOutboxConfigExtensions.cs

@ -0,0 +1,13 @@
using Volo.Abp.EventBus.Distributed;
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public static class SqlServerOutboxConfigExtensions
{
public static void UseSqlServer<TDbContext>(this OutboxConfig outboxConfig)
where TDbContext : IHasEventOutbox
{
outboxConfig.ImplementationType = typeof(ISqlRawDbContextEventOutbox<TDbContext>);
}
}
}

13
framework/src/Volo.Abp.EntityFrameworkCore.Sqlite/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqliteInboxConfigExtensions.cs

@ -0,0 +1,13 @@
using Volo.Abp.EventBus.Distributed;
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public static class SqliteInboxConfigExtensions
{
public static void UseSqlite<TDbContext>(this InboxConfig outboxConfig)
where TDbContext : IHasEventInbox
{
outboxConfig.ImplementationType = typeof(ISqlRawDbContextEventInbox<TDbContext>);
}
}
}

13
framework/src/Volo.Abp.EntityFrameworkCore.Sqlite/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqliteOutboxConfigExtensions.cs

@ -0,0 +1,13 @@
using Volo.Abp.EventBus.Distributed;
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public static class SqliteOutboxConfigExtensions
{
public static void UseSqlite<TDbContext>(this OutboxConfig outboxConfig)
where TDbContext : IHasEventOutbox
{
outboxConfig.ImplementationType = typeof(ISqlRawDbContextEventOutbox<TDbContext>);
}
}
}

4
framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/AbpEntityFrameworkCoreModule.cs

@ -2,7 +2,6 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Volo.Abp.Domain;
using Volo.Abp.EntityFrameworkCore.DependencyInjection;
using Volo.Abp.EntityFrameworkCore.DistributedEvents;
using Volo.Abp.Modularity;
using Volo.Abp.Uow.EntityFrameworkCore;
@ -29,6 +28,9 @@ namespace Volo.Abp.EntityFrameworkCore
context.Services.TryAddTransient(typeof(IDbContextProvider<>), typeof(UnitOfWorkDbContextProvider<>));
context.Services.AddTransient(typeof(IDbContextEventOutbox<>), typeof(DbContextEventOutbox<>));
context.Services.AddTransient(typeof(IDbContextEventInbox<>), typeof(DbContextEventInbox<>));
context.Services.AddTransient(typeof(ISqlRawDbContextEventOutbox<>), typeof(SqlRawDbContextEventOutbox<>));
context.Services.AddTransient(typeof(ISqlRawDbContextEventInbox<>), typeof(SqlRawDbContextEventInbox<>));
}
}
}

29
framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs

@ -1,26 +1,32 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Options;
using Volo.Abp.EventBus.Boxes;
using Volo.Abp.EventBus.Distributed;
using Volo.Abp.Timing;
using Volo.Abp.Uow;
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public class DbContextEventInbox<TDbContext> : IDbContextEventInbox<TDbContext>
public class DbContextEventInbox<TDbContext> : IDbContextEventInbox<TDbContext>
where TDbContext : IHasEventInbox
{
protected IDbContextProvider<TDbContext> DbContextProvider { get; }
protected AbpEventBusBoxesOptions EventBusBoxesOptions { get; }
protected IClock Clock { get; }
public DbContextEventInbox(
IDbContextProvider<TDbContext> dbContextProvider,
IClock clock)
IClock clock,
IOptions<AbpEventBusBoxesOptions> eventBusBoxesOptions)
{
DbContextProvider = dbContextProvider;
Clock = clock;
EventBusBoxesOptions = eventBusBoxesOptions.Value;
}
[UnitOfWork]
@ -34,7 +40,7 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
}
[UnitOfWork]
public virtual async Task<List<IncomingEventInfo>> GetWaitingEventsAsync(int maxCount)
public virtual async Task<List<IncomingEventInfo>> GetWaitingEventsAsync(int maxCount, CancellationToken cancellationToken = default)
{
var dbContext = await DbContextProvider.GetDbContextAsync();
@ -44,17 +50,16 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
.Where(x => !x.Processed)
.OrderBy(x => x.CreationTime)
.Take(maxCount)
.ToListAsync();
.ToListAsync(cancellationToken: cancellationToken);
return outgoingEventRecords
.Select(x => x.ToIncomingEventInfo())
.ToList();
}
[UnitOfWork]
public async Task MarkAsProcessedAsync(Guid id)
public virtual async Task MarkAsProcessedAsync(Guid id)
{
//TODO: Optimize?
var dbContext = await DbContextProvider.GetDbContextAsync();
var incomingEvent = await dbContext.IncomingEvents.FindAsync(id);
if (incomingEvent != null)
@ -64,23 +69,21 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
}
[UnitOfWork]
public async Task<bool> ExistsByMessageIdAsync(string messageId)
public virtual async Task<bool> ExistsByMessageIdAsync(string messageId)
{
//TODO: Optimize
var dbContext = await DbContextProvider.GetDbContextAsync();
return await dbContext.IncomingEvents.AnyAsync(x => x.MessageId == messageId);
}
[UnitOfWork]
public async Task DeleteOldEventsAsync()
public virtual async Task DeleteOldEventsAsync()
{
//TODO: Optimize
var dbContext = await DbContextProvider.GetDbContextAsync();
var timeToKeepEvents = Clock.Now.AddHours(-2); //TODO: Config?
var timeToKeepEvents = Clock.Now - EventBusBoxesOptions.WaitTimeToDeleteProcessedInboxEvents;
var oldEvents = await dbContext.IncomingEvents
.Where(x => x.Processed && x.CreationTime < timeToKeepEvents)
.ToListAsync();
dbContext.IncomingEvents.RemoveRange(oldEvents);
}
}
}
}

16
framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventOutbox.cs

@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Volo.Abp.EventBus.Distributed;
@ -8,7 +9,7 @@ using Volo.Abp.Uow;
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public class DbContextEventOutbox<TDbContext> : IDbContextEventOutbox<TDbContext>
public class DbContextEventOutbox<TDbContext> : IDbContextEventOutbox<TDbContext>
where TDbContext : IHasEventOutbox
{
protected IDbContextProvider<TDbContext> DbContextProvider { get; }
@ -18,7 +19,7 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
DbContextProvider = dbContextProvider;
}
[UnitOfWork]
public virtual async Task EnqueueAsync(OutgoingEventInfo outgoingEvent)
{
@ -29,17 +30,17 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
}
[UnitOfWork]
public virtual async Task<List<OutgoingEventInfo>> GetWaitingEventsAsync(int maxCount)
public virtual async Task<List<OutgoingEventInfo>> GetWaitingEventsAsync(int maxCount, CancellationToken cancellationToken = default)
{
var dbContext = (IHasEventOutbox) await DbContextProvider.GetDbContextAsync();
var outgoingEventRecords = await dbContext
.OutgoingEvents
.AsNoTracking()
.OrderBy(x => x.CreationTime)
.Take(maxCount)
.ToListAsync();
.ToListAsync(cancellationToken: cancellationToken);
return outgoingEventRecords
.Select(x => x.ToOutgoingEventInfo())
.ToList();
@ -48,7 +49,6 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
[UnitOfWork]
public virtual async Task DeleteAsync(Guid id)
{
//TODO: Optimize?
var dbContext = (IHasEventOutbox) await DbContextProvider.GetDbContextAsync();
var outgoingEvent = await dbContext.OutgoingEvents.FindAsync(id);
if (outgoingEvent != null)
@ -57,4 +57,4 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
}
}
}
}
}

7
framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/ISqlRawDbContextEventInbox.cs

@ -0,0 +1,7 @@
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public interface ISqlRawDbContextEventInbox<TDbContext> : IDbContextEventInbox<TDbContext>
where TDbContext : IHasEventInbox
{
}
}

7
framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/ISqlRawDbContextEventOutbox.cs

@ -0,0 +1,7 @@
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public interface ISqlRawDbContextEventOutbox<TDbContext> : IDbContextEventOutbox<TDbContext>
where TDbContext : IHasEventOutbox
{
}
}

43
framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqlRawDbContextEventInbox.cs

@ -0,0 +1,43 @@
using System;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Options;
using Volo.Abp.EventBus.Boxes;
using Volo.Abp.Timing;
using Volo.Abp.Uow;
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public class SqlRawDbContextEventInbox<TDbContext> : DbContextEventInbox<TDbContext> , ISqlRawDbContextEventInbox<TDbContext>
where TDbContext : IHasEventInbox
{
public SqlRawDbContextEventInbox(
IDbContextProvider<TDbContext> dbContextProvider,
IClock clock,
IOptions<AbpEventBusBoxesOptions> eventBusBoxesOptions)
: base(dbContextProvider, clock, eventBusBoxesOptions)
{
}
[UnitOfWork]
public override async Task MarkAsProcessedAsync(Guid id)
{
var dbContext = await DbContextProvider.GetDbContextAsync();
var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName();
var sql = $"UPDATE {tableName} SET Processed = '1', ProcessedTime = '{Clock.Now}' WHERE Id = '{id.ToString().ToUpper()}'";
await dbContext.Database.ExecuteSqlRawAsync(sql);
}
[UnitOfWork]
public override async Task DeleteOldEventsAsync()
{
var dbContext = await DbContextProvider.GetDbContextAsync();
var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName();
var timeToKeepEvents = Clock.Now - EventBusBoxesOptions.WaitTimeToDeleteProcessedInboxEvents;
var sql = $"DELETE FROM {tableName} WHERE Processed = '1' AND CreationTime < '{timeToKeepEvents}'";
await dbContext.Database.ExecuteSqlRawAsync(sql);
}
}
}

26
framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqlRawDbContextEventOutbox.cs

@ -0,0 +1,26 @@
using System;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Volo.Abp.Uow;
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public class SqlRawDbContextEventOutbox<TDbContext> : DbContextEventOutbox<TDbContext> , ISqlRawDbContextEventOutbox<TDbContext>
where TDbContext : IHasEventOutbox
{
public SqlRawDbContextEventOutbox(IDbContextProvider<TDbContext> dbContextProvider)
: base(dbContextProvider)
{
}
[UnitOfWork]
public override async Task DeleteAsync(Guid id)
{
var dbContext = (IHasEventOutbox) await DbContextProvider.GetDbContextAsync();
var tableName = dbContext.OutgoingEvents.EntityType.GetSchemaQualifiedTableName();
var sql = $"DELETE FROM {tableName} WHERE Id = '{id.ToString().ToUpper()}'";
await dbContext.Database.ExecuteSqlRawAsync(sql);
}
}
}

48
framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/AbpEventBusBoxesOptions.cs

@ -0,0 +1,48 @@
using System;
namespace Volo.Abp.EventBus.Boxes
{
public class AbpEventBusBoxesOptions
{
/// <summary>
/// Default: 6 hours
/// </summary>
public TimeSpan CleanOldEventTimeIntervalSpan { get; set; }
/// <summary>
/// Default: 1000
/// </summary>
public int InboxWaitingEventMaxCount { get; set; }
/// <summary>
/// Default: 1000
/// </summary>
public int OutboxWaitingEventMaxCount { get; set; }
/// <summary>
/// Period time of <see cref="InboxProcessor"/> and <see cref="OutboxSender"/>
/// Default: 2 seconds
/// </summary>
public TimeSpan PeriodTimeSpan { get; set; }
/// <summary>
/// Default: 15 seconds
/// </summary>
public TimeSpan DistributedLockWaitDuration { get; set; }
/// <summary>
/// Default: 2 hours
/// </summary>
public TimeSpan WaitTimeToDeleteProcessedInboxEvents { get; set; }
public AbpEventBusBoxesOptions()
{
CleanOldEventTimeIntervalSpan = TimeSpan.FromHours(6);
InboxWaitingEventMaxCount = 1000;
OutboxWaitingEventMaxCount = 1000;
PeriodTimeSpan = TimeSpan.FromSeconds(2);
DistributedLockWaitDuration = TimeSpan.FromSeconds(15);
WaitTimeToDeleteProcessedInboxEvents = TimeSpan.FromHours(2);
}
}
}

18
framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/InboxProcessor.cs

@ -5,6 +5,7 @@ using Medallion.Threading;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using Volo.Abp.DependencyInjection;
using Volo.Abp.EventBus.Distributed;
using Volo.Abp.Threading;
@ -23,6 +24,7 @@ namespace Volo.Abp.EventBus.Boxes
protected IClock Clock { get; }
protected IEventInbox Inbox { get; private set; }
protected InboxConfig InboxConfig { get; private set; }
protected AbpEventBusBoxesOptions EventBusBoxesOptions { get; }
protected DateTime? LastCleanTime { get; set; }
@ -37,7 +39,8 @@ namespace Volo.Abp.EventBus.Boxes
IDistributedEventBus distributedEventBus,
IDistributedLockProvider distributedLockProvider,
IUnitOfWorkManager unitOfWorkManager,
IClock clock)
IClock clock,
IOptions<AbpEventBusBoxesOptions> eventBusBoxesOptions)
{
ServiceProvider = serviceProvider;
Timer = timer;
@ -45,7 +48,8 @@ namespace Volo.Abp.EventBus.Boxes
DistributedLockProvider = distributedLockProvider;
UnitOfWorkManager = unitOfWorkManager;
Clock = clock;
Timer.Period = 2000; //TODO: Config?
EventBusBoxesOptions = eventBusBoxesOptions.Value;
Timer.Period = EventBusBoxesOptions.PeriodTimeSpan.Milliseconds;
Timer.Elapsed += TimerOnElapsed;
Logger = NullLogger<InboxProcessor>.Instance;
StoppingTokenSource = new CancellationTokenSource();
@ -79,7 +83,7 @@ namespace Volo.Abp.EventBus.Boxes
{
return;
}
await using (var handle = await DistributedLockProvider.TryAcquireLockAsync(DistributedLockName, cancellationToken: StoppingToken))
{
if (handle != null)
@ -88,7 +92,7 @@ namespace Volo.Abp.EventBus.Boxes
while (true)
{
var waitingEvents = await Inbox.GetWaitingEventsAsync(1000); //TODO: Config? Pass StoppingToken!
var waitingEvents = await Inbox.GetWaitingEventsAsync(EventBusBoxesOptions.InboxWaitingEventMaxCount, StoppingToken);
if (waitingEvents.Count <= 0)
{
break;
@ -116,21 +120,21 @@ namespace Volo.Abp.EventBus.Boxes
else
{
Logger.LogDebug("Could not obtain the distributed lock: " + DistributedLockName);
await TaskDelayHelper.DelayAsync(15000, StoppingToken); //TODO: Config?
await TaskDelayHelper.DelayAsync(EventBusBoxesOptions.DistributedLockWaitDuration.Milliseconds, StoppingToken);
}
}
}
protected virtual async Task DeleteOldEventsAsync()
{
if (LastCleanTime != null && LastCleanTime > Clock.Now.AddHours(6)) //TODO: Config?
if (LastCleanTime != null && LastCleanTime + EventBusBoxesOptions.CleanOldEventTimeIntervalSpan > Clock.Now)
{
return;
}
await Inbox.DeleteOldEventsAsync();
LastCleanTime = DateTime.Now;
LastCleanTime = Clock.Now;
}
}
}

16
framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/OutboxSender.cs

@ -5,6 +5,7 @@ using Medallion.Threading;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using Volo.Abp.DependencyInjection;
using Volo.Abp.EventBus.Distributed;
using Volo.Abp.Threading;
@ -19,9 +20,10 @@ namespace Volo.Abp.EventBus.Boxes
protected IDistributedLockProvider DistributedLockProvider { get; }
protected IEventOutbox Outbox { get; private set; }
protected OutboxConfig OutboxConfig { get; private set; }
protected AbpEventBusBoxesOptions EventBusBoxesOptions { get; }
protected string DistributedLockName => "Outbox_" + OutboxConfig.Name;
public ILogger<OutboxSender> Logger { get; set; }
protected CancellationTokenSource StoppingTokenSource { get; }
protected CancellationToken StoppingToken { get; }
@ -29,13 +31,15 @@ namespace Volo.Abp.EventBus.Boxes
IServiceProvider serviceProvider,
AbpAsyncTimer timer,
IDistributedEventBus distributedEventBus,
IDistributedLockProvider distributedLockProvider)
IDistributedLockProvider distributedLockProvider,
IOptions<AbpEventBusBoxesOptions> eventBusBoxesOptions)
{
ServiceProvider = serviceProvider;
Timer = timer;
DistributedEventBus = distributedEventBus;
DistributedLockProvider = distributedLockProvider;
Timer.Period = 2000; //TODO: Config?
EventBusBoxesOptions = eventBusBoxesOptions.Value;
Timer.Period = EventBusBoxesOptions.PeriodTimeSpan.Milliseconds;
Timer.Elapsed += TimerOnElapsed;
Logger = NullLogger<OutboxSender>.Instance;
StoppingTokenSource = new CancellationTokenSource();
@ -65,13 +69,13 @@ namespace Volo.Abp.EventBus.Boxes
protected virtual async Task RunAsync()
{
await using (var handle = await DistributedLockProvider.TryAcquireLockAsync(DistributedLockName))
await using (var handle = await DistributedLockProvider.TryAcquireLockAsync(DistributedLockName, cancellationToken: StoppingToken))
{
if (handle != null)
{
while (true)
{
var waitingEvents = await Outbox.GetWaitingEventsAsync(1000); //TODO: Config?
var waitingEvents = await Outbox.GetWaitingEventsAsync(EventBusBoxesOptions.OutboxWaitingEventMaxCount, StoppingToken);
if (waitingEvents.Count <= 0)
{
break;
@ -96,7 +100,7 @@ namespace Volo.Abp.EventBus.Boxes
else
{
Logger.LogDebug("Could not obtain the distributed lock: " + DistributedLockName);
await TaskDelayHelper.DelayAsync(15000, StoppingToken); //TODO: Config?
await TaskDelayHelper.DelayAsync(EventBusBoxesOptions.DistributedLockWaitDuration.Milliseconds, StoppingToken);
}
}
}

0
framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/IRabbitMqSerializer.cs → framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/IRebusSerializer.cs

37
framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs

@ -6,6 +6,7 @@ using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using Rebus.Bus;
using Rebus.Pipeline;
using Volo.Abp.DependencyInjection;
using Volo.Abp.EventBus.Distributed;
using Volo.Abp.Guids;
@ -134,6 +135,19 @@ namespace Volo.Abp.EventBus.Rebus
Rebus.Unsubscribe(eventType);
}
public async Task ProcessEventAsync(Type eventType, object eventData)
{
var messageId = MessageContext.Current.TransportMessage.GetMessageId();
var eventName = EventNameAttribute.GetNameOrDefault(eventType);
if (await AddToInboxAsync(messageId, eventName, eventType, MessageContext.Current.TransportMessage.Body))
{
return;
}
await TriggerHandlersAsync(eventType, eventData);
}
protected override async Task PublishToEventBusAsync(Type eventType, object eventData)
{
await AbpRebusEventBusOptions.Publish(Rebus, eventType, eventData);
@ -192,16 +206,29 @@ namespace Volo.Abp.EventBus.Rebus
OutgoingEventInfo outgoingEvent,
OutboxConfig outboxConfig)
{
/* TODO: IMPLEMENT! */
throw new NotImplementedException();
var eventType = EventTypes.GetOrDefault(outgoingEvent.EventName);
var eventData = Serializer.Deserialize(outgoingEvent.EventData, eventType);
return PublishToEventBusAsync(eventType, eventData);
}
public override Task ProcessFromInboxAsync(
public override async Task ProcessFromInboxAsync(
IncomingEventInfo incomingEvent,
InboxConfig inboxConfig)
{
/* TODO: IMPLEMENT! */
throw new NotImplementedException();
var eventType = EventTypes.GetOrDefault(incomingEvent.EventName);
if (eventType == null)
{
return;
}
var eventData = Serializer.Deserialize(incomingEvent.EventData, eventType);
var exceptions = new List<Exception>();
await TriggerHandlersAsync(eventType, eventData, exceptions, inboxConfig);
if (exceptions.Any())
{
ThrowOriginalExceptions(eventType, exceptions);
}
}
protected override byte[] Serialize(object eventData)

2
framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventHandlerAdapter.cs

@ -14,7 +14,7 @@ namespace Volo.Abp.EventBus.Rebus
public async Task Handle(TEventData message)
{
await RebusDistributedEventBus.TriggerHandlersAsync(typeof(TEventData), message);
await RebusDistributedEventBus.ProcessEventAsync(message.GetType(), message);
}
}
}

7
framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/AbpDistributedEventBusOptions.cs

@ -5,11 +5,10 @@ namespace Volo.Abp.EventBus.Distributed
public class AbpDistributedEventBusOptions
{
public ITypeList<IEventHandler> Handlers { get; }
public OutboxConfigDictionary Outboxes { get; }
public InboxConfigDictionary Inboxes { get; }
public InboxConfigDictionary Inboxes { get; }
public AbpDistributedEventBusOptions()
{
Handlers = new TypeList<IEventHandler>();
@ -17,4 +16,4 @@ namespace Volo.Abp.EventBus.Distributed
Inboxes = new InboxConfigDictionary();
}
}
}
}

11
framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/IEventInbox.cs

@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace Volo.Abp.EventBus.Distributed
@ -7,13 +8,13 @@ namespace Volo.Abp.EventBus.Distributed
public interface IEventInbox
{
Task EnqueueAsync(IncomingEventInfo incomingEvent);
Task<List<IncomingEventInfo>> GetWaitingEventsAsync(int maxCount);
Task<List<IncomingEventInfo>> GetWaitingEventsAsync(int maxCount, CancellationToken cancellationToken = default);
Task MarkAsProcessedAsync(Guid id);
Task<bool> ExistsByMessageIdAsync(string messageId);
Task DeleteOldEventsAsync();
}
}
}

9
framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/IEventOutbox.cs

@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace Volo.Abp.EventBus.Distributed
@ -7,9 +8,9 @@ namespace Volo.Abp.EventBus.Distributed
public interface IEventOutbox
{
Task EnqueueAsync(OutgoingEventInfo outgoingEvent);
Task<List<OutgoingEventInfo>> GetWaitingEventsAsync(int maxCount);
Task<List<OutgoingEventInfo>> GetWaitingEventsAsync(int maxCount, CancellationToken cancellationToken = default);
Task DeleteAsync(Guid id);
}
}
}

4
framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/IHasEventInbox.cs

@ -4,6 +4,6 @@ namespace Volo.Abp.MongoDB.DistributedEvents
{
public interface IHasEventInbox : IAbpMongoDbContext
{
IMongoCollection<IncomingEventRecord> IncomingEvents { get; set; }
IMongoCollection<IncomingEventRecord> IncomingEvents { get; }
}
}
}

4
framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/IHasEventOutbox.cs

@ -4,6 +4,6 @@ namespace Volo.Abp.MongoDB.DistributedEvents
{
public interface IHasEventOutbox : IAbpMongoDbContext
{
IMongoCollection<OutgoingEventRecord> OutgoingEvents { get; set; }
IMongoCollection<OutgoingEventRecord> OutgoingEvents { get; }
}
}
}

61
framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventInbox.cs

@ -1,30 +1,36 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Options;
using MongoDB.Driver;
using MongoDB.Driver.Linq;
using Volo.Abp.EventBus.Boxes;
using Volo.Abp.EventBus.Distributed;
using Volo.Abp.Timing;
using Volo.Abp.Uow;
namespace Volo.Abp.MongoDB.DistributedEvents
{
public class MongoDbContextEventInbox<TMongoDbContext> : IMongoDbContextEventInbox<TMongoDbContext>
public class MongoDbContextEventInbox<TMongoDbContext> : IMongoDbContextEventInbox<TMongoDbContext>
where TMongoDbContext : IHasEventInbox
{
protected IMongoDbContextProvider<TMongoDbContext> DbContextProvider { get; }
protected AbpEventBusBoxesOptions EventBusBoxesOptions { get; }
protected IClock Clock { get; }
public MongoDbContextEventInbox(
IMongoDbContextProvider<TMongoDbContext> dbContextProvider,
IClock clock)
IClock clock,
IOptions<AbpEventBusBoxesOptions> eventBusBoxesOptions)
{
DbContextProvider = dbContextProvider;
Clock = clock;
EventBusBoxesOptions = eventBusBoxesOptions.Value;
}
[UnitOfWork]
public virtual async Task EnqueueAsync(IncomingEventInfo incomingEvent)
{
@ -45,9 +51,9 @@ namespace Volo.Abp.MongoDB.DistributedEvents
}
[UnitOfWork]
public virtual async Task<List<IncomingEventInfo>> GetWaitingEventsAsync(int maxCount)
public virtual async Task<List<IncomingEventInfo>> GetWaitingEventsAsync(int maxCount, CancellationToken cancellationToken = default)
{
var dbContext = await DbContextProvider.GetDbContextAsync();
var dbContext = await DbContextProvider.GetDbContextAsync(cancellationToken);
var outgoingEventRecords = await dbContext
.IncomingEvents
@ -55,27 +61,52 @@ namespace Volo.Abp.MongoDB.DistributedEvents
.Where(x => !x.Processed)
.OrderBy(x => x.CreationTime)
.Take(maxCount)
.ToListAsync();
.ToListAsync(cancellationToken: cancellationToken);
return outgoingEventRecords
.Select(x => x.ToIncomingEventInfo())
.ToList();
}
[UnitOfWork]
public async Task MarkAsProcessedAsync(Guid id)
public virtual async Task MarkAsProcessedAsync(Guid id)
{
throw new NotImplementedException();
var dbContext = await DbContextProvider.GetDbContextAsync();
var filter = Builders<IncomingEventRecord>.Filter.Eq(x => x.Id, id);
var update = Builders<IncomingEventRecord>.Update.Set(x => x.Processed, true).Set(x => x.ProcessedTime, Clock.Now);
if (dbContext.SessionHandle != null)
{
await dbContext.IncomingEvents.UpdateOneAsync(dbContext.SessionHandle, filter, update);
}
else
{
await dbContext.IncomingEvents.UpdateOneAsync(filter, update);
}
}
public Task<bool> ExistsByMessageIdAsync(string messageId)
[UnitOfWork]
public virtual async Task<bool> ExistsByMessageIdAsync(string messageId)
{
throw new NotImplementedException();
var dbContext = await DbContextProvider.GetDbContextAsync();
return await dbContext.IncomingEvents.AsQueryable().AnyAsync(x => x.MessageId == messageId);
}
public Task DeleteOldEventsAsync()
[UnitOfWork]
public virtual async Task DeleteOldEventsAsync()
{
throw new NotImplementedException();
var dbContext = await DbContextProvider.GetDbContextAsync();
var timeToKeepEvents = Clock.Now - EventBusBoxesOptions.WaitTimeToDeleteProcessedInboxEvents;
if (dbContext.SessionHandle != null)
{
await dbContext.IncomingEvents.DeleteManyAsync(dbContext.SessionHandle, x => x.Processed && x.CreationTime < timeToKeepEvents);
}
else
{
await dbContext.IncomingEvents.DeleteManyAsync(x => x.Processed && x.CreationTime < timeToKeepEvents);
}
}
}
}
}

61
framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventOutbox.cs

@ -1,27 +1,72 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using MongoDB.Driver;
using MongoDB.Driver.Linq;
using Volo.Abp.EventBus.Distributed;
using Volo.Abp.Uow;
namespace Volo.Abp.MongoDB.DistributedEvents
{
public class MongoDbContextEventOutbox<TMongoDbContext> : IMongoDbContextEventOutbox<TMongoDbContext>
public class MongoDbContextEventOutbox<TMongoDbContext> : IMongoDbContextEventOutbox<TMongoDbContext>
where TMongoDbContext : IHasEventOutbox
{
public Task EnqueueAsync(OutgoingEventInfo outgoingEvent)
protected IMongoDbContextProvider<TMongoDbContext> MongoDbContextProvider { get; }
public MongoDbContextEventOutbox(IMongoDbContextProvider<TMongoDbContext> mongoDbContextProvider)
{
MongoDbContextProvider = mongoDbContextProvider;
}
[UnitOfWork]
public virtual async Task EnqueueAsync(OutgoingEventInfo outgoingEvent)
{
throw new NotImplementedException();
var dbContext = (IHasEventOutbox) await MongoDbContextProvider.GetDbContextAsync();
if (dbContext.SessionHandle != null)
{
await dbContext.OutgoingEvents.InsertOneAsync(
dbContext.SessionHandle,
new OutgoingEventRecord(outgoingEvent)
);
}
else
{
await dbContext.OutgoingEvents.InsertOneAsync(
new OutgoingEventRecord(outgoingEvent)
);
}
}
public Task<List<OutgoingEventInfo>> GetWaitingEventsAsync(int maxCount)
[UnitOfWork]
public virtual async Task<List<OutgoingEventInfo>> GetWaitingEventsAsync(int maxCount, CancellationToken cancellationToken = default)
{
throw new NotImplementedException();
var dbContext = (IHasEventOutbox) await MongoDbContextProvider.GetDbContextAsync(cancellationToken);
var outgoingEventRecords = await dbContext
.OutgoingEvents.AsQueryable()
.OrderBy(x => x.CreationTime)
.Take(maxCount)
.ToListAsync(cancellationToken: cancellationToken);
return outgoingEventRecords
.Select(x => x.ToOutgoingEventInfo())
.ToList();
}
public Task DeleteAsync(Guid id)
[UnitOfWork]
public virtual async Task DeleteAsync(Guid id)
{
throw new NotImplementedException();
var dbContext = (IHasEventOutbox) await MongoDbContextProvider.GetDbContextAsync();
if (dbContext.SessionHandle != null)
{
await dbContext.OutgoingEvents.DeleteOneAsync(dbContext.SessionHandle, x => x.Id.Equals(id));
}
else
{
await dbContext.OutgoingEvents.DeleteOneAsync(x => x.Id.Equals(id));
}
}
}
}
}

2
test/DistEvents/DistDemoApp.EfCoreRabbitMq/appsettings.json

@ -16,4 +16,4 @@
"Redis": {
"Configuration": "127.0.0.1"
}
}
}

12
test/DistEvents/DistDemoApp.MongoDbKafka/DistDemoApp.MongoDbKafka.csproj

@ -6,4 +6,16 @@
<RootNamespace>DistDemoApp</RootNamespace>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\..\..\framework\src\Volo.Abp.MongoDB\Volo.Abp.MongoDB.csproj" />
<ProjectReference Include="..\..\..\framework\src\Volo.Abp.EventBus.Kafka\Volo.Abp.EventBus.Kafka.csproj" />
<ProjectReference Include="..\DistDemoApp.Shared\DistDemoApp.Shared.csproj" />
</ItemGroup>
<ItemGroup>
<None Update="appsettings.json">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
</ItemGroup>
</Project>

38
test/DistEvents/DistDemoApp.MongoDbKafka/DistDemoAppMongoDbKafkaModule.cs

@ -0,0 +1,38 @@
using Microsoft.Extensions.DependencyInjection;
using Volo.Abp.EventBus.Distributed;
using Volo.Abp.EventBus.Kafka;
using Volo.Abp.Modularity;
using Volo.Abp.MongoDB;
using Volo.Abp.MongoDB.DistributedEvents;
namespace DistDemoApp
{
[DependsOn(
typeof(AbpMongoDbModule),
typeof(AbpEventBusKafkaModule),
typeof(DistDemoAppSharedModule)
)]
public class DistDemoAppMongoDbKafkaModule : AbpModule
{
public override void ConfigureServices(ServiceConfigurationContext context)
{
context.Services.AddMongoDbContext<TodoMongoDbContext>(options =>
{
options.AddDefaultRepositories();
});
Configure<AbpDistributedEventBusOptions>(options =>
{
options.Outboxes.Configure(config =>
{
config.UseMongoDbContext<TodoMongoDbContext>();
});
options.Inboxes.Configure(config =>
{
config.UseMongoDbContext<TodoMongoDbContext>();
});
});
}
}
}

53
test/DistEvents/DistDemoApp.MongoDbKafka/Program.cs

@ -1,12 +1,57 @@
using System;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Serilog;
using Serilog.Events;
namespace DistDemoApp
{
class Program
public class Program
{
static void Main(string[] args)
public static async Task<int> Main(string[] args)
{
Console.WriteLine("Hello World!");
Log.Logger = new LoggerConfiguration()
#if DEBUG
.MinimumLevel.Debug()
#else
.MinimumLevel.Information()
#endif
.MinimumLevel.Override("Microsoft", LogEventLevel.Warning)
.Enrich.FromLogContext()
.WriteTo.Async(c => c.File("Logs/logs.txt"))
.WriteTo.Async(c => c.Console())
.CreateLogger();
try
{
Log.Information("Starting console host.");
await CreateHostBuilder(args).RunConsoleAsync();
return 0;
}
catch (Exception ex)
{
Log.Fatal(ex, "Host terminated unexpectedly!");
return 1;
}
finally
{
Log.CloseAndFlush();
}
}
internal static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.UseAutofac()
.UseSerilog()
.ConfigureAppConfiguration((context, config) =>
{
//setup your additional configuration sources
})
.ConfigureServices((hostContext, services) =>
{
services.AddApplication<DistDemoAppMongoDbKafkaModule>();
});
}
}
}

26
test/DistEvents/DistDemoApp.MongoDbKafka/TodoMongoDbContext.cs

@ -0,0 +1,26 @@
using MongoDB.Driver;
using Volo.Abp.Data;
using Volo.Abp.MongoDB;
using Volo.Abp.MongoDB.DistributedEvents;
namespace DistDemoApp
{
[ConnectionStringName("Default")]
public class TodoMongoDbContext : AbpMongoDbContext, IHasEventOutbox, IHasEventInbox
{
public IMongoCollection<TodoItem> TodoItems => Collection<TodoItem>();
public IMongoCollection<TodoSummary> TodoSummaries => Collection<TodoSummary>();
public IMongoCollection<OutgoingEventRecord> OutgoingEvents
{
get => Collection<OutgoingEventRecord>();
set {}
}
public IMongoCollection<IncomingEventRecord> IncomingEvents
{
get => Collection<IncomingEventRecord>();
set {}
}
}
}

19
test/DistEvents/DistDemoApp.MongoDbKafka/appsettings.json

@ -0,0 +1,19 @@
{
"ConnectionStrings": {
"Default": "mongodb://localhost:27018,localhost:27019,localhost:27020/DistEventsDemo"
},
"Kafka": {
"Connections": {
"Default": {
"BootstrapServers": "localhost:9092"
}
},
"EventBus": {
"GroupId": "DistDemoApp",
"TopicName": "DistDemoTopic"
}
},
"Redis": {
"Configuration": "127.0.0.1"
}
}

21
test/DistEvents/DistDemoApp.MongoDbRebus/DistDemoApp.MongoDbRebus.csproj

@ -0,0 +1,21 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net5.0</TargetFramework>
<RootNamespace>DistDemoApp</RootNamespace>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\..\..\framework\src\Volo.Abp.MongoDB\Volo.Abp.MongoDB.csproj" />
<ProjectReference Include="..\..\..\framework\src\Volo.Abp.EventBus.Rebus\Volo.Abp.EventBus.Rebus.csproj" />
<ProjectReference Include="..\DistDemoApp.Shared\DistDemoApp.Shared.csproj" />
</ItemGroup>
<ItemGroup>
<None Update="appsettings.json">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
</ItemGroup>
</Project>

53
test/DistEvents/DistDemoApp.MongoDbRebus/DistDemoAppMongoDbRebusModule.cs

@ -0,0 +1,53 @@
using Microsoft.Extensions.DependencyInjection;
using Rebus.Persistence.InMem;
using Rebus.Transport.InMem;
using Volo.Abp.EventBus.Distributed;
using Volo.Abp.EventBus.Rebus;
using Volo.Abp.Modularity;
using Volo.Abp.MongoDB;
using Volo.Abp.MongoDB.DistributedEvents;
namespace DistDemoApp
{
[DependsOn(
typeof(AbpMongoDbModule),
typeof(AbpEventBusRebusModule),
typeof(DistDemoAppSharedModule)
)]
public class DistDemoAppMongoDbRebusModule : AbpModule
{
public override void PreConfigureServices(ServiceConfigurationContext context)
{
PreConfigure<AbpRebusEventBusOptions>(options =>
{
options.InputQueueName = "eventbus";
options.Configurer = rebusConfigurer =>
{
rebusConfigurer.Transport(t => t.UseInMemoryTransport(new InMemNetwork(), "eventbus"));
rebusConfigurer.Subscriptions(s => s.StoreInMemory());
};
});
}
public override void ConfigureServices(ServiceConfigurationContext context)
{
context.Services.AddMongoDbContext<TodoMongoDbContext>(options =>
{
options.AddDefaultRepositories();
});
Configure<AbpDistributedEventBusOptions>(options =>
{
options.Outboxes.Configure(config =>
{
config.UseMongoDbContext<TodoMongoDbContext>();
});
options.Inboxes.Configure(config =>
{
config.UseMongoDbContext<TodoMongoDbContext>();
});
});
}
}
}

57
test/DistEvents/DistDemoApp.MongoDbRebus/Program.cs

@ -0,0 +1,57 @@
using System;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Serilog;
using Serilog.Events;
namespace DistDemoApp
{
public class Program
{
public static async Task<int> Main(string[] args)
{
Log.Logger = new LoggerConfiguration()
#if DEBUG
.MinimumLevel.Debug()
#else
.MinimumLevel.Information()
#endif
.MinimumLevel.Override("Microsoft", LogEventLevel.Warning)
.Enrich.FromLogContext()
.WriteTo.Async(c => c.File("Logs/logs.txt"))
.WriteTo.Async(c => c.Console())
.CreateLogger();
try
{
Log.Information("Starting console host.");
await CreateHostBuilder(args).RunConsoleAsync();
return 0;
}
catch (Exception ex)
{
Log.Fatal(ex, "Host terminated unexpectedly!");
return 1;
}
finally
{
Log.CloseAndFlush();
}
}
internal static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.UseAutofac()
.UseSerilog()
.ConfigureAppConfiguration((context, config) =>
{
//setup your additional configuration sources
})
.ConfigureServices((hostContext, services) =>
{
services.AddApplication<DistDemoAppMongoDbRebusModule>();
});
}
}

19
test/DistEvents/DistDemoApp.MongoDbRebus/TodoMongoDbContext.cs

@ -0,0 +1,19 @@
using MongoDB.Driver;
using Volo.Abp.Data;
using Volo.Abp.MongoDB;
using Volo.Abp.MongoDB.DistributedEvents;
namespace DistDemoApp
{
[ConnectionStringName("Default")]
public class TodoMongoDbContext : AbpMongoDbContext, IHasEventOutbox, IHasEventInbox
{
public IMongoCollection<TodoItem> TodoItems => Collection<TodoItem>();
public IMongoCollection<TodoSummary> TodoSummaries => Collection<TodoSummary>();
public IMongoCollection<OutgoingEventRecord> OutgoingEvents => Collection<OutgoingEventRecord>();
public IMongoCollection<IncomingEventRecord> IncomingEvents => Collection<IncomingEventRecord>();
}
}

19
test/DistEvents/DistDemoApp.MongoDbRebus/appsettings.json

@ -0,0 +1,19 @@
{
"ConnectionStrings": {
"Default": "mongodb://localhost:27018,localhost:27019,localhost:27020/DistEventsDemo"
},
"Kafka": {
"Connections": {
"Default": {
"BootstrapServers": "localhost:9092"
}
},
"EventBus": {
"GroupId": "DistDemoApp",
"TopicName": "DistDemoTopic"
}
},
"Redis": {
"Configuration": "127.0.0.1"
}
}

6
test/DistEvents/DistEventsDemo.sln

@ -6,6 +6,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DistDemoApp.MongoDbKafka",
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DistDemoApp.Shared", "DistDemoApp.Shared\DistDemoApp.Shared.csproj", "{C515F4E2-0ED3-4561-BC58-FC633B50E2EB}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DistDemoApp.MongoDbRebus", "DistDemoApp.MongoDbRebus\DistDemoApp.MongoDbRebus.csproj", "{4FB63540-4CC5-4A7B-900B-F5FCD907456E}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@ -24,5 +26,9 @@ Global
{C515F4E2-0ED3-4561-BC58-FC633B50E2EB}.Debug|Any CPU.Build.0 = Debug|Any CPU
{C515F4E2-0ED3-4561-BC58-FC633B50E2EB}.Release|Any CPU.ActiveCfg = Release|Any CPU
{C515F4E2-0ED3-4561-BC58-FC633B50E2EB}.Release|Any CPU.Build.0 = Release|Any CPU
{4FB63540-4CC5-4A7B-900B-F5FCD907456E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{4FB63540-4CC5-4A7B-900B-F5FCD907456E}.Debug|Any CPU.Build.0 = Debug|Any CPU
{4FB63540-4CC5-4A7B-900B-F5FCD907456E}.Release|Any CPU.ActiveCfg = Release|Any CPU
{4FB63540-4CC5-4A7B-900B-F5FCD907456E}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
EndGlobal

Loading…
Cancel
Save