Browse Source

Refactor

pull/10159/head
liangshiwei 4 years ago
parent
commit
b809ee50e3
  1. 13
      framework/src/Volo.Abp.EntityFrameworkCore.MySQL/Volo/Abp/EntityFrameworkCore/DistributedEvents/MySQLInboxConfigExtensions.cs
  2. 13
      framework/src/Volo.Abp.EntityFrameworkCore.MySQL/Volo/Abp/EntityFrameworkCore/DistributedEvents/MySQLOutboxConfigExtensions.cs
  3. 8
      framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/IOracleDbContextEventInbox.cs
  4. 7
      framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/IOracleDbContextEventOutbox.cs
  5. 47
      framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventInbox.cs
  6. 31
      framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventOutbox.cs
  7. 13
      framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleInboxConfigExtensions.cs
  8. 13
      framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleOutboxConfigExtensions.cs
  9. 7
      framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/Oracle/Devart/AbpEntityFrameworkCoreOracleDevartModule.cs
  10. 8
      framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/IOracleDbContextEventInbox.cs
  11. 7
      framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/IOracleDbContextEventOutbox.cs
  12. 47
      framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventInbox.cs
  13. 31
      framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventOutbox.cs
  14. 13
      framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleInboxConfigExtensions.cs
  15. 13
      framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleOutboxConfigExtensions.cs
  16. 7
      framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/Oracle/AbpEntityFrameworkCoreOracleModule.cs
  17. 8
      framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/IPostgreSqlDbContextEventInbox.cs
  18. 7
      framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/IPostgreSqlDbContextEventOutbox.cs
  19. 25
      framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlAdapter.cs
  20. 43
      framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlDbContextEventInbox.cs
  21. 25
      framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlDbContextEventOutbox.cs
  22. 13
      framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlInboxConfigExtensions.cs
  23. 13
      framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlOutboxConfigExtensions.cs
  24. 8
      framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/PostgreSql/AbpEntityFrameworkCorePostgreSqlModule.cs
  25. 13
      framework/src/Volo.Abp.EntityFrameworkCore.SqlServer/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqlServerInboxConfigExtensions.cs
  26. 13
      framework/src/Volo.Abp.EntityFrameworkCore.SqlServer/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqlServerOutboxConfigExtensions.cs
  27. 13
      framework/src/Volo.Abp.EntityFrameworkCore.Sqlite/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqliteInboxConfigExtensions.cs
  28. 13
      framework/src/Volo.Abp.EntityFrameworkCore.Sqlite/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqliteOutboxConfigExtensions.cs
  29. 9
      framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/AbpEntityFrameworkCoreModule.cs
  30. 19
      framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/AbpEfCoreDistributedEventBusOptions.cs
  31. 33
      framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs
  32. 19
      framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventOutbox.cs
  33. 27
      framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DefaultSqlAdapter.cs
  34. 13
      framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/ISqlAdapter.cs
  35. 7
      framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/ISqlRawDbContextEventInbox.cs
  36. 7
      framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/ISqlRawDbContextEventOutbox.cs
  37. 43
      framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqlRawDbContextEventInbox.cs
  38. 26
      framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqlRawDbContextEventOutbox.cs

13
framework/src/Volo.Abp.EntityFrameworkCore.MySQL/Volo/Abp/EntityFrameworkCore/DistributedEvents/MySQLInboxConfigExtensions.cs

@ -0,0 +1,13 @@
using Volo.Abp.EventBus.Distributed;
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public static class MySQLInboxConfigExtensions
{
public static void UseMySQL<TDbContext>(this InboxConfig outboxConfig)
where TDbContext : IHasEventInbox
{
outboxConfig.ImplementationType = typeof(ISqlRawDbContextEventInbox<TDbContext>);
}
}
}

13
framework/src/Volo.Abp.EntityFrameworkCore.MySQL/Volo/Abp/EntityFrameworkCore/DistributedEvents/MySQLOutboxConfigExtensions.cs

@ -0,0 +1,13 @@
using Volo.Abp.EventBus.Distributed;
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public static class MySQLOutboxConfigExtensions
{
public static void UseMySQL<TDbContext>(this OutboxConfig outboxConfig)
where TDbContext : IHasEventOutbox
{
outboxConfig.ImplementationType = typeof(ISqlRawDbContextEventOutbox<TDbContext>);
}
}
}

