Browse Source

Minor refactoring

pull/8988/head
ViacheslavKlimov 3 years ago
parent
commit
d30f8a8352
  1. 31
      application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
  2. 10
      application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java
  3. 4
      application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java
  4. 2
      common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java
  5. 2
      common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java
  6. 12
      ui-ngx/src/app/modules/home/components/profile/tenant-profile.component.ts

31
application/src/main/java/org/thingsboard/server/actors/app/AppActor.java

@ -143,13 +143,9 @@ public class AppActor extends ContextAwareActor {
if (TenantId.SYS_TENANT_ID.equals(msg.getTenantId())) {
msg.getMsg().getCallback().onFailure(new RuleEngineException("Message has system tenant id!"));
} else {
if (!deletedTenants.contains(msg.getTenantId())) {
getOrCreateTenantActor(msg.getTenantId()).ifPresentOrElse(actor -> {
actor.tell(msg);
}, () -> msg.getMsg().getCallback().onSuccess());
} else {
msg.getMsg().getCallback().onSuccess();
}
getOrCreateTenantActor(msg.getTenantId()).ifPresentOrElse(actor -> {
actor.tell(msg);
}, () -> msg.getMsg().getCallback().onSuccess());
}
}
@ -182,22 +178,23 @@ public class AppActor extends ContextAwareActor {
}
private void onToDeviceActorMsg(TenantAwareMsg msg, boolean priority) {
if (!deletedTenants.contains(msg.getTenantId())) {
getOrCreateTenantActor(msg.getTenantId()).ifPresent(tenantActor -> {
if (priority) {
tenantActor.tellWithHighPriority(msg);
} else {
tenantActor.tell(msg);
}
});
} else {
getOrCreateTenantActor(msg.getTenantId()).ifPresentOrElse(tenantActor -> {
if (priority) {
tenantActor.tellWithHighPriority(msg);
} else {
tenantActor.tell(msg);
}
}, () -> {
if (msg instanceof TransportToDeviceActorMsgWrapper) {
((TransportToDeviceActorMsgWrapper) msg).getCallback().onSuccess();
}
}
});
}
private Optional<TbActorRef> getOrCreateTenantActor(TenantId tenantId) {
if (deletedTenants.contains(tenantId)) {
return Optional.empty();
}
return Optional.ofNullable(ctx.getOrCreateChildActor(new TbEntityActorId(tenantId),
() -> DefaultActorService.TENANT_DISPATCHER_NAME,
() -> new TenantActor.ActorCreator(systemContext, tenantId),

10
application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java

@ -591,6 +591,11 @@ public class DefaultTbClusterService implements TbClusterService {
tbTransportServices.removeAll(tbCoreServices);
tbCoreServices.removeAll(tbRuleEngineServices);
for (String ruleEngineServiceId : tbRuleEngineServices) {
TopicPartitionInfo tpi = notificationsTopicService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, ruleEngineServiceId);
producerProvider.getRuleEngineNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), ruleEngineMsg), null);
toRuleEngineNfs.incrementAndGet();
}
for (String coreServiceId : tbCoreServices) {
TopicPartitionInfo tpi = notificationsTopicService.getNotificationsTopic(ServiceType.TB_CORE, coreServiceId);
producerProvider.getTbCoreNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), coreMsg), null);
@ -601,10 +606,5 @@ public class DefaultTbClusterService implements TbClusterService {
producerProvider.getTransportNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), transportMsg), null);
toTransportNfs.incrementAndGet();
}
for (String ruleEngineServiceId : tbRuleEngineServices) {
TopicPartitionInfo tpi = notificationsTopicService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, ruleEngineServiceId);
producerProvider.getRuleEngineNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), ruleEngineMsg), null);
toRuleEngineNfs.incrementAndGet();
}
}
}

4
application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java

@ -104,7 +104,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
@Value("${queue.rule-engine.prometheus-stats.enabled:false}")
boolean prometheusStatsEnabled;
@Value("${queue.rule-engine.topic-deletion-delay:30}")
private int topicDeletionDelay;
private int topicDeletionDelayInSec;
private final StatsFactory statsFactory;
private final TbRuleEngineSubmitStrategyFactory submitStrategyFactory;
@ -506,7 +506,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
}
private void processQueueDeletion(Queue queue, TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> consumer) {
long finishTs = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(topicDeletionDelay);
long finishTs = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(topicDeletionDelayInSec);
try {
int n = 0;
while (System.currentTimeMillis() <= finishTs) {

2
common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java

@ -187,7 +187,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
consumerBuilder.settings(kafkaSettings);
consumerBuilder.topic(configuration.getTopic());
consumerBuilder.clientId("re-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet());
consumerBuilder.groupId("re-" + queueName + (!configuration.getTenantId().isSysTenantId() ? "-" + configuration.getTenantId() : "") + "-consumer");
consumerBuilder.groupId("re-" + queueName + (configuration.getTenantId().isSysTenantId() ? "" : ("-" + configuration.getTenantId())) + "-consumer");
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
consumerBuilder.admin(ruleEngineAdmin);
consumerBuilder.statsService(consumerStatsService);

2
common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java

@ -166,7 +166,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
consumerBuilder.settings(kafkaSettings);
consumerBuilder.topic(configuration.getTopic());
consumerBuilder.clientId("re-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet());
consumerBuilder.groupId("re-" + queueName + (!configuration.getTenantId().isSysTenantId() ? "-" + configuration.getTenantId() : "") + "-consumer");
consumerBuilder.groupId("re-" + queueName + (configuration.getTenantId().isSysTenantId() ? "" : ("-" + configuration.getTenantId())) + "-consumer");
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
consumerBuilder.admin(ruleEngineAdmin);
consumerBuilder.statsService(consumerStatsService);

12
ui-ngx/src/app/modules/home/components/profile/tenant-profile.component.ts

@ -58,8 +58,8 @@ export class TenantProfileComponent extends EntityComponent<TenantProfile> {
id: guid(),
consumerPerPartition: true,
name: 'Main',
packProcessingTimeout: 2000,
partitions: 2,
packProcessingTimeout: 10000,
partitions: 1,
pollInterval: 2000,
processingStrategy: {
failurePercentage: 0,
@ -83,9 +83,9 @@ export class TenantProfileComponent extends EntityComponent<TenantProfile> {
name: 'HighPriority',
topic: 'tb_rule_engine.hp',
pollInterval: 2000,
partitions: 2,
partitions: 1,
consumerPerPartition: true,
packProcessingTimeout: 2000,
packProcessingTimeout: 10000,
submitStrategy: {
type: 'BURST',
batchSize: 100
@ -107,9 +107,9 @@ export class TenantProfileComponent extends EntityComponent<TenantProfile> {
name: 'SequentialByOriginator',
topic: 'tb_rule_engine.sq',
pollInterval: 2000,
partitions: 2,
partitions: 1,
consumerPerPartition: true,
packProcessingTimeout: 2000,
packProcessingTimeout: 10000,
submitStrategy: {
type: 'SEQUENTIAL_BY_ORIGINATOR',
batchSize: 100

Loading…
Cancel
Save