mirror of https://github.com/abpframework/abp.git
24 changed files with 537 additions and 17 deletions
@ -0,0 +1,47 @@ |
|||
using System.Collections.Generic; |
|||
using System.Linq; |
|||
using System.Threading.Tasks; |
|||
using Microsoft.EntityFrameworkCore; |
|||
using Volo.Abp.EventBus.Distributed; |
|||
using Volo.Abp.Uow; |
|||
|
|||
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents |
|||
{ |
|||
public class DbContextEventInbox<TDbContext> : IDbContextEventInbox<TDbContext> |
|||
where TDbContext : IHasEventInbox |
|||
{ |
|||
protected IDbContextProvider<TDbContext> DbContextProvider { get; } |
|||
|
|||
public DbContextEventInbox( |
|||
IDbContextProvider<TDbContext> dbContextProvider) |
|||
{ |
|||
DbContextProvider = dbContextProvider; |
|||
} |
|||
|
|||
[UnitOfWork] |
|||
public virtual async Task EnqueueAsync(IncomingEventInfo incomingEvent) |
|||
{ |
|||
var dbContext = (IHasEventInbox) await DbContextProvider.GetDbContextAsync(); |
|||
dbContext.IncomingEvents.Add( |
|||
new IncomingEventRecord(incomingEvent) |
|||
); |
|||
} |
|||
|
|||
[UnitOfWork] |
|||
public virtual async Task<List<IncomingEventInfo>> GetWaitingEventsAsync(int maxCount) |
|||
{ |
|||
var dbContext = (IHasEventInbox) await DbContextProvider.GetDbContextAsync(); |
|||
|
|||
var outgoingEventRecords = await dbContext |
|||
.IncomingEvents |
|||
.AsNoTracking() |
|||
.OrderBy(x => x.CreationTime) |
|||
.Take(maxCount) |
|||
.ToListAsync(); |
|||
|
|||
return outgoingEventRecords |
|||
.Select(x => x.ToIncomingEventInfo()) |
|||
.ToList(); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,13 @@ |
|||
using Volo.Abp.EventBus.Distributed; |
|||
|
|||
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents |
|||
{ |
|||
public static class EfCoreInboxConfigExtensions |
|||
{ |
|||
public static void UseDbContext<TDbContext>(this InboxConfig outboxConfig) |
|||
where TDbContext : IHasEventInbox |
|||
{ |
|||
outboxConfig.ImplementationType = typeof(IDbContextEventInbox<TDbContext>); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,21 @@ |
|||
using JetBrains.Annotations; |
|||
using Microsoft.EntityFrameworkCore; |
|||
using Volo.Abp.Data; |
|||
using Volo.Abp.EntityFrameworkCore.Modeling; |
|||
|
|||
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents |
|||
{ |
|||
public static class EventInboxDbContextModelBuilderExtensions |
|||
{ |
|||
public static void ConfigureEventInbox([NotNull] this ModelBuilder builder) |
|||
{ |
|||
builder.Entity<IncomingEventRecord>(b => |
|||
{ |
|||
b.ToTable(AbpCommonDbProperties.DbTablePrefix + "EventInbox", AbpCommonDbProperties.DbSchema); |
|||
b.ConfigureByConvention(); |
|||
b.Property(x => x.EventName).IsRequired().HasMaxLength(IncomingEventRecord.MaxEventNameLength); |
|||
b.Property(x => x.EventData).IsRequired(); |
|||
}); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,10 @@ |
|||
using Volo.Abp.EventBus.Distributed; |
|||
|
|||
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents |
|||
{ |
|||
public interface IDbContextEventInbox<TDbContext> : IEventInbox |
|||
where TDbContext : IHasEventInbox |
|||
{ |
|||
|
|||
} |
|||
} |
|||
@ -0,0 +1,9 @@ |
|||
using Microsoft.EntityFrameworkCore; |
|||
|
|||
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents |
|||
{ |
|||
public interface IHasEventInbox : IEfCoreDbContext |
|||
{ |
|||
DbSet<IncomingEventRecord> IncomingEvents { get; set; } |
|||
} |
|||
} |
|||
@ -0,0 +1,52 @@ |
|||
using System; |
|||
using Volo.Abp.Auditing; |
|||
using Volo.Abp.Data; |
|||
using Volo.Abp.Domain.Entities; |
|||
using Volo.Abp.EventBus.Distributed; |
|||
|
|||
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents |
|||
{ |
|||
public class IncomingEventRecord : |
|||
BasicAggregateRoot<Guid>, |
|||
IHasExtraProperties, |
|||
IHasCreationTime |
|||
{ |
|||
public static int MaxEventNameLength { get; set; } = 256; |
|||
|
|||
public ExtraPropertyDictionary ExtraProperties { get; private set; } |
|||
|
|||
public string EventName { get; private set; } |
|||
|
|||
public byte[] EventData { get; private set; } |
|||
|
|||
public DateTime CreationTime { get; private set; } |
|||
|
|||
protected IncomingEventRecord() |
|||
{ |
|||
ExtraProperties = new ExtraPropertyDictionary(); |
|||
this.SetDefaultsForExtraProperties(); |
|||
} |
|||
|
|||
public IncomingEventRecord( |
|||
IncomingEventInfo eventInfo) |
|||
: base(eventInfo.Id) |
|||
{ |
|||
EventName = eventInfo.EventName; |
|||
EventData = eventInfo.EventData; |
|||
CreationTime = eventInfo.CreationTime; |
|||
|
|||
ExtraProperties = new ExtraPropertyDictionary(); |
|||
this.SetDefaultsForExtraProperties(); |
|||
} |
|||
|
|||
public IncomingEventInfo ToIncomingEventInfo() |
|||
{ |
|||
return new IncomingEventInfo( |
|||
Id, |
|||
EventName, |
|||
EventData, |
|||
CreationTime |
|||
); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,12 @@ |
|||
using System.Collections.Generic; |
|||
using System.Threading.Tasks; |
|||
|
|||
namespace Volo.Abp.EventBus.Distributed |
|||
{ |
|||
public interface IEventInbox |
|||
{ |
|||
Task EnqueueAsync(IncomingEventInfo incomingEvent); |
|||
|
|||
Task<List<IncomingEventInfo>> GetWaitingEventsAsync(int maxCount); |
|||
} |
|||
} |
|||
@ -0,0 +1,28 @@ |
|||
using System; |
|||
using JetBrains.Annotations; |
|||
|
|||
namespace Volo.Abp.EventBus.Distributed |
|||
{ |
|||
public class InboxConfig |
|||
{ |
|||
[NotNull] |
|||
public string Name { get; } |
|||
|
|||
public Type ImplementationType { get; set; } |
|||
|
|||
public Func<Type, bool> EventSelector { get; set; } |
|||
|
|||
public Func<Type, bool> HandlerSelector { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// Used to enable/disable processing incoming events.
|
|||
/// Default: true.
|
|||
/// </summary>
|
|||
public bool IsProcessingEnabled { get; set; } = true; |
|||
|
|||
public InboxConfig([NotNull] string name) |
|||
{ |
|||
Name = Check.NotNullOrWhiteSpace(name, nameof(name)); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,19 @@ |
|||
using System; |
|||
using System.Collections.Generic; |
|||
|
|||
namespace Volo.Abp.EventBus.Distributed |
|||
{ |
|||
public class InboxConfigDictionary : Dictionary<string, InboxConfig> |
|||
{ |
|||
public void Configure(Action<InboxConfig> configAction) |
|||
{ |
|||
Configure("Default", configAction); |
|||
} |
|||
|
|||
public void Configure(string outboxName, Action<InboxConfig> configAction) |
|||
{ |
|||
var outboxConfig = this.GetOrAdd(outboxName, () => new InboxConfig(outboxName)); |
|||
configAction(outboxConfig); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,40 @@ |
|||
using System; |
|||
using Volo.Abp.Data; |
|||
|
|||
namespace Volo.Abp.EventBus.Distributed |
|||
{ |
|||
public class IncomingEventInfo : IHasExtraProperties |
|||
{ |
|||
public static int MaxEventNameLength { get; set; } = 256; |
|||
|
|||
public ExtraPropertyDictionary ExtraProperties { get; protected set; } |
|||
|
|||
public Guid Id { get; } |
|||
|
|||
public string EventName { get; } |
|||
|
|||
public byte[] EventData { get; } |
|||
|
|||
public DateTime CreationTime { get; } |
|||
|
|||
protected IncomingEventInfo() |
|||
{ |
|||
ExtraProperties = new ExtraPropertyDictionary(); |
|||
this.SetDefaultsForExtraProperties(); |
|||
} |
|||
|
|||
public IncomingEventInfo( |
|||
Guid id, |
|||
string eventName, |
|||
byte[] eventData, |
|||
DateTime creationTime) |
|||
{ |
|||
Id = id; |
|||
EventName = Check.NotNullOrWhiteSpace(eventName, nameof(eventName), MaxEventNameLength); |
|||
EventData = eventData; |
|||
CreationTime = creationTime; |
|||
ExtraProperties = new ExtraPropertyDictionary(); |
|||
this.SetDefaultsForExtraProperties(); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,149 @@ |
|||
// <auto-generated />
|
|||
using System; |
|||
using DistDemoApp; |
|||
using Microsoft.EntityFrameworkCore; |
|||
using Microsoft.EntityFrameworkCore.Infrastructure; |
|||
using Microsoft.EntityFrameworkCore.Metadata; |
|||
using Microsoft.EntityFrameworkCore.Migrations; |
|||
using Microsoft.EntityFrameworkCore.Storage.ValueConversion; |
|||
using Volo.Abp.EntityFrameworkCore; |
|||
|
|||
namespace DistDemoApp.Migrations |
|||
{ |
|||
[DbContext(typeof(TodoDbContext))] |
|||
[Migration("20210909113934_Added_Inbox")] |
|||
partial class Added_Inbox |
|||
{ |
|||
protected override void BuildTargetModel(ModelBuilder modelBuilder) |
|||
{ |
|||
#pragma warning disable 612, 618
|
|||
modelBuilder |
|||
.HasAnnotation("_Abp_DatabaseProvider", EfCoreDatabaseProvider.SqlServer) |
|||
.HasAnnotation("Relational:MaxIdentifierLength", 128) |
|||
.HasAnnotation("ProductVersion", "5.0.9") |
|||
.HasAnnotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn); |
|||
|
|||
modelBuilder.Entity("DistDemoApp.TodoItem", b => |
|||
{ |
|||
b.Property<Guid>("Id") |
|||
.HasColumnType("uniqueidentifier"); |
|||
|
|||
b.Property<string>("ConcurrencyStamp") |
|||
.IsConcurrencyToken() |
|||
.HasMaxLength(40) |
|||
.HasColumnType("nvarchar(40)") |
|||
.HasColumnName("ConcurrencyStamp"); |
|||
|
|||
b.Property<DateTime>("CreationTime") |
|||
.HasColumnType("datetime2") |
|||
.HasColumnName("CreationTime"); |
|||
|
|||
b.Property<Guid?>("CreatorId") |
|||
.HasColumnType("uniqueidentifier") |
|||
.HasColumnName("CreatorId"); |
|||
|
|||
b.Property<string>("ExtraProperties") |
|||
.HasColumnType("nvarchar(max)") |
|||
.HasColumnName("ExtraProperties"); |
|||
|
|||
b.Property<string>("Text") |
|||
.IsRequired() |
|||
.HasMaxLength(128) |
|||
.HasColumnType("nvarchar(128)"); |
|||
|
|||
b.HasKey("Id"); |
|||
|
|||
b.ToTable("TodoItems"); |
|||
}); |
|||
|
|||
modelBuilder.Entity("DistDemoApp.TodoSummary", b => |
|||
{ |
|||
b.Property<int>("Id") |
|||
.ValueGeneratedOnAdd() |
|||
.HasColumnType("int") |
|||
.HasAnnotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn); |
|||
|
|||
b.Property<string>("ConcurrencyStamp") |
|||
.IsConcurrencyToken() |
|||
.HasMaxLength(40) |
|||
.HasColumnType("nvarchar(40)") |
|||
.HasColumnName("ConcurrencyStamp"); |
|||
|
|||
b.Property<byte>("Day") |
|||
.HasColumnType("tinyint"); |
|||
|
|||
b.Property<string>("ExtraProperties") |
|||
.HasColumnType("nvarchar(max)") |
|||
.HasColumnName("ExtraProperties"); |
|||
|
|||
b.Property<byte>("Month") |
|||
.HasColumnType("tinyint"); |
|||
|
|||
b.Property<int>("TotalCount") |
|||
.HasColumnType("int"); |
|||
|
|||
b.Property<int>("Year") |
|||
.HasColumnType("int"); |
|||
|
|||
b.HasKey("Id"); |
|||
|
|||
b.ToTable("TodoSummaries"); |
|||
}); |
|||
|
|||
modelBuilder.Entity("Volo.Abp.EntityFrameworkCore.DistributedEvents.IncomingEventRecord", b => |
|||
{ |
|||
b.Property<Guid>("Id") |
|||
.HasColumnType("uniqueidentifier"); |
|||
|
|||
b.Property<DateTime>("CreationTime") |
|||
.HasColumnType("datetime2") |
|||
.HasColumnName("CreationTime"); |
|||
|
|||
b.Property<byte[]>("EventData") |
|||
.IsRequired() |
|||
.HasColumnType("varbinary(max)"); |
|||
|
|||
b.Property<string>("EventName") |
|||
.IsRequired() |
|||
.HasMaxLength(256) |
|||
.HasColumnType("nvarchar(256)"); |
|||
|
|||
b.Property<string>("ExtraProperties") |
|||
.HasColumnType("nvarchar(max)") |
|||
.HasColumnName("ExtraProperties"); |
|||
|
|||
b.HasKey("Id"); |
|||
|
|||
b.ToTable("AbpEventInbox"); |
|||
}); |
|||
|
|||
modelBuilder.Entity("Volo.Abp.EntityFrameworkCore.DistributedEvents.OutgoingEventRecord", b => |
|||
{ |
|||
b.Property<Guid>("Id") |
|||
.HasColumnType("uniqueidentifier"); |
|||
|
|||
b.Property<DateTime>("CreationTime") |
|||
.HasColumnType("datetime2") |
|||
.HasColumnName("CreationTime"); |
|||
|
|||
b.Property<byte[]>("EventData") |
|||
.IsRequired() |
|||
.HasColumnType("varbinary(max)"); |
|||
|
|||
b.Property<string>("EventName") |
|||
.IsRequired() |
|||
.HasMaxLength(256) |
|||
.HasColumnType("nvarchar(256)"); |
|||
|
|||
b.Property<string>("ExtraProperties") |
|||
.HasColumnType("nvarchar(max)") |
|||
.HasColumnName("ExtraProperties"); |
|||
|
|||
b.HasKey("Id"); |
|||
|
|||
b.ToTable("AbpEventOutbox"); |
|||
}); |
|||
#pragma warning restore 612, 618
|
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,32 @@ |
|||
using System; |
|||
using Microsoft.EntityFrameworkCore.Migrations; |
|||
|
|||
namespace DistDemoApp.Migrations |
|||
{ |
|||
public partial class Added_Inbox : Migration |
|||
{ |
|||
protected override void Up(MigrationBuilder migrationBuilder) |
|||
{ |
|||
migrationBuilder.CreateTable( |
|||
name: "AbpEventInbox", |
|||
columns: table => new |
|||
{ |
|||
Id = table.Column<Guid>(type: "uniqueidentifier", nullable: false), |
|||
ExtraProperties = table.Column<string>(type: "nvarchar(max)", nullable: true), |
|||
EventName = table.Column<string>(type: "nvarchar(256)", maxLength: 256, nullable: false), |
|||
EventData = table.Column<byte[]>(type: "varbinary(max)", nullable: false), |
|||
CreationTime = table.Column<DateTime>(type: "datetime2", nullable: false) |
|||
}, |
|||
constraints: table => |
|||
{ |
|||
table.PrimaryKey("PK_AbpEventInbox", x => x.Id); |
|||
}); |
|||
} |
|||
|
|||
protected override void Down(MigrationBuilder migrationBuilder) |
|||
{ |
|||
migrationBuilder.DropTable( |
|||
name: "AbpEventInbox"); |
|||
} |
|||
} |
|||
} |
|||
Loading…
Reference in new issue