8
framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/IOracleDbContextEventInbox.cs

@ -0,0 +1,8 @@
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public interface IOracleDbContextEventInbox<TDbContext> : IDbContextEventInbox<TDbContext>
where TDbContext : IHasEventInbox
{
}
}

7
framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/IOracleDbContextEventOutbox.cs

@ -0,0 +1,7 @@
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public interface IOracleDbContextEventOutbox<TDbContext> : IDbContextEventOutbox<TDbContext>
where TDbContext : IHasEventOutbox
{
}
}

47
framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventInbox.cs

@ -0,0 +1,47 @@
using System;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Options;
using Volo.Abp.EventBus.Distributed;
using Volo.Abp.Timing;
using Volo.Abp.Uow;
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public class OracleDbContextEventInbox<TDbContext> : DbContextEventInbox<TDbContext> , IOracleDbContextEventInbox<TDbContext>
where TDbContext : IHasEventInbox
{
public OracleDbContextEventInbox(
IDbContextProvider<TDbContext> dbContextProvider,
IClock clock,
IOptions<AbpDistributedEventBusOptions> distributedEventsOptions) : base(dbContextProvider, clock, distributedEventsOptions)
{
}
[UnitOfWork]
public override async Task MarkAsProcessedAsync(Guid id)
{
var dbContext = await DbContextProvider.GetDbContextAsync();
var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName();
var sql = $"UPDATE \"{tableName}\" SET \"Processed\" = '1', \"ProcessedTime\" = TO_DATE('{Clock.Now}', 'yyyy-mm-dd hh24:mi:ss') WHERE \"Id\" = HEXTORAW('{GuidToOracleType(id)}')";
await dbContext.Database.ExecuteSqlRawAsync(sql);
}
[UnitOfWork]
public override async Task DeleteOldEventsAsync()
{
var dbContext = await DbContextProvider.GetDbContextAsync();
var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName();
var timeToKeepEvents = Clock.Now.Add(DistributedEventsOptions.InboxKeepEventTimeSpan);
var sql = $"DELETE FROM \"{tableName}\" WHERE \"Processed\" = '1' AND \"CreationTime\" < TO_DATE('{timeToKeepEvents}', 'yyyy-mm-dd hh24:mi:ss')";
await dbContext.Database.ExecuteSqlRawAsync(sql);
}
protected virtual string GuidToOracleType(Guid id)
{
return BitConverter.ToString(id.ToByteArray()).Replace("-", "").ToUpper();
}
}
}

31
framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventOutbox.cs

@ -0,0 +1,31 @@
using System;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Volo.Abp.Uow;
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public class OracleDbContextEventOutbox<TDbContext> : DbContextEventOutbox<TDbContext> , IOracleDbContextEventOutbox<TDbContext>
where TDbContext : IHasEventOutbox
{
public OracleDbContextEventOutbox(IDbContextProvider<TDbContext> dbContextProvider)
: base(dbContextProvider)
{
}
[UnitOfWork]
public override async Task DeleteAsync(Guid id)
{
var dbContext = (IHasEventOutbox) await DbContextProvider.GetDbContextAsync();
var tableName = dbContext.OutgoingEvents.EntityType.GetSchemaQualifiedTableName();
var sql = $"DELETE FROM \"{tableName}\" WHERE \"Id\" = HEXTORAW('{GuidToOracleType(id)}')";
await dbContext.Database.ExecuteSqlRawAsync(sql);
}
protected virtual string GuidToOracleType(Guid id)
{
return BitConverter.ToString(id.ToByteArray()).Replace("-", "").ToUpper();
}
}
}

13
framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleInboxConfigExtensions.cs

@ -0,0 +1,13 @@
using Volo.Abp.EventBus.Distributed;
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public static class OracleInboxConfigExtensions
{
public static void UseOracle<TDbContext>(this InboxConfig outboxConfig)
where TDbContext : IHasEventInbox
{
outboxConfig.ImplementationType = typeof(IOracleDbContextEventInbox<TDbContext>);
}
}
}

13
framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleOutboxConfigExtensions.cs

