|
|
|
@ -35,6 +35,7 @@ import org.thingsboard.server.common.data.notification.NotificationRequestStatus |
|
|
|
import org.thingsboard.server.common.data.notification.info.NotificationInfo; |
|
|
|
import org.thingsboard.server.common.data.notification.rule.NotificationRule; |
|
|
|
import org.thingsboard.server.common.data.notification.rule.trigger.NotificationRuleTrigger; |
|
|
|
import org.thingsboard.server.common.data.notification.rule.trigger.NotificationRuleTrigger.DeduplicationStrategy; |
|
|
|
import org.thingsboard.server.common.data.notification.rule.trigger.config.NotificationRuleTriggerConfig; |
|
|
|
import org.thingsboard.server.common.data.notification.rule.trigger.config.NotificationRuleTriggerType; |
|
|
|
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; |
|
|
|
@ -66,8 +67,8 @@ public class DefaultNotificationRuleProcessor implements NotificationRuleProcess |
|
|
|
private final NotificationDeduplicationService deduplicationService; |
|
|
|
private final PartitionService partitionService; |
|
|
|
private final RateLimitService rateLimitService; |
|
|
|
@Autowired @Lazy |
|
|
|
private NotificationCenter notificationCenter; |
|
|
|
@Lazy |
|
|
|
private final NotificationCenter notificationCenter; |
|
|
|
private final NotificationExecutorService notificationExecutor; |
|
|
|
|
|
|
|
private final Map<NotificationRuleTriggerType, NotificationRuleTriggerProcessor> triggerProcessors = new EnumMap<>(NotificationRuleTriggerType.class); |
|
|
|
@ -82,14 +83,11 @@ public class DefaultNotificationRuleProcessor implements NotificationRuleProcess |
|
|
|
if (enabledRules.isEmpty()) { |
|
|
|
return; |
|
|
|
} |
|
|
|
if (trigger.deduplicate()) { |
|
|
|
enabledRules = new ArrayList<>(enabledRules); |
|
|
|
enabledRules.removeIf(rule -> deduplicationService.alreadyProcessed(trigger, rule)); |
|
|
|
} |
|
|
|
final List<NotificationRule> rules = enabledRules; |
|
|
|
for (NotificationRule rule : rules) { |
|
|
|
|
|
|
|
List<NotificationRule> rulesToProcess = filterNotificationRules(trigger, enabledRules); |
|
|
|
for (NotificationRule rule : rulesToProcess) { |
|
|
|
try { |
|
|
|
processNotificationRule(rule, trigger); |
|
|
|
processNotificationRule(rule, trigger, DeduplicationStrategy.ONLY_MATCHING.equals(trigger.getDeduplicationStrategy())); |
|
|
|
} catch (Throwable e) { |
|
|
|
log.error("Failed to process notification rule {} for trigger type {} with trigger object {}", rule.getId(), rule.getTriggerType(), trigger, e); |
|
|
|
} |
|
|
|
@ -100,7 +98,20 @@ public class DefaultNotificationRuleProcessor implements NotificationRuleProcess |
|
|
|
}); |
|
|
|
} |
|
|
|
|
|
|
|
private void processNotificationRule(NotificationRule rule, NotificationRuleTrigger trigger) { |
|
|
|
private List<NotificationRule> filterNotificationRules(NotificationRuleTrigger trigger, List<NotificationRule> enabledRules) { |
|
|
|
List<NotificationRule> rulesToProcess = new ArrayList<>(enabledRules); |
|
|
|
rulesToProcess.removeIf(rule -> switch (trigger.getDeduplicationStrategy()) { |
|
|
|
case ONLY_MATCHING -> { |
|
|
|
boolean matched = matchesFilter(trigger, rule.getTriggerConfig()); |
|
|
|
yield !matched || deduplicationService.alreadyProcessed(trigger, rule); |
|
|
|
} |
|
|
|
case ALL -> deduplicationService.alreadyProcessed(trigger, rule); |
|
|
|
default -> false; |
|
|
|
}); |
|
|
|
return rulesToProcess; |
|
|
|
} |
|
|
|
|
|
|
|
private void processNotificationRule(NotificationRule rule, NotificationRuleTrigger trigger, boolean alreadyMatched) { |
|
|
|
NotificationRuleTriggerConfig triggerConfig = rule.getTriggerConfig(); |
|
|
|
log.debug("Processing notification rule '{}' for trigger type {}", rule.getName(), rule.getTriggerType()); |
|
|
|
|
|
|
|
@ -114,7 +125,7 @@ public class DefaultNotificationRuleProcessor implements NotificationRuleProcess |
|
|
|
return; |
|
|
|
} |
|
|
|
|
|
|
|
if (matchesFilter(trigger, triggerConfig)) { |
|
|
|
if (alreadyMatched || matchesFilter(trigger, triggerConfig)) { |
|
|
|
if (!rateLimitService.checkRateLimit(LimitedApi.NOTIFICATION_REQUESTS_PER_RULE, rule.getTenantId(), rule.getId())) { |
|
|
|
log.debug("[{}] Rate limit for notification requests per rule was exceeded (rule '{}')", rule.getTenantId(), rule.getName()); |
|
|
|
return; |
|
|
|
|