diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlAdapter.cs b/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlAdapter.cs new file mode 100644 index 0000000000..0c84ec929e --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlAdapter.cs @@ -0,0 +1,25 @@ +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public class PostgreSqlAdapter : ISqlAdapter + { + public string NormalizeTableName(string tableName) + { + return $"\"{tableName}\""; + } + + public string NormalizeColumnName(string columnName) + { + return $"\"{columnName}\""; + } + + public string NormalizeColumnNameEqualsValue(string columnName, object value) + { + return $"\"{columnName}\" = '{value}'"; + } + + public string NormalizeValue(object value) + { + return $"'{value}'"; + } + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/PostgreSql/AbpEntityFrameworkCorePostgreSqlModule.cs b/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/PostgreSql/AbpEntityFrameworkCorePostgreSqlModule.cs index 4e76b89113..7d8c87c63a 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/PostgreSql/AbpEntityFrameworkCorePostgreSqlModule.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/PostgreSql/AbpEntityFrameworkCorePostgreSqlModule.cs @@ -1,4 +1,6 @@ -using Volo.Abp.Guids; +using Npgsql; +using Volo.Abp.EntityFrameworkCore.DistributedEvents; +using Volo.Abp.Guids; using Volo.Abp.Modularity; namespace Volo.Abp.EntityFrameworkCore.PostgreSql @@ -17,6 +19,11 @@ namespace Volo.Abp.EntityFrameworkCore.PostgreSql options.DefaultSequentialGuidType = SequentialGuidType.SequentialAsString; } }); + + Configure(options => + { + options.SqlAdapters.TryAdd(nameof(NpgsqlConnection).ToLower(), new PostgreSqlAdapter()); + }); } } } diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/AbpEntityFrameworkCoreModule.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/AbpEntityFrameworkCoreModule.cs index 90b541bc73..349116d6eb 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/AbpEntityFrameworkCoreModule.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/AbpEntityFrameworkCoreModule.cs @@ -26,6 +26,11 @@ namespace Volo.Abp.EntityFrameworkCore }); }); + Configure(options => + { + options.SqlAdapters.Add(DefaultSqlAdapter.Name, new DefaultSqlAdapter()); + }); + context.Services.TryAddTransient(typeof(IDbContextProvider<>), typeof(UnitOfWorkDbContextProvider<>)); context.Services.AddTransient(typeof(IDbContextEventOutbox<>), typeof(DbContextEventOutbox<>)); context.Services.AddTransient(typeof(IDbContextEventInbox<>), typeof(DbContextEventInbox<>)); diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/AbpEfCoreDistributedEventBusOptions.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/AbpEfCoreDistributedEventBusOptions.cs new file mode 100644 index 0000000000..9497552a09 --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/AbpEfCoreDistributedEventBusOptions.cs @@ -0,0 +1,19 @@ +using System.Collections.Generic; + +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public class AbpEfCoreDistributedEventBusOptions + { + public Dictionary SqlAdapters { get; set; } + + public ISqlAdapter GetSqlAdapter(string connectionType) + { + return SqlAdapters.TryGetValue(connectionType, out var sqlAdapter) ? sqlAdapter : SqlAdapters[DefaultSqlAdapter.Name]; + } + + public AbpEfCoreDistributedEventBusOptions() + { + SqlAdapters = new Dictionary(); + } + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs index 5bee24d754..d8f18c6f32 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs @@ -16,15 +16,18 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents { protected IDbContextProvider DbContextProvider { get; } protected AbpDistributedEventBusOptions DistributedEventsOptions { get; } + protected AbpEfCoreDistributedEventBusOptions EfCoreDistributedEventBusOptions { get; } protected IClock Clock { get; } public DbContextEventInbox( IDbContextProvider dbContextProvider, IClock clock, - IOptions distributedEventsOptions) + IOptions distributedEventsOptions, + IOptions efCoreDistributedEventBusOptions) { DbContextProvider = dbContextProvider; Clock = clock; + EfCoreDistributedEventBusOptions = efCoreDistributedEventBusOptions.Value; DistributedEventsOptions = distributedEventsOptions.Value; } @@ -61,8 +64,14 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents { var dbContext = await DbContextProvider.GetDbContextAsync(); var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName(); + var connectionName = dbContext.Database.GetDbConnection().GetType().Name.ToLower(); + var sqlAdapter = EfCoreDistributedEventBusOptions.GetSqlAdapter(connectionName); + + var sql = $"UPDATE {sqlAdapter.NormalizeTableName(tableName)} SET " + + $"{sqlAdapter.NormalizeColumnNameEqualsValue("Processed", 1)}, " + + $"{sqlAdapter.NormalizeColumnNameEqualsValue("ProcessedTime", Clock.Now)} WHERE " + + $"{sqlAdapter.NormalizeColumnNameEqualsValue("Id", id)}"; - var sql = $"UPDATE {tableName} SET Processed = 1, ProcessedTime = '{Clock.Now}' WHERE Id = '{id}'"; await dbContext.Database.ExecuteSqlRawAsync(sql); } @@ -79,8 +88,13 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents var dbContext = await DbContextProvider.GetDbContextAsync(); var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName(); var timeToKeepEvents = Clock.Now.Add(DistributedEventsOptions.InboxKeepEventTimeSpan); + var connectionName = dbContext.Database.GetDbConnection().GetType().Name.ToLower(); + var sqlAdapter = EfCoreDistributedEventBusOptions.GetSqlAdapter(connectionName); + + var sql = $"DELETE FROM {sqlAdapter.NormalizeTableName(tableName)} WHERE " + + $"{sqlAdapter.NormalizeColumnNameEqualsValue("Processed", 1)} AND " + + $"{sqlAdapter.NormalizeColumnName("CreationTime")} < {sqlAdapter.NormalizeValue(timeToKeepEvents)}"; - var sql = $"DELETE FROM {tableName} WHERE Processed = 1 AND CreationTime < '{timeToKeepEvents}'"; await dbContext.Database.ExecuteSqlRawAsync(sql); } } diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventOutbox.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventOutbox.cs index f0b4d3b98c..5451a0a30a 100644 --- a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventOutbox.cs +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventOutbox.cs @@ -4,6 +4,7 @@ using System.Linq; using System.Threading; using System.Threading.Tasks; using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Options; using Volo.Abp.EventBus.Distributed; using Volo.Abp.Uow; @@ -13,11 +14,14 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents where TDbContext : IHasEventOutbox { protected IDbContextProvider DbContextProvider { get; } + protected AbpEfCoreDistributedEventBusOptions EfCoreDistributedEventBusOptions { get; } public DbContextEventOutbox( - IDbContextProvider dbContextProvider) + IDbContextProvider dbContextProvider, + IOptions efCoreDistributedEventBusOptions) { DbContextProvider = dbContextProvider; + EfCoreDistributedEventBusOptions = efCoreDistributedEventBusOptions.Value; } [UnitOfWork] @@ -51,8 +55,12 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents { var dbContext = (IHasEventOutbox) await DbContextProvider.GetDbContextAsync(); var tableName = dbContext.OutgoingEvents.EntityType.GetSchemaQualifiedTableName(); + var connectionName = dbContext.Database.GetDbConnection().GetType().Name.ToLower(); + var sqlAdapter = EfCoreDistributedEventBusOptions.GetSqlAdapter(connectionName); + + var sql = $"DELETE FROM {sqlAdapter.NormalizeTableName(tableName)} WHERE " + + $"{sqlAdapter.NormalizeColumnNameEqualsValue("Id", id)}"; - var sql = $"DELETE FROM {tableName} WHERE Id = '{id}'"; await dbContext.Database.ExecuteSqlRawAsync(sql); } } diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DefaultSqlAdapter.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DefaultSqlAdapter.cs new file mode 100644 index 0000000000..8a86af7434 --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DefaultSqlAdapter.cs @@ -0,0 +1,27 @@ +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public class DefaultSqlAdapter : ISqlAdapter + { + public const string Name = "default"; + + public string NormalizeTableName(string tableName) + { + return tableName; + } + + public string NormalizeColumnName(string columnName) + { + return columnName; + } + + public string NormalizeColumnNameEqualsValue(string columnName, object value) + { + return $"{columnName} = '{value}'"; + } + + public string NormalizeValue(object value) + { + return $"'{value}'"; + } + } +} diff --git a/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/ISqlAdapter.cs b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/ISqlAdapter.cs new file mode 100644 index 0000000000..9b366f00ab --- /dev/null +++ b/framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/ISqlAdapter.cs @@ -0,0 +1,13 @@ +namespace Volo.Abp.EntityFrameworkCore.DistributedEvents +{ + public interface ISqlAdapter + { + string NormalizeTableName(string tableName); + + string NormalizeColumnName(string columnName); + + string NormalizeColumnNameEqualsValue(string columnName, object value); + + string NormalizeValue(object value); + } +}