@ -0,0 +1,13 @@
using Volo.Abp.EventBus.Distributed;
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public static class OracleOutboxConfigExtensions
{
public static void UseOracle<TDbContext>(this OutboxConfig outboxConfig)
where TDbContext : IHasEventOutbox
{
outboxConfig.ImplementationType = typeof(IOracleDbContextEventOutbox<TDbContext>);
}
}
}

7
framework/src/Volo.Abp.EntityFrameworkCore.Oracle.Devart/Volo/Abp/EntityFrameworkCore/Oracle/Devart/AbpEntityFrameworkCoreOracleDevartModule.cs

@ -1,4 +1,6 @@
using Volo.Abp.Guids;
using Microsoft.Extensions.DependencyInjection;
using Volo.Abp.EntityFrameworkCore.DistributedEvents;
using Volo.Abp.Guids;
using Volo.Abp.Modularity;
namespace Volo.Abp.EntityFrameworkCore.Oracle.Devart
@ -17,6 +19,9 @@ namespace Volo.Abp.EntityFrameworkCore.Oracle.Devart
options.DefaultSequentialGuidType = SequentialGuidType.SequentialAsBinary;
}
});
context.Services.AddTransient(typeof(IOracleDbContextEventOutbox<>), typeof(OracleDbContextEventOutbox<>));
context.Services.AddTransient(typeof(IOracleDbContextEventInbox<>), typeof(OracleDbContextEventInbox<>));
}
}
}

8
framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/IOracleDbContextEventInbox.cs

@ -0,0 +1,8 @@
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public interface IOracleDbContextEventInbox<TDbContext> : IDbContextEventInbox<TDbContext>
where TDbContext : IHasEventInbox
{
}
}

7
framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/IOracleDbContextEventOutbox.cs

@ -0,0 +1,7 @@
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public interface IOracleDbContextEventOutbox<TDbContext> : IDbContextEventOutbox<TDbContext>
where TDbContext : IHasEventOutbox
{
}
}

47
framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventInbox.cs

@ -0,0 +1,47 @@
using System;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Options;
using Volo.Abp.EventBus.Distributed;
using Volo.Abp.Timing;
using Volo.Abp.Uow;
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public class OracleDbContextEventInbox<TDbContext> : DbContextEventInbox<TDbContext> , IOracleDbContextEventInbox<TDbContext>
where TDbContext : IHasEventInbox
{
public OracleDbContextEventInbox(
IDbContextProvider<TDbContext> dbContextProvider,
IClock clock,
IOptions<AbpDistributedEventBusOptions> distributedEventsOptions) : base(dbContextProvider, clock, distributedEventsOptions)
{
}
[UnitOfWork]
public override async Task MarkAsProcessedAsync(Guid id)
{
var dbContext = await DbContextProvider.GetDbContextAsync();
var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName();
var sql = $"UPDATE \"{tableName}\" SET \"Processed\" = '1', \"ProcessedTime\" = TO_DATE('{Clock.Now}', 'yyyy-mm-dd hh24:mi:ss') WHERE \"Id\" = HEXTORAW('{GuidToOracleType(id)}')";
await dbContext.Database.ExecuteSqlRawAsync(sql);
}
[UnitOfWork]
public override async Task DeleteOldEventsAsync()
{
var dbContext = await DbContextProvider.GetDbContextAsync();
var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName();
var timeToKeepEvents = Clock.Now.Add(DistributedEventsOptions.InboxKeepEventTimeSpan);
var sql = $"DELETE FROM \"{tableName}\" WHERE \"Processed\" = '1' AND \"CreationTime\" < TO_DATE('{timeToKeepEvents}', 'yyyy-mm-dd hh24:mi:ss')";
await dbContext.Database.ExecuteSqlRawAsync(sql);
}
protected virtual string GuidToOracleType(Guid id)
{
return BitConverter.ToString(id.ToByteArray()).Replace("-", "").ToUpper();
}
}
}

31
framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleDbContextEventOutbox.cs

