From 48d9aca0abec1525e69ba500c91673c8efe56f66 Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Fri, 11 Apr 2025 12:29:45 +0300 Subject: [PATCH] minor fixes --- .../org/thingsboard/server/actors/app/AppActor.java | 2 +- .../calculatedField/CalculatedFieldManagerActor.java | 6 +++--- .../CalculatedFieldManagerMessageProcessor.java | 12 +++++++----- .../server/actors/tenant/TenantActor.java | 3 ++- .../cf/DefaultCalculatedFieldInitService.java | 8 ++++---- .../DefaultTbCalculatedFieldConsumerService.java | 5 ++--- application/src/main/resources/thingsboard.yml | 4 ++++ .../queue/DefaultTbCoreConsumerServiceTest.java | 4 ++++ .../server/dao/cf/CalculatedFieldService.java | 2 -- .../org/thingsboard/server/common/msg/MsgType.java | 2 +- ...java => CalculatedFieldInitProfileEntityMsg.java} | 4 ++-- .../queue/discovery/event/PartitionChangeEvent.java | 1 - .../queue/provider/KafkaMonolithQueueFactory.java | 3 +++ .../provider/KafkaTbRuleEngineQueueFactory.java | 3 +++ .../server/dao/cf/BaseCalculatedFieldService.java | 6 ------ 15 files changed, 36 insertions(+), 29 deletions(-) rename common/message/src/main/java/org/thingsboard/server/common/msg/cf/{CalculatedFieldProfileEntityMsg.java => CalculatedFieldInitProfileEntityMsg.java} (88%) diff --git a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java index fc9f344563..a79a182fa1 100644 --- a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java @@ -114,7 +114,7 @@ public class AppActor extends ContextAwareActor { ctx.broadcastToChildrenByType(msg, EntityType.TENANT); break; case CF_CACHE_INIT_MSG: - case CF_PROFILE_ENTITY_MSG: + case CF_INIT_PROFILE_ENTITY_MSG: case CF_INIT_MSG: case CF_LINK_INIT_MSG: case CF_STATE_RESTORE_MSG: diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerActor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerActor.java index d5a3da03e4..9f59a80e67 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerActor.java @@ -25,9 +25,9 @@ import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg; import org.thingsboard.server.common.msg.cf.CalculatedFieldCacheInitMsg; import org.thingsboard.server.common.msg.cf.CalculatedFieldEntityLifecycleMsg; import org.thingsboard.server.common.msg.cf.CalculatedFieldInitMsg; +import org.thingsboard.server.common.msg.cf.CalculatedFieldInitProfileEntityMsg; import org.thingsboard.server.common.msg.cf.CalculatedFieldLinkInitMsg; import org.thingsboard.server.common.msg.cf.CalculatedFieldPartitionChangeMsg; -import org.thingsboard.server.common.msg.cf.CalculatedFieldProfileEntityMsg; /** * Created by ashvayka on 15.03.18. @@ -70,8 +70,8 @@ public class CalculatedFieldManagerActor extends AbstractCalculatedFieldActor { case CF_CACHE_INIT_MSG: processor.onCacheInitMsg((CalculatedFieldCacheInitMsg) msg); break; - case CF_PROFILE_ENTITY_MSG: - processor.onProfileEntityMsg((CalculatedFieldProfileEntityMsg) msg); + case CF_INIT_PROFILE_ENTITY_MSG: + processor.onProfileEntityMsg((CalculatedFieldInitProfileEntityMsg) msg); break; case CF_INIT_MSG: processor.onFieldInitMsg((CalculatedFieldInitMsg) msg); diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java index e62c531b16..a896bbd74b 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java @@ -38,9 +38,9 @@ import org.thingsboard.server.common.data.page.PageDataIterable; import org.thingsboard.server.common.msg.cf.CalculatedFieldCacheInitMsg; import org.thingsboard.server.common.msg.cf.CalculatedFieldEntityLifecycleMsg; import org.thingsboard.server.common.msg.cf.CalculatedFieldInitMsg; +import org.thingsboard.server.common.msg.cf.CalculatedFieldInitProfileEntityMsg; import org.thingsboard.server.common.msg.cf.CalculatedFieldLinkInitMsg; import org.thingsboard.server.common.msg.cf.CalculatedFieldPartitionChangeMsg; -import org.thingsboard.server.common.msg.cf.CalculatedFieldProfileEntityMsg; import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TbCallback; @@ -86,7 +86,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware protected TbActorCtx ctx; - @Value("${calculated_fields.init_fetch_pack_size:50000}") + @Value("${queue.calculated_fields.init_tenant_fetch_pack_size:1000}") @Getter private int initFetchPackSize; @@ -123,7 +123,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware msg.getCallback().onSuccess(); } - public void onProfileEntityMsg(CalculatedFieldProfileEntityMsg msg) { + public void onProfileEntityMsg(CalculatedFieldInitProfileEntityMsg msg) { log.debug("[{}] Processing profile entity message.", msg.getTenantId().getId()); entityProfileCache.add(msg.getProfileEntityId(), msg.getEntityId()); msg.getCallback().onSuccess(); @@ -538,7 +538,8 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware return switch (entityId.getEntityType()) { case ASSET -> assetProfileCache.get(tenantId, (AssetId) entityId).getId(); case DEVICE -> deviceProfileCache.get(tenantId, (DeviceId) entityId).getId(); - default -> null; + default -> + throw new IllegalArgumentException("'" + entityId.getEntityType() + "' is not profile entity." + entityId); }; } @@ -567,10 +568,11 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware public void initCalculatedFields() { PageDataIterable cfs = new PageDataIterable<>(pageLink -> cfDaoService.findCalculatedFieldsByTenantId(tenantId, pageLink), initFetchPackSize); cfs.forEach(cf -> { + log.trace("Processing calculated field record: {}", cf); try { onFieldInitMsg(new CalculatedFieldInitMsg(cf.getTenantId(), cf)); } catch (CalculatedFieldException e) { - throw new RuntimeException(e); + log.error("Failed to process calculated field record: {}", cf, e); } }); calculatedFields.values().forEach(cf -> { diff --git a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java index a5be701d38..2a039327f2 100644 --- a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java @@ -178,7 +178,7 @@ public class TenantActor extends RuleChainManagerActor { onRuleChainMsg((RuleChainAwareMsg) msg); break; case CF_CACHE_INIT_MSG: - case CF_PROFILE_ENTITY_MSG: + case CF_INIT_PROFILE_ENTITY_MSG: case CF_INIT_MSG: case CF_LINK_INIT_MSG: case CF_STATE_RESTORE_MSG: @@ -202,6 +202,7 @@ public class TenantActor extends RuleChainManagerActor { } else { log.debug("[{}] CF Actor is not initialized. ToCalculatedFieldSystemMsg: [{}]", tenantId, msg); } + msg.getCallback().onSuccess(); return; } if (priority) { diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldInitService.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldInitService.java index 826a5243cf..79950f2e3f 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldInitService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldInitService.java @@ -23,7 +23,7 @@ import org.springframework.stereotype.Service; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.common.data.ProfileEntityIdInfo; import org.thingsboard.server.common.data.page.PageDataIterable; -import org.thingsboard.server.common.msg.cf.CalculatedFieldProfileEntityMsg; +import org.thingsboard.server.common.msg.cf.CalculatedFieldInitProfileEntityMsg; import org.thingsboard.server.dao.asset.AssetService; import org.thingsboard.server.dao.device.DeviceService; import org.thingsboard.server.queue.util.AfterStartUp; @@ -39,7 +39,7 @@ public class DefaultCalculatedFieldInitService implements CalculatedFieldInitSer private final DeviceService deviceService; private final ActorSystemContext actorSystemContext; - @Value("${calculated_fields.init_fetch_pack_size:50000}") + @Value("${queue.calculated_fields.init_fetch_pack_size:50000}") @Getter private int initFetchPackSize; @@ -49,7 +49,7 @@ public class DefaultCalculatedFieldInitService implements CalculatedFieldInitSer for (ProfileEntityIdInfo idInfo : deviceIdInfos) { log.trace("Processing device record: {}", idInfo); try { - actorSystemContext.tell(new CalculatedFieldProfileEntityMsg(idInfo.getTenantId(), idInfo.getProfileId(), idInfo.getEntityId())); + actorSystemContext.tell(new CalculatedFieldInitProfileEntityMsg(idInfo.getTenantId(), idInfo.getProfileId(), idInfo.getEntityId())); } catch (Exception e) { log.error("Failed to process device record: {}", idInfo, e); } @@ -58,7 +58,7 @@ public class DefaultCalculatedFieldInitService implements CalculatedFieldInitSer for (ProfileEntityIdInfo idInfo : assetIdInfos) { log.trace("Processing asset record: {}", idInfo); try { - actorSystemContext.tell(new CalculatedFieldProfileEntityMsg(idInfo.getTenantId(), idInfo.getProfileId(), idInfo.getEntityId())); + actorSystemContext.tell(new CalculatedFieldInitProfileEntityMsg(idInfo.getTenantId(), idInfo.getProfileId(), idInfo.getEntityId())); } catch (Exception e) { log.error("Failed to process asset record: {}", idInfo, e); } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java index ea2b87e5e4..41bf76ff0e 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java @@ -131,6 +131,7 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractPartitionBa stateService.restore(queueKey, partitions); } }); + // eventConsumer's partitions will be updated by stateService // Cleanup old entities after corresponding consumers are stopped. // Any periodic tasks need to check that the entity is still managed by the current server before processing. @@ -174,9 +175,7 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractPartitionBa packSubmitFuture.cancel(true); log.info("Timeout to process message: {}", pendingMsgHolder.getMsg()); } - if (log.isDebugEnabled()) { - ctx.getAckMap().forEach((id, msg) -> log.debug("[{}] Timeout to process message: {}", id, msg.getValue())); - } + ctx.getAckMap().forEach((id, msg) -> log.debug("[{}] Timeout to process message: {}", id, msg.getValue())); ctx.getFailedMap().forEach((id, msg) -> log.warn("[{}] Failed to process message: {}", id, msg.getValue())); } consumer.commit(); diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 716797c2d2..14f0437402 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -1848,6 +1848,10 @@ queue: pool_size: "${TB_QUEUE_CF_POOL_SIZE:8}" # RocksDB path for storing CF states rocks_db_path: "${TB_QUEUE_CF_ROCKS_DB_PATH:${user.home}/.rocksdb/cf_states}" + # The fetch size specifies how many rows will be returned + init_fetch_pack_size: "${TB_QUEUE_CF_FETCH_PACK_SIZE:50000}" + # The fetch size specifies how many rows will be returned + init_tenant_fetch_pack_size: "${TB_QUEUE_CF_TENANT_FETCH_PACK_SIZE:1000}" transport: # For high-priority notifications that require minimum latency and processing time notifications_topic: "${TB_QUEUE_TRANSPORT_NOTIFICATIONS_TOPIC:tb_transport.notifications}" diff --git a/application/src/test/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerServiceTest.java b/application/src/test/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerServiceTest.java index 26832f6176..4a5ecef970 100644 --- a/application/src/test/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerServiceTest.java @@ -24,6 +24,7 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.slf4j.Logger; import org.springframework.test.util.ReflectionTestUtils; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.TenantId; @@ -51,6 +52,8 @@ public class DefaultTbCoreConsumerServiceTest { private TbCoreConsumerStats statsMock; @Mock private RuleEngineCallService ruleEngineCallServiceMock; + @Mock + private Logger logMock; @Mock private TbCallback tbCallbackMock; @@ -69,6 +72,7 @@ public class DefaultTbCoreConsumerServiceTest { executor = MoreExecutors.newDirectExecutorService(); ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "stateService", stateServiceMock); ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "deviceActivityEventsExecutor", executor); + ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "log", logMock); } @AfterEach diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/cf/CalculatedFieldService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/cf/CalculatedFieldService.java index b6f43cda67..5101d6d57e 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/cf/CalculatedFieldService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/cf/CalculatedFieldService.java @@ -55,8 +55,6 @@ public interface CalculatedFieldService extends EntityDaoService { List findAllCalculatedFieldLinksByEntityId(TenantId tenantId, EntityId entityId); - List findAllCalculatedFieldLinksByTenantId(TenantId tenantId); - PageData findAllCalculatedFieldLinksByTenantId(TenantId tenantId, PageLink pageLink); PageData findAllCalculatedFieldLinks(PageLink pageLink); diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java index 5bada73c8c..f1c404ce16 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java @@ -137,7 +137,7 @@ public enum MsgType { CF_CACHE_INIT_MSG, // Sent to init caches for CF actor; - CF_PROFILE_ENTITY_MSG, // Sent to init profile entities cache; + CF_INIT_PROFILE_ENTITY_MSG, // Sent to init profile entities cache; CF_INIT_MSG, // Sent to init particular calculated field; CF_LINK_INIT_MSG, // Sent to init particular calculated field; CF_STATE_RESTORE_MSG, // Sent to restore particular calculated field entity state; diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/cf/CalculatedFieldProfileEntityMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/cf/CalculatedFieldInitProfileEntityMsg.java similarity index 88% rename from common/message/src/main/java/org/thingsboard/server/common/msg/cf/CalculatedFieldProfileEntityMsg.java rename to common/message/src/main/java/org/thingsboard/server/common/msg/cf/CalculatedFieldInitProfileEntityMsg.java index 72447af677..66cd2ac441 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/cf/CalculatedFieldProfileEntityMsg.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/cf/CalculatedFieldInitProfileEntityMsg.java @@ -22,7 +22,7 @@ import org.thingsboard.server.common.msg.MsgType; import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg; @Data -public class CalculatedFieldProfileEntityMsg implements ToCalculatedFieldSystemMsg { +public class CalculatedFieldInitProfileEntityMsg implements ToCalculatedFieldSystemMsg { private final TenantId tenantId; private final EntityId profileEntityId; @@ -30,7 +30,7 @@ public class CalculatedFieldProfileEntityMsg implements ToCalculatedFieldSystemM @Override public MsgType getMsgType() { - return MsgType.CF_PROFILE_ENTITY_MSG; + return MsgType.CF_INIT_PROFILE_ENTITY_MSG; } } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/event/PartitionChangeEvent.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/event/PartitionChangeEvent.java index 7c2d0b0240..32537edba5 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/event/PartitionChangeEvent.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/event/PartitionChangeEvent.java @@ -18,7 +18,6 @@ package org.thingsboard.server.queue.discovery.event; import lombok.Getter; import lombok.ToString; import org.thingsboard.server.common.data.DataConstants; -import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.queue.discovery.QueueKey; diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java index 87a9d6a697..0738f660df 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java @@ -517,6 +517,9 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi @Override public TbQueueConsumer> createToCalculatedFieldMsgConsumer(TopicPartitionInfo tpi) { String queueName = DataConstants.CF_QUEUE_NAME; + if (tpi == null) { + throw new IllegalArgumentException("TopicPartitionInfo is required."); + } TenantId tenantId = tpi.getTenantId().orElse(TenantId.SYS_TENANT_ID); Integer partitionId = tpi.getPartition().orElseThrow(() -> new IllegalArgumentException("PartitionId is required.")); String groupId = topicService.buildConsumerGroupId("cf-", tenantId, queueName, partitionId); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java index fdf2b2f3f7..b4884ae72c 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java @@ -318,6 +318,9 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { @Override public TbQueueConsumer> createToCalculatedFieldMsgConsumer(TopicPartitionInfo tpi) { String queueName = DataConstants.CF_QUEUE_NAME; + if (tpi == null) { + throw new IllegalArgumentException("TopicPartitionInfo is required."); + } TenantId tenantId = tpi.getTenantId().orElse(TenantId.SYS_TENANT_ID); Integer partitionId = tpi.getPartition().orElseThrow(() -> new IllegalArgumentException("PartitionId is required.")); String groupId = topicService.buildConsumerGroupId("cf-", tenantId, queueName, partitionId); diff --git a/dao/src/main/java/org/thingsboard/server/dao/cf/BaseCalculatedFieldService.java b/dao/src/main/java/org/thingsboard/server/dao/cf/BaseCalculatedFieldService.java index d3fea73849..0c5df18e80 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/cf/BaseCalculatedFieldService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/cf/BaseCalculatedFieldService.java @@ -182,12 +182,6 @@ public class BaseCalculatedFieldService extends AbstractEntityService implements return calculatedFieldLinkDao.findCalculatedFieldLinksByEntityId(tenantId, entityId); } - @Override - public List findAllCalculatedFieldLinksByTenantId(TenantId tenantId) { - log.trace("Executing findAllCalculatedFieldLinksByTenantId, tenantId [{}]", tenantId); - return calculatedFieldLinkDao.findCalculatedFieldLinksByTenantId(tenantId); - } - @Override public PageData findAllCalculatedFieldLinksByTenantId(TenantId tenantId, PageLink pageLink) { log.trace("Executing findAllCalculatedFieldLinksByTenantId, tenantId[{}] pageLink [{}]", tenantId, pageLink);