diff --git a/framework/src/Volo.Abp.Ddd.Domain/Volo.Abp.Ddd.Domain.csproj b/framework/src/Volo.Abp.Ddd.Domain/Volo.Abp.Ddd.Domain.csproj index 87879334af..68c90489ff 100644 --- a/framework/src/Volo.Abp.Ddd.Domain/Volo.Abp.Ddd.Domain.csproj +++ b/framework/src/Volo.Abp.Ddd.Domain/Volo.Abp.Ddd.Domain.csproj @@ -17,7 +17,7 @@ - + diff --git a/framework/src/Volo.Abp.Ddd.Domain/Volo/Abp/Domain/AbpDddDomainModule.cs b/framework/src/Volo.Abp.Ddd.Domain/Volo/Abp/Domain/AbpDddDomainModule.cs index 523a6ee4f2..38e2d9fc84 100644 --- a/framework/src/Volo.Abp.Ddd.Domain/Volo/Abp/Domain/AbpDddDomainModule.cs +++ b/framework/src/Volo.Abp.Ddd.Domain/Volo/Abp/Domain/AbpDddDomainModule.cs @@ -3,6 +3,7 @@ using Volo.Abp.Auditing; using Volo.Abp.Data; using Volo.Abp.Domain.Repositories; using Volo.Abp.EventBus; +using Volo.Abp.EventBus.Boxes; using Volo.Abp.ExceptionHandling; using Volo.Abp.Guids; using Volo.Abp.Modularity; @@ -18,7 +19,7 @@ namespace Volo.Abp.Domain [DependsOn( typeof(AbpAuditingModule), typeof(AbpDataModule), - typeof(AbpEventBusModule), + typeof(AbpEventBusBoxesModule), typeof(AbpGuidsModule), typeof(AbpMultiTenancyModule), typeof(AbpThreadingModule), diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.MySQL/Volo/Abp/EntityFrameworkCore/DistributedEvents/MySQLInboxConfigExtensions.cs b/framework/src/Volo.Abp.EntityFrameworkCore.MySQL/Volo/Abp/EntityFrameworkCore/DistributedEvents/MySQLInboxConfigExtensions.cs new file mode 100644 index 0000000000..ecda92702b --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.MySQL/Volo/Abp/EntityFrameworkCore/DistributedEvents/MySQLInboxConfigExtensions.cs @@ -0,0 +1,13 @@ +using Volo.Abp.EventBus.Distributed; + +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public static class MySQLInboxConfigExtensions + { + public static void UseMySQL(this InboxConfig outboxConfig) + where TDbContext : IHasEventInbox + { + outboxConfig.ImplementationType = typeof(ISqlRawDbContextEventInbox); + } + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.MySQL/Volo/Abp/EntityFrameworkCore/DistributedEvents/MySQLOutboxConfigExtensions.cs b/framework/src/Volo.Abp.EntityFrameworkCore.MySQL/Volo/Abp/EntityFrameworkCore/DistributedEvents/MySQLOutboxConfigExtensions.cs new file mode 100644 index 0000000000..8657ba92ab --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.MySQL/Volo/Abp/EntityFrameworkCore/DistributedEvents/MySQLOutboxConfigExtensions.cs @@ -0,0 +1,13 @@ +using Volo.Abp.EventBus.Distributed; + +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public static class MySQLOutboxConfigExtensions + { + public static void UseMySQL(this OutboxConfig outboxConfig) + where TDbContext : IHasEventOutbox + { + outboxConfig.ImplementationType = typeof(ISqlRawDbContextEventOutbox); + } + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/IOracleDbContextEventInbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/IOracleDbContextEventInbox.cs new file mode 100644 index 0000000000..0cc7ae5531 --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/IOracleDbContextEventInbox.cs @@ -0,0 +1,8 @@ +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public interface IOracleDbContextEventInbox : IDbContextEventInbox + where TDbContext : IHasEventInbox + { + + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/IOracleDbContextEventOutbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/IOracleDbContextEventOutbox.cs new file mode 100644 index 0000000000..a588f36e43 --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/IOracleDbContextEventOutbox.cs @@ -0,0 +1,7 @@ +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public interface IOracleDbContextEventOutbox : IDbContextEventOutbox + where TDbContext : IHasEventOutbox + { + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventInbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventInbox.cs new file mode 100644 index 0000000000..c19ca6746a --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventInbox.cs @@ -0,0 +1,49 @@ +using System; +using System.Threading.Tasks; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Options; +using Volo.Abp.EventBus.Boxes; +using Volo.Abp.EventBus.Distributed; +using Volo.Abp.Timing; +using Volo.Abp.Uow; + +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public class OracleDbContextEventInbox : DbContextEventInbox , IOracleDbContextEventInbox + where TDbContext : IHasEventInbox + { + public OracleDbContextEventInbox( + IDbContextProvider dbContextProvider, + IClock clock, + IOptions eventBusBoxesOptions) + : base(dbContextProvider, clock, eventBusBoxesOptions) + { + } + + [UnitOfWork] + public override async Task MarkAsProcessedAsync(Guid id) + { + var dbContext = await DbContextProvider.GetDbContextAsync(); + var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName(); + + var sql = $"UPDATE \"{tableName}\" SET \"Processed\" = '1', \"ProcessedTime\" = TO_DATE('{Clock.Now}', 'yyyy-mm-dd hh24:mi:ss') WHERE \"Id\" = HEXTORAW('{GuidToOracleType(id)}')"; + await dbContext.Database.ExecuteSqlRawAsync(sql); + } + + [UnitOfWork] + public override async Task DeleteOldEventsAsync() + { + var dbContext = await DbContextProvider.GetDbContextAsync(); + var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName(); + var timeToKeepEvents = Clock.Now - EventBusBoxesOptions.WaitTimeToDeleteProcessedInboxEvents; + + var sql = $"DELETE FROM \"{tableName}\" WHERE \"Processed\" = '1' AND \"CreationTime\" < TO_DATE('{timeToKeepEvents}', 'yyyy-mm-dd hh24:mi:ss')"; + await dbContext.Database.ExecuteSqlRawAsync(sql); + } + + protected virtual string GuidToOracleType(Guid id) + { + return BitConverter.ToString(id.ToByteArray()).Replace("-", "").ToUpper(); + } + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventOutbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventOutbox.cs new file mode 100644 index 0000000000..a5c4566ced --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventOutbox.cs @@ -0,0 +1,31 @@ +using System; +using System.Threading.Tasks; +using Microsoft.EntityFrameworkCore; +using Volo.Abp.Uow; + +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public class OracleDbContextEventOutbox : DbContextEventOutbox , IOracleDbContextEventOutbox + where TDbContext : IHasEventOutbox + { + public OracleDbContextEventOutbox(IDbContextProvider dbContextProvider) + : base(dbContextProvider) + { + } + + [UnitOfWork] + public override async Task DeleteAsync(Guid id) + { + var dbContext = (IHasEventOutbox) await DbContextProvider.GetDbContextAsync(); + var tableName = dbContext.OutgoingEvents.EntityType.GetSchemaQualifiedTableName(); + + var sql = $"DELETE FROM \"{tableName}\" WHERE \"Id\" = HEXTORAW('{GuidToOracleType(id)}')"; + await dbContext.Database.ExecuteSqlRawAsync(sql); + } + + protected virtual string GuidToOracleType(Guid id) + { + return BitConverter.ToString(id.ToByteArray()).Replace("-", "").ToUpper(); + } + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleInboxConfigExtensions.cs b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleInboxConfigExtensions.cs new file mode 100644 index 0000000000..ca79019e28 --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleInboxConfigExtensions.cs @@ -0,0 +1,13 @@ +using Volo.Abp.EventBus.Distributed; + +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public static class OracleInboxConfigExtensions + { + public static void UseOracle(this InboxConfig outboxConfig) + where TDbContext : IHasEventInbox + { + outboxConfig.ImplementationType = typeof(IOracleDbContextEventInbox); + } + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleOutboxConfigExtensions.cs b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleOutboxConfigExtensions.cs new file mode 100644 index 0000000000..e2d7d33761 --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleOutboxConfigExtensions.cs @@ -0,0 +1,13 @@ +using Volo.Abp.EventBus.Distributed; + +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public static class OracleOutboxConfigExtensions + { + public static void UseOracle(this OutboxConfig outboxConfig) + where TDbContext : IHasEventOutbox + { + outboxConfig.ImplementationType = typeof(IOracleDbContextEventOutbox); + } + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/Oracle/Devart/AbpEntityFrameworkCoreOracleDevartModule.cs b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/Oracle/Devart/AbpEntityFrameworkCoreOracleDevartModule.cs index 9580219cfc..76d0ecdd21 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/Oracle/Devart/AbpEntityFrameworkCoreOracleDevartModule.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/Oracle/Devart/AbpEntityFrameworkCoreOracleDevartModule.cs @@ -1,4 +1,6 @@ -using Volo.Abp.Guids; +using Microsoft.Extensions.DependencyInjection; +using Volo.Abp.EntityFrameworkCore.DistributedEvents; +using Volo.Abp.Guids; using Volo.Abp.Modularity; namespace Volo.Abp.EntityFrameworkCore.Oracle.Devart @@ -17,6 +19,9 @@ namespace Volo.Abp.EntityFrameworkCore.Oracle.Devart options.DefaultSequentialGuidType = SequentialGuidType.SequentialAsBinary; } }); + + context.Services.AddTransient(typeof(IOracleDbContextEventOutbox<>), typeof(OracleDbContextEventOutbox<>)); + context.Services.AddTransient(typeof(IOracleDbContextEventInbox<>), typeof(OracleDbContextEventInbox<>)); } } } diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/IOracleDbContextEventInbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/IOracleDbContextEventInbox.cs new file mode 100644 index 0000000000..0cc7ae5531 --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/IOracleDbContextEventInbox.cs @@ -0,0 +1,8 @@ +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public interface IOracleDbContextEventInbox : IDbContextEventInbox + where TDbContext : IHasEventInbox + { + + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/IOracleDbContextEventOutbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/IOracleDbContextEventOutbox.cs new file mode 100644 index 0000000000..a588f36e43 --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/IOracleDbContextEventOutbox.cs @@ -0,0 +1,7 @@ +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public interface IOracleDbContextEventOutbox : IDbContextEventOutbox + where TDbContext : IHasEventOutbox + { + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventInbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventInbox.cs new file mode 100644 index 0000000000..6d6d04d267 --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventInbox.cs @@ -0,0 +1,48 @@ +using System; +using System.Threading.Tasks; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Options; +using Volo.Abp.EventBus.Boxes; +using Volo.Abp.Timing; +using Volo.Abp.Uow; + +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public class OracleDbContextEventInbox : DbContextEventInbox , IOracleDbContextEventInbox + where TDbContext : IHasEventInbox + { + public OracleDbContextEventInbox( + IDbContextProvider dbContextProvider, + IClock clock, + IOptions eventBusBoxesOptions) + : base(dbContextProvider, clock, eventBusBoxesOptions) + { + } + + [UnitOfWork] + public override async Task MarkAsProcessedAsync(Guid id) + { + var dbContext = await DbContextProvider.GetDbContextAsync(); + var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName(); + + var sql = $"UPDATE \"{tableName}\" SET \"Processed\" = '1', \"ProcessedTime\" = TO_DATE('{Clock.Now}', 'yyyy-mm-dd hh24:mi:ss') WHERE \"Id\" = HEXTORAW('{GuidToOracleType(id)}')"; + await dbContext.Database.ExecuteSqlRawAsync(sql); + } + + [UnitOfWork] + public override async Task DeleteOldEventsAsync() + { + var dbContext = await DbContextProvider.GetDbContextAsync(); + var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName(); + var timeToKeepEvents = Clock.Now - EventBusBoxesOptions.WaitTimeToDeleteProcessedInboxEvents; + + var sql = $"DELETE FROM \"{tableName}\" WHERE \"Processed\" = '1' AND \"CreationTime\" < TO_DATE('{timeToKeepEvents}', 'yyyy-mm-dd hh24:mi:ss')"; + await dbContext.Database.ExecuteSqlRawAsync(sql); + } + + protected virtual string GuidToOracleType(Guid id) + { + return BitConverter.ToString(id.ToByteArray()).Replace("-", "").ToUpper(); + } + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventOutbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventOutbox.cs new file mode 100644 index 0000000000..a5c4566ced --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventOutbox.cs @@ -0,0 +1,31 @@ +using System; +using System.Threading.Tasks; +using Microsoft.EntityFrameworkCore; +using Volo.Abp.Uow; + +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public class OracleDbContextEventOutbox : DbContextEventOutbox , IOracleDbContextEventOutbox + where TDbContext : IHasEventOutbox + { + public OracleDbContextEventOutbox(IDbContextProvider dbContextProvider) + : base(dbContextProvider) + { + } + + [UnitOfWork] + public override async Task DeleteAsync(Guid id) + { + var dbContext = (IHasEventOutbox) await DbContextProvider.GetDbContextAsync(); + var tableName = dbContext.OutgoingEvents.EntityType.GetSchemaQualifiedTableName(); + + var sql = $"DELETE FROM \"{tableName}\" WHERE \"Id\" = HEXTORAW('{GuidToOracleType(id)}')"; + await dbContext.Database.ExecuteSqlRawAsync(sql); + } + + protected virtual string GuidToOracleType(Guid id) + { + return BitConverter.ToString(id.ToByteArray()).Replace("-", "").ToUpper(); + } + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleInboxConfigExtensions.cs b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleInboxConfigExtensions.cs new file mode 100644 index 0000000000..ca79019e28 --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleInboxConfigExtensions.cs @@ -0,0 +1,13 @@ +using Volo.Abp.EventBus.Distributed; + +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public static class OracleInboxConfigExtensions + { + public static void UseOracle(this InboxConfig outboxConfig) + where TDbContext : IHasEventInbox + { + outboxConfig.ImplementationType = typeof(IOracleDbContextEventInbox); + } + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleOutboxConfigExtensions.cs b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleOutboxConfigExtensions.cs new file mode 100644 index 0000000000..e2d7d33761 --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleOutboxConfigExtensions.cs @@ -0,0 +1,13 @@ +using Volo.Abp.EventBus.Distributed; + +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public static class OracleOutboxConfigExtensions + { + public static void UseOracle(this OutboxConfig outboxConfig) + where TDbContext : IHasEventOutbox + { + outboxConfig.ImplementationType = typeof(IOracleDbContextEventOutbox); + } + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/Oracle/AbpEntityFrameworkCoreOracleModule.cs b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/Oracle/AbpEntityFrameworkCoreOracleModule.cs index b7cbaec1a1..7716ae6150 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/Oracle/AbpEntityFrameworkCoreOracleModule.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/Oracle/AbpEntityFrameworkCoreOracleModule.cs @@ -1,4 +1,6 @@ -using Volo.Abp.Guids; +using Microsoft.Extensions.DependencyInjection; +using Volo.Abp.EntityFrameworkCore.DistributedEvents; +using Volo.Abp.Guids; using Volo.Abp.Modularity; namespace Volo.Abp.EntityFrameworkCore.Oracle @@ -15,6 +17,9 @@ namespace Volo.Abp.EntityFrameworkCore.Oracle options.DefaultSequentialGuidType = SequentialGuidType.SequentialAsBinary; } }); + + context.Services.AddTransient(typeof(IOracleDbContextEventOutbox<>), typeof(OracleDbContextEventOutbox<>)); + context.Services.AddTransient(typeof(IOracleDbContextEventInbox<>), typeof(OracleDbContextEventInbox<>)); } } } diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/IPostgreSqlDbContextEventInbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/IPostgreSqlDbContextEventInbox.cs new file mode 100644 index 0000000000..5c24d79f88 --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/IPostgreSqlDbContextEventInbox.cs @@ -0,0 +1,8 @@ +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public interface IPostgreSqlDbContextEventInbox : IDbContextEventInbox + where TDbContext : IHasEventInbox + { + + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/IPostgreSqlDbContextEventOutbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/IPostgreSqlDbContextEventOutbox.cs new file mode 100644 index 0000000000..7e6bc4bd59 --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/IPostgreSqlDbContextEventOutbox.cs @@ -0,0 +1,7 @@ +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public interface IPostgreSqlDbContextEventOutbox : IDbContextEventOutbox + where TDbContext : IHasEventOutbox + { + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlDbContextEventInbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlDbContextEventInbox.cs new file mode 100644 index 0000000000..fedff90dec --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlDbContextEventInbox.cs @@ -0,0 +1,44 @@ +using System; +using System.Threading.Tasks; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Options; +using Volo.Abp.EventBus.Boxes; +using Volo.Abp.EventBus.Distributed; +using Volo.Abp.Timing; +using Volo.Abp.Uow; + +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public class PostgreSqlDbContextEventInbox : DbContextEventInbox, IPostgreSqlDbContextEventInbox + where TDbContext : IHasEventInbox + { + public PostgreSqlDbContextEventInbox( + IDbContextProvider dbContextProvider, + IClock clock, + IOptions eventBusBoxesOptions) + : base(dbContextProvider, clock, eventBusBoxesOptions) + { + } + + [UnitOfWork] + public override async Task MarkAsProcessedAsync(Guid id) + { + var dbContext = await DbContextProvider.GetDbContextAsync(); + var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName(); + + var sql = $"UPDATE \"{tableName}\" SET \"Processed\" = '1', \"ProcessedTime\" = '{Clock.Now}' WHERE \"Id\" = '{id}'"; + await dbContext.Database.ExecuteSqlRawAsync(sql); + } + + [UnitOfWork] + public override async Task DeleteOldEventsAsync() + { + var dbContext = await DbContextProvider.GetDbContextAsync(); + var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName(); + var timeToKeepEvents = Clock.Now - EventBusBoxesOptions.WaitTimeToDeleteProcessedInboxEvents; + + var sql = $"DELETE FROM \"{tableName}\" WHERE \"Processed\" = '1' AND \"CreationTime\" < '{timeToKeepEvents}'"; + await dbContext.Database.ExecuteSqlRawAsync(sql); + } + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlDbContextEventOutbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlDbContextEventOutbox.cs new file mode 100644 index 0000000000..c5e79a0014 --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlDbContextEventOutbox.cs @@ -0,0 +1,25 @@ +using System; +using System.Threading.Tasks; +using Microsoft.EntityFrameworkCore; +using Volo.Abp.Uow; + +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public class PostgreSqlDbContextEventOutbox : DbContextEventOutbox , IPostgreSqlDbContextEventOutbox + where TDbContext : IHasEventOutbox + { + public PostgreSqlDbContextEventOutbox(IDbContextProvider dbContextProvider) : base(dbContextProvider) + { + } + + [UnitOfWork] + public override async Task DeleteAsync(Guid id) + { + var dbContext = (IHasEventOutbox) await DbContextProvider.GetDbContextAsync(); + var tableName = dbContext.OutgoingEvents.EntityType.GetSchemaQualifiedTableName(); + + var sql = $"DELETE FROM \"{tableName}\" WHERE \"Id\" = '{id}'"; + await dbContext.Database.ExecuteSqlRawAsync(sql); + } + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlInboxConfigExtensions.cs b/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlInboxConfigExtensions.cs new file mode 100644 index 0000000000..f4bb462a1c --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlInboxConfigExtensions.cs @@ -0,0 +1,13 @@ +using Volo.Abp.EventBus.Distributed; + +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public static class PostgreSqlInboxConfigExtensions + { + public static void UseNpgsql(this InboxConfig outboxConfig) + where TDbContext : IHasEventInbox + { + outboxConfig.ImplementationType = typeof(IPostgreSqlDbContextEventInbox); + } + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlOutboxConfigExtensions.cs b/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlOutboxConfigExtensions.cs new file mode 100644 index 0000000000..853ae9ba59 --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlOutboxConfigExtensions.cs @@ -0,0 +1,13 @@ +using Volo.Abp.EventBus.Distributed; + +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public static class PostgreSqlOutboxConfigExtensions + { + public static void UseNpgsql(this OutboxConfig outboxConfig) + where TDbContext : IHasEventOutbox + { + outboxConfig.ImplementationType = typeof(IPostgreSqlDbContextEventOutbox); + } + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/PostgreSql/AbpEntityFrameworkCorePostgreSqlModule.cs b/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/PostgreSql/AbpEntityFrameworkCorePostgreSqlModule.cs index 4e76b89113..55c1a4af42 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/PostgreSql/AbpEntityFrameworkCorePostgreSqlModule.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/PostgreSql/AbpEntityFrameworkCorePostgreSqlModule.cs @@ -1,4 +1,6 @@ -using Volo.Abp.Guids; +using Microsoft.Extensions.DependencyInjection; +using Volo.Abp.EntityFrameworkCore.DistributedEvents; +using Volo.Abp.Guids; using Volo.Abp.Modularity; namespace Volo.Abp.EntityFrameworkCore.PostgreSql @@ -17,6 +19,9 @@ namespace Volo.Abp.EntityFrameworkCore.PostgreSql options.DefaultSequentialGuidType = SequentialGuidType.SequentialAsString; } }); + + context.Services.AddTransient(typeof(IPostgreSqlDbContextEventOutbox<>), typeof(PostgreSqlDbContextEventOutbox<>)); + context.Services.AddTransient(typeof(IPostgreSqlDbContextEventInbox<>), typeof(PostgreSqlDbContextEventInbox<>)); } } } diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.SqlServer/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqlServerInboxConfigExtensions.cs b/framework/src/Volo.Abp.EntityFrameworkCore.SqlServer/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqlServerInboxConfigExtensions.cs new file mode 100644 index 0000000000..60adf600c7 --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.SqlServer/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqlServerInboxConfigExtensions.cs @@ -0,0 +1,13 @@ +using Volo.Abp.EventBus.Distributed; + +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public static class SqlServerInboxConfigExtensions + { + public static void UseSqlServer(this InboxConfig outboxConfig) + where TDbContext : IHasEventInbox + { + outboxConfig.ImplementationType = typeof(ISqlRawDbContextEventInbox); + } + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.SqlServer/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqlServerOutboxConfigExtensions.cs b/framework/src/Volo.Abp.EntityFrameworkCore.SqlServer/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqlServerOutboxConfigExtensions.cs new file mode 100644 index 0000000000..9022d5c7e6 --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.SqlServer/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqlServerOutboxConfigExtensions.cs @@ -0,0 +1,13 @@ +using Volo.Abp.EventBus.Distributed; + +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public static class SqlServerOutboxConfigExtensions + { + public static void UseSqlServer(this OutboxConfig outboxConfig) + where TDbContext : IHasEventOutbox + { + outboxConfig.ImplementationType = typeof(ISqlRawDbContextEventOutbox); + } + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.Sqlite/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqliteInboxConfigExtensions.cs b/framework/src/Volo.Abp.EntityFrameworkCore.Sqlite/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqliteInboxConfigExtensions.cs new file mode 100644 index 0000000000..ccc92d4eb1 --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.Sqlite/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqliteInboxConfigExtensions.cs @@ -0,0 +1,13 @@ +using Volo.Abp.EventBus.Distributed; + +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public static class SqliteInboxConfigExtensions + { + public static void UseSqlite(this InboxConfig outboxConfig) + where TDbContext : IHasEventInbox + { + outboxConfig.ImplementationType = typeof(ISqlRawDbContextEventInbox); + } + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.Sqlite/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqliteOutboxConfigExtensions.cs b/framework/src/Volo.Abp.EntityFrameworkCore.Sqlite/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqliteOutboxConfigExtensions.cs new file mode 100644 index 0000000000..c1d9949c19 --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.Sqlite/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqliteOutboxConfigExtensions.cs @@ -0,0 +1,13 @@ +using Volo.Abp.EventBus.Distributed; + +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public static class SqliteOutboxConfigExtensions + { + public static void UseSqlite(this OutboxConfig outboxConfig) + where TDbContext : IHasEventOutbox + { + outboxConfig.ImplementationType = typeof(ISqlRawDbContextEventOutbox); + } + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/AbpEntityFrameworkCoreModule.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/AbpEntityFrameworkCoreModule.cs index 90b541bc73..96c245be2e 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/AbpEntityFrameworkCoreModule.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/AbpEntityFrameworkCoreModule.cs @@ -2,7 +2,6 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; using Volo.Abp.Domain; -using Volo.Abp.EntityFrameworkCore.DependencyInjection; using Volo.Abp.EntityFrameworkCore.DistributedEvents; using Volo.Abp.Modularity; using Volo.Abp.Uow.EntityFrameworkCore; @@ -29,6 +28,9 @@ namespace Volo.Abp.EntityFrameworkCore context.Services.TryAddTransient(typeof(IDbContextProvider<>), typeof(UnitOfWorkDbContextProvider<>)); context.Services.AddTransient(typeof(IDbContextEventOutbox<>), typeof(DbContextEventOutbox<>)); context.Services.AddTransient(typeof(IDbContextEventInbox<>), typeof(DbContextEventInbox<>)); + + context.Services.AddTransient(typeof(ISqlRawDbContextEventOutbox<>), typeof(SqlRawDbContextEventOutbox<>)); + context.Services.AddTransient(typeof(ISqlRawDbContextEventInbox<>), typeof(SqlRawDbContextEventInbox<>)); } } } diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs index 2b0a448f74..242b601177 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs @@ -1,26 +1,32 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Threading; using System.Threading.Tasks; using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Options; +using Volo.Abp.EventBus.Boxes; using Volo.Abp.EventBus.Distributed; using Volo.Abp.Timing; using Volo.Abp.Uow; namespace Volo.Abp.EntityFrameworkCore.DistributedEvents { - public class DbContextEventInbox : IDbContextEventInbox + public class DbContextEventInbox : IDbContextEventInbox where TDbContext : IHasEventInbox { protected IDbContextProvider DbContextProvider { get; } + protected AbpEventBusBoxesOptions EventBusBoxesOptions { get; } protected IClock Clock { get; } public DbContextEventInbox( IDbContextProvider dbContextProvider, - IClock clock) + IClock clock, + IOptions eventBusBoxesOptions) { DbContextProvider = dbContextProvider; Clock = clock; + EventBusBoxesOptions = eventBusBoxesOptions.Value; } [UnitOfWork] @@ -34,7 +40,7 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents } [UnitOfWork] - public virtual async Task> GetWaitingEventsAsync(int maxCount) + public virtual async Task> GetWaitingEventsAsync(int maxCount, CancellationToken cancellationToken = default) { var dbContext = await DbContextProvider.GetDbContextAsync(); @@ -44,17 +50,16 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents .Where(x => !x.Processed) .OrderBy(x => x.CreationTime) .Take(maxCount) - .ToListAsync(); - + .ToListAsync(cancellationToken: cancellationToken); + return outgoingEventRecords .Select(x => x.ToIncomingEventInfo()) .ToList(); } [UnitOfWork] - public async Task MarkAsProcessedAsync(Guid id) + public virtual async Task MarkAsProcessedAsync(Guid id) { - //TODO: Optimize? var dbContext = await DbContextProvider.GetDbContextAsync(); var incomingEvent = await dbContext.IncomingEvents.FindAsync(id); if (incomingEvent != null) @@ -64,23 +69,21 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents } [UnitOfWork] - public async Task ExistsByMessageIdAsync(string messageId) + public virtual async Task ExistsByMessageIdAsync(string messageId) { - //TODO: Optimize var dbContext = await DbContextProvider.GetDbContextAsync(); return await dbContext.IncomingEvents.AnyAsync(x => x.MessageId == messageId); } [UnitOfWork] - public async Task DeleteOldEventsAsync() + public virtual async Task DeleteOldEventsAsync() { - //TODO: Optimize var dbContext = await DbContextProvider.GetDbContextAsync(); - var timeToKeepEvents = Clock.Now.AddHours(-2); //TODO: Config? + var timeToKeepEvents = Clock.Now - EventBusBoxesOptions.WaitTimeToDeleteProcessedInboxEvents; var oldEvents = await dbContext.IncomingEvents .Where(x => x.Processed && x.CreationTime < timeToKeepEvents) .ToListAsync(); dbContext.IncomingEvents.RemoveRange(oldEvents); } } -} \ No newline at end of file +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventOutbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventOutbox.cs index a785ad7ce5..0b8909b966 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventOutbox.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventOutbox.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Threading; using System.Threading.Tasks; using Microsoft.EntityFrameworkCore; using Volo.Abp.EventBus.Distributed; @@ -8,7 +9,7 @@ using Volo.Abp.Uow; namespace Volo.Abp.EntityFrameworkCore.DistributedEvents { - public class DbContextEventOutbox : IDbContextEventOutbox + public class DbContextEventOutbox : IDbContextEventOutbox where TDbContext : IHasEventOutbox { protected IDbContextProvider DbContextProvider { get; } @@ -18,7 +19,7 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents { DbContextProvider = dbContextProvider; } - + [UnitOfWork] public virtual async Task EnqueueAsync(OutgoingEventInfo outgoingEvent) { @@ -29,17 +30,17 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents } [UnitOfWork] - public virtual async Task> GetWaitingEventsAsync(int maxCount) + public virtual async Task> GetWaitingEventsAsync(int maxCount, CancellationToken cancellationToken = default) { var dbContext = (IHasEventOutbox) await DbContextProvider.GetDbContextAsync(); - + var outgoingEventRecords = await dbContext .OutgoingEvents .AsNoTracking() .OrderBy(x => x.CreationTime) .Take(maxCount) - .ToListAsync(); - + .ToListAsync(cancellationToken: cancellationToken); + return outgoingEventRecords .Select(x => x.ToOutgoingEventInfo()) .ToList(); @@ -48,7 +49,6 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents [UnitOfWork] public virtual async Task DeleteAsync(Guid id) { - //TODO: Optimize? var dbContext = (IHasEventOutbox) await DbContextProvider.GetDbContextAsync(); var outgoingEvent = await dbContext.OutgoingEvents.FindAsync(id); if (outgoingEvent != null) @@ -57,4 +57,4 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents } } } -} \ No newline at end of file +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/ISqlRawDbContextEventInbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/ISqlRawDbContextEventInbox.cs new file mode 100644 index 0000000000..d86e3f36ed --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/ISqlRawDbContextEventInbox.cs @@ -0,0 +1,7 @@ +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public interface ISqlRawDbContextEventInbox : IDbContextEventInbox + where TDbContext : IHasEventInbox + { + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/ISqlRawDbContextEventOutbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/ISqlRawDbContextEventOutbox.cs new file mode 100644 index 0000000000..776cc2f93c --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/ISqlRawDbContextEventOutbox.cs @@ -0,0 +1,7 @@ +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public interface ISqlRawDbContextEventOutbox : IDbContextEventOutbox + where TDbContext : IHasEventOutbox + { + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqlRawDbContextEventInbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqlRawDbContextEventInbox.cs new file mode 100644 index 0000000000..cb764ede17 --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqlRawDbContextEventInbox.cs @@ -0,0 +1,43 @@ +using System; +using System.Threading.Tasks; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Options; +using Volo.Abp.EventBus.Boxes; +using Volo.Abp.Timing; +using Volo.Abp.Uow; + +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public class SqlRawDbContextEventInbox : DbContextEventInbox , ISqlRawDbContextEventInbox + where TDbContext : IHasEventInbox + { + public SqlRawDbContextEventInbox( + IDbContextProvider dbContextProvider, + IClock clock, + IOptions eventBusBoxesOptions) + : base(dbContextProvider, clock, eventBusBoxesOptions) + { + } + + [UnitOfWork] + public override async Task MarkAsProcessedAsync(Guid id) + { + var dbContext = await DbContextProvider.GetDbContextAsync(); + var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName(); + + var sql = $"UPDATE {tableName} SET Processed = '1', ProcessedTime = '{Clock.Now}' WHERE Id = '{id.ToString().ToUpper()}'"; + await dbContext.Database.ExecuteSqlRawAsync(sql); + } + + [UnitOfWork] + public override async Task DeleteOldEventsAsync() + { + var dbContext = await DbContextProvider.GetDbContextAsync(); + var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName(); + var timeToKeepEvents = Clock.Now - EventBusBoxesOptions.WaitTimeToDeleteProcessedInboxEvents; + + var sql = $"DELETE FROM {tableName} WHERE Processed = '1' AND CreationTime < '{timeToKeepEvents}'"; + await dbContext.Database.ExecuteSqlRawAsync(sql); + } + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqlRawDbContextEventOutbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqlRawDbContextEventOutbox.cs new file mode 100644 index 0000000000..8748e6dc09 --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqlRawDbContextEventOutbox.cs @@ -0,0 +1,26 @@ +using System; +using System.Threading.Tasks; +using Microsoft.EntityFrameworkCore; +using Volo.Abp.Uow; + +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public class SqlRawDbContextEventOutbox : DbContextEventOutbox , ISqlRawDbContextEventOutbox + where TDbContext : IHasEventOutbox + { + public SqlRawDbContextEventOutbox(IDbContextProvider dbContextProvider) + : base(dbContextProvider) + { + } + + [UnitOfWork] + public override async Task DeleteAsync(Guid id) + { + var dbContext = (IHasEventOutbox) await DbContextProvider.GetDbContextAsync(); + var tableName = dbContext.OutgoingEvents.EntityType.GetSchemaQualifiedTableName(); + + var sql = $"DELETE FROM {tableName} WHERE Id = '{id.ToString().ToUpper()}'"; + await dbContext.Database.ExecuteSqlRawAsync(sql); + } + } +} diff --git a/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/AbpEventBusBoxesOptions.cs b/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/AbpEventBusBoxesOptions.cs new file mode 100644 index 0000000000..1d72666c42 --- /dev/null +++ b/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/AbpEventBusBoxesOptions.cs @@ -0,0 +1,48 @@ +using System; + +namespace Volo.Abp.EventBus.Boxes +{ + public class AbpEventBusBoxesOptions + { + /// + /// Default: 6 hours + /// + public TimeSpan CleanOldEventTimeIntervalSpan { get; set; } + + /// + /// Default: 1000 + /// + public int InboxWaitingEventMaxCount { get; set; } + + /// + /// Default: 1000 + /// + public int OutboxWaitingEventMaxCount { get; set; } + + /// + /// Period time of and + /// Default: 2 seconds + /// + public TimeSpan PeriodTimeSpan { get; set; } + + /// + /// Default: 15 seconds + /// + public TimeSpan DistributedLockWaitDuration { get; set; } + + /// + /// Default: 2 hours + /// + public TimeSpan WaitTimeToDeleteProcessedInboxEvents { get; set; } + + public AbpEventBusBoxesOptions() + { + CleanOldEventTimeIntervalSpan = TimeSpan.FromHours(6); + InboxWaitingEventMaxCount = 1000; + OutboxWaitingEventMaxCount = 1000; + PeriodTimeSpan = TimeSpan.FromSeconds(2); + DistributedLockWaitDuration = TimeSpan.FromSeconds(15); + WaitTimeToDeleteProcessedInboxEvents = TimeSpan.FromHours(2); + } + } +} diff --git a/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/InboxProcessor.cs b/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/InboxProcessor.cs index e711e5798e..e3c71771ad 100644 --- a/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/InboxProcessor.cs +++ b/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/InboxProcessor.cs @@ -5,6 +5,7 @@ using Medallion.Threading; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.Extensions.Options; using Volo.Abp.DependencyInjection; using Volo.Abp.EventBus.Distributed; using Volo.Abp.Threading; @@ -23,6 +24,7 @@ namespace Volo.Abp.EventBus.Boxes protected IClock Clock { get; } protected IEventInbox Inbox { get; private set; } protected InboxConfig InboxConfig { get; private set; } + protected AbpEventBusBoxesOptions EventBusBoxesOptions { get; } protected DateTime? LastCleanTime { get; set; } @@ -37,7 +39,8 @@ namespace Volo.Abp.EventBus.Boxes IDistributedEventBus distributedEventBus, IDistributedLockProvider distributedLockProvider, IUnitOfWorkManager unitOfWorkManager, - IClock clock) + IClock clock, + IOptions eventBusBoxesOptions) { ServiceProvider = serviceProvider; Timer = timer; @@ -45,7 +48,8 @@ namespace Volo.Abp.EventBus.Boxes DistributedLockProvider = distributedLockProvider; UnitOfWorkManager = unitOfWorkManager; Clock = clock; - Timer.Period = 2000; //TODO: Config? + EventBusBoxesOptions = eventBusBoxesOptions.Value; + Timer.Period = EventBusBoxesOptions.PeriodTimeSpan.Milliseconds; Timer.Elapsed += TimerOnElapsed; Logger = NullLogger.Instance; StoppingTokenSource = new CancellationTokenSource(); @@ -79,7 +83,7 @@ namespace Volo.Abp.EventBus.Boxes { return; } - + await using (var handle = await DistributedLockProvider.TryAcquireLockAsync(DistributedLockName, cancellationToken: StoppingToken)) { if (handle != null) @@ -88,7 +92,7 @@ namespace Volo.Abp.EventBus.Boxes while (true) { - var waitingEvents = await Inbox.GetWaitingEventsAsync(1000); //TODO: Config? Pass StoppingToken! + var waitingEvents = await Inbox.GetWaitingEventsAsync(EventBusBoxesOptions.InboxWaitingEventMaxCount, StoppingToken); if (waitingEvents.Count <= 0) { break; @@ -116,21 +120,21 @@ namespace Volo.Abp.EventBus.Boxes else { Logger.LogDebug("Could not obtain the distributed lock: " + DistributedLockName); - await TaskDelayHelper.DelayAsync(15000, StoppingToken); //TODO: Config? + await TaskDelayHelper.DelayAsync(EventBusBoxesOptions.DistributedLockWaitDuration.Milliseconds, StoppingToken); } } } protected virtual async Task DeleteOldEventsAsync() { - if (LastCleanTime != null && LastCleanTime > Clock.Now.AddHours(6)) //TODO: Config? + if (LastCleanTime != null && LastCleanTime + EventBusBoxesOptions.CleanOldEventTimeIntervalSpan > Clock.Now) { return; } await Inbox.DeleteOldEventsAsync(); - LastCleanTime = DateTime.Now; + LastCleanTime = Clock.Now; } } } diff --git a/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/OutboxSender.cs b/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/OutboxSender.cs index 1be2374268..32545227c3 100644 --- a/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/OutboxSender.cs +++ b/framework/src/Volo.Abp.EventBus.Boxes/Volo/Abp/EventBus/Boxes/OutboxSender.cs @@ -5,6 +5,7 @@ using Medallion.Threading; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.Extensions.Options; using Volo.Abp.DependencyInjection; using Volo.Abp.EventBus.Distributed; using Volo.Abp.Threading; @@ -19,9 +20,10 @@ namespace Volo.Abp.EventBus.Boxes protected IDistributedLockProvider DistributedLockProvider { get; } protected IEventOutbox Outbox { get; private set; } protected OutboxConfig OutboxConfig { get; private set; } + protected AbpEventBusBoxesOptions EventBusBoxesOptions { get; } protected string DistributedLockName => "Outbox_" + OutboxConfig.Name; public ILogger Logger { get; set; } - + protected CancellationTokenSource StoppingTokenSource { get; } protected CancellationToken StoppingToken { get; } @@ -29,13 +31,15 @@ namespace Volo.Abp.EventBus.Boxes IServiceProvider serviceProvider, AbpAsyncTimer timer, IDistributedEventBus distributedEventBus, - IDistributedLockProvider distributedLockProvider) + IDistributedLockProvider distributedLockProvider, + IOptions eventBusBoxesOptions) { ServiceProvider = serviceProvider; Timer = timer; DistributedEventBus = distributedEventBus; DistributedLockProvider = distributedLockProvider; - Timer.Period = 2000; //TODO: Config? + EventBusBoxesOptions = eventBusBoxesOptions.Value; + Timer.Period = EventBusBoxesOptions.PeriodTimeSpan.Milliseconds; Timer.Elapsed += TimerOnElapsed; Logger = NullLogger.Instance; StoppingTokenSource = new CancellationTokenSource(); @@ -65,13 +69,13 @@ namespace Volo.Abp.EventBus.Boxes protected virtual async Task RunAsync() { - await using (var handle = await DistributedLockProvider.TryAcquireLockAsync(DistributedLockName)) + await using (var handle = await DistributedLockProvider.TryAcquireLockAsync(DistributedLockName, cancellationToken: StoppingToken)) { if (handle != null) { while (true) { - var waitingEvents = await Outbox.GetWaitingEventsAsync(1000); //TODO: Config? + var waitingEvents = await Outbox.GetWaitingEventsAsync(EventBusBoxesOptions.OutboxWaitingEventMaxCount, StoppingToken); if (waitingEvents.Count <= 0) { break; @@ -96,7 +100,7 @@ namespace Volo.Abp.EventBus.Boxes else { Logger.LogDebug("Could not obtain the distributed lock: " + DistributedLockName); - await TaskDelayHelper.DelayAsync(15000, StoppingToken); //TODO: Config? + await TaskDelayHelper.DelayAsync(EventBusBoxesOptions.DistributedLockWaitDuration.Milliseconds, StoppingToken); } } } diff --git a/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/IRabbitMqSerializer.cs b/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/IRebusSerializer.cs similarity index 100% rename from framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/IRabbitMqSerializer.cs rename to framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/IRebusSerializer.cs diff --git a/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs index 8dac885036..1cb380679f 100644 --- a/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs +++ b/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventBus.cs @@ -6,6 +6,7 @@ using System.Threading.Tasks; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Options; using Rebus.Bus; +using Rebus.Pipeline; using Volo.Abp.DependencyInjection; using Volo.Abp.EventBus.Distributed; using Volo.Abp.Guids; @@ -134,6 +135,19 @@ namespace Volo.Abp.EventBus.Rebus Rebus.Unsubscribe(eventType); } + public async Task ProcessEventAsync(Type eventType, object eventData) + { + var messageId = MessageContext.Current.TransportMessage.GetMessageId(); + var eventName = EventNameAttribute.GetNameOrDefault(eventType); + + if (await AddToInboxAsync(messageId, eventName, eventType, MessageContext.Current.TransportMessage.Body)) + { + return; + } + + await TriggerHandlersAsync(eventType, eventData); + } + protected override async Task PublishToEventBusAsync(Type eventType, object eventData) { await AbpRebusEventBusOptions.Publish(Rebus, eventType, eventData); @@ -192,16 +206,29 @@ namespace Volo.Abp.EventBus.Rebus OutgoingEventInfo outgoingEvent, OutboxConfig outboxConfig) { - /* TODO: IMPLEMENT! */ - throw new NotImplementedException(); + var eventType = EventTypes.GetOrDefault(outgoingEvent.EventName); + var eventData = Serializer.Deserialize(outgoingEvent.EventData, eventType); + + return PublishToEventBusAsync(eventType, eventData); } - public override Task ProcessFromInboxAsync( + public override async Task ProcessFromInboxAsync( IncomingEventInfo incomingEvent, InboxConfig inboxConfig) { - /* TODO: IMPLEMENT! */ - throw new NotImplementedException(); + var eventType = EventTypes.GetOrDefault(incomingEvent.EventName); + if (eventType == null) + { + return; + } + + var eventData = Serializer.Deserialize(incomingEvent.EventData, eventType); + var exceptions = new List(); + await TriggerHandlersAsync(eventType, eventData, exceptions, inboxConfig); + if (exceptions.Any()) + { + ThrowOriginalExceptions(eventType, exceptions); + } } protected override byte[] Serialize(object eventData) diff --git a/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventHandlerAdapter.cs b/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventHandlerAdapter.cs index 005226b885..61330828b4 100644 --- a/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventHandlerAdapter.cs +++ b/framework/src/Volo.Abp.EventBus.Rebus/Volo/Abp/EventBus/Rebus/RebusDistributedEventHandlerAdapter.cs @@ -14,7 +14,7 @@ namespace Volo.Abp.EventBus.Rebus public async Task Handle(TEventData message) { - await RebusDistributedEventBus.TriggerHandlersAsync(typeof(TEventData), message); + await RebusDistributedEventBus.ProcessEventAsync(message.GetType(), message); } } } diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/AbpDistributedEventBusOptions.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/AbpDistributedEventBusOptions.cs index 896dfd4f7d..ab8fe6823b 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/AbpDistributedEventBusOptions.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/AbpDistributedEventBusOptions.cs @@ -5,11 +5,10 @@ namespace Volo.Abp.EventBus.Distributed public class AbpDistributedEventBusOptions { public ITypeList Handlers { get; } - + public OutboxConfigDictionary Outboxes { get; } - - public InboxConfigDictionary Inboxes { get; } + public InboxConfigDictionary Inboxes { get; } public AbpDistributedEventBusOptions() { Handlers = new TypeList(); @@ -17,4 +16,4 @@ namespace Volo.Abp.EventBus.Distributed Inboxes = new InboxConfigDictionary(); } } -} \ No newline at end of file +} diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/IEventInbox.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/IEventInbox.cs index bee802a126..d072f5d922 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/IEventInbox.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/IEventInbox.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Threading; using System.Threading.Tasks; namespace Volo.Abp.EventBus.Distributed @@ -7,13 +8,13 @@ namespace Volo.Abp.EventBus.Distributed public interface IEventInbox { Task EnqueueAsync(IncomingEventInfo incomingEvent); - - Task> GetWaitingEventsAsync(int maxCount); - + + Task> GetWaitingEventsAsync(int maxCount, CancellationToken cancellationToken = default); + Task MarkAsProcessedAsync(Guid id); - + Task ExistsByMessageIdAsync(string messageId); Task DeleteOldEventsAsync(); } -} \ No newline at end of file +} diff --git a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/IEventOutbox.cs b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/IEventOutbox.cs index 0b50993e8e..cc00ee6654 100644 --- a/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/IEventOutbox.cs +++ b/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/Distributed/IEventOutbox.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Threading; using System.Threading.Tasks; namespace Volo.Abp.EventBus.Distributed @@ -7,9 +8,9 @@ namespace Volo.Abp.EventBus.Distributed public interface IEventOutbox { Task EnqueueAsync(OutgoingEventInfo outgoingEvent); - - Task> GetWaitingEventsAsync(int maxCount); - + + Task> GetWaitingEventsAsync(int maxCount, CancellationToken cancellationToken = default); + Task DeleteAsync(Guid id); } -} \ No newline at end of file +} diff --git a/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/IHasEventInbox.cs b/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/IHasEventInbox.cs index 387e860bce..792a85e124 100644 --- a/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/IHasEventInbox.cs +++ b/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/IHasEventInbox.cs @@ -4,6 +4,6 @@ namespace Volo.Abp.MongoDB.DistributedEvents { public interface IHasEventInbox : IAbpMongoDbContext { - IMongoCollection IncomingEvents { get; set; } + IMongoCollection IncomingEvents { get; } } -} \ No newline at end of file +} diff --git a/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/IHasEventOutbox.cs b/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/IHasEventOutbox.cs index cf57aaa699..ab4bc10f35 100644 --- a/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/IHasEventOutbox.cs +++ b/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/IHasEventOutbox.cs @@ -4,6 +4,6 @@ namespace Volo.Abp.MongoDB.DistributedEvents { public interface IHasEventOutbox : IAbpMongoDbContext { - IMongoCollection OutgoingEvents { get; set; } + IMongoCollection OutgoingEvents { get; } } -} \ No newline at end of file +} diff --git a/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventInbox.cs b/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventInbox.cs index d98e144ada..3cf5485512 100644 --- a/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventInbox.cs +++ b/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventInbox.cs @@ -1,30 +1,36 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Threading; using System.Threading.Tasks; +using Microsoft.Extensions.Options; using MongoDB.Driver; using MongoDB.Driver.Linq; +using Volo.Abp.EventBus.Boxes; using Volo.Abp.EventBus.Distributed; using Volo.Abp.Timing; using Volo.Abp.Uow; namespace Volo.Abp.MongoDB.DistributedEvents { - public class MongoDbContextEventInbox : IMongoDbContextEventInbox + public class MongoDbContextEventInbox : IMongoDbContextEventInbox where TMongoDbContext : IHasEventInbox { protected IMongoDbContextProvider DbContextProvider { get; } + protected AbpEventBusBoxesOptions EventBusBoxesOptions { get; } protected IClock Clock { get; } - + public MongoDbContextEventInbox( IMongoDbContextProvider dbContextProvider, - IClock clock) + IClock clock, + IOptions eventBusBoxesOptions) { DbContextProvider = dbContextProvider; Clock = clock; + EventBusBoxesOptions = eventBusBoxesOptions.Value; } - + [UnitOfWork] public virtual async Task EnqueueAsync(IncomingEventInfo incomingEvent) { @@ -45,9 +51,9 @@ namespace Volo.Abp.MongoDB.DistributedEvents } [UnitOfWork] - public virtual async Task> GetWaitingEventsAsync(int maxCount) + public virtual async Task> GetWaitingEventsAsync(int maxCount, CancellationToken cancellationToken = default) { - var dbContext = await DbContextProvider.GetDbContextAsync(); + var dbContext = await DbContextProvider.GetDbContextAsync(cancellationToken); var outgoingEventRecords = await dbContext .IncomingEvents @@ -55,27 +61,52 @@ namespace Volo.Abp.MongoDB.DistributedEvents .Where(x => !x.Processed) .OrderBy(x => x.CreationTime) .Take(maxCount) - .ToListAsync(); - + .ToListAsync(cancellationToken: cancellationToken); + return outgoingEventRecords .Select(x => x.ToIncomingEventInfo()) .ToList(); } [UnitOfWork] - public async Task MarkAsProcessedAsync(Guid id) + public virtual async Task MarkAsProcessedAsync(Guid id) { - throw new NotImplementedException(); + var dbContext = await DbContextProvider.GetDbContextAsync(); + + var filter = Builders.Filter.Eq(x => x.Id, id); + var update = Builders.Update.Set(x => x.Processed, true).Set(x => x.ProcessedTime, Clock.Now); + + if (dbContext.SessionHandle != null) + { + await dbContext.IncomingEvents.UpdateOneAsync(dbContext.SessionHandle, filter, update); + } + else + { + await dbContext.IncomingEvents.UpdateOneAsync(filter, update); + } } - public Task ExistsByMessageIdAsync(string messageId) + [UnitOfWork] + public virtual async Task ExistsByMessageIdAsync(string messageId) { - throw new NotImplementedException(); + var dbContext = await DbContextProvider.GetDbContextAsync(); + return await dbContext.IncomingEvents.AsQueryable().AnyAsync(x => x.MessageId == messageId); } - public Task DeleteOldEventsAsync() + [UnitOfWork] + public virtual async Task DeleteOldEventsAsync() { - throw new NotImplementedException(); + var dbContext = await DbContextProvider.GetDbContextAsync(); + var timeToKeepEvents = Clock.Now - EventBusBoxesOptions.WaitTimeToDeleteProcessedInboxEvents; + + if (dbContext.SessionHandle != null) + { + await dbContext.IncomingEvents.DeleteManyAsync(dbContext.SessionHandle, x => x.Processed && x.CreationTime < timeToKeepEvents); + } + else + { + await dbContext.IncomingEvents.DeleteManyAsync(x => x.Processed && x.CreationTime < timeToKeepEvents); + } } } -} \ No newline at end of file +} diff --git a/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventOutbox.cs b/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventOutbox.cs index 1bec9e0834..dad0294d1c 100644 --- a/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventOutbox.cs +++ b/framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventOutbox.cs @@ -1,27 +1,72 @@ using System; using System.Collections.Generic; +using System.Linq; +using System.Threading; using System.Threading.Tasks; +using MongoDB.Driver; +using MongoDB.Driver.Linq; using Volo.Abp.EventBus.Distributed; using Volo.Abp.Uow; namespace Volo.Abp.MongoDB.DistributedEvents { - public class MongoDbContextEventOutbox : IMongoDbContextEventOutbox + public class MongoDbContextEventOutbox : IMongoDbContextEventOutbox where TMongoDbContext : IHasEventOutbox { - public Task EnqueueAsync(OutgoingEventInfo outgoingEvent) + protected IMongoDbContextProvider MongoDbContextProvider { get; } + + public MongoDbContextEventOutbox(IMongoDbContextProvider mongoDbContextProvider) + { + MongoDbContextProvider = mongoDbContextProvider; + } + + [UnitOfWork] + public virtual async Task EnqueueAsync(OutgoingEventInfo outgoingEvent) { - throw new NotImplementedException(); + var dbContext = (IHasEventOutbox) await MongoDbContextProvider.GetDbContextAsync(); + if (dbContext.SessionHandle != null) + { + await dbContext.OutgoingEvents.InsertOneAsync( + dbContext.SessionHandle, + new OutgoingEventRecord(outgoingEvent) + ); + } + else + { + await dbContext.OutgoingEvents.InsertOneAsync( + new OutgoingEventRecord(outgoingEvent) + ); + } } - public Task> GetWaitingEventsAsync(int maxCount) + [UnitOfWork] + public virtual async Task> GetWaitingEventsAsync(int maxCount, CancellationToken cancellationToken = default) { - throw new NotImplementedException(); + var dbContext = (IHasEventOutbox) await MongoDbContextProvider.GetDbContextAsync(cancellationToken); + + var outgoingEventRecords = await dbContext + .OutgoingEvents.AsQueryable() + .OrderBy(x => x.CreationTime) + .Take(maxCount) + .ToListAsync(cancellationToken: cancellationToken); + + return outgoingEventRecords + .Select(x => x.ToOutgoingEventInfo()) + .ToList(); } - public Task DeleteAsync(Guid id) + [UnitOfWork] + public virtual async Task DeleteAsync(Guid id) { - throw new NotImplementedException(); + var dbContext = (IHasEventOutbox) await MongoDbContextProvider.GetDbContextAsync(); + if (dbContext.SessionHandle != null) + { + await dbContext.OutgoingEvents.DeleteOneAsync(dbContext.SessionHandle, x => x.Id.Equals(id)); + } + else + { + await dbContext.OutgoingEvents.DeleteOneAsync(x => x.Id.Equals(id)); + } } } -} \ No newline at end of file +} diff --git a/test/DistEvents/DistDemoApp.EfCoreRabbitMq/appsettings.json b/test/DistEvents/DistDemoApp.EfCoreRabbitMq/appsettings.json index 1ed30c80a9..5359c021b7 100644 --- a/test/DistEvents/DistDemoApp.EfCoreRabbitMq/appsettings.json +++ b/test/DistEvents/DistDemoApp.EfCoreRabbitMq/appsettings.json @@ -16,4 +16,4 @@ "Redis": { "Configuration": "127.0.0.1" } -} \ No newline at end of file +} diff --git a/test/DistEvents/DistDemoApp.MongoDbKafka/DistDemoApp.MongoDbKafka.csproj b/test/DistEvents/DistDemoApp.MongoDbKafka/DistDemoApp.MongoDbKafka.csproj index cfd73d51bb..a503c6f6c0 100644 --- a/test/DistEvents/DistDemoApp.MongoDbKafka/DistDemoApp.MongoDbKafka.csproj +++ b/test/DistEvents/DistDemoApp.MongoDbKafka/DistDemoApp.MongoDbKafka.csproj @@ -6,4 +6,16 @@ DistDemoApp + + + + + + + + + Always + + + diff --git a/test/DistEvents/DistDemoApp.MongoDbKafka/DistDemoAppMongoDbKafkaModule.cs b/test/DistEvents/DistDemoApp.MongoDbKafka/DistDemoAppMongoDbKafkaModule.cs new file mode 100644 index 0000000000..b2e41b6ca7 --- /dev/null +++ b/test/DistEvents/DistDemoApp.MongoDbKafka/DistDemoAppMongoDbKafkaModule.cs @@ -0,0 +1,38 @@ +using Microsoft.Extensions.DependencyInjection; +using Volo.Abp.EventBus.Distributed; +using Volo.Abp.EventBus.Kafka; +using Volo.Abp.Modularity; +using Volo.Abp.MongoDB; +using Volo.Abp.MongoDB.DistributedEvents; + +namespace DistDemoApp +{ + [DependsOn( + typeof(AbpMongoDbModule), + typeof(AbpEventBusKafkaModule), + typeof(DistDemoAppSharedModule) + )] + public class DistDemoAppMongoDbKafkaModule : AbpModule + { + public override void ConfigureServices(ServiceConfigurationContext context) + { + context.Services.AddMongoDbContext(options => + { + options.AddDefaultRepositories(); + }); + + Configure(options => + { + options.Outboxes.Configure(config => + { + config.UseMongoDbContext(); + }); + + options.Inboxes.Configure(config => + { + config.UseMongoDbContext(); + }); + }); + } + } +} diff --git a/test/DistEvents/DistDemoApp.MongoDbKafka/Program.cs b/test/DistEvents/DistDemoApp.MongoDbKafka/Program.cs index d0e4cdf4ca..b048c17389 100644 --- a/test/DistEvents/DistDemoApp.MongoDbKafka/Program.cs +++ b/test/DistEvents/DistDemoApp.MongoDbKafka/Program.cs @@ -1,12 +1,57 @@ using System; +using System.Threading.Tasks; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Serilog; +using Serilog.Events; namespace DistDemoApp { - class Program + public class Program { - static void Main(string[] args) + public static async Task Main(string[] args) { - Console.WriteLine("Hello World!"); + Log.Logger = new LoggerConfiguration() +#if DEBUG + .MinimumLevel.Debug() +#else + .MinimumLevel.Information() +#endif + .MinimumLevel.Override("Microsoft", LogEventLevel.Warning) + .Enrich.FromLogContext() + .WriteTo.Async(c => c.File("Logs/logs.txt")) + .WriteTo.Async(c => c.Console()) + .CreateLogger(); + + try + { + Log.Information("Starting console host."); + await CreateHostBuilder(args).RunConsoleAsync(); + return 0; + } + catch (Exception ex) + { + Log.Fatal(ex, "Host terminated unexpectedly!"); + return 1; + } + finally + { + Log.CloseAndFlush(); + } + } + + internal static IHostBuilder CreateHostBuilder(string[] args) => + Host.CreateDefaultBuilder(args) + .UseAutofac() + .UseSerilog() + .ConfigureAppConfiguration((context, config) => + { + //setup your additional configuration sources + }) + .ConfigureServices((hostContext, services) => + { + services.AddApplication(); + }); } -} \ No newline at end of file +} diff --git a/test/DistEvents/DistDemoApp.MongoDbKafka/TodoMongoDbContext.cs b/test/DistEvents/DistDemoApp.MongoDbKafka/TodoMongoDbContext.cs new file mode 100644 index 0000000000..a7f1b78f86 --- /dev/null +++ b/test/DistEvents/DistDemoApp.MongoDbKafka/TodoMongoDbContext.cs @@ -0,0 +1,26 @@ +using MongoDB.Driver; +using Volo.Abp.Data; +using Volo.Abp.MongoDB; +using Volo.Abp.MongoDB.DistributedEvents; + +namespace DistDemoApp +{ + [ConnectionStringName("Default")] + public class TodoMongoDbContext : AbpMongoDbContext, IHasEventOutbox, IHasEventInbox + { + public IMongoCollection TodoItems => Collection(); + public IMongoCollection TodoSummaries => Collection(); + + public IMongoCollection OutgoingEvents + { + get => Collection(); + set {} + } + public IMongoCollection IncomingEvents + { + get => Collection(); + set {} + } + } + +} diff --git a/test/DistEvents/DistDemoApp.MongoDbKafka/appsettings.json b/test/DistEvents/DistDemoApp.MongoDbKafka/appsettings.json new file mode 100644 index 0000000000..d8c528fe87 --- /dev/null +++ b/test/DistEvents/DistDemoApp.MongoDbKafka/appsettings.json @@ -0,0 +1,19 @@ +{ + "ConnectionStrings": { + "Default": "mongodb://localhost:27018,localhost:27019,localhost:27020/DistEventsDemo" + }, + "Kafka": { + "Connections": { + "Default": { + "BootstrapServers": "localhost:9092" + } + }, + "EventBus": { + "GroupId": "DistDemoApp", + "TopicName": "DistDemoTopic" + } + }, + "Redis": { + "Configuration": "127.0.0.1" + } +} diff --git a/test/DistEvents/DistDemoApp.MongoDbRebus/DistDemoApp.MongoDbRebus.csproj b/test/DistEvents/DistDemoApp.MongoDbRebus/DistDemoApp.MongoDbRebus.csproj new file mode 100644 index 0000000000..c102adb382 --- /dev/null +++ b/test/DistEvents/DistDemoApp.MongoDbRebus/DistDemoApp.MongoDbRebus.csproj @@ -0,0 +1,21 @@ + + + + Exe + net5.0 + DistDemoApp + + + + + + + + + + + Always + + + + diff --git a/test/DistEvents/DistDemoApp.MongoDbRebus/DistDemoAppMongoDbRebusModule.cs b/test/DistEvents/DistDemoApp.MongoDbRebus/DistDemoAppMongoDbRebusModule.cs new file mode 100644 index 0000000000..21dab7b9b5 --- /dev/null +++ b/test/DistEvents/DistDemoApp.MongoDbRebus/DistDemoAppMongoDbRebusModule.cs @@ -0,0 +1,53 @@ +using Microsoft.Extensions.DependencyInjection; +using Rebus.Persistence.InMem; +using Rebus.Transport.InMem; +using Volo.Abp.EventBus.Distributed; +using Volo.Abp.EventBus.Rebus; +using Volo.Abp.Modularity; +using Volo.Abp.MongoDB; +using Volo.Abp.MongoDB.DistributedEvents; + +namespace DistDemoApp +{ + [DependsOn( + typeof(AbpMongoDbModule), + typeof(AbpEventBusRebusModule), + typeof(DistDemoAppSharedModule) + )] + public class DistDemoAppMongoDbRebusModule : AbpModule + { + public override void PreConfigureServices(ServiceConfigurationContext context) + { + PreConfigure(options => + { + options.InputQueueName = "eventbus"; + options.Configurer = rebusConfigurer => + { + rebusConfigurer.Transport(t => t.UseInMemoryTransport(new InMemNetwork(), "eventbus")); + rebusConfigurer.Subscriptions(s => s.StoreInMemory()); + }; + }); + } + + public override void ConfigureServices(ServiceConfigurationContext context) + { + context.Services.AddMongoDbContext(options => + { + options.AddDefaultRepositories(); + }); + + Configure(options => + { + options.Outboxes.Configure(config => + { + config.UseMongoDbContext(); + }); + + options.Inboxes.Configure(config => + { + config.UseMongoDbContext(); + }); + }); + } + } +} diff --git a/test/DistEvents/DistDemoApp.MongoDbRebus/Program.cs b/test/DistEvents/DistDemoApp.MongoDbRebus/Program.cs new file mode 100644 index 0000000000..a79ad1b1bf --- /dev/null +++ b/test/DistEvents/DistDemoApp.MongoDbRebus/Program.cs @@ -0,0 +1,57 @@ +using System; +using System.Threading.Tasks; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Serilog; +using Serilog.Events; + +namespace DistDemoApp +{ + public class Program + { + public static async Task Main(string[] args) + { + Log.Logger = new LoggerConfiguration() +#if DEBUG + .MinimumLevel.Debug() +#else + .MinimumLevel.Information() +#endif + .MinimumLevel.Override("Microsoft", LogEventLevel.Warning) + .Enrich.FromLogContext() + .WriteTo.Async(c => c.File("Logs/logs.txt")) + .WriteTo.Async(c => c.Console()) + .CreateLogger(); + + try + { + Log.Information("Starting console host."); + await CreateHostBuilder(args).RunConsoleAsync(); + return 0; + } + catch (Exception ex) + { + Log.Fatal(ex, "Host terminated unexpectedly!"); + return 1; + } + finally + { + Log.CloseAndFlush(); + } + + } + + internal static IHostBuilder CreateHostBuilder(string[] args) => + Host.CreateDefaultBuilder(args) + .UseAutofac() + .UseSerilog() + .ConfigureAppConfiguration((context, config) => + { + //setup your additional configuration sources + }) + .ConfigureServices((hostContext, services) => + { + services.AddApplication(); + }); + } +} diff --git a/test/DistEvents/DistDemoApp.MongoDbRebus/TodoMongoDbContext.cs b/test/DistEvents/DistDemoApp.MongoDbRebus/TodoMongoDbContext.cs new file mode 100644 index 0000000000..95370bb4d2 --- /dev/null +++ b/test/DistEvents/DistDemoApp.MongoDbRebus/TodoMongoDbContext.cs @@ -0,0 +1,19 @@ +using MongoDB.Driver; +using Volo.Abp.Data; +using Volo.Abp.MongoDB; +using Volo.Abp.MongoDB.DistributedEvents; + +namespace DistDemoApp +{ + [ConnectionStringName("Default")] + public class TodoMongoDbContext : AbpMongoDbContext, IHasEventOutbox, IHasEventInbox + { + public IMongoCollection TodoItems => Collection(); + public IMongoCollection TodoSummaries => Collection(); + + public IMongoCollection OutgoingEvents => Collection(); + + public IMongoCollection IncomingEvents => Collection(); + } + +} diff --git a/test/DistEvents/DistDemoApp.MongoDbRebus/appsettings.json b/test/DistEvents/DistDemoApp.MongoDbRebus/appsettings.json new file mode 100644 index 0000000000..d8c528fe87 --- /dev/null +++ b/test/DistEvents/DistDemoApp.MongoDbRebus/appsettings.json @@ -0,0 +1,19 @@ +{ + "ConnectionStrings": { + "Default": "mongodb://localhost:27018,localhost:27019,localhost:27020/DistEventsDemo" + }, + "Kafka": { + "Connections": { + "Default": { + "BootstrapServers": "localhost:9092" + } + }, + "EventBus": { + "GroupId": "DistDemoApp", + "TopicName": "DistDemoTopic" + } + }, + "Redis": { + "Configuration": "127.0.0.1" + } +} diff --git a/test/DistEvents/DistEventsDemo.sln b/test/DistEvents/DistEventsDemo.sln index 4e53ba6367..e6c3348a5f 100644 --- a/test/DistEvents/DistEventsDemo.sln +++ b/test/DistEvents/DistEventsDemo.sln @@ -6,6 +6,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DistDemoApp.MongoDbKafka", EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DistDemoApp.Shared", "DistDemoApp.Shared\DistDemoApp.Shared.csproj", "{C515F4E2-0ED3-4561-BC58-FC633B50E2EB}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DistDemoApp.MongoDbRebus", "DistDemoApp.MongoDbRebus\DistDemoApp.MongoDbRebus.csproj", "{4FB63540-4CC5-4A7B-900B-F5FCD907456E}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -24,5 +26,9 @@ Global {C515F4E2-0ED3-4561-BC58-FC633B50E2EB}.Debug|Any CPU.Build.0 = Debug|Any CPU {C515F4E2-0ED3-4561-BC58-FC633B50E2EB}.Release|Any CPU.ActiveCfg = Release|Any CPU {C515F4E2-0ED3-4561-BC58-FC633B50E2EB}.Release|Any CPU.Build.0 = Release|Any CPU + {4FB63540-4CC5-4A7B-900B-F5FCD907456E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {4FB63540-4CC5-4A7B-900B-F5FCD907456E}.Debug|Any CPU.Build.0 = Debug|Any CPU + {4FB63540-4CC5-4A7B-900B-F5FCD907456E}.Release|Any CPU.ActiveCfg = Release|Any CPU + {4FB63540-4CC5-4A7B-900B-F5FCD907456E}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection EndGlobal