diff --git a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java index 89a000594b..d4f04486e6 100644 --- a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java @@ -143,13 +143,9 @@ public class AppActor extends ContextAwareActor { if (TenantId.SYS_TENANT_ID.equals(msg.getTenantId())) { msg.getMsg().getCallback().onFailure(new RuleEngineException("Message has system tenant id!")); } else { - if (!deletedTenants.contains(msg.getTenantId())) { - getOrCreateTenantActor(msg.getTenantId()).ifPresentOrElse(actor -> { - actor.tell(msg); - }, () -> msg.getMsg().getCallback().onSuccess()); - } else { - msg.getMsg().getCallback().onSuccess(); - } + getOrCreateTenantActor(msg.getTenantId()).ifPresentOrElse(actor -> { + actor.tell(msg); + }, () -> msg.getMsg().getCallback().onSuccess()); } } @@ -182,22 +178,23 @@ public class AppActor extends ContextAwareActor { } private void onToDeviceActorMsg(TenantAwareMsg msg, boolean priority) { - if (!deletedTenants.contains(msg.getTenantId())) { - getOrCreateTenantActor(msg.getTenantId()).ifPresent(tenantActor -> { - if (priority) { - tenantActor.tellWithHighPriority(msg); - } else { - tenantActor.tell(msg); - } - }); - } else { + getOrCreateTenantActor(msg.getTenantId()).ifPresentOrElse(tenantActor -> { + if (priority) { + tenantActor.tellWithHighPriority(msg); + } else { + tenantActor.tell(msg); + } + }, () -> { if (msg instanceof TransportToDeviceActorMsgWrapper) { ((TransportToDeviceActorMsgWrapper) msg).getCallback().onSuccess(); } - } + }); } private Optional getOrCreateTenantActor(TenantId tenantId) { + if (deletedTenants.contains(tenantId)) { + return Optional.empty(); + } return Optional.ofNullable(ctx.getOrCreateChildActor(new TbEntityActorId(tenantId), () -> DefaultActorService.TENANT_DISPATCHER_NAME, () -> new TenantActor.ActorCreator(systemContext, tenantId), diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java index 5a2db8019e..76631aaa95 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java @@ -591,6 +591,11 @@ public class DefaultTbClusterService implements TbClusterService { tbTransportServices.removeAll(tbCoreServices); tbCoreServices.removeAll(tbRuleEngineServices); + for (String ruleEngineServiceId : tbRuleEngineServices) { + TopicPartitionInfo tpi = notificationsTopicService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, ruleEngineServiceId); + producerProvider.getRuleEngineNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), ruleEngineMsg), null); + toRuleEngineNfs.incrementAndGet(); + } for (String coreServiceId : tbCoreServices) { TopicPartitionInfo tpi = notificationsTopicService.getNotificationsTopic(ServiceType.TB_CORE, coreServiceId); producerProvider.getTbCoreNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), coreMsg), null); @@ -601,10 +606,5 @@ public class DefaultTbClusterService implements TbClusterService { producerProvider.getTransportNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), transportMsg), null); toTransportNfs.incrementAndGet(); } - for (String ruleEngineServiceId : tbRuleEngineServices) { - TopicPartitionInfo tpi = notificationsTopicService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, ruleEngineServiceId); - producerProvider.getRuleEngineNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), ruleEngineMsg), null); - toRuleEngineNfs.incrementAndGet(); - } } } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java index 19dea11665..f4716b925f 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java @@ -104,7 +104,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< @Value("${queue.rule-engine.prometheus-stats.enabled:false}") boolean prometheusStatsEnabled; @Value("${queue.rule-engine.topic-deletion-delay:30}") - private int topicDeletionDelay; + private int topicDeletionDelayInSec; private final StatsFactory statsFactory; private final TbRuleEngineSubmitStrategyFactory submitStrategyFactory; @@ -506,7 +506,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< } private void processQueueDeletion(Queue queue, TbQueueConsumer> consumer) { - long finishTs = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(topicDeletionDelay); + long finishTs = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(topicDeletionDelayInSec); try { int n = 0; while (System.currentTimeMillis() <= finishTs) { diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java index 779bf9fae3..22a16de64f 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java @@ -187,7 +187,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi consumerBuilder.settings(kafkaSettings); consumerBuilder.topic(configuration.getTopic()); consumerBuilder.clientId("re-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet()); - consumerBuilder.groupId("re-" + queueName + (!configuration.getTenantId().isSysTenantId() ? "-" + configuration.getTenantId() : "") + "-consumer"); + consumerBuilder.groupId("re-" + queueName + (configuration.getTenantId().isSysTenantId() ? "" : ("-" + configuration.getTenantId())) + "-consumer"); consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders())); consumerBuilder.admin(ruleEngineAdmin); consumerBuilder.statsService(consumerStatsService); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java index eb387a84f4..2e3bf784d7 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java @@ -166,7 +166,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { consumerBuilder.settings(kafkaSettings); consumerBuilder.topic(configuration.getTopic()); consumerBuilder.clientId("re-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet()); - consumerBuilder.groupId("re-" + queueName + (!configuration.getTenantId().isSysTenantId() ? "-" + configuration.getTenantId() : "") + "-consumer"); + consumerBuilder.groupId("re-" + queueName + (configuration.getTenantId().isSysTenantId() ? "" : ("-" + configuration.getTenantId())) + "-consumer"); consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders())); consumerBuilder.admin(ruleEngineAdmin); consumerBuilder.statsService(consumerStatsService); diff --git a/ui-ngx/src/app/modules/home/components/profile/tenant-profile.component.ts b/ui-ngx/src/app/modules/home/components/profile/tenant-profile.component.ts index e9142fa1fa..26d6611a3c 100644 --- a/ui-ngx/src/app/modules/home/components/profile/tenant-profile.component.ts +++ b/ui-ngx/src/app/modules/home/components/profile/tenant-profile.component.ts @@ -58,8 +58,8 @@ export class TenantProfileComponent extends EntityComponent { id: guid(), consumerPerPartition: true, name: 'Main', - packProcessingTimeout: 2000, - partitions: 2, + packProcessingTimeout: 10000, + partitions: 1, pollInterval: 2000, processingStrategy: { failurePercentage: 0, @@ -83,9 +83,9 @@ export class TenantProfileComponent extends EntityComponent { name: 'HighPriority', topic: 'tb_rule_engine.hp', pollInterval: 2000, - partitions: 2, + partitions: 1, consumerPerPartition: true, - packProcessingTimeout: 2000, + packProcessingTimeout: 10000, submitStrategy: { type: 'BURST', batchSize: 100 @@ -107,9 +107,9 @@ export class TenantProfileComponent extends EntityComponent { name: 'SequentialByOriginator', topic: 'tb_rule_engine.sq', pollInterval: 2000, - partitions: 2, + partitions: 1, consumerPerPartition: true, - packProcessingTimeout: 2000, + packProcessingTimeout: 10000, submitStrategy: { type: 'SEQUENTIAL_BY_ORIGINATOR', batchSize: 100