diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.MySQL/Volo/Abp/EntityFrameworkCore/DistributedEvents/MySQLInboxConfigExtensions.cs b/framework/src/Volo.Abp.EntityFrameworkCore.MySQL/Volo/Abp/EntityFrameworkCore/DistributedEvents/MySQLInboxConfigExtensions.cs new file mode 100644 index 0000000000..ecda92702b --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.MySQL/Volo/Abp/EntityFrameworkCore/DistributedEvents/MySQLInboxConfigExtensions.cs @@ -0,0 +1,13 @@ +using Volo.Abp.EventBus.Distributed; + +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public static class MySQLInboxConfigExtensions + { + public static void UseMySQL(this InboxConfig outboxConfig) + where TDbContext : IHasEventInbox + { + outboxConfig.ImplementationType = typeof(ISqlRawDbContextEventInbox); + } + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.MySQL/Volo/Abp/EntityFrameworkCore/DistributedEvents/MySQLOutboxConfigExtensions.cs b/framework/src/Volo.Abp.EntityFrameworkCore.MySQL/Volo/Abp/EntityFrameworkCore/DistributedEvents/MySQLOutboxConfigExtensions.cs new file mode 100644 index 0000000000..8657ba92ab --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.MySQL/Volo/Abp/EntityFrameworkCore/DistributedEvents/MySQLOutboxConfigExtensions.cs @@ -0,0 +1,13 @@ +using Volo.Abp.EventBus.Distributed; + +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public static class MySQLOutboxConfigExtensions + { + public static void UseMySQL(this OutboxConfig outboxConfig) + where TDbContext : IHasEventOutbox + { + outboxConfig.ImplementationType = typeof(ISqlRawDbContextEventOutbox); + } + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/IOracleDbContextEventInbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/IOracleDbContextEventInbox.cs new file mode 100644 index 0000000000..0cc7ae5531 --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/IOracleDbContextEventInbox.cs @@ -0,0 +1,8 @@ +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public interface IOracleDbContextEventInbox : IDbContextEventInbox + where TDbContext : IHasEventInbox + { + + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/IOracleDbContextEventOutbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/IOracleDbContextEventOutbox.cs new file mode 100644 index 0000000000..a588f36e43 --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/IOracleDbContextEventOutbox.cs @@ -0,0 +1,7 @@ +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public interface IOracleDbContextEventOutbox : IDbContextEventOutbox + where TDbContext : IHasEventOutbox + { + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventInbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventInbox.cs new file mode 100644 index 0000000000..d5f1b7afcc --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventInbox.cs @@ -0,0 +1,47 @@ +using System; +using System.Threading.Tasks; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Options; +using Volo.Abp.EventBus.Distributed; +using Volo.Abp.Timing; +using Volo.Abp.Uow; + +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public class OracleDbContextEventInbox : DbContextEventInbox , IOracleDbContextEventInbox + where TDbContext : IHasEventInbox + { + public OracleDbContextEventInbox( + IDbContextProvider dbContextProvider, + IClock clock, + IOptions distributedEventsOptions) : base(dbContextProvider, clock, distributedEventsOptions) + { + } + + [UnitOfWork] + public override async Task MarkAsProcessedAsync(Guid id) + { + var dbContext = await DbContextProvider.GetDbContextAsync(); + var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName(); + + var sql = $"UPDATE \"{tableName}\" SET \"Processed\" = '1', \"ProcessedTime\" = TO_DATE('{Clock.Now}', 'yyyy-mm-dd hh24:mi:ss') WHERE \"Id\" = HEXTORAW('{GuidToOracleType(id)}')"; + await dbContext.Database.ExecuteSqlRawAsync(sql); + } + + [UnitOfWork] + public override async Task DeleteOldEventsAsync() + { + var dbContext = await DbContextProvider.GetDbContextAsync(); + var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName(); + var timeToKeepEvents = Clock.Now.Add(DistributedEventsOptions.InboxKeepEventTimeSpan); + + var sql = $"DELETE FROM \"{tableName}\" WHERE \"Processed\" = '1' AND \"CreationTime\" < TO_DATE('{timeToKeepEvents}', 'yyyy-mm-dd hh24:mi:ss')"; + await dbContext.Database.ExecuteSqlRawAsync(sql); + } + + protected virtual string GuidToOracleType(Guid id) + { + return BitConverter.ToString(id.ToByteArray()).Replace("-", "").ToUpper(); + } + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventOutbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventOutbox.cs new file mode 100644 index 0000000000..a5c4566ced --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventOutbox.cs @@ -0,0 +1,31 @@ +using System; +using System.Threading.Tasks; +using Microsoft.EntityFrameworkCore; +using Volo.Abp.Uow; + +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public class OracleDbContextEventOutbox : DbContextEventOutbox , IOracleDbContextEventOutbox + where TDbContext : IHasEventOutbox + { + public OracleDbContextEventOutbox(IDbContextProvider dbContextProvider) + : base(dbContextProvider) + { + } + + [UnitOfWork] + public override async Task DeleteAsync(Guid id) + { + var dbContext = (IHasEventOutbox) await DbContextProvider.GetDbContextAsync(); + var tableName = dbContext.OutgoingEvents.EntityType.GetSchemaQualifiedTableName(); + + var sql = $"DELETE FROM \"{tableName}\" WHERE \"Id\" = HEXTORAW('{GuidToOracleType(id)}')"; + await dbContext.Database.ExecuteSqlRawAsync(sql); + } + + protected virtual string GuidToOracleType(Guid id) + { + return BitConverter.ToString(id.ToByteArray()).Replace("-", "").ToUpper(); + } + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleInboxConfigExtensions.cs b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleInboxConfigExtensions.cs new file mode 100644 index 0000000000..ca79019e28 --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleInboxConfigExtensions.cs @@ -0,0 +1,13 @@ +using Volo.Abp.EventBus.Distributed; + +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public static class OracleInboxConfigExtensions + { + public static void UseOracle(this InboxConfig outboxConfig) + where TDbContext : IHasEventInbox + { + outboxConfig.ImplementationType = typeof(IOracleDbContextEventInbox); + } + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleOutboxConfigExtensions.cs b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleOutboxConfigExtensions.cs new file mode 100644 index 0000000000..e2d7d33761 --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleOutboxConfigExtensions.cs @@ -0,0 +1,13 @@ +using Volo.Abp.EventBus.Distributed; + +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public static class OracleOutboxConfigExtensions + { + public static void UseOracle(this OutboxConfig outboxConfig) + where TDbContext : IHasEventOutbox + { + outboxConfig.ImplementationType = typeof(IOracleDbContextEventOutbox); + } + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/Oracle/Devart/AbpEntityFrameworkCoreOracleDevartModule.cs b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/Oracle/Devart/AbpEntityFrameworkCoreOracleDevartModule.cs index 9580219cfc..76d0ecdd21 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/Oracle/Devart/AbpEntityFrameworkCoreOracleDevartModule.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/Oracle/Devart/AbpEntityFrameworkCoreOracleDevartModule.cs @@ -1,4 +1,6 @@ -using Volo.Abp.Guids; +using Microsoft.Extensions.DependencyInjection; +using Volo.Abp.EntityFrameworkCore.DistributedEvents; +using Volo.Abp.Guids; using Volo.Abp.Modularity; namespace Volo.Abp.EntityFrameworkCore.Oracle.Devart @@ -17,6 +19,9 @@ namespace Volo.Abp.EntityFrameworkCore.Oracle.Devart options.DefaultSequentialGuidType = SequentialGuidType.SequentialAsBinary; } }); + + context.Services.AddTransient(typeof(IOracleDbContextEventOutbox<>), typeof(OracleDbContextEventOutbox<>)); + context.Services.AddTransient(typeof(IOracleDbContextEventInbox<>), typeof(OracleDbContextEventInbox<>)); } } } diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/IOracleDbContextEventInbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/IOracleDbContextEventInbox.cs new file mode 100644 index 0000000000..0cc7ae5531 --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/IOracleDbContextEventInbox.cs @@ -0,0 +1,8 @@ +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public interface IOracleDbContextEventInbox : IDbContextEventInbox + where TDbContext : IHasEventInbox + { + + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/IOracleDbContextEventOutbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/IOracleDbContextEventOutbox.cs new file mode 100644 index 0000000000..a588f36e43 --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/IOracleDbContextEventOutbox.cs @@ -0,0 +1,7 @@ +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public interface IOracleDbContextEventOutbox : IDbContextEventOutbox + where TDbContext : IHasEventOutbox + { + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventInbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventInbox.cs new file mode 100644 index 0000000000..d5f1b7afcc --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventInbox.cs @@ -0,0 +1,47 @@ +using System; +using System.Threading.Tasks; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Options; +using Volo.Abp.EventBus.Distributed; +using Volo.Abp.Timing; +using Volo.Abp.Uow; + +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public class OracleDbContextEventInbox : DbContextEventInbox , IOracleDbContextEventInbox + where TDbContext : IHasEventInbox + { + public OracleDbContextEventInbox( + IDbContextProvider dbContextProvider, + IClock clock, + IOptions distributedEventsOptions) : base(dbContextProvider, clock, distributedEventsOptions) + { + } + + [UnitOfWork] + public override async Task MarkAsProcessedAsync(Guid id) + { + var dbContext = await DbContextProvider.GetDbContextAsync(); + var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName(); + + var sql = $"UPDATE \"{tableName}\" SET \"Processed\" = '1', \"ProcessedTime\" = TO_DATE('{Clock.Now}', 'yyyy-mm-dd hh24:mi:ss') WHERE \"Id\" = HEXTORAW('{GuidToOracleType(id)}')"; + await dbContext.Database.ExecuteSqlRawAsync(sql); + } + + [UnitOfWork] + public override async Task DeleteOldEventsAsync() + { + var dbContext = await DbContextProvider.GetDbContextAsync(); + var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName(); + var timeToKeepEvents = Clock.Now.Add(DistributedEventsOptions.InboxKeepEventTimeSpan); + + var sql = $"DELETE FROM \"{tableName}\" WHERE \"Processed\" = '1' AND \"CreationTime\" < TO_DATE('{timeToKeepEvents}', 'yyyy-mm-dd hh24:mi:ss')"; + await dbContext.Database.ExecuteSqlRawAsync(sql); + } + + protected virtual string GuidToOracleType(Guid id) + { + return BitConverter.ToString(id.ToByteArray()).Replace("-", "").ToUpper(); + } + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventOutbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventOutbox.cs new file mode 100644 index 0000000000..a5c4566ced --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventOutbox.cs @@ -0,0 +1,31 @@ +using System; +using System.Threading.Tasks; +using Microsoft.EntityFrameworkCore; +using Volo.Abp.Uow; + +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public class OracleDbContextEventOutbox : DbContextEventOutbox , IOracleDbContextEventOutbox + where TDbContext : IHasEventOutbox + { + public OracleDbContextEventOutbox(IDbContextProvider dbContextProvider) + : base(dbContextProvider) + { + } + + [UnitOfWork] + public override async Task DeleteAsync(Guid id) + { + var dbContext = (IHasEventOutbox) await DbContextProvider.GetDbContextAsync(); + var tableName = dbContext.OutgoingEvents.EntityType.GetSchemaQualifiedTableName(); + + var sql = $"DELETE FROM \"{tableName}\" WHERE \"Id\" = HEXTORAW('{GuidToOracleType(id)}')"; + await dbContext.Database.ExecuteSqlRawAsync(sql); + } + + protected virtual string GuidToOracleType(Guid id) + { + return BitConverter.ToString(id.ToByteArray()).Replace("-", "").ToUpper(); + } + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleInboxConfigExtensions.cs b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleInboxConfigExtensions.cs new file mode 100644 index 0000000000..ca79019e28 --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleInboxConfigExtensions.cs @@ -0,0 +1,13 @@ +using Volo.Abp.EventBus.Distributed; + +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public static class OracleInboxConfigExtensions + { + public static void UseOracle(this InboxConfig outboxConfig) + where TDbContext : IHasEventInbox + { + outboxConfig.ImplementationType = typeof(IOracleDbContextEventInbox); + } + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleOutboxConfigExtensions.cs b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleOutboxConfigExtensions.cs new file mode 100644 index 0000000000..e2d7d33761 --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleOutboxConfigExtensions.cs @@ -0,0 +1,13 @@ +using Volo.Abp.EventBus.Distributed; + +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public static class OracleOutboxConfigExtensions + { + public static void UseOracle(this OutboxConfig outboxConfig) + where TDbContext : IHasEventOutbox + { + outboxConfig.ImplementationType = typeof(IOracleDbContextEventOutbox); + } + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/Oracle/AbpEntityFrameworkCoreOracleModule.cs b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/Oracle/AbpEntityFrameworkCoreOracleModule.cs index b7cbaec1a1..7716ae6150 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/Oracle/AbpEntityFrameworkCoreOracleModule.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/Oracle/AbpEntityFrameworkCoreOracleModule.cs @@ -1,4 +1,6 @@ -using Volo.Abp.Guids; +using Microsoft.Extensions.DependencyInjection; +using Volo.Abp.EntityFrameworkCore.DistributedEvents; +using Volo.Abp.Guids; using Volo.Abp.Modularity; namespace Volo.Abp.EntityFrameworkCore.Oracle @@ -15,6 +17,9 @@ namespace Volo.Abp.EntityFrameworkCore.Oracle options.DefaultSequentialGuidType = SequentialGuidType.SequentialAsBinary; } }); + + context.Services.AddTransient(typeof(IOracleDbContextEventOutbox<>), typeof(OracleDbContextEventOutbox<>)); + context.Services.AddTransient(typeof(IOracleDbContextEventInbox<>), typeof(OracleDbContextEventInbox<>)); } } } diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/IPostgreSqlDbContextEventInbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/IPostgreSqlDbContextEventInbox.cs new file mode 100644 index 0000000000..5c24d79f88 --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/IPostgreSqlDbContextEventInbox.cs @@ -0,0 +1,8 @@ +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public interface IPostgreSqlDbContextEventInbox : IDbContextEventInbox + where TDbContext : IHasEventInbox + { + + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/IPostgreSqlDbContextEventOutbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/IPostgreSqlDbContextEventOutbox.cs new file mode 100644 index 0000000000..7e6bc4bd59 --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/IPostgreSqlDbContextEventOutbox.cs @@ -0,0 +1,7 @@ +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public interface IPostgreSqlDbContextEventOutbox : IDbContextEventOutbox + where TDbContext : IHasEventOutbox + { + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlAdapter.cs b/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlAdapter.cs deleted file mode 100644 index 0c84ec929e..0000000000 --- a/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlAdapter.cs +++ /dev/null @@ -1,25 +0,0 @@ -namespace Volo.Abp.EntityFrameworkCore.DistributedEvents -{ - public class PostgreSqlAdapter : ISqlAdapter - { - public string NormalizeTableName(string tableName) - { - return $"\"{tableName}\""; - } - - public string NormalizeColumnName(string columnName) - { - return $"\"{columnName}\""; - } - - public string NormalizeColumnNameEqualsValue(string columnName, object value) - { - return $"\"{columnName}\" = '{value}'"; - } - - public string NormalizeValue(object value) - { - return $"'{value}'"; - } - } -} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlDbContextEventInbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlDbContextEventInbox.cs new file mode 100644 index 0000000000..4560bdb2c4 --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlDbContextEventInbox.cs @@ -0,0 +1,43 @@ +using System; +using System.Threading.Tasks; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Options; +using Volo.Abp.EventBus.Distributed; +using Volo.Abp.Timing; +using Volo.Abp.Uow; + +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public class PostgreSqlDbContextEventInbox : DbContextEventInbox, IPostgreSqlDbContextEventInbox + where TDbContext : IHasEventInbox + { + public PostgreSqlDbContextEventInbox( + IDbContextProvider dbContextProvider, + IClock clock, + IOptions distributedEventsOptions) + : base(dbContextProvider, clock, distributedEventsOptions) + { + } + + [UnitOfWork] + public override async Task MarkAsProcessedAsync(Guid id) + { + var dbContext = await DbContextProvider.GetDbContextAsync(); + var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName(); + + var sql = $"UPDATE \"{tableName}\" SET \"Processed\" = '1', \"ProcessedTime\" = '{Clock.Now}' WHERE \"Id\" = '{id}'"; + await dbContext.Database.ExecuteSqlRawAsync(sql); + } + + [UnitOfWork] + public override async Task DeleteOldEventsAsync() + { + var dbContext = await DbContextProvider.GetDbContextAsync(); + var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName(); + var timeToKeepEvents = Clock.Now.Add(DistributedEventsOptions.InboxKeepEventTimeSpan); + + var sql = $"DELETE FROM \"{tableName}\" WHERE \"Processed\" = '1' AND \"CreationTime\" < '{timeToKeepEvents}'"; + await dbContext.Database.ExecuteSqlRawAsync(sql); + } + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlDbContextEventOutbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlDbContextEventOutbox.cs new file mode 100644 index 0000000000..c5e79a0014 --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlDbContextEventOutbox.cs @@ -0,0 +1,25 @@ +using System; +using System.Threading.Tasks; +using Microsoft.EntityFrameworkCore; +using Volo.Abp.Uow; + +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public class PostgreSqlDbContextEventOutbox : DbContextEventOutbox , IPostgreSqlDbContextEventOutbox + where TDbContext : IHasEventOutbox + { + public PostgreSqlDbContextEventOutbox(IDbContextProvider dbContextProvider) : base(dbContextProvider) + { + } + + [UnitOfWork] + public override async Task DeleteAsync(Guid id) + { + var dbContext = (IHasEventOutbox) await DbContextProvider.GetDbContextAsync(); + var tableName = dbContext.OutgoingEvents.EntityType.GetSchemaQualifiedTableName(); + + var sql = $"DELETE FROM \"{tableName}\" WHERE \"Id\" = '{id}'"; + await dbContext.Database.ExecuteSqlRawAsync(sql); + } + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlInboxConfigExtensions.cs b/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlInboxConfigExtensions.cs new file mode 100644 index 0000000000..55afc4794d --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlInboxConfigExtensions.cs @@ -0,0 +1,13 @@ +using Volo.Abp.EventBus.Distributed; + +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public static class PostgreSqlInboxConfigExtensions + { + public static void UsePostgreSql(this InboxConfig outboxConfig) + where TDbContext : IHasEventInbox + { + outboxConfig.ImplementationType = typeof(IPostgreSqlDbContextEventInbox); + } + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlOutboxConfigExtensions.cs b/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlOutboxConfigExtensions.cs new file mode 100644 index 0000000000..5a65d15c90 --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlOutboxConfigExtensions.cs @@ -0,0 +1,13 @@ +using Volo.Abp.EventBus.Distributed; + +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public static class PostgreSqlOutboxConfigExtensions + { + public static void UsePostgreSql(this OutboxConfig outboxConfig) + where TDbContext : IHasEventOutbox + { + outboxConfig.ImplementationType = typeof(IPostgreSqlDbContextEventOutbox); + } + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/PostgreSql/AbpEntityFrameworkCorePostgreSqlModule.cs b/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/PostgreSql/AbpEntityFrameworkCorePostgreSqlModule.cs index 7d8c87c63a..55c1a4af42 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/PostgreSql/AbpEntityFrameworkCorePostgreSqlModule.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/PostgreSql/AbpEntityFrameworkCorePostgreSqlModule.cs @@ -1,4 +1,4 @@ -using Npgsql; +using Microsoft.Extensions.DependencyInjection; using Volo.Abp.EntityFrameworkCore.DistributedEvents; using Volo.Abp.Guids; using Volo.Abp.Modularity; @@ -20,10 +20,8 @@ namespace Volo.Abp.EntityFrameworkCore.PostgreSql } }); - Configure(options => - { - options.SqlAdapters.TryAdd(nameof(NpgsqlConnection).ToLower(), new PostgreSqlAdapter()); - }); + context.Services.AddTransient(typeof(IPostgreSqlDbContextEventOutbox<>), typeof(PostgreSqlDbContextEventOutbox<>)); + context.Services.AddTransient(typeof(IPostgreSqlDbContextEventInbox<>), typeof(PostgreSqlDbContextEventInbox<>)); } } } diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.SqlServer/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqlServerInboxConfigExtensions.cs b/framework/src/Volo.Abp.EntityFrameworkCore.SqlServer/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqlServerInboxConfigExtensions.cs new file mode 100644 index 0000000000..60adf600c7 --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.SqlServer/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqlServerInboxConfigExtensions.cs @@ -0,0 +1,13 @@ +using Volo.Abp.EventBus.Distributed; + +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public static class SqlServerInboxConfigExtensions + { + public static void UseSqlServer(this InboxConfig outboxConfig) + where TDbContext : IHasEventInbox + { + outboxConfig.ImplementationType = typeof(ISqlRawDbContextEventInbox); + } + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.SqlServer/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqlServerOutboxConfigExtensions.cs b/framework/src/Volo.Abp.EntityFrameworkCore.SqlServer/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqlServerOutboxConfigExtensions.cs new file mode 100644 index 0000000000..9022d5c7e6 --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.SqlServer/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqlServerOutboxConfigExtensions.cs @@ -0,0 +1,13 @@ +using Volo.Abp.EventBus.Distributed; + +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public static class SqlServerOutboxConfigExtensions + { + public static void UseSqlServer(this OutboxConfig outboxConfig) + where TDbContext : IHasEventOutbox + { + outboxConfig.ImplementationType = typeof(ISqlRawDbContextEventOutbox); + } + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.Sqlite/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqliteInboxConfigExtensions.cs b/framework/src/Volo.Abp.EntityFrameworkCore.Sqlite/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqliteInboxConfigExtensions.cs new file mode 100644 index 0000000000..ccc92d4eb1 --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.Sqlite/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqliteInboxConfigExtensions.cs @@ -0,0 +1,13 @@ +using Volo.Abp.EventBus.Distributed; + +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public static class SqliteInboxConfigExtensions + { + public static void UseSqlite(this InboxConfig outboxConfig) + where TDbContext : IHasEventInbox + { + outboxConfig.ImplementationType = typeof(ISqlRawDbContextEventInbox); + } + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.Sqlite/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqliteOutboxConfigExtensions.cs b/framework/src/Volo.Abp.EntityFrameworkCore.Sqlite/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqliteOutboxConfigExtensions.cs new file mode 100644 index 0000000000..c1d9949c19 --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.Sqlite/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqliteOutboxConfigExtensions.cs @@ -0,0 +1,13 @@ +using Volo.Abp.EventBus.Distributed; + +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public static class SqliteOutboxConfigExtensions + { + public static void UseSqlite(this OutboxConfig outboxConfig) + where TDbContext : IHasEventOutbox + { + outboxConfig.ImplementationType = typeof(ISqlRawDbContextEventOutbox); + } + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/AbpEntityFrameworkCoreModule.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/AbpEntityFrameworkCoreModule.cs index 349116d6eb..f7834c1835 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/AbpEntityFrameworkCoreModule.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/AbpEntityFrameworkCoreModule.cs @@ -5,6 +5,7 @@ using Volo.Abp.Domain; using Volo.Abp.EntityFrameworkCore.DependencyInjection; using Volo.Abp.EntityFrameworkCore.DistributedEvents; using Volo.Abp.Modularity; +using Volo.Abp.Uow; using Volo.Abp.Uow.EntityFrameworkCore; namespace Volo.Abp.EntityFrameworkCore @@ -26,14 +27,12 @@ namespace Volo.Abp.EntityFrameworkCore }); }); - Configure(options => - { - options.SqlAdapters.Add(DefaultSqlAdapter.Name, new DefaultSqlAdapter()); - }); - context.Services.TryAddTransient(typeof(IDbContextProvider<>), typeof(UnitOfWorkDbContextProvider<>)); context.Services.AddTransient(typeof(IDbContextEventOutbox<>), typeof(DbContextEventOutbox<>)); context.Services.AddTransient(typeof(IDbContextEventInbox<>), typeof(DbContextEventInbox<>)); + + context.Services.AddTransient(typeof(ISqlRawDbContextEventOutbox<>), typeof(SqlRawDbContextEventOutbox<>)); + context.Services.AddTransient(typeof(ISqlRawDbContextEventInbox<>), typeof(SqlRawDbContextEventInbox<>)); } } } diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/AbpEfCoreDistributedEventBusOptions.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/AbpEfCoreDistributedEventBusOptions.cs deleted file mode 100644 index 9497552a09..0000000000 --- a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/AbpEfCoreDistributedEventBusOptions.cs +++ /dev/null @@ -1,19 +0,0 @@ -using System.Collections.Generic; - -namespace Volo.Abp.EntityFrameworkCore.DistributedEvents -{ - public class AbpEfCoreDistributedEventBusOptions - { - public Dictionary SqlAdapters { get; set; } - - public ISqlAdapter GetSqlAdapter(string connectionType) - { - return SqlAdapters.TryGetValue(connectionType, out var sqlAdapter) ? sqlAdapter : SqlAdapters[DefaultSqlAdapter.Name]; - } - - public AbpEfCoreDistributedEventBusOptions() - { - SqlAdapters = new Dictionary(); - } - } -} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs index d8f18c6f32..982d335d17 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs @@ -16,18 +16,15 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents { protected IDbContextProvider DbContextProvider { get; } protected AbpDistributedEventBusOptions DistributedEventsOptions { get; } - protected AbpEfCoreDistributedEventBusOptions EfCoreDistributedEventBusOptions { get; } protected IClock Clock { get; } public DbContextEventInbox( IDbContextProvider dbContextProvider, IClock clock, - IOptions distributedEventsOptions, - IOptions efCoreDistributedEventBusOptions) + IOptions distributedEventsOptions) { DbContextProvider = dbContextProvider; Clock = clock; - EfCoreDistributedEventBusOptions = efCoreDistributedEventBusOptions.Value; DistributedEventsOptions = distributedEventsOptions.Value; } @@ -63,16 +60,11 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents public virtual async Task MarkAsProcessedAsync(Guid id) { var dbContext = await DbContextProvider.GetDbContextAsync(); - var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName(); - var connectionName = dbContext.Database.GetDbConnection().GetType().Name.ToLower(); - var sqlAdapter = EfCoreDistributedEventBusOptions.GetSqlAdapter(connectionName); - - var sql = $"UPDATE {sqlAdapter.NormalizeTableName(tableName)} SET " + - $"{sqlAdapter.NormalizeColumnNameEqualsValue("Processed", 1)}, " + - $"{sqlAdapter.NormalizeColumnNameEqualsValue("ProcessedTime", Clock.Now)} WHERE " + - $"{sqlAdapter.NormalizeColumnNameEqualsValue("Id", id)}"; - - await dbContext.Database.ExecuteSqlRawAsync(sql); + var incomingEvent = await dbContext.IncomingEvents.FindAsync(id); + if (incomingEvent != null) + { + incomingEvent.MarkAsProcessed(Clock.Now); + } } [UnitOfWork] @@ -86,16 +78,11 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents public virtual async Task DeleteOldEventsAsync() { var dbContext = await DbContextProvider.GetDbContextAsync(); - var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName(); var timeToKeepEvents = Clock.Now.Add(DistributedEventsOptions.InboxKeepEventTimeSpan); - var connectionName = dbContext.Database.GetDbConnection().GetType().Name.ToLower(); - var sqlAdapter = EfCoreDistributedEventBusOptions.GetSqlAdapter(connectionName); - - var sql = $"DELETE FROM {sqlAdapter.NormalizeTableName(tableName)} WHERE " + - $"{sqlAdapter.NormalizeColumnNameEqualsValue("Processed", 1)} AND " + - $"{sqlAdapter.NormalizeColumnName("CreationTime")} < {sqlAdapter.NormalizeValue(timeToKeepEvents)}"; - - await dbContext.Database.ExecuteSqlRawAsync(sql); + var oldEvents = await dbContext.IncomingEvents + .Where(x => x.Processed && x.CreationTime < timeToKeepEvents) + .ToListAsync(); + dbContext.IncomingEvents.RemoveRange(oldEvents); } } } diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventOutbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventOutbox.cs index 5451a0a30a..0b8909b966 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventOutbox.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventOutbox.cs @@ -4,7 +4,6 @@ using System.Linq; using System.Threading; using System.Threading.Tasks; using Microsoft.EntityFrameworkCore; -using Microsoft.Extensions.Options; using Volo.Abp.EventBus.Distributed; using Volo.Abp.Uow; @@ -14,14 +13,11 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents where TDbContext : IHasEventOutbox { protected IDbContextProvider DbContextProvider { get; } - protected AbpEfCoreDistributedEventBusOptions EfCoreDistributedEventBusOptions { get; } public DbContextEventOutbox( - IDbContextProvider dbContextProvider, - IOptions efCoreDistributedEventBusOptions) + IDbContextProvider dbContextProvider) { DbContextProvider = dbContextProvider; - EfCoreDistributedEventBusOptions = efCoreDistributedEventBusOptions.Value; } [UnitOfWork] @@ -54,14 +50,11 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents public virtual async Task DeleteAsync(Guid id) { var dbContext = (IHasEventOutbox) await DbContextProvider.GetDbContextAsync(); - var tableName = dbContext.OutgoingEvents.EntityType.GetSchemaQualifiedTableName(); - var connectionName = dbContext.Database.GetDbConnection().GetType().Name.ToLower(); - var sqlAdapter = EfCoreDistributedEventBusOptions.GetSqlAdapter(connectionName); - - var sql = $"DELETE FROM {sqlAdapter.NormalizeTableName(tableName)} WHERE " + - $"{sqlAdapter.NormalizeColumnNameEqualsValue("Id", id)}"; - - await dbContext.Database.ExecuteSqlRawAsync(sql); + var outgoingEvent = await dbContext.OutgoingEvents.FindAsync(id); + if (outgoingEvent != null) + { + dbContext.Remove(outgoingEvent); + } } } } diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DefaultSqlAdapter.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DefaultSqlAdapter.cs deleted file mode 100644 index 8a86af7434..0000000000 --- a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DefaultSqlAdapter.cs +++ /dev/null @@ -1,27 +0,0 @@ -namespace Volo.Abp.EntityFrameworkCore.DistributedEvents -{ - public class DefaultSqlAdapter : ISqlAdapter - { - public const string Name = "default"; - - public string NormalizeTableName(string tableName) - { - return tableName; - } - - public string NormalizeColumnName(string columnName) - { - return columnName; - } - - public string NormalizeColumnNameEqualsValue(string columnName, object value) - { - return $"{columnName} = '{value}'"; - } - - public string NormalizeValue(object value) - { - return $"'{value}'"; - } - } -} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/ISqlAdapter.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/ISqlAdapter.cs deleted file mode 100644 index 9b366f00ab..0000000000 --- a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/ISqlAdapter.cs +++ /dev/null @@ -1,13 +0,0 @@ -namespace Volo.Abp.EntityFrameworkCore.DistributedEvents -{ - public interface ISqlAdapter - { - string NormalizeTableName(string tableName); - - string NormalizeColumnName(string columnName); - - string NormalizeColumnNameEqualsValue(string columnName, object value); - - string NormalizeValue(object value); - } -} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/ISqlRawDbContextEventInbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/ISqlRawDbContextEventInbox.cs new file mode 100644 index 0000000000..d86e3f36ed --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/ISqlRawDbContextEventInbox.cs @@ -0,0 +1,7 @@ +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public interface ISqlRawDbContextEventInbox : IDbContextEventInbox + where TDbContext : IHasEventInbox + { + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/ISqlRawDbContextEventOutbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/ISqlRawDbContextEventOutbox.cs new file mode 100644 index 0000000000..776cc2f93c --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/ISqlRawDbContextEventOutbox.cs @@ -0,0 +1,7 @@ +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public interface ISqlRawDbContextEventOutbox : IDbContextEventOutbox + where TDbContext : IHasEventOutbox + { + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqlRawDbContextEventInbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqlRawDbContextEventInbox.cs new file mode 100644 index 0000000000..04bef7579d --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqlRawDbContextEventInbox.cs @@ -0,0 +1,43 @@ +using System; +using System.Threading.Tasks; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Options; +using Volo.Abp.EventBus.Distributed; +using Volo.Abp.Timing; +using Volo.Abp.Uow; + +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public class SqlRawDbContextEventInbox : DbContextEventInbox , ISqlRawDbContextEventInbox + where TDbContext : IHasEventInbox + { + public SqlRawDbContextEventInbox( + IDbContextProvider dbContextProvider, + IClock clock, + IOptions distributedEventsOptions) + : base(dbContextProvider, clock, distributedEventsOptions) + { + } + + [UnitOfWork] + public override async Task MarkAsProcessedAsync(Guid id) + { + var dbContext = await DbContextProvider.GetDbContextAsync(); + var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName(); + + var sql = $"UPDATE {tableName} SET Processed = '1', ProcessedTime = '{Clock.Now}' WHERE Id = '{id}'"; + await dbContext.Database.ExecuteSqlRawAsync(sql); + } + + [UnitOfWork] + public override async Task DeleteOldEventsAsync() + { + var dbContext = await DbContextProvider.GetDbContextAsync(); + var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName(); + var timeToKeepEvents = Clock.Now.Add(DistributedEventsOptions.InboxKeepEventTimeSpan); + + var sql = $"DELETE FROM {tableName} WHERE Processed = '1' AND CreationTime < '{timeToKeepEvents}'"; + await dbContext.Database.ExecuteSqlRawAsync(sql); + } + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqlRawDbContextEventOutbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqlRawDbContextEventOutbox.cs new file mode 100644 index 0000000000..6c890ef9f8 --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqlRawDbContextEventOutbox.cs @@ -0,0 +1,26 @@ +using System; +using System.Threading.Tasks; +using Microsoft.EntityFrameworkCore; +using Volo.Abp.Uow; + +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public class SqlRawDbContextEventOutbox : DbContextEventOutbox , ISqlRawDbContextEventOutbox + where TDbContext : IHasEventOutbox + { + public SqlRawDbContextEventOutbox(IDbContextProvider dbContextProvider) + : base(dbContextProvider) + { + } + + [UnitOfWork] + public override async Task DeleteAsync(Guid id) + { + var dbContext = (IHasEventOutbox) await DbContextProvider.GetDbContextAsync(); + var tableName = dbContext.OutgoingEvents.EntityType.GetSchemaQualifiedTableName(); + + var sql = $"DELETE FROM {tableName} WHERE Id = '{id}'"; + await dbContext.Database.ExecuteSqlRawAsync(sql); + } + } +}