@ -73,11 +73,12 @@ public class DefaultNotificationRuleProcessingService implements NotificationRul
@Override
@Override
public void process ( TenantId tenantId , NotificationRuleTrigger trigger ) {
public void process ( TenantId tenantId , NotificationRuleTrigger trigger ) {
List < NotificationRule > rules = notificationRuleService . findNotificationRulesByTenantIdAndTriggerType ( tenantId , trigger . getType ( ) ) ;
List < NotificationRule > rules = notificationRuleService . findNotificationRulesByTenantIdAndTriggerType (
trigger . getType ( ) . isTenantLevel ( ) ? tenantId : TenantId . SYS_TENANT_ID , trigger . getType ( ) ) ;
for ( NotificationRule rule : rules ) {
for ( NotificationRule rule : rules ) {
notificationExecutor . submit ( ( ) - > {
notificationExecutor . submit ( ( ) - > {
try {
try {
processNotificationRule ( rule , trigger ) ;
processNotificationRule ( tenantId , rule , trigger ) ;
} catch ( Throwable e ) {
} catch ( Throwable e ) {
log . error ( "Failed to process notification rule {} for trigger type {} with trigger object {}" , rule . getId ( ) , rule . getTriggerType ( ) , trigger , e ) ;
log . error ( "Failed to process notification rule {} for trigger type {} with trigger object {}" , rule . getId ( ) , rule . getTriggerType ( ) , trigger , e ) ;
}
}
@ -97,12 +98,12 @@ public class DefaultNotificationRuleProcessingService implements NotificationRul
. build ( ) ) ;
. build ( ) ) ;
}
}
private void processNotificationRule ( NotificationRule rule , NotificationRuleTrigger trigger ) {
private void processNotificationRule ( TenantId tenantId , NotificationRule rule , NotificationRuleTrigger trigger ) {
NotificationRuleTriggerConfig triggerConfig = rule . getTriggerConfig ( ) ;
NotificationRuleTriggerConfig triggerConfig = rule . getTriggerConfig ( ) ;
log . debug ( "Processing notification rule '{}' for trigger type {}" , rule . getName ( ) , rule . getTriggerType ( ) ) ;
log . debug ( "Processing notification rule '{}' for trigger type {}" , rule . getName ( ) , rule . getTriggerType ( ) ) ;
if ( matchesClearRule ( trigger , triggerConfig ) ) {
if ( matchesClearRule ( trigger , triggerConfig ) ) {
List < NotificationRequest > notificationRequests = notificationRequestService . findNotificationRequestsByRuleIdAndOriginatorEntityId ( rule . ge tT enantId( ) , rule . getId ( ) , trigger . getOriginatorEntityId ( ) ) ;
List < NotificationRequest > notificationRequests = notificationRequestService . findNotificationRequestsByRuleIdAndOriginatorEntityId ( tenantId , rule . getId ( ) , trigger . getOriginatorEntityId ( ) ) ;
if ( notificationRequests . isEmpty ( ) ) {
if ( notificationRequests . isEmpty ( ) ) {
return ;
return ;
}
}
@ -112,11 +113,11 @@ public class DefaultNotificationRuleProcessingService implements NotificationRul
. flatMap ( notificationRequest - > notificationRequest . getTargets ( ) . stream ( ) )
. flatMap ( notificationRequest - > notificationRequest . getTargets ( ) . stream ( ) )
. distinct ( ) . collect ( Collectors . toList ( ) ) ;
. distinct ( ) . collect ( Collectors . toList ( ) ) ;
NotificationInfo notificationInfo = constructNotificationInfo ( trigger , triggerConfig ) ;
NotificationInfo notificationInfo = constructNotificationInfo ( trigger , triggerConfig ) ;
submitNotificationRequest ( targets , rule , trigger . getOriginatorEntityId ( ) , notificationInfo , 0 ) ;
submitNotificationRequest ( tenantId , t argets , rule , trigger . getOriginatorEntityId ( ) , notificationInfo , 0 ) ;
notificationRequests . forEach ( notificationRequest - > {
notificationRequests . forEach ( notificationRequest - > {
if ( notificationRequest . isScheduled ( ) ) {
if ( notificationRequest . isScheduled ( ) ) {
notificationCenter . deleteNotificationRequest ( rule . ge tT enantId( ) , notificationRequest . getId ( ) ) ;
notificationCenter . deleteNotificationRequest ( tenantId , notificationRequest . getId ( ) ) ;
}
}
} ) ;
} ) ;
return ;
return ;
@ -125,7 +126,7 @@ public class DefaultNotificationRuleProcessingService implements NotificationRul
if ( matchesFilter ( trigger , triggerConfig ) ) {
if ( matchesFilter ( trigger , triggerConfig ) ) {
NotificationInfo notificationInfo = constructNotificationInfo ( trigger , triggerConfig ) ;
NotificationInfo notificationInfo = constructNotificationInfo ( trigger , triggerConfig ) ;
rule . getRecipientsConfig ( ) . getTargetsTable ( ) . forEach ( ( delay , targets ) - > {
rule . getRecipientsConfig ( ) . getTargetsTable ( ) . forEach ( ( delay , targets ) - > {
submitNotificationRequest ( targets , rule , trigger . getOriginatorEntityId ( ) , notificationInfo , delay ) ;
submitNotificationRequest ( tenantId , t argets , rule , trigger . getOriginatorEntityId ( ) , notificationInfo , delay ) ;
} ) ;
} ) ;
}
}
}
}
@ -142,14 +143,14 @@ public class DefaultNotificationRuleProcessingService implements NotificationRul
return triggerProcessors . get ( triggerConfig . getTriggerType ( ) ) . constructNotificationInfo ( trigger , triggerConfig ) ;
return triggerProcessors . get ( triggerConfig . getTriggerType ( ) ) . constructNotificationInfo ( trigger , triggerConfig ) ;
}
}
private void submitNotificationRequest ( List < UUID > targets , NotificationRule rule ,
private void submitNotificationRequest ( TenantId tenantId , List < UUID > targets , NotificationRule rule ,
EntityId originatorEntityId , NotificationInfo notificationInfo , int delayInSec ) {
EntityId originatorEntityId , NotificationInfo notificationInfo , int delayInSec ) {
NotificationRequestConfig config = new NotificationRequestConfig ( ) ;
NotificationRequestConfig config = new NotificationRequestConfig ( ) ;
if ( delayInSec > 0 ) {
if ( delayInSec > 0 ) {
config . setSendingDelayInSec ( delayInSec ) ;
config . setSendingDelayInSec ( delayInSec ) ;
}
}
NotificationRequest notificationRequest = NotificationRequest . builder ( )
NotificationRequest notificationRequest = NotificationRequest . builder ( )
. tenantId ( rule . ge tT enantId( ) )
. tenantId ( tenantId )
. targets ( targets )
. targets ( targets )
. templateId ( rule . getTemplateId ( ) )
. templateId ( rule . getTemplateId ( ) )
. additionalConfig ( config )
. additionalConfig ( config )
@ -160,7 +161,7 @@ public class DefaultNotificationRuleProcessingService implements NotificationRul
notificationExecutor . submit ( ( ) - > {
notificationExecutor . submit ( ( ) - > {
try {
try {
log . debug ( "Submitting notification request for rule '{}' with delay of {} sec to targets {}" , rule . getName ( ) , delayInSec , targets ) ;
log . debug ( "Submitting notification request for rule '{}' with delay of {} sec to targets {}" , rule . getName ( ) , delayInSec , targets ) ;
notificationCenter . processNotificationRequest ( rule . ge tT enantId( ) , notificationRequest ) ;
notificationCenter . processNotificationRequest ( tenantId , notificationRequest ) ;
} catch ( Exception e ) {
} catch ( Exception e ) {
log . error ( "Failed to process notification request for rule {}" , rule . getId ( ) , e ) ;
log . error ( "Failed to process notification request for rule {}" , rule . getId ( ) , e ) ;
}
}