@ -0,0 +1,31 @@
using System;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Volo.Abp.Uow;
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public class OracleDbContextEventOutbox<TDbContext> : DbContextEventOutbox<TDbContext> , IOracleDbContextEventOutbox<TDbContext>
where TDbContext : IHasEventOutbox
{
public OracleDbContextEventOutbox(IDbContextProvider<TDbContext> dbContextProvider)
: base(dbContextProvider)
{
}
[UnitOfWork]
public override async Task DeleteAsync(Guid id)
{
var dbContext = (IHasEventOutbox) await DbContextProvider.GetDbContextAsync();
var tableName = dbContext.OutgoingEvents.EntityType.GetSchemaQualifiedTableName();
var sql = $"DELETE FROM \"{tableName}\" WHERE \"Id\" = HEXTORAW('{GuidToOracleType(id)}')";
await dbContext.Database.ExecuteSqlRawAsync(sql);
}
protected virtual string GuidToOracleType(Guid id)
{
return BitConverter.ToString(id.ToByteArray()).Replace("-", "").ToUpper();
}
}
}

13
framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleInboxConfigExtensions.cs

@ -0,0 +1,13 @@
using Volo.Abp.EventBus.Distributed;
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public static class OracleInboxConfigExtensions
{
public static void UseOracle<TDbContext>(this InboxConfig outboxConfig)
where TDbContext : IHasEventInbox
{
outboxConfig.ImplementationType = typeof(IOracleDbContextEventInbox<TDbContext>);
}
}
}

13
framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/DistributedEvents/OracleOutboxConfigExtensions.cs

@ -0,0 +1,13 @@
using Volo.Abp.EventBus.Distributed;
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public static class OracleOutboxConfigExtensions
{
public static void UseOracle<TDbContext>(this OutboxConfig outboxConfig)
where TDbContext : IHasEventOutbox
{
outboxConfig.ImplementationType = typeof(IOracleDbContextEventOutbox<TDbContext>);
}
}
}

7
framework/src/Volo.Abp.EntityFrameworkCore.Oracle/Volo/Abp/EntityFrameworkCore/Oracle/AbpEntityFrameworkCoreOracleModule.cs

@ -1,4 +1,6 @@
using Volo.Abp.Guids;
using Microsoft.Extensions.DependencyInjection;
using Volo.Abp.EntityFrameworkCore.DistributedEvents;
using Volo.Abp.Guids;
using Volo.Abp.Modularity;
namespace Volo.Abp.EntityFrameworkCore.Oracle
@ -15,6 +17,9 @@ namespace Volo.Abp.EntityFrameworkCore.Oracle
options.DefaultSequentialGuidType = SequentialGuidType.SequentialAsBinary;
}
});
context.Services.AddTransient(typeof(IOracleDbContextEventOutbox<>), typeof(OracleDbContextEventOutbox<>));
context.Services.AddTransient(typeof(IOracleDbContextEventInbox<>), typeof(OracleDbContextEventInbox<>));
}
}
}

8
framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/IPostgreSqlDbContextEventInbox.cs

@ -0,0 +1,8 @@
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public interface IPostgreSqlDbContextEventInbox<TDbContext> : IDbContextEventInbox<TDbContext>
where TDbContext : IHasEventInbox
{
}
}

7
framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/IPostgreSqlDbContextEventOutbox.cs

@ -0,0 +1,7 @@
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public interface IPostgreSqlDbContextEventOutbox<TDbContext> : IDbContextEventOutbox<TDbContext>
where TDbContext : IHasEventOutbox
{
}
}

25
framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlAdapter.cs

@ -1,25 +0,0 @@
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public class PostgreSqlAdapter : ISqlAdapter
{
public string NormalizeTableName(string tableName)
{
return $"\"{tableName}\"";
}
public string NormalizeColumnName(string columnName)
{
return $"\"{columnName}\"";
}
public string NormalizeColumnNameEqualsValue(string columnName, object value)
{
return $"\"{columnName}\" = '{value}'";
}
public string NormalizeValue(object value)
{
return $"'{value}'";
}
}
}

43
framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlDbContextEventInbox.cs

