|
|
|
@ -22,13 +22,12 @@ import lombok.extern.slf4j.Slf4j; |
|
|
|
import org.springframework.beans.factory.annotation.Autowired; |
|
|
|
import org.springframework.stereotype.Service; |
|
|
|
import org.thingsboard.common.util.DonAsynchron; |
|
|
|
import org.thingsboard.rule.engine.api.MailService; |
|
|
|
import org.thingsboard.rule.engine.api.NotificationCenter; |
|
|
|
import org.thingsboard.rule.engine.api.SmsService; |
|
|
|
import org.thingsboard.server.common.data.EntityType; |
|
|
|
import org.thingsboard.server.common.data.User; |
|
|
|
import org.thingsboard.server.common.data.id.NotificationId; |
|
|
|
import org.thingsboard.server.common.data.id.NotificationRequestId; |
|
|
|
import org.thingsboard.server.common.data.id.NotificationRuleId; |
|
|
|
import org.thingsboard.server.common.data.id.NotificationTargetId; |
|
|
|
import org.thingsboard.server.common.data.id.TenantId; |
|
|
|
import org.thingsboard.server.common.data.id.UserId; |
|
|
|
@ -46,7 +45,6 @@ import org.thingsboard.server.common.data.notification.settings.NotificationSett |
|
|
|
import org.thingsboard.server.common.data.notification.targets.NotificationRecipient; |
|
|
|
import org.thingsboard.server.common.data.notification.targets.NotificationTarget; |
|
|
|
import org.thingsboard.server.common.data.notification.targets.platform.PlatformUsersNotificationTargetConfig; |
|
|
|
import org.thingsboard.server.common.data.notification.targets.platform.UsersFilterType; |
|
|
|
import org.thingsboard.server.common.data.notification.targets.slack.SlackNotificationTargetConfig; |
|
|
|
import org.thingsboard.server.common.data.notification.template.DeliveryMethodNotificationTemplate; |
|
|
|
import org.thingsboard.server.common.data.notification.template.NotificationTemplate; |
|
|
|
@ -103,59 +101,57 @@ public class DefaultNotificationCenter extends AbstractSubscriptionService imple |
|
|
|
private final NotificationsTopicService notificationsTopicService; |
|
|
|
private final TbQueueProducerProvider producerProvider; |
|
|
|
private final RateLimitService rateLimitService; |
|
|
|
private final MailService mailService; |
|
|
|
private final SmsService smsService; |
|
|
|
|
|
|
|
private Map<NotificationDeliveryMethod, NotificationChannel> channels; |
|
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
public NotificationRequest processNotificationRequest(TenantId tenantId, NotificationRequest notificationRequest) { |
|
|
|
public NotificationRequest processNotificationRequest(TenantId tenantId, NotificationRequest request) { |
|
|
|
if (!rateLimitService.checkRateLimit(tenantId, LimitedApi.NOTIFICATION_REQUEST)) { |
|
|
|
throw new TbRateLimitsException(EntityType.TENANT); |
|
|
|
} |
|
|
|
NotificationSettings settings = notificationSettingsService.findNotificationSettings(tenantId); |
|
|
|
|
|
|
|
NotificationTemplate notificationTemplate; |
|
|
|
if (notificationRequest.getTemplateId() != null) { |
|
|
|
notificationTemplate = notificationTemplateService.findNotificationTemplateById(tenantId, notificationRequest.getTemplateId()); |
|
|
|
if (request.getTemplateId() != null) { |
|
|
|
notificationTemplate = notificationTemplateService.findNotificationTemplateById(tenantId, request.getTemplateId()); |
|
|
|
} else { |
|
|
|
notificationTemplate = notificationRequest.getTemplate(); |
|
|
|
notificationTemplate = request.getTemplate(); |
|
|
|
} |
|
|
|
if (notificationTemplate == null) throw new IllegalArgumentException("Template is missing"); |
|
|
|
|
|
|
|
List<NotificationTarget> targets = notificationRequest.getTargets().stream().map(NotificationTargetId::new) |
|
|
|
List<NotificationTarget> targets = request.getTargets().stream().map(NotificationTargetId::new) |
|
|
|
.map(id -> notificationTargetService.findNotificationTargetById(tenantId, id)).collect(Collectors.toList()); |
|
|
|
Set<NotificationDeliveryMethod> availableDeliveryMethods = getAvailableDeliveryMethods(tenantId); |
|
|
|
|
|
|
|
NotificationRuleId ruleId = request.getRuleId(); |
|
|
|
notificationTemplate.getConfiguration().getDeliveryMethodsTemplates().forEach((deliveryMethod, template) -> { |
|
|
|
if (!template.isEnabled()) return; |
|
|
|
if (!availableDeliveryMethods.contains(deliveryMethod)) { |
|
|
|
throw new IllegalArgumentException("Settings for " + deliveryMethod.getName() + " are missing"); |
|
|
|
if (!channels.get(deliveryMethod).check(tenantId)) { |
|
|
|
throw new IllegalArgumentException("Unable to send notification via " + deliveryMethod.getName() + ": not configured or not working"); |
|
|
|
} |
|
|
|
if (notificationRequest.getRuleId() == null) { |
|
|
|
if (ruleId == null) { |
|
|
|
if (targets.stream().noneMatch(target -> target.getConfiguration().getType().getSupportedDeliveryMethods().contains(deliveryMethod))) { |
|
|
|
throw new IllegalArgumentException("Target for " + deliveryMethod.getName() + " delivery method is missing"); |
|
|
|
} |
|
|
|
} |
|
|
|
}); |
|
|
|
|
|
|
|
if (notificationRequest.getAdditionalConfig() != null) { |
|
|
|
NotificationRequestConfig config = notificationRequest.getAdditionalConfig(); |
|
|
|
if (config.getSendingDelayInSec() > 0 && notificationRequest.getId() == null) { |
|
|
|
notificationRequest.setStatus(NotificationRequestStatus.SCHEDULED); |
|
|
|
NotificationRequest savedNotificationRequest = notificationRequestService.saveNotificationRequest(tenantId, notificationRequest); |
|
|
|
forwardToNotificationSchedulerService(tenantId, savedNotificationRequest.getId()); |
|
|
|
return savedNotificationRequest; |
|
|
|
if (request.getAdditionalConfig() != null) { |
|
|
|
NotificationRequestConfig config = request.getAdditionalConfig(); |
|
|
|
if (config.getSendingDelayInSec() > 0 && request.getId() == null) { |
|
|
|
request.setStatus(NotificationRequestStatus.SCHEDULED); |
|
|
|
request = notificationRequestService.saveNotificationRequest(tenantId, request); |
|
|
|
forwardToNotificationSchedulerService(tenantId, request.getId()); |
|
|
|
return request; |
|
|
|
} |
|
|
|
} |
|
|
|
NotificationSettings settings = notificationSettingsService.findNotificationSettings(tenantId); |
|
|
|
|
|
|
|
log.debug("Processing notification request (tenantId: {}, targets: {})", tenantId, notificationRequest.getTargets()); |
|
|
|
notificationRequest.setStatus(NotificationRequestStatus.PROCESSING); |
|
|
|
NotificationRequest savedNotificationRequest = notificationRequestService.saveNotificationRequest(tenantId, notificationRequest); |
|
|
|
log.debug("Processing notification request (tenantId: {}, targets: {})", tenantId, request.getTargets()); |
|
|
|
request.setStatus(NotificationRequestStatus.PROCESSING); |
|
|
|
request = notificationRequestService.saveNotificationRequest(tenantId, request); |
|
|
|
|
|
|
|
NotificationProcessingContext ctx = NotificationProcessingContext.builder() |
|
|
|
.tenantId(tenantId) |
|
|
|
.request(savedNotificationRequest) |
|
|
|
.request(request) |
|
|
|
.settings(settings) |
|
|
|
.template(notificationTemplate) |
|
|
|
.build(); |
|
|
|
@ -169,7 +165,7 @@ public class DefaultNotificationCenter extends AbstractSubscriptionService imple |
|
|
|
} |
|
|
|
|
|
|
|
Futures.whenAllComplete(results).run(() -> { |
|
|
|
NotificationRequestId requestId = savedNotificationRequest.getId(); |
|
|
|
NotificationRequestId requestId = ctx.getRequest().getId(); |
|
|
|
log.debug("[{}] Notification request processing is finished", requestId); |
|
|
|
NotificationRequestStats stats = ctx.getStats(); |
|
|
|
try { |
|
|
|
@ -180,33 +176,28 @@ public class DefaultNotificationCenter extends AbstractSubscriptionService imple |
|
|
|
}, dbCallbackExecutorService); |
|
|
|
}); |
|
|
|
|
|
|
|
return savedNotificationRequest; |
|
|
|
return request; |
|
|
|
} |
|
|
|
|
|
|
|
private List<ListenableFuture<Void>> processForTarget(NotificationTarget target, NotificationProcessingContext ctx) { |
|
|
|
Iterable<? extends NotificationRecipient> recipients; |
|
|
|
switch (target.getConfiguration().getType()) { |
|
|
|
case PLATFORM_USERS: { |
|
|
|
PlatformUsersNotificationTargetConfig platformUsersTargetConfig = (PlatformUsersNotificationTargetConfig) target.getConfiguration(); |
|
|
|
if (platformUsersTargetConfig.getUsersFilter().getType() == UsersFilterType.AFFECTED_USER) { |
|
|
|
if (ctx.getRequest().getInfo() instanceof RuleOriginatedNotificationInfo) { |
|
|
|
UserId targetUserId = ((RuleOriginatedNotificationInfo) ctx.getRequest().getInfo()).getTargetUserId(); |
|
|
|
if (targetUserId != null) { |
|
|
|
recipients = List.of(userService.findUserById(ctx.getTenantId(), targetUserId)); |
|
|
|
break; |
|
|
|
} |
|
|
|
} |
|
|
|
recipients = Collections.emptyList(); |
|
|
|
PlatformUsersNotificationTargetConfig targetConfig = (PlatformUsersNotificationTargetConfig) target.getConfiguration(); |
|
|
|
if (targetConfig.getUsersFilter().getType().isForRules()) { |
|
|
|
recipients = new PageDataIterable<>(pageLink -> { |
|
|
|
return notificationTargetService.findRecipientsForRuleNotificationTargetConfig(ctx.getTenantId(), targetConfig, (RuleOriginatedNotificationInfo) ctx.getRequest().getInfo(), pageLink); |
|
|
|
}, 500); |
|
|
|
} else { |
|
|
|
recipients = new PageDataIterable<>(pageLink -> { |
|
|
|
return notificationTargetService.findRecipientsForNotificationTargetConfig(ctx.getTenantId(), ctx.getCustomerId(), platformUsersTargetConfig, pageLink); |
|
|
|
return notificationTargetService.findRecipientsForNotificationTargetConfig(ctx.getTenantId(), targetConfig, pageLink); |
|
|
|
}, 500); |
|
|
|
} |
|
|
|
break; |
|
|
|
} |
|
|
|
case SLACK: { |
|
|
|
SlackNotificationTargetConfig slackTargetConfig = (SlackNotificationTargetConfig) target.getConfiguration(); |
|
|
|
recipients = List.of(slackTargetConfig.getConversation()); |
|
|
|
SlackNotificationTargetConfig targetConfig = (SlackNotificationTargetConfig) target.getConfiguration(); |
|
|
|
recipients = List.of(targetConfig.getConversation()); |
|
|
|
break; |
|
|
|
} |
|
|
|
default: { |
|
|
|
@ -345,20 +336,15 @@ public class DefaultNotificationCenter extends AbstractSubscriptionService imple |
|
|
|
|
|
|
|
@Override |
|
|
|
public Set<NotificationDeliveryMethod> getAvailableDeliveryMethods(TenantId tenantId) { |
|
|
|
Set<NotificationDeliveryMethod> deliveryMethods = new HashSet<>(); |
|
|
|
deliveryMethods.add(NotificationDeliveryMethod.WEB); |
|
|
|
NotificationSettings notificationSettings = notificationSettingsService.findNotificationSettings(tenantId); |
|
|
|
if (notificationSettings.getDeliveryMethodsConfigs().containsKey(NotificationDeliveryMethod.SLACK)) { |
|
|
|
deliveryMethods.add(NotificationDeliveryMethod.SLACK); |
|
|
|
} |
|
|
|
try { |
|
|
|
mailService.testConnection(tenantId); |
|
|
|
deliveryMethods.add(NotificationDeliveryMethod.EMAIL); |
|
|
|
} catch (Exception e) {} |
|
|
|
if (smsService.isConfigured(tenantId)) { |
|
|
|
deliveryMethods.add(NotificationDeliveryMethod.SMS); |
|
|
|
} |
|
|
|
return deliveryMethods; |
|
|
|
return channels.values().stream() |
|
|
|
.filter(channel -> channel.check(tenantId)) |
|
|
|
.map(NotificationChannel::getDeliveryMethod) |
|
|
|
.collect(Collectors.toSet()); |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public boolean check(TenantId tenantId) { |
|
|
|
return true; |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
|