55 changed files with 1824 additions and 994 deletions
@ -1,7 +1,7 @@ |
|||
<Project> |
|||
|
|||
<PropertyGroup> |
|||
<AbpPackageVersion>4.3.3</AbpPackageVersion> |
|||
<AbpPackageVersion>4.4.0</AbpPackageVersion> |
|||
</PropertyGroup> |
|||
|
|||
</Project> |
|||
@ -0,0 +1,129 @@ |
|||
using System; |
|||
using System.Collections.Generic; |
|||
using System.Linq; |
|||
using System.Reflection; |
|||
using DotNetCore.CAP; |
|||
using DotNetCore.CAP.Internal; |
|||
using Microsoft.Extensions.DependencyInjection; |
|||
using Microsoft.Extensions.Options; |
|||
using Volo.Abp.DependencyInjection; |
|||
using Volo.Abp.EventBus; |
|||
using Volo.Abp.EventBus.Distributed; |
|||
|
|||
namespace CompanyName.ProjectName.CAP |
|||
{ |
|||
[Dependency(ServiceLifetime.Singleton, ReplaceServices = true)] |
|||
[ExposeServices(typeof(IConsumerServiceSelector))] |
|||
public class BeeAbpCapConsumerServiceSelector : ConsumerServiceSelector |
|||
{ |
|||
protected AbpDistributedEventBusOptions AbpDistributedEventBusOptions { get; } |
|||
protected IServiceProvider ServiceProvider { get; } |
|||
|
|||
/// <summary>
|
|||
/// Creates a new <see cref="T:DotNetCore.CAP.Internal.ConsumerServiceSelector" />.
|
|||
/// </summary>
|
|||
public BeeAbpCapConsumerServiceSelector( |
|||
IServiceProvider serviceProvider, |
|||
IOptions<AbpDistributedEventBusOptions> distributedEventBusOptions) |
|||
: base(serviceProvider) |
|||
{ |
|||
ServiceProvider = serviceProvider; |
|||
AbpDistributedEventBusOptions = distributedEventBusOptions.Value; |
|||
} |
|||
|
|||
protected override IEnumerable<ConsumerExecutorDescriptor> FindConsumersFromInterfaceTypes(IServiceProvider provider) |
|||
{ |
|||
var executorDescriptorList = base.FindConsumersFromInterfaceTypes(provider).ToList(); |
|||
|
|||
//handlers
|
|||
var handlers = AbpDistributedEventBusOptions.Handlers; |
|||
|
|||
foreach (var handler in handlers) |
|||
{ |
|||
var interfaces = handler.GetInterfaces(); |
|||
foreach (var @interface in interfaces) |
|||
{ |
|||
if (!typeof(IEventHandler).GetTypeInfo().IsAssignableFrom(@interface)) |
|||
{ |
|||
continue; |
|||
} |
|||
var genericArgs = @interface.GetGenericArguments(); |
|||
|
|||
if (genericArgs.Length != 1) |
|||
{ |
|||
continue; |
|||
} |
|||
|
|||
var descriptors = GetHandlerDescription(genericArgs[0], handler); |
|||
|
|||
foreach (var descriptor in descriptors) |
|||
{ |
|||
var count = executorDescriptorList.Count(x => |
|||
x.Attribute.Name == descriptor.Attribute.Name); |
|||
|
|||
descriptor.Attribute.Group = descriptor.Attribute.Group.Insert( |
|||
descriptor.Attribute.Group.LastIndexOf(".", StringComparison.Ordinal), $".{count}"); |
|||
|
|||
executorDescriptorList.Add(descriptor); |
|||
} |
|||
|
|||
//Subscribe(genericArgs[0], new IocEventHandlerFactory(ServiceScopeFactory, handler));
|
|||
} |
|||
} |
|||
return executorDescriptorList; |
|||
} |
|||
|
|||
protected virtual IEnumerable<ConsumerExecutorDescriptor> GetHandlerDescription(Type eventType,Type typeInfo) |
|||
{ |
|||
var serviceTypeInfo = typeof(IDistributedEventHandler<>) |
|||
.MakeGenericType(eventType); |
|||
var method = typeInfo |
|||
.GetMethod( |
|||
nameof(IDistributedEventHandler<object>.HandleEventAsync), |
|||
new[] { eventType } |
|||
); |
|||
var eventName = EventNameAttribute.GetNameOrDefault(eventType); |
|||
var topicAttr = method.GetCustomAttributes<TopicAttribute>(true); |
|||
var topicAttributes = topicAttr.ToList(); |
|||
|
|||
if (topicAttributes.Count == 0) |
|||
{ |
|||
topicAttributes.Add(new CapSubscribeAttribute(eventName)); |
|||
} |
|||
|
|||
foreach (var attr in topicAttributes) |
|||
{ |
|||
SetSubscribeAttribute(attr); |
|||
|
|||
var parameters = method.GetParameters() |
|||
.Select(parameter => new ParameterDescriptor |
|||
{ |
|||
Name = parameter.Name, |
|||
ParameterType = parameter.ParameterType, |
|||
IsFromCap = parameter.GetCustomAttributes(typeof(FromCapAttribute)).Any() |
|||
}).ToList(); |
|||
|
|||
yield return InitDescriptor(attr, method, typeInfo.GetTypeInfo(), serviceTypeInfo.GetTypeInfo(), parameters); |
|||
} |
|||
} |
|||
|
|||
private static ConsumerExecutorDescriptor InitDescriptor( |
|||
TopicAttribute attr, |
|||
MethodInfo methodInfo, |
|||
TypeInfo implType, |
|||
TypeInfo serviceTypeInfo, |
|||
IList<ParameterDescriptor> parameters) |
|||
{ |
|||
var descriptor = new ConsumerExecutorDescriptor |
|||
{ |
|||
Attribute = attr, |
|||
MethodInfo = methodInfo, |
|||
ImplTypeInfo = implType, |
|||
ServiceTypeInfo = serviceTypeInfo, |
|||
Parameters = parameters |
|||
}; |
|||
|
|||
return descriptor; |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,156 @@ |
|||
using System; |
|||
using System.Collections.Concurrent; |
|||
using System.Collections.Generic; |
|||
using System.Linq; |
|||
using System.Threading.Tasks; |
|||
using DotNetCore.CAP; |
|||
using Microsoft.Extensions.DependencyInjection; |
|||
using Microsoft.Extensions.Options; |
|||
using Volo.Abp; |
|||
using Volo.Abp.DependencyInjection; |
|||
using Volo.Abp.EventBus; |
|||
using Volo.Abp.EventBus.Distributed; |
|||
using Volo.Abp.MultiTenancy; |
|||
using Volo.Abp.Threading; |
|||
|
|||
|
|||
namespace CompanyName.ProjectName.CAP |
|||
{ |
|||
public class AbpCapDistributedEventBus : |
|||
EventBusBase, |
|||
IDistributedEventBus, |
|||
ISingletonDependency |
|||
{ |
|||
protected AbpDistributedEventBusOptions AbpDistributedEventBusOptions { get; } |
|||
protected ConcurrentDictionary<Type, List<IEventHandlerFactory>> HandlerFactories { get; } |
|||
protected ConcurrentDictionary<string, Type> EventTypes { get; } |
|||
|
|||
protected readonly ICapPublisher CapPublisher; |
|||
|
|||
public AbpCapDistributedEventBus(IServiceScopeFactory serviceScopeFactory, |
|||
IOptions<AbpDistributedEventBusOptions> distributedEventBusOptions, |
|||
ICapPublisher capPublisher, |
|||
ICurrentTenant currentTenant, |
|||
IEventErrorHandler errorHandler) |
|||
: base(serviceScopeFactory, currentTenant, errorHandler) |
|||
{ |
|||
CapPublisher = capPublisher; |
|||
AbpDistributedEventBusOptions = distributedEventBusOptions.Value; |
|||
HandlerFactories = new ConcurrentDictionary<Type, List<IEventHandlerFactory>>(); |
|||
EventTypes = new ConcurrentDictionary<string, Type>(); |
|||
} |
|||
|
|||
public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory) |
|||
{ |
|||
//This is handled by CAP ConsumerServiceSelector
|
|||
throw new NotImplementedException(); |
|||
} |
|||
|
|||
public override void Unsubscribe<TEvent>(Func<TEvent, Task> action) |
|||
{ |
|||
Check.NotNull(action, nameof(action)); |
|||
|
|||
GetOrCreateHandlerFactories(typeof(TEvent)) |
|||
.Locking(factories => |
|||
{ |
|||
factories.RemoveAll( |
|||
factory => |
|||
{ |
|||
var singleInstanceFactory = factory as SingleInstanceHandlerFactory; |
|||
if (singleInstanceFactory == null) |
|||
{ |
|||
return false; |
|||
} |
|||
|
|||
var actionHandler = singleInstanceFactory.HandlerInstance as ActionEventHandler<TEvent>; |
|||
if (actionHandler == null) |
|||
{ |
|||
return false; |
|||
} |
|||
|
|||
return actionHandler.Action == action; |
|||
}); |
|||
}); |
|||
} |
|||
|
|||
public override void Unsubscribe(Type eventType, IEventHandler handler) |
|||
{ |
|||
GetOrCreateHandlerFactories(eventType) |
|||
.Locking(factories => |
|||
{ |
|||
factories.RemoveAll( |
|||
factory => |
|||
factory is SingleInstanceHandlerFactory && |
|||
(factory as SingleInstanceHandlerFactory).HandlerInstance == handler |
|||
); |
|||
}); |
|||
} |
|||
|
|||
public override void Unsubscribe(Type eventType, IEventHandlerFactory factory) |
|||
{ |
|||
GetOrCreateHandlerFactories(eventType).Locking(factories => factories.Remove(factory)); |
|||
} |
|||
|
|||
public override void UnsubscribeAll(Type eventType) |
|||
{ |
|||
GetOrCreateHandlerFactories(eventType).Locking(factories => factories.Clear()); |
|||
} |
|||
|
|||
public IDisposable Subscribe<TEvent>(IDistributedEventHandler<TEvent> handler) where TEvent : class |
|||
{ |
|||
return Subscribe(typeof(TEvent), handler); |
|||
} |
|||
|
|||
|
|||
public override async Task PublishAsync(Type eventType, object eventData) |
|||
{ |
|||
var eventName = EventNameAttribute.GetNameOrDefault(eventType); |
|||
await CapPublisher.PublishAsync(eventName, eventData); |
|||
} |
|||
|
|||
protected override IEnumerable<EventTypeWithEventHandlerFactories> GetHandlerFactories(Type eventType) |
|||
{ |
|||
var handlerFactoryList = new List<EventTypeWithEventHandlerFactories>(); |
|||
|
|||
foreach (var handlerFactory in |
|||
HandlerFactories.Where(hf => ShouldTriggerEventForHandler(eventType, hf.Key))) |
|||
{ |
|||
handlerFactoryList.Add( |
|||
new EventTypeWithEventHandlerFactories(handlerFactory.Key, handlerFactory.Value)); |
|||
} |
|||
|
|||
return handlerFactoryList.ToArray(); |
|||
} |
|||
|
|||
private List<IEventHandlerFactory> GetOrCreateHandlerFactories(Type eventType) |
|||
{ |
|||
return HandlerFactories.GetOrAdd( |
|||
eventType, |
|||
type => |
|||
{ |
|||
var eventName = EventNameAttribute.GetNameOrDefault(type); |
|||
EventTypes[eventName] = type; |
|||
return new List<IEventHandlerFactory>(); |
|||
} |
|||
); |
|||
} |
|||
|
|||
private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType) |
|||
{ |
|||
//Should trigger same type
|
|||
if (handlerEventType == targetEventType) |
|||
{ |
|||
return true; |
|||
} |
|||
|
|||
//TODO: Support inheritance? But it does not support on subscription to RabbitMq!
|
|||
//Should trigger for inherited types
|
|||
if (handlerEventType.IsAssignableFrom(targetEventType)) |
|||
{ |
|||
return true; |
|||
} |
|||
|
|||
return false; |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,10 @@ |
|||
using Volo.Abp.EventBus; |
|||
using Volo.Abp.Modularity; |
|||
|
|||
namespace CompanyName.ProjectName.CAP |
|||
{ |
|||
[DependsOn(typeof(AbpEventBusModule))] |
|||
public class AbpCapModule : AbpModule |
|||
{ |
|||
} |
|||
} |
|||
@ -0,0 +1,22 @@ |
|||
using System; |
|||
using DotNetCore.CAP; |
|||
using DotNetCore.CAP.Internal; |
|||
using Microsoft.Extensions.DependencyInjection; |
|||
using Volo.Abp.EventBus.Distributed; |
|||
using Volo.Abp.Modularity; |
|||
|
|||
namespace CompanyName.ProjectName.CAP |
|||
{ |
|||
public static class AbpCapServiceCollectionExtensions |
|||
{ |
|||
public static ServiceConfigurationContext AddAbpCap( |
|||
this ServiceConfigurationContext context, |
|||
Action<CapOptions> capAction) |
|||
{ |
|||
context.Services.AddCap(capAction); |
|||
context.Services.AddSingleton<IConsumerServiceSelector, BeeAbpCapConsumerServiceSelector>(); |
|||
context.Services.AddSingleton<IDistributedEventBus, AbpCapDistributedEventBus>(); |
|||
return context; |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,13 @@ |
|||
<Project Sdk="Microsoft.NET.Sdk"> |
|||
|
|||
<PropertyGroup> |
|||
<TargetFramework>net5.0</TargetFramework> |
|||
</PropertyGroup> |
|||
|
|||
<ItemGroup> |
|||
<PackageReference Include="DotNetCore.CAP" Version="5.1.3" /> |
|||
<PackageReference Include="Volo.Abp.Core" Version="4.4.0" /> |
|||
<PackageReference Include="Volo.Abp.EventBus" Version="4.4.0" /> |
|||
</ItemGroup> |
|||
|
|||
</Project> |
|||
@ -0,0 +1,22 @@ |
|||
using System.Threading.Tasks; |
|||
using CompanyName.ProjectName.NotificationManagement.Notifications.Dtos; |
|||
|
|||
namespace CompanyName.ProjectName.NotificationManagement.Hubs |
|||
{ |
|||
public interface INotificationHub |
|||
{ |
|||
/// <summary>
|
|||
/// 发送普通消息
|
|||
/// </summary>
|
|||
/// <param name="input"></param>
|
|||
/// <returns></returns>
|
|||
Task ReceiveTextMessageAsync(SendNotificationDto input); |
|||
|
|||
/// <summary>
|
|||
/// 发送广播消息
|
|||
/// </summary>
|
|||
/// <param name="input"></param>
|
|||
/// <returns></returns>
|
|||
Task ReceiveBroadCastMessageAsync(SendNotificationDto input); |
|||
} |
|||
} |
|||
@ -0,0 +1,44 @@ |
|||
using System; |
|||
using System.Collections.Generic; |
|||
using System.ComponentModel.DataAnnotations; |
|||
|
|||
namespace CompanyName.ProjectName.NotificationManagement.Notifications.Dtos |
|||
{ |
|||
public class CreateNotificationInput : IValidatableObject |
|||
{ |
|||
/// <summary>
|
|||
/// 消息标题
|
|||
/// </summary>
|
|||
public string Title { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// 消息内容
|
|||
/// </summary>
|
|||
public string Content { get; set; } |
|||
|
|||
|
|||
/// <summary>
|
|||
/// 消息类型
|
|||
/// </summary>
|
|||
public MessageType MessageType { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// 接收人
|
|||
/// 如果消息类型是广播消息,接收人字段为空
|
|||
/// </summary>
|
|||
public List<Guid> ReceiveIds { get; set; } |
|||
|
|||
public CreateNotificationInput() |
|||
{ |
|||
ReceiveIds = new List<Guid>(); |
|||
} |
|||
|
|||
public IEnumerable<ValidationResult> Validate(ValidationContext validationContext) |
|||
{ |
|||
if (MessageType == MessageType.BroadCast && ReceiveIds.Count > 0) |
|||
{ |
|||
yield return new ValidationResult("当消息类型为广播消息是,接收人列表只能为空", new[] {"ReceiveIds"}); |
|||
} |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,23 @@ |
|||
namespace CompanyName.ProjectName.NotificationManagement.Notifications.Dtos |
|||
{ |
|||
public class SendNotificationDto |
|||
{ |
|||
public string Title { get; set; } |
|||
|
|||
public string Content { get; set; } |
|||
|
|||
public MessageType MessageType { get; set; } |
|||
|
|||
private SendNotificationDto() |
|||
{ |
|||
|
|||
} |
|||
|
|||
public SendNotificationDto(string title, string content, MessageType messageType) |
|||
{ |
|||
Title = title; |
|||
Content = content; |
|||
MessageType = messageType; |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,36 @@ |
|||
using System.Collections.Generic; |
|||
using System.Threading.Tasks; |
|||
using CompanyName.ProjectName.NotificationManagement.Notifications.Dtos; |
|||
using Volo.Abp.Application.Services; |
|||
|
|||
namespace CompanyName.ProjectName.NotificationManagement.Notifications |
|||
{ |
|||
public interface INotificationAppService:IApplicationService |
|||
{ |
|||
/// <summary>
|
|||
/// 发送消息到客户端
|
|||
/// </summary>
|
|||
/// <returns></returns>
|
|||
Task SendMessageAsync(string title, string content, MessageType messageType, List<string> users); |
|||
|
|||
/// <summary>
|
|||
/// 发送消息到客户端
|
|||
/// </summary>
|
|||
/// <returns></returns>
|
|||
Task SendMessageToClientByUserIdAsync(SendNotificationDto sendNotificationDto, List<string> userIds); |
|||
|
|||
/// <summary>
|
|||
/// 发送消息到所有客户端
|
|||
/// </summary>
|
|||
/// <param name="sendNotificationDto"></param>
|
|||
Task SendMessageToAllClientAsync(SendNotificationDto sendNotificationDto); |
|||
|
|||
/// <summary>
|
|||
/// 创建一个消息
|
|||
/// 测试使用
|
|||
/// </summary>
|
|||
/// <param name="input"></param>
|
|||
/// <returns></returns>
|
|||
Task CreateAsync(CreateNotificationInput input); |
|||
} |
|||
} |
|||
@ -0,0 +1,14 @@ |
|||
using Microsoft.AspNetCore.Authorization; |
|||
using Volo.Abp.AspNetCore.SignalR; |
|||
using Volo.Abp.Auditing; |
|||
|
|||
namespace CompanyName.ProjectName.NotificationManagement.Hubs |
|||
{ |
|||
[HubRoute("SignalR/Notification")] |
|||
[Authorize] |
|||
[DisableAuditing] |
|||
public class NotificationHub : AbpHub<INotificationHub> |
|||
{ |
|||
|
|||
} |
|||
} |
|||
@ -0,0 +1,80 @@ |
|||
using System.Collections.Generic; |
|||
using System.Linq; |
|||
using System.Threading.Tasks; |
|||
using CompanyName.ProjectName.NotificationManagement.Hubs; |
|||
using CompanyName.ProjectName.NotificationManagement.Notifications.Dtos; |
|||
using Microsoft.AspNetCore.SignalR; |
|||
using Volo.Abp; |
|||
using Volo.Abp.Users; |
|||
|
|||
namespace CompanyName.ProjectName.NotificationManagement.Notifications |
|||
{ |
|||
public class NotificationAppService : NotificationManagementAppService, INotificationAppService |
|||
{ |
|||
private readonly IHubContext<NotificationHub, INotificationHub> _hubContext; |
|||
private readonly NotificationManager _notificationManager; |
|||
private readonly ICurrentUser _currentUser; |
|||
|
|||
public NotificationAppService( |
|||
IHubContext<NotificationHub, INotificationHub> hubContext, NotificationManager notificationManager, |
|||
ICurrentUser currentUser) |
|||
{ |
|||
_hubContext = hubContext; |
|||
_notificationManager = notificationManager; |
|||
_currentUser = currentUser; |
|||
} |
|||
|
|||
/// <summary>
|
|||
/// 发送消息
|
|||
/// </summary>
|
|||
public async Task SendMessageAsync(string title, string content, MessageType messageType, List<string> users) |
|||
{ |
|||
switch (messageType) |
|||
{ |
|||
case MessageType.Text: |
|||
await SendMessageToClientByUserIdAsync(new SendNotificationDto(title, content, messageType), users); |
|||
break; |
|||
case MessageType.BroadCast: |
|||
await SendMessageToAllClientAsync(new SendNotificationDto(title, content, messageType)); |
|||
break; |
|||
default: |
|||
throw new UserFriendlyException("未知的消息类型"); |
|||
} |
|||
} |
|||
|
|||
|
|||
/// <summary>
|
|||
/// 发送消息指定客户端用户
|
|||
/// </summary>
|
|||
/// <param name="sendNotificationDto"></param>
|
|||
/// <param name="users"></param>
|
|||
/// <returns></returns>
|
|||
public async Task SendMessageToClientByUserIdAsync(SendNotificationDto sendNotificationDto, |
|||
List<string> users) |
|||
{ |
|||
if (users is {Count: > 0}) |
|||
{ |
|||
await _hubContext.Clients |
|||
.Users(users.AsReadOnly().ToList()) |
|||
.ReceiveTextMessageAsync(sendNotificationDto); |
|||
} |
|||
} |
|||
|
|||
/// <summary>
|
|||
/// 发送消息到所有客户端
|
|||
/// 广播消息
|
|||
/// </summary>
|
|||
/// <param name="sendNotificationDto"></param>
|
|||
public async Task SendMessageToAllClientAsync(SendNotificationDto sendNotificationDto) |
|||
{ |
|||
await _hubContext.Clients.All.ReceiveBroadCastMessageAsync(sendNotificationDto); |
|||
} |
|||
|
|||
|
|||
public async Task CreateAsync(CreateNotificationInput input) |
|||
{ |
|||
await _notificationManager.CreateAsync(input.Title, input.Content, _currentUser.Id.Value, input.ReceiveIds, |
|||
input.MessageType); |
|||
} |
|||
} |
|||
} |
|||
File diff suppressed because it is too large
Loading…
Reference in new issue