@ -0,0 +1,43 @@
using System;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Options;
using Volo.Abp.EventBus.Distributed;
using Volo.Abp.Timing;
using Volo.Abp.Uow;
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public class PostgreSqlDbContextEventInbox<TDbContext> : DbContextEventInbox<TDbContext>, IPostgreSqlDbContextEventInbox<TDbContext>
where TDbContext : IHasEventInbox
{
public PostgreSqlDbContextEventInbox(
IDbContextProvider<TDbContext> dbContextProvider,
IClock clock,
IOptions<AbpDistributedEventBusOptions> distributedEventsOptions)
: base(dbContextProvider, clock, distributedEventsOptions)
{
}
[UnitOfWork]
public override async Task MarkAsProcessedAsync(Guid id)
{
var dbContext = await DbContextProvider.GetDbContextAsync();
var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName();
var sql = $"UPDATE \"{tableName}\" SET \"Processed\" = '1', \"ProcessedTime\" = '{Clock.Now}' WHERE \"Id\" = '{id}'";
await dbContext.Database.ExecuteSqlRawAsync(sql);
}
[UnitOfWork]
public override async Task DeleteOldEventsAsync()
{
var dbContext = await DbContextProvider.GetDbContextAsync();
var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName();
var timeToKeepEvents = Clock.Now.Add(DistributedEventsOptions.InboxKeepEventTimeSpan);
var sql = $"DELETE FROM \"{tableName}\" WHERE \"Processed\" = '1' AND \"CreationTime\" < '{timeToKeepEvents}'";
await dbContext.Database.ExecuteSqlRawAsync(sql);
}
}
}

25
framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlDbContextEventOutbox.cs

@ -0,0 +1,25 @@
using System;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Volo.Abp.Uow;
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public class PostgreSqlDbContextEventOutbox<TDbContext> : DbContextEventOutbox<TDbContext> , IPostgreSqlDbContextEventOutbox<TDbContext>
where TDbContext : IHasEventOutbox
{
public PostgreSqlDbContextEventOutbox(IDbContextProvider<TDbContext> dbContextProvider) : base(dbContextProvider)
{
}
[UnitOfWork]
public override async Task DeleteAsync(Guid id)
{
var dbContext = (IHasEventOutbox) await DbContextProvider.GetDbContextAsync();
var tableName = dbContext.OutgoingEvents.EntityType.GetSchemaQualifiedTableName();
var sql = $"DELETE FROM \"{tableName}\" WHERE \"Id\" = '{id}'";
await dbContext.Database.ExecuteSqlRawAsync(sql);
}
}
}

13
framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlInboxConfigExtensions.cs

@ -0,0 +1,13 @@
using Volo.Abp.EventBus.Distributed;
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public static class PostgreSqlInboxConfigExtensions
{
public static void UsePostgreSql<TDbContext>(this InboxConfig outboxConfig)
where TDbContext : IHasEventInbox
{
outboxConfig.ImplementationType = typeof(IPostgreSqlDbContextEventInbox<TDbContext>);
}
}
}

13
framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/DistributedEvents/PostgreSqlOutboxConfigExtensions.cs

@ -0,0 +1,13 @@
using Volo.Abp.EventBus.Distributed;
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public static class PostgreSqlOutboxConfigExtensions
{
public static void UsePostgreSql<TDbContext>(this OutboxConfig outboxConfig)
where TDbContext : IHasEventOutbox
{
outboxConfig.ImplementationType = typeof(IPostgreSqlDbContextEventOutbox<TDbContext>);
}
}
}

8
framework/src/Volo.Abp.EntityFrameworkCore.PostgreSql/Volo/Abp/EntityFrameworkCore/PostgreSql/AbpEntityFrameworkCorePostgreSqlModule.cs

@ -1,4 +1,4 @@
using Npgsql;
using Microsoft.Extensions.DependencyInjection;
using Volo.Abp.EntityFrameworkCore.DistributedEvents;
using Volo.Abp.Guids;
using Volo.Abp.Modularity;
@ -20,10 +20,8 @@ namespace Volo.Abp.EntityFrameworkCore.PostgreSql
}
});
Configure<AbpEfCoreDistributedEventBusOptions>(options =>
{
options.SqlAdapters.TryAdd(nameof(NpgsqlConnection).ToLower(), new PostgreSqlAdapter());
});
context.Services.AddTransient(typeof(IPostgreSqlDbContextEventOutbox<>), typeof(PostgreSqlDbContextEventOutbox<>));
context.Services.AddTransient(typeof(IPostgreSqlDbContextEventInbox<>), typeof(PostgreSqlDbContextEventInbox<>));
}
}
}

13
framework/src/Volo.Abp.EntityFrameworkCore.SqlServer/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqlServerInboxConfigExtensions.cs

@ -0,0 +1,13 @@
using Volo.Abp.EventBus.Distributed;
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public static class SqlServerInboxConfigExtensions
{
public static void UseSqlServer<TDbContext>(this InboxConfig outboxConfig)
where TDbContext : IHasEventInbox
{
outboxConfig.ImplementationType = typeof(ISqlRawDbContextEventInbox<TDbContext>);
}
}
}

13
framework/src/Volo.Abp.EntityFrameworkCore.SqlServer/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqlServerOutboxConfigExtensions.cs

@ -0,0 +1,13 @@
using Volo.Abp.EventBus.Distributed;
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public static class SqlServerOutboxConfigExtensions
{
public static void UseSqlServer<TDbContext>(this OutboxConfig outboxConfig)
where TDbContext : IHasEventOutbox
{
outboxConfig.ImplementationType = typeof(ISqlRawDbContextEventOutbox<TDbContext>);
}
}
}

13
framework/src/Volo.Abp.EntityFrameworkCore.Sqlite/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqliteInboxConfigExtensions.cs

@ -0,0 +1,13 @@
using Volo.Abp.EventBus.Distributed;
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public static class SqliteInboxConfigExtensions
{
public static void UseSqlite<TDbContext>(this InboxConfig outboxConfig)
where TDbContext : IHasEventInbox
{
outboxConfig.ImplementationType = typeof(ISqlRawDbContextEventInbox<TDbContext>);
}
}
}

13
framework/src/Volo.Abp.EntityFrameworkCore.Sqlite/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqliteOutboxConfigExtensions.cs

@ -0,0 +1,13 @@
using Volo.Abp.EventBus.Distributed;
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public static class SqliteOutboxConfigExtensions
{
public static void UseSqlite<TDbContext>(this OutboxConfig outboxConfig)
where TDbContext : IHasEventOutbox
{
outboxConfig.ImplementationType = typeof(ISqlRawDbContextEventOutbox<TDbContext>);
}
}
}

9
framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/AbpEntityFrameworkCoreModule.cs

@ -5,6 +5,7 @@ using Volo.Abp.Domain;
using Volo.Abp.EntityFrameworkCore.DependencyInjection;
using Volo.Abp.EntityFrameworkCore.DistributedEvents;
using Volo.Abp.Modularity;
using Volo.Abp.Uow;
using Volo.Abp.Uow.EntityFrameworkCore;
namespace Volo.Abp.EntityFrameworkCore
@ -26,14 +27,12 @@ namespace Volo.Abp.EntityFrameworkCore
});
});
Configure<AbpEfCoreDistributedEventBusOptions>(options =>
{
options.SqlAdapters.Add(DefaultSqlAdapter.Name, new DefaultSqlAdapter());
});
context.Services.TryAddTransient(typeof(IDbContextProvider<>), typeof(UnitOfWorkDbContextProvider<>));
context.Services.AddTransient(typeof(IDbContextEventOutbox<>), typeof(DbContextEventOutbox<>));
context.Services.AddTransient(typeof(IDbContextEventInbox<>), typeof(DbContextEventInbox<>));
context.Services.AddTransient(typeof(ISqlRawDbContextEventOutbox<>), typeof(SqlRawDbContextEventOutbox<>));
context.Services.AddTransient(typeof(ISqlRawDbContextEventInbox<>), typeof(SqlRawDbContextEventInbox<>));
}
}
}

19
framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/AbpEfCoreDistributedEventBusOptions.cs

@ -1,19 +0,0 @@
using System.Collections.Generic;
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public class AbpEfCoreDistributedEventBusOptions
{
public Dictionary<string, ISqlAdapter> SqlAdapters { get; set; }
public ISqlAdapter GetSqlAdapter(string connectionType)
{
return SqlAdapters.TryGetValue(connectionType, out var sqlAdapter) ? sqlAdapter : SqlAdapters[DefaultSqlAdapter.Name];
}
public AbpEfCoreDistributedEventBusOptions()
{
SqlAdapters = new Dictionary<string, ISqlAdapter>();
}
}
}

33
framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs

