Browse Source

Add ISqlAdapter

pull/10159/head
liangshiwei 4 years ago
parent
commit
bc4944c19d
  1. 25
      framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlAdapter.cs
  2. 9
      framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/PostgreSql/AbpEntityFrameworkCorePostgreSqlModule.cs
  3. 5
      framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/AbpEntityFrameworkCoreModule.cs
  4. 19
      framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/AbpEfCoreDistributedEventBusOptions.cs
  5. 20
      framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs
  6. 12
      framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventOutbox.cs
  7. 27
      framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DefaultSqlAdapter.cs
  8. 13
      framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/ISqlAdapter.cs

25
framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlAdapter.cs

@ -0,0 +1,25 @@
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public class PostgreSqlAdapter : ISqlAdapter
{
public string NormalizeTableName(string tableName)
{
return $"\"{tableName}\"";
}
public string NormalizeColumnName(string columnName)
{
return $"\"{columnName}\"";
}
public string NormalizeColumnNameEqualsValue(string columnName, object value)
{
return $"\"{columnName}\" = '{value}'";
}
public string NormalizeValue(object value)
{
return $"'{value}'";
}
}
}

9
framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/PostgreSql/AbpEntityFrameworkCorePostgreSqlModule.cs

@ -1,4 +1,6 @@
using Volo.Abp.Guids;
using Npgsql;
using Volo.Abp.EntityFrameworkCore.DistributedEvents;
using Volo.Abp.Guids;
using Volo.Abp.Modularity;
namespace Volo.Abp.EntityFrameworkCore.PostgreSql
@ -17,6 +19,11 @@ namespace Volo.Abp.EntityFrameworkCore.PostgreSql
options.DefaultSequentialGuidType = SequentialGuidType.SequentialAsString;
}
});
Configure<AbpEfCoreDistributedEventBusOptions>(options =>
{
options.SqlAdapters.TryAdd(nameof(NpgsqlConnection).ToLower(), new PostgreSqlAdapter());
});
}
}
}

5
framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/AbpEntityFrameworkCoreModule.cs

@ -26,6 +26,11 @@ namespace Volo.Abp.EntityFrameworkCore
});
});
Configure<AbpEfCoreDistributedEventBusOptions>(options =>
{
options.SqlAdapters.Add(DefaultSqlAdapter.Name, new DefaultSqlAdapter());
});
context.Services.TryAddTransient(typeof(IDbContextProvider<>), typeof(UnitOfWorkDbContextProvider<>));
context.Services.AddTransient(typeof(IDbContextEventOutbox<>), typeof(DbContextEventOutbox<>));
context.Services.AddTransient(typeof(IDbContextEventInbox<>), typeof(DbContextEventInbox<>));

19
framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/AbpEfCoreDistributedEventBusOptions.cs

@ -0,0 +1,19 @@
using System.Collections.Generic;
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public class AbpEfCoreDistributedEventBusOptions
{
public Dictionary<string, ISqlAdapter> SqlAdapters { get; set; }
public ISqlAdapter GetSqlAdapter(string connectionType)
{
return SqlAdapters.TryGetValue(connectionType, out var sqlAdapter) ? sqlAdapter : SqlAdapters[DefaultSqlAdapter.Name];
}
public AbpEfCoreDistributedEventBusOptions()
{
SqlAdapters = new Dictionary<string, ISqlAdapter>();
}
}
}

20
framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs

@ -16,15 +16,18 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
protected IDbContextProvider<TDbContext> DbContextProvider { get; }
protected AbpDistributedEventBusOptions DistributedEventsOptions { get; }
protected AbpEfCoreDistributedEventBusOptions EfCoreDistributedEventBusOptions { get; }
protected IClock Clock { get; }
public DbContextEventInbox(
IDbContextProvider<TDbContext> dbContextProvider,
IClock clock,
IOptions<AbpDistributedEventBusOptions> distributedEventsOptions)
IOptions<AbpDistributedEventBusOptions> distributedEventsOptions,
IOptions<AbpEfCoreDistributedEventBusOptions> efCoreDistributedEventBusOptions)
{
DbContextProvider = dbContextProvider;
Clock = clock;
EfCoreDistributedEventBusOptions = efCoreDistributedEventBusOptions.Value;
DistributedEventsOptions = distributedEventsOptions.Value;
}
@ -61,8 +64,14 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
var dbContext = await DbContextProvider.GetDbContextAsync();
var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName();
var connectionName = dbContext.Database.GetDbConnection().GetType().Name.ToLower();
var sqlAdapter = EfCoreDistributedEventBusOptions.GetSqlAdapter(connectionName);
var sql = $"UPDATE {sqlAdapter.NormalizeTableName(tableName)} SET " +
$"{sqlAdapter.NormalizeColumnNameEqualsValue("Processed", 1)}, " +
$"{sqlAdapter.NormalizeColumnNameEqualsValue("ProcessedTime", Clock.Now)} WHERE " +
$"{sqlAdapter.NormalizeColumnNameEqualsValue("Id", id)}";
var sql = $"UPDATE {tableName} SET Processed = 1, ProcessedTime = '{Clock.Now}' WHERE Id = '{id}'";
await dbContext.Database.ExecuteSqlRawAsync(sql);
}
@ -79,8 +88,13 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
var dbContext = await DbContextProvider.GetDbContextAsync();
var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName();
var timeToKeepEvents = Clock.Now.Add(DistributedEventsOptions.InboxKeepEventTimeSpan);
var connectionName = dbContext.Database.GetDbConnection().GetType().Name.ToLower();
var sqlAdapter = EfCoreDistributedEventBusOptions.GetSqlAdapter(connectionName);
var sql = $"DELETE FROM {sqlAdapter.NormalizeTableName(tableName)} WHERE " +
$"{sqlAdapter.NormalizeColumnNameEqualsValue("Processed", 1)} AND " +
$"{sqlAdapter.NormalizeColumnName("CreationTime")} < {sqlAdapter.NormalizeValue(timeToKeepEvents)}";
var sql = $"DELETE FROM {tableName} WHERE Processed = 1 AND CreationTime < '{timeToKeepEvents}'";
await dbContext.Database.ExecuteSqlRawAsync(sql);
}
}

12
framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventOutbox.cs

@ -4,6 +4,7 @@ using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Options;
using Volo.Abp.EventBus.Distributed;
using Volo.Abp.Uow;
@ -13,11 +14,14 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
where TDbContext : IHasEventOutbox
{
protected IDbContextProvider<TDbContext> DbContextProvider { get; }
protected AbpEfCoreDistributedEventBusOptions EfCoreDistributedEventBusOptions { get; }
public DbContextEventOutbox(
IDbContextProvider<TDbContext> dbContextProvider)
IDbContextProvider<TDbContext> dbContextProvider,
IOptions<AbpEfCoreDistributedEventBusOptions> efCoreDistributedEventBusOptions)
{
DbContextProvider = dbContextProvider;
EfCoreDistributedEventBusOptions = efCoreDistributedEventBusOptions.Value;
}
[UnitOfWork]
@ -51,8 +55,12 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
var dbContext = (IHasEventOutbox) await DbContextProvider.GetDbContextAsync();
var tableName = dbContext.OutgoingEvents.EntityType.GetSchemaQualifiedTableName();
var connectionName = dbContext.Database.GetDbConnection().GetType().Name.ToLower();
var sqlAdapter = EfCoreDistributedEventBusOptions.GetSqlAdapter(connectionName);
var sql = $"DELETE FROM {sqlAdapter.NormalizeTableName(tableName)} WHERE " +
$"{sqlAdapter.NormalizeColumnNameEqualsValue("Id", id)}";
var sql = $"DELETE FROM {tableName} WHERE Id = '{id}'";
await dbContext.Database.ExecuteSqlRawAsync(sql);
}
}

27
framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DefaultSqlAdapter.cs

@ -0,0 +1,27 @@
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public class DefaultSqlAdapter : ISqlAdapter
{
public const string Name = "default";
public string NormalizeTableName(string tableName)
{
return tableName;
}
public string NormalizeColumnName(string columnName)
{
return columnName;
}
public string NormalizeColumnNameEqualsValue(string columnName, object value)
{
return $"{columnName} = '{value}'";
}
public string NormalizeValue(object value)
{
return $"'{value}'";
}
}
}

13
framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/ISqlAdapter.cs

@ -0,0 +1,13 @@
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public interface ISqlAdapter
{
string NormalizeTableName(string tableName);
string NormalizeColumnName(string columnName);
string NormalizeColumnNameEqualsValue(string columnName, object value);
string NormalizeValue(object value);
}
}
Loading…
Cancel
Save