|
|
|
@ -627,15 +627,14 @@ public class DefaultTbClusterService implements TbClusterService { |
|
|
|
// No need to push notifications twice
|
|
|
|
tbRuleEngineServices.removeAll(tbCoreServices); |
|
|
|
} |
|
|
|
if (entityType == EntityType.USER) { |
|
|
|
// No need to push user update notification to the rule engine
|
|
|
|
return; |
|
|
|
} |
|
|
|
for (String serviceId : tbRuleEngineServices) { |
|
|
|
TopicPartitionInfo tpi = topicService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceId); |
|
|
|
ToRuleEngineNotificationMsg toRuleEngineMsg = ToRuleEngineNotificationMsg.newBuilder().setComponentLifecycle(componentLifecycleMsgProto).build(); |
|
|
|
toRuleEngineProducer.send(tpi, new TbProtoQueueMsg<>(msg.getEntityId().getId(), toRuleEngineMsg), null); |
|
|
|
toRuleEngineNfs.incrementAndGet(); |
|
|
|
boolean toRuleEngine = entityType != EntityType.USER; |
|
|
|
if (toRuleEngine) { |
|
|
|
for (String serviceId : tbRuleEngineServices) { |
|
|
|
TopicPartitionInfo tpi = topicService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceId); |
|
|
|
ToRuleEngineNotificationMsg toRuleEngineMsg = ToRuleEngineNotificationMsg.newBuilder().setComponentLifecycle(componentLifecycleMsgProto).build(); |
|
|
|
toRuleEngineProducer.send(tpi, new TbProtoQueueMsg<>(msg.getEntityId().getId(), toRuleEngineMsg), null); |
|
|
|
toRuleEngineNfs.incrementAndGet(); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|