@ -16,18 +16,15 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
protected IDbContextProvider<TDbContext> DbContextProvider { get; }
protected AbpDistributedEventBusOptions DistributedEventsOptions { get; }
protected AbpEfCoreDistributedEventBusOptions EfCoreDistributedEventBusOptions { get; }
protected IClock Clock { get; }
public DbContextEventInbox(
IDbContextProvider<TDbContext> dbContextProvider,
IClock clock,
IOptions<AbpDistributedEventBusOptions> distributedEventsOptions,
IOptions<AbpEfCoreDistributedEventBusOptions> efCoreDistributedEventBusOptions)
IOptions<AbpDistributedEventBusOptions> distributedEventsOptions)
{
DbContextProvider = dbContextProvider;
Clock = clock;
EfCoreDistributedEventBusOptions = efCoreDistributedEventBusOptions.Value;
DistributedEventsOptions = distributedEventsOptions.Value;
}
@ -63,16 +60,11 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
public virtual async Task MarkAsProcessedAsync(Guid id)
{
var dbContext = await DbContextProvider.GetDbContextAsync();
var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName();
var connectionName = dbContext.Database.GetDbConnection().GetType().Name.ToLower();
var sqlAdapter = EfCoreDistributedEventBusOptions.GetSqlAdapter(connectionName);
var sql = $"UPDATE {sqlAdapter.NormalizeTableName(tableName)} SET " +
$"{sqlAdapter.NormalizeColumnNameEqualsValue("Processed", 1)}, " +
$"{sqlAdapter.NormalizeColumnNameEqualsValue("ProcessedTime", Clock.Now)} WHERE " +
$"{sqlAdapter.NormalizeColumnNameEqualsValue("Id", id)}";
await dbContext.Database.ExecuteSqlRawAsync(sql);
var incomingEvent = await dbContext.IncomingEvents.FindAsync(id);
if (incomingEvent != null)
{
incomingEvent.MarkAsProcessed(Clock.Now);
}
}
[UnitOfWork]
@ -86,16 +78,11 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
public virtual async Task DeleteOldEventsAsync()
{
var dbContext = await DbContextProvider.GetDbContextAsync();
var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName();
var timeToKeepEvents = Clock.Now.Add(DistributedEventsOptions.InboxKeepEventTimeSpan);
var connectionName = dbContext.Database.GetDbConnection().GetType().Name.ToLower();
var sqlAdapter = EfCoreDistributedEventBusOptions.GetSqlAdapter(connectionName);
var sql = $"DELETE FROM {sqlAdapter.NormalizeTableName(tableName)} WHERE " +
$"{sqlAdapter.NormalizeColumnNameEqualsValue("Processed", 1)} AND " +
$"{sqlAdapter.NormalizeColumnName("CreationTime")} < {sqlAdapter.NormalizeValue(timeToKeepEvents)}";
await dbContext.Database.ExecuteSqlRawAsync(sql);
var oldEvents = await dbContext.IncomingEvents
.Where(x => x.Processed && x.CreationTime < timeToKeepEvents)
.ToListAsync();
dbContext.IncomingEvents.RemoveRange(oldEvents);
}
}
}

19
framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventOutbox.cs

@ -4,7 +4,6 @@ using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Options;
using Volo.Abp.EventBus.Distributed;
using Volo.Abp.Uow;
@ -14,14 +13,11 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
where TDbContext : IHasEventOutbox
{
protected IDbContextProvider<TDbContext> DbContextProvider { get; }
protected AbpEfCoreDistributedEventBusOptions EfCoreDistributedEventBusOptions { get; }
public DbContextEventOutbox(
IDbContextProvider<TDbContext> dbContextProvider,
IOptions<AbpEfCoreDistributedEventBusOptions> efCoreDistributedEventBusOptions)
IDbContextProvider<TDbContext> dbContextProvider)
{
DbContextProvider = dbContextProvider;
EfCoreDistributedEventBusOptions = efCoreDistributedEventBusOptions.Value;
}
[UnitOfWork]
@ -54,14 +50,11 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
public virtual async Task DeleteAsync(Guid id)
{
var dbContext = (IHasEventOutbox) await DbContextProvider.GetDbContextAsync();
var tableName = dbContext.OutgoingEvents.EntityType.GetSchemaQualifiedTableName();
var connectionName = dbContext.Database.GetDbConnection().GetType().Name.ToLower();
var sqlAdapter = EfCoreDistributedEventBusOptions.GetSqlAdapter(connectionName);
var sql = $"DELETE FROM {sqlAdapter.NormalizeTableName(tableName)} WHERE " +
$"{sqlAdapter.NormalizeColumnNameEqualsValue("Id", id)}";
await dbContext.Database.ExecuteSqlRawAsync(sql);
var outgoingEvent = await dbContext.OutgoingEvents.FindAsync(id);
if (outgoingEvent != null)
{
dbContext.Remove(outgoingEvent);
}
}
}
}

