using LINGYUN.Abp.Notifications; using LY.MicroService.RealtimeMessage.BackgroundJobs; using Microsoft.Extensions.Localization; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Extensions.Options; using System; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; using Volo.Abp.BackgroundJobs; using Volo.Abp.DependencyInjection; using Volo.Abp.EventBus.Distributed; using Volo.Abp.Json; using Volo.Abp.MultiTenancy; using Volo.Abp.TextTemplating; using Volo.Abp.Uow; namespace LY.MicroService.RealtimeMessage.EventBus.Distributed { /// /// 订阅通知发布事件,统一发布消息 /// /// /// 作用在于SignalR客户端只会与一台服务器建立连接, /// 只有启用了SignlR服务端的才能真正将消息发布到客户端 /// public class NotificationEventHandler : IDistributedEventHandler>, IDistributedEventHandler>, ITransientDependency { /// /// Reference to . /// public ILogger Logger { get; set; } /// /// Reference to . /// protected AbpNotificationOptions Options { get; } /// /// Reference to . /// protected ICurrentTenant CurrentTenant { get; } /// /// Reference to . /// protected IJsonSerializer JsonSerializer { get; } /// /// Reference to . /// protected IBackgroundJobManager BackgroundJobManager { get; } /// /// Reference to . /// protected ITemplateRenderer TemplateRenderer { get; } /// /// Reference to . /// protected INotificationStore NotificationStore { get; } /// /// Reference to . /// protected IStringLocalizerFactory StringLocalizerFactory { get; } /// /// Reference to . /// protected INotificationDefinitionManager NotificationDefinitionManager { get; } /// /// Reference to . /// protected INotificationSubscriptionManager NotificationSubscriptionManager { get; } /// /// Reference to . /// protected INotificationPublishProviderManager NotificationPublishProviderManager { get; } /// /// Initializes a new instance of the class. /// public NotificationEventHandler( ICurrentTenant currentTenant, IJsonSerializer jsonSerializer, ITemplateRenderer templateRenderer, IBackgroundJobManager backgroundJobManager, IStringLocalizerFactory stringLocalizerFactory, IOptions options, INotificationStore notificationStore, INotificationDefinitionManager notificationDefinitionManager, INotificationSubscriptionManager notificationSubscriptionManager, INotificationPublishProviderManager notificationPublishProviderManager) { Options = options.Value; CurrentTenant = currentTenant; JsonSerializer = jsonSerializer; TemplateRenderer = templateRenderer; BackgroundJobManager = backgroundJobManager; StringLocalizerFactory = stringLocalizerFactory; NotificationStore = notificationStore; NotificationDefinitionManager = notificationDefinitionManager; NotificationSubscriptionManager = notificationSubscriptionManager; NotificationPublishProviderManager = notificationPublishProviderManager; Logger = NullLogger.Instance; } [UnitOfWork] public async virtual Task HandleEventAsync(NotificationEto eventData) { using (CurrentTenant.Change(eventData.TenantId)) { var notification = NotificationDefinitionManager.GetOrNull(eventData.Name); if (notification == null) { return; } var notificationInfo = new NotificationInfo { Name = notification.Name, TenantId = eventData.TenantId, Severity = eventData.Severity, Type = notification.NotificationType, CreationTime = eventData.CreationTime, Lifetime = notification.NotificationLifetime, }; notificationInfo.SetId(eventData.Id); var title = notification.DisplayName.Localize(StringLocalizerFactory); var message = await TemplateRenderer.RenderAsync( templateName: eventData.Data.Name, model: eventData.Data.ExtraProperties, cultureName: eventData.Data.Culture, globalContext: new Dictionary { { "$notification", notification.Name }, { "$formUser", eventData.Data.FormUser }, { "$notificationId", eventData.Id }, { "$title", title.ToString() }, { "$creationTime", eventData.CreationTime.ToString("yyyy-MM-dd HH:mm:ss") }, }); var notificationData = new NotificationData(); notificationData.WriteStandardData( title: title, message: message, createTime: eventData.CreationTime, formUser: eventData.Data.FormUser); notificationData.ExtraProperties.AddIfNotContains(eventData.Data.ExtraProperties); notificationInfo.Data = notificationData; Logger.LogDebug($"Persistent notification {notificationInfo.Name}"); // 持久化通知 await NotificationStore.InsertNotificationAsync(notificationInfo); var providers = Enumerable.Reverse(NotificationPublishProviderManager.Providers); // 过滤用户指定提供者 if (notification.Providers.Any()) { providers = providers.Where(p => notification.Providers.Contains(p.Name)); } await PublishFromProvidersAsync(providers, eventData.Users, notificationInfo); } } [UnitOfWork] public async virtual Task HandleEventAsync(NotificationEto eventData) { using (CurrentTenant.Change(eventData.TenantId)) { // 如果上面过滤了应用程序,这里可以使用Get方法,否则,最好使用GetOrNull加以判断 var notification = NotificationDefinitionManager.GetOrNull(eventData.Name); if (notification == null) { return; } var notificationInfo = new NotificationInfo { Name = notification.Name, CreationTime = eventData.CreationTime, Data = eventData.Data, Severity = eventData.Severity, Lifetime = notification.NotificationLifetime, TenantId = eventData.TenantId, Type = notification.NotificationType }; notificationInfo.SetId(eventData.Id); // TODO: 可以做成一个接口来序列化消息 notificationInfo.Data = NotificationDataConverter.Convert(notificationInfo.Data); Logger.LogDebug($"Persistent notification {notificationInfo.Name}"); // 持久化通知 await NotificationStore.InsertNotificationAsync(notificationInfo); var providers = Enumerable.Reverse(NotificationPublishProviderManager.Providers); // 过滤用户指定提供者 if (notification.Providers.Any()) { providers = providers.Where(p => notification.Providers.Contains(p.Name)); } await PublishFromProvidersAsync(providers, eventData.Users, notificationInfo); } } /// /// 指定提供者发布通知 /// /// 提供者列表 /// 通知信息 /// protected async Task PublishFromProvidersAsync( IEnumerable providers, IEnumerable users, NotificationInfo notificationInfo) { // 检查是够已订阅消息 Logger.LogDebug($"Gets a list of user subscriptions {notificationInfo.Name}"); // 获取用户订阅列表 var userSubscriptions = await NotificationSubscriptionManager .GetUsersSubscriptionsAsync(notificationInfo.TenantId, notificationInfo.Name, users); users = userSubscriptions.Select(us => new UserIdentifier(us.UserId, us.UserName)); if (users.Any()) { // 持久化用户通知 Logger.LogDebug($"Persistent user notifications {notificationInfo.Name}"); await NotificationStore .InsertUserNotificationsAsync( notificationInfo, users.Select(u => u.UserId)); // 2020-11-02 fix bug, 多个发送提供者处于同一个工作单元之下,不能把删除用户订阅写入到单个通知提供者完成事件中 // 而且为了确保一致性,删除订阅移动到发布通知之前 if (notificationInfo.Lifetime == NotificationLifetime.OnlyOne) { // 一次性通知在发送完成后就取消用户订阅 await NotificationStore .DeleteUserSubscriptionAsync( notificationInfo.TenantId, users, notificationInfo.Name); } // 发布通知 foreach (var provider in providers) { await PublishAsync(provider, notificationInfo, users); } } } /// /// 发布通知 /// /// 通知发布者 /// 通知信息 /// 订阅用户列表 /// protected async Task PublishAsync( INotificationPublishProvider provider, NotificationInfo notificationInfo, IEnumerable subscriptionUserIdentifiers) { try { Logger.LogDebug($"Sending notification with provider {provider.Name}"); var notifacationDataMapping = Options.NotificationDataMappings .GetMapItemOrDefault(provider.Name, notificationInfo.Name); if (notifacationDataMapping != null) { notificationInfo.Data = notifacationDataMapping.MappingFunc(notificationInfo.Data); } // 发布 await provider.PublishAsync(notificationInfo, subscriptionUserIdentifiers); Logger.LogDebug($"Send notification {notificationInfo.Name} with provider {provider.Name} was successful"); } catch (Exception ex) { Logger.LogWarning($"Send notification error with provider {provider.Name}"); Logger.LogWarning($"Error message:{ex.Message}"); Logger.LogTrace(ex, $"Send notification error with provider { provider.Name}"); Logger.LogDebug($"Send notification error, notification {notificationInfo.Name} entry queue"); // 发送失败的消息进入后台队列 await BackgroundJobManager.EnqueueAsync( new NotificationPublishJobArgs( notificationInfo.GetId(), provider.GetType().AssemblyQualifiedName, subscriptionUserIdentifiers.ToList(), notificationInfo.TenantId)); } } } }