27
framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DefaultSqlAdapter.cs

@ -1,27 +0,0 @@
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public class DefaultSqlAdapter : ISqlAdapter
{
public const string Name = "default";
public string NormalizeTableName(string tableName)
{
return tableName;
}
public string NormalizeColumnName(string columnName)
{
return columnName;
}
public string NormalizeColumnNameEqualsValue(string columnName, object value)
{
return $"{columnName} = '{value}'";
}
public string NormalizeValue(object value)
{
return $"'{value}'";
}
}
}

13
framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/ISqlAdapter.cs

@ -1,13 +0,0 @@
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public interface ISqlAdapter
{
string NormalizeTableName(string tableName);
string NormalizeColumnName(string columnName);
string NormalizeColumnNameEqualsValue(string columnName, object value);
string NormalizeValue(object value);
}
}

7
framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/ISqlRawDbContextEventInbox.cs

@ -0,0 +1,7 @@
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public interface ISqlRawDbContextEventInbox<TDbContext> : IDbContextEventInbox<TDbContext>
where TDbContext : IHasEventInbox
{
}
}

7
framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/ISqlRawDbContextEventOutbox.cs

@ -0,0 +1,7 @@
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public interface ISqlRawDbContextEventOutbox<TDbContext> : IDbContextEventOutbox<TDbContext>
where TDbContext : IHasEventOutbox
{
}
}

43
framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqlRawDbContextEventInbox.cs

@ -0,0 +1,43 @@
using System;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Options;
using Volo.Abp.EventBus.Distributed;
using Volo.Abp.Timing;
using Volo.Abp.Uow;
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public class SqlRawDbContextEventInbox<TDbContext> : DbContextEventInbox<TDbContext> , ISqlRawDbContextEventInbox<TDbContext>
where TDbContext : IHasEventInbox
{
public SqlRawDbContextEventInbox(
IDbContextProvider<TDbContext> dbContextProvider,
IClock clock,
IOptions<AbpDistributedEventBusOptions> distributedEventsOptions)
: base(dbContextProvider, clock, distributedEventsOptions)
{
}
[UnitOfWork]
public override async Task MarkAsProcessedAsync(Guid id)
{
var dbContext = await DbContextProvider.GetDbContextAsync();
var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName();
var sql = $"UPDATE {tableName} SET Processed = '1', ProcessedTime = '{Clock.Now}' WHERE Id = '{id}'";
await dbContext.Database.ExecuteSqlRawAsync(sql);
}
[UnitOfWork]
public override async Task DeleteOldEventsAsync()
{
var dbContext = await DbContextProvider.GetDbContextAsync();
var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName();
var timeToKeepEvents = Clock.Now.Add(DistributedEventsOptions.InboxKeepEventTimeSpan);
var sql = $"DELETE FROM {tableName} WHERE Processed = '1' AND CreationTime < '{timeToKeepEvents}'";
await dbContext.Database.ExecuteSqlRawAsync(sql);
}
}
}

26
framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/SqlRawDbContextEventOutbox.cs

@ -0,0 +1,26 @@
using System;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Volo.Abp.Uow;
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public class SqlRawDbContextEventOutbox<TDbContext> : DbContextEventOutbox<TDbContext> , ISqlRawDbContextEventOutbox<TDbContext>
where TDbContext : IHasEventOutbox
{
public SqlRawDbContextEventOutbox(IDbContextProvider<TDbContext> dbContextProvider)
: base(dbContextProvider)
{
}
[UnitOfWork]
public override async Task DeleteAsync(Guid id)
{
var dbContext = (IHasEventOutbox) await DbContextProvider.GetDbContextAsync();
var tableName = dbContext.OutgoingEvents.EntityType.GetSchemaQualifiedTableName();
var sql = $"DELETE FROM {tableName} WHERE Id = '{id}'";
await dbContext.Database.ExecuteSqlRawAsync(sql);
}
}
}
Loading…
Cancel
Save