diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index 1ed919e922..b5fa1e6fbd 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -30,9 +30,9 @@ import org.springframework.context.annotation.Lazy; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.rule.engine.api.DeviceStateManager; import org.thingsboard.rule.engine.api.MailService; import org.thingsboard.rule.engine.api.NotificationCenter; -import org.thingsboard.rule.engine.api.DeviceStateManager; import org.thingsboard.rule.engine.api.SmsService; import org.thingsboard.rule.engine.api.notification.SlackService; import org.thingsboard.rule.engine.api.sms.SmsSenderFactory; @@ -110,7 +110,6 @@ import org.thingsboard.server.service.apiusage.TbApiUsageStateService; import org.thingsboard.server.service.cf.CalculatedFieldProcessingService; import org.thingsboard.server.service.cf.CalculatedFieldQueueService; import org.thingsboard.server.service.cf.CalculatedFieldStateService; -import org.thingsboard.server.service.cf.cache.CalculatedFieldEntityProfileCache; import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry; import org.thingsboard.server.service.component.ComponentDiscoveryService; import org.thingsboard.server.service.edge.rpc.EdgeRpcService; @@ -546,11 +545,6 @@ public class ActorSystemContext { @Getter private CalculatedFieldQueueService calculatedFieldQueueService; - @Lazy - @Autowired(required = false) - @Getter - private CalculatedFieldEntityProfileCache calculatedFieldEntityProfileCache; - @Value("${actors.session.max_concurrent_sessions_per_device:1}") @Getter private int maxConcurrentSessionsPerDevice; diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerActor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerActor.java index 70ed2849e8..511449c2a9 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerActor.java @@ -22,10 +22,12 @@ import org.thingsboard.server.actors.TbActorException; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.TbActorStopReason; import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg; +import org.thingsboard.server.common.msg.cf.CalculatedFieldActorInitMsg; import org.thingsboard.server.common.msg.cf.CalculatedFieldEntityLifecycleMsg; import org.thingsboard.server.common.msg.cf.CalculatedFieldInitMsg; import org.thingsboard.server.common.msg.cf.CalculatedFieldLinkInitMsg; import org.thingsboard.server.common.msg.cf.CalculatedFieldPartitionChangeMsg; +import org.thingsboard.server.common.msg.cf.ProfileEntityMsg; /** * Created by ashvayka on 15.03.18. @@ -65,6 +67,12 @@ public class CalculatedFieldManagerActor extends AbstractCalculatedFieldActor { case CF_PARTITIONS_CHANGE_MSG: processor.onPartitionChange((CalculatedFieldPartitionChangeMsg) msg); break; + case CF_ACTOR_INIT_MSG: + processor.onActorInitMsg((CalculatedFieldActorInitMsg) msg); + break; + case CF_PROFILE_ENTITY_MSG: + processor.onProfileEntityMsg((ProfileEntityMsg) msg); + break; case CF_INIT_MSG: processor.onFieldInitMsg((CalculatedFieldInitMsg) msg); break; diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java index 060893ec8d..9dad4e820e 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java @@ -15,7 +15,9 @@ */ package org.thingsboard.server.actors.calculatedField; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.actors.TbActorCtx; import org.thingsboard.server.actors.TbActorRef; @@ -24,6 +26,7 @@ import org.thingsboard.server.actors.service.DefaultActorService; import org.thingsboard.server.actors.shared.AbstractContextAwareMsgProcessor; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.ProfileEntityIdInfo; import org.thingsboard.server.common.data.cf.CalculatedField; import org.thingsboard.server.common.data.cf.CalculatedFieldLink; import org.thingsboard.server.common.data.id.AssetId; @@ -31,17 +34,22 @@ import org.thingsboard.server.common.data.id.CalculatedFieldId; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.page.PageDataIterable; +import org.thingsboard.server.common.msg.cf.CalculatedFieldActorInitMsg; import org.thingsboard.server.common.msg.cf.CalculatedFieldEntityLifecycleMsg; import org.thingsboard.server.common.msg.cf.CalculatedFieldInitMsg; import org.thingsboard.server.common.msg.cf.CalculatedFieldLinkInitMsg; import org.thingsboard.server.common.msg.cf.CalculatedFieldPartitionChangeMsg; +import org.thingsboard.server.common.msg.cf.ProfileEntityMsg; import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TbCallback; +import org.thingsboard.server.dao.asset.AssetService; import org.thingsboard.server.dao.cf.CalculatedFieldService; +import org.thingsboard.server.dao.device.DeviceService; import org.thingsboard.server.service.cf.CalculatedFieldProcessingService; import org.thingsboard.server.service.cf.CalculatedFieldStateService; -import org.thingsboard.server.service.cf.cache.CalculatedFieldEntityProfileCache; +import org.thingsboard.server.service.cf.cache.TenantEntityProfileCache; import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId; import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx; import org.thingsboard.server.service.profile.TbAssetProfileCache; @@ -68,22 +76,30 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware private final CalculatedFieldProcessingService cfExecService; private final CalculatedFieldStateService cfStateService; - private final CalculatedFieldEntityProfileCache cfEntityCache; private final CalculatedFieldService cfDaoService; + private final DeviceService deviceService; + private final AssetService assetService; private final TbAssetProfileCache assetProfileCache; private final TbDeviceProfileCache deviceProfileCache; + private final TenantEntityProfileCache entityProfileCache; protected final TenantId tenantId; protected TbActorCtx ctx; + @Value("${calculated_fields.init_fetch_pack_size:50000}") + @Getter + private int initFetchPackSize; + CalculatedFieldManagerMessageProcessor(ActorSystemContext systemContext, TenantId tenantId) { super(systemContext); - this.cfEntityCache = systemContext.getCalculatedFieldEntityProfileCache(); this.cfExecService = systemContext.getCalculatedFieldProcessingService(); this.cfStateService = systemContext.getCalculatedFieldStateService(); this.cfDaoService = systemContext.getCalculatedFieldService(); + this.deviceService = systemContext.getDeviceService(); + this.assetService = systemContext.getAssetService(); this.assetProfileCache = systemContext.getAssetProfileCache(); this.deviceProfileCache = systemContext.getDeviceProfileCache(); + this.entityProfileCache = new TenantEntityProfileCache(); this.tenantId = tenantId; } @@ -100,6 +116,19 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware ctx.stop(ctx.getSelf()); } + public void onActorInitMsg(CalculatedFieldActorInitMsg msg) { + log.debug("[{}] Processing CF actor init message.", msg.getTenantId().getId()); + initEntityProfileCache(); + initCalculatedFields(); + msg.getCallback().onSuccess(); + } + + public void onProfileEntityMsg(ProfileEntityMsg msg) { + log.debug("[{}] Processing profile entity message.", msg.getTenantId().getId()); + entityProfileCache.add(msg.getProfileEntityId(), msg.getEntityId()); + msg.getCallback().onSuccess(); + } + public void onFieldInitMsg(CalculatedFieldInitMsg msg) throws CalculatedFieldException { log.debug("[{}] Processing CF init message.", msg.getCf().getId()); var cf = msg.getCf(); @@ -180,16 +209,35 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware } break; } + case DEVICE_PROFILE: + case ASSET_PROFILE: { + switch (event) { + case DELETED: + onProfileDeleted(msg.getData(), msg.getCallback()); + break; + default: + msg.getCallback().onSuccess(); + break; + } + break; + } default: { msg.getCallback().onSuccess(); } } } + private void onProfileDeleted(ComponentLifecycleMsg msg, TbCallback callback) { + entityProfileCache.removeProfileId(msg.getEntityId()); + callback.onSuccess(); + } + private void onEntityCreated(ComponentLifecycleMsg msg, TbCallback callback) { EntityId entityId = msg.getEntityId(); EntityId profileId = getProfileId(tenantId, entityId); - cfEntityCache.add(tenantId, profileId, entityId); + if (profileId != null) { + entityProfileCache.add(profileId, entityId); + } if (!isMyPartition(entityId, callback)) { return; } @@ -207,7 +255,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware private void onEntityUpdated(ComponentLifecycleMsg msg, TbCallback callback) { if (msg.getOldProfileId() != null && !msg.getOldProfileId().equals(msg.getProfileId())) { - cfEntityCache.update(tenantId, msg.getOldProfileId(), msg.getProfileId(), msg.getEntityId()); + entityProfileCache.update(msg.getOldProfileId(), msg.getProfileId(), msg.getEntityId()); if (!isMyPartition(msg.getEntityId(), callback)) { return; } @@ -226,7 +274,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware } private void onEntityDeleted(ComponentLifecycleMsg msg, TbCallback callback) { - cfEntityCache.evict(tenantId, msg.getEntityId()); + entityProfileCache.removeEntityId(msg.getEntityId()); if (isMyPartition(msg.getEntityId(), callback)) { log.debug("Pushing entity lifecycle msg to specific actor [{}]", msg.getEntityId()); getOrCreateActor(msg.getEntityId()).tell(new CalculatedFieldEntityDeleteMsg(tenantId, msg.getEntityId(), callback)); @@ -322,7 +370,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware EntityId entityId = cfCtx.getEntityId(); EntityType entityType = cfCtx.getEntityId().getEntityType(); if (isProfileEntity(entityType)) { - var entityIds = cfEntityCache.getEntityIdsByProfileId(tenantId, entityId); + var entityIds = entityProfileCache.getEntityIdsByProfileId(entityId); if (!entityIds.isEmpty()) { //TODO: no need to do this if we cache all created actors and know which one belong to us; var multiCallback = new MultipleTbCallback(entityIds.size(), callback); @@ -383,7 +431,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware var cf = calculatedFields.get(link.cfId()); if (EntityType.DEVICE_PROFILE.equals(targetEntityType) || EntityType.ASSET_PROFILE.equals(targetEntityType)) { // iterate over all entities that belong to profile and push the message for corresponding CF - var entityIds = cfEntityCache.getEntityIdsByProfileId(tenantId, targetEntityId); + var entityIds = entityProfileCache.getEntityIdsByProfileId(targetEntityId); if (!entityIds.isEmpty()) { MultipleTbCallback multipleCallback = new MultipleTbCallback(entityIds.size(), callback); var newMsg = new EntityCalculatedFieldLinkedTelemetryMsg(tenantId, sourceEntityId, proto.getMsg(), cf, multipleCallback); @@ -445,7 +493,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware EntityId entityId = cfCtx.getEntityId(); EntityType entityType = cfCtx.getEntityId().getEntityType(); if (isProfileEntity(entityType)) { - var entityIds = cfEntityCache.getEntityIdsByProfileId(tenantId, entityId); + var entityIds = entityProfileCache.getEntityIdsByProfileId(entityId); if (!entityIds.isEmpty()) { var multiCallback = new MultipleTbCallback(entityIds.size(), callback); entityIds.forEach(id -> { @@ -513,21 +561,46 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware } public void onPartitionChange(CalculatedFieldPartitionChangeMsg msg) { - initCalculatedFields(); ctx.broadcastToChildren(msg, true); } public void initCalculatedFields() { - cfDaoService.findCalculatedFieldsByTenantId(tenantId).forEach(cf -> { + PageDataIterable cfs = new PageDataIterable<>(pageLink -> cfDaoService.findCalculatedFieldsByTenantId(tenantId, pageLink), initFetchPackSize); + cfs.forEach(cf -> { try { onFieldInitMsg(new CalculatedFieldInitMsg(cf.getTenantId(), cf)); } catch (CalculatedFieldException e) { throw new RuntimeException(e); } }); - cfDaoService.findAllCalculatedFieldLinksByTenantId(tenantId).forEach(cfLink -> { - onLinkInitMsg(new CalculatedFieldLinkInitMsg(cfLink.getTenantId(), cfLink)); + calculatedFields.values().forEach(cf -> { + entityIdCalculatedFields.computeIfAbsent(cf.getEntityId(), id -> new CopyOnWriteArrayList<>()).add(cf); }); + PageDataIterable cfls = new PageDataIterable<>(pageLink -> cfDaoService.findAllCalculatedFieldLinksByTenantId(tenantId, pageLink), initFetchPackSize); + cfls.forEach(link -> { + onLinkInitMsg(new CalculatedFieldLinkInitMsg(link.getTenantId(), link)); + }); + } + + private void initEntityProfileCache() { + PageDataIterable deviceIdInfos = new PageDataIterable<>(pageLink -> deviceService.findProfileEntityIdInfosByTenantId(tenantId, pageLink), initFetchPackSize); + for (ProfileEntityIdInfo idInfo : deviceIdInfos) { + log.trace("Processing device record: {}", idInfo); + try { + entityProfileCache.add(idInfo.getProfileId(), idInfo.getEntityId()); + } catch (Exception e) { + log.error("Failed to process device record: {}", idInfo, e); + } + } + PageDataIterable assetIdInfos = new PageDataIterable<>(pageLink -> assetService.findProfileEntityIdInfosByTenantId(tenantId, pageLink), initFetchPackSize); + for (ProfileEntityIdInfo idInfo : assetIdInfos) { + log.trace("Processing asset record: {}", idInfo); + try { + entityProfileCache.add(idInfo.getProfileId(), idInfo.getEntityId()); + } catch (Exception e) { + log.error("Failed to process asset record: {}", idInfo, e); + } + } } } diff --git a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java index b42d91a4de..4e04962dcb 100644 --- a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java @@ -50,6 +50,7 @@ import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg; import org.thingsboard.server.common.msg.aware.DeviceAwareMsg; import org.thingsboard.server.common.msg.aware.RuleChainAwareMsg; +import org.thingsboard.server.common.msg.cf.CalculatedFieldActorInitMsg; import org.thingsboard.server.common.msg.cf.CalculatedFieldEntityLifecycleMsg; import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; import org.thingsboard.server.common.msg.queue.PartitionChangeMsg; @@ -176,6 +177,8 @@ public class TenantActor extends RuleChainManagerActor { case RULE_CHAIN_TO_RULE_CHAIN_MSG: onRuleChainMsg((RuleChainAwareMsg) msg); break; + case CF_ACTOR_INIT_MSG: + case CF_PROFILE_ENTITY_MSG: case CF_INIT_MSG: case CF_LINK_INIT_MSG: case CF_STATE_RESTORE_MSG: @@ -274,7 +277,7 @@ public class TenantActor extends RuleChainManagerActor { () -> DefaultActorService.CF_MANAGER_DISPATCHER_NAME, () -> new CalculatedFieldManagerActorCreator(systemContext, tenantId), () -> true); - cfActor.tellWithHighPriority(msg); + cfActor.tellWithHighPriority(new CalculatedFieldActorInitMsg(tenantId)); } catch (Exception e) { log.info("[{}] Failed to init CF Actor.", tenantId, e); } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldInitService.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldInitService.java index 5f02be0cde..be27b5318b 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldInitService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldInitService.java @@ -20,14 +20,14 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; +import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.common.data.ProfileEntityIdInfo; import org.thingsboard.server.common.data.page.PageDataIterable; +import org.thingsboard.server.common.msg.cf.ProfileEntityMsg; import org.thingsboard.server.dao.asset.AssetService; import org.thingsboard.server.dao.device.DeviceService; -import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.util.AfterStartUp; import org.thingsboard.server.queue.util.TbRuleEngineComponent; -import org.thingsboard.server.service.cf.cache.CalculatedFieldEntityProfileCache; @Slf4j @Service @@ -35,10 +35,9 @@ import org.thingsboard.server.service.cf.cache.CalculatedFieldEntityProfileCache @RequiredArgsConstructor public class DefaultCalculatedFieldInitService implements CalculatedFieldInitService { - private final CalculatedFieldEntityProfileCache entityProfileCache; private final AssetService assetService; private final DeviceService deviceService; - private final PartitionService partitionService; + private final ActorSystemContext actorSystemContext; @Value("${calculated_fields.init_fetch_pack_size:50000}") @Getter @@ -50,9 +49,7 @@ public class DefaultCalculatedFieldInitService implements CalculatedFieldInitSer for (ProfileEntityIdInfo idInfo : deviceIdInfos) { log.trace("Processing device record: {}", idInfo); try { - if (partitionService.isManagedByCurrentService(idInfo.getTenantId())) { - entityProfileCache.add(idInfo.getTenantId(), idInfo.getProfileId(), idInfo.getEntityId()); - } + actorSystemContext.tell(new ProfileEntityMsg(idInfo.getTenantId(), idInfo.getProfileId(), idInfo.getEntityId())); } catch (Exception e) { log.error("Failed to process device record: {}", idInfo, e); } @@ -61,9 +58,7 @@ public class DefaultCalculatedFieldInitService implements CalculatedFieldInitSer for (ProfileEntityIdInfo idInfo : assetIdInfos) { log.trace("Processing asset record: {}", idInfo); try { - if (partitionService.isManagedByCurrentService(idInfo.getTenantId())) { - entityProfileCache.add(idInfo.getTenantId(), idInfo.getProfileId(), idInfo.getEntityId()); - } + actorSystemContext.tell(new ProfileEntityMsg(idInfo.getTenantId(), idInfo.getProfileId(), idInfo.getEntityId())); } catch (Exception e) { log.error("Failed to process asset record: {}", idInfo, e); } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/cache/CalculatedFieldEntityProfileCache.java b/application/src/main/java/org/thingsboard/server/service/cf/cache/CalculatedFieldEntityProfileCache.java deleted file mode 100644 index df2dc88f6b..0000000000 --- a/application/src/main/java/org/thingsboard/server/service/cf/cache/CalculatedFieldEntityProfileCache.java +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Copyright © 2016-2025 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.service.cf.cache; - -import org.springframework.context.ApplicationListener; -import org.thingsboard.server.common.data.id.EntityId; -import org.thingsboard.server.common.data.id.TenantId; -import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent; - -import java.util.Collection; - -public interface CalculatedFieldEntityProfileCache extends ApplicationListener { - - void add(TenantId tenantId, EntityId profileId, EntityId entityId); - - void update(TenantId tenantId, EntityId oldProfileId, EntityId newProfileId, EntityId entityId); - - void evict(TenantId tenantId, EntityId entityId); - - void evictProfile(TenantId tenantId, EntityId profileId); - - void removeTenant(TenantId tenantId); - - Collection getEntityIdsByProfileId(TenantId tenantId, EntityId profileId); - - int getEntityIdPartition(TenantId tenantId, EntityId entityId); -} diff --git a/application/src/main/java/org/thingsboard/server/service/cf/cache/DefaultCalculatedFieldEntityProfileCache.java b/application/src/main/java/org/thingsboard/server/service/cf/cache/DefaultCalculatedFieldEntityProfileCache.java deleted file mode 100644 index cefdd012d8..0000000000 --- a/application/src/main/java/org/thingsboard/server/service/cf/cache/DefaultCalculatedFieldEntityProfileCache.java +++ /dev/null @@ -1,132 +0,0 @@ -/** - * Copyright © 2016-2025 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.service.cf.cache; - -import lombok.Getter; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.stereotype.Service; -import org.thingsboard.server.common.data.DataConstants; -import org.thingsboard.server.common.data.Device; -import org.thingsboard.server.common.data.asset.Asset; -import org.thingsboard.server.common.data.id.EntityId; -import org.thingsboard.server.common.data.id.TenantId; -import org.thingsboard.server.common.data.page.PageDataIterable; -import org.thingsboard.server.common.msg.queue.ServiceType; -import org.thingsboard.server.dao.asset.AssetService; -import org.thingsboard.server.dao.device.DeviceService; -import org.thingsboard.server.queue.discovery.PartitionService; -import org.thingsboard.server.queue.discovery.TbApplicationEventListener; -import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent; -import org.thingsboard.server.queue.util.TbRuleEngineComponent; - -import java.util.Collection; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -@TbRuleEngineComponent -@Service -@Slf4j -@RequiredArgsConstructor -//TODO ashvayka: remove and use TenantEntityProfileCache in each CalculatedFieldManagerMessageProcessor; -public class DefaultCalculatedFieldEntityProfileCache extends TbApplicationEventListener implements CalculatedFieldEntityProfileCache { - - private static final Integer UNKNOWN = 0; - private final ConcurrentMap tenantCache = new ConcurrentHashMap<>(); - private final PartitionService partitionService; - private final AssetService assetService; - private final DeviceService deviceService; - - @Value("${calculated_fields.init_fetch_pack_size:50000}") - @Getter - private int initFetchPackSize; - - @Override - protected void onTbApplicationEvent(PartitionChangeEvent event) { - event.getCfPartitions().forEach(tpi -> tpi.getTenantId().ifPresent(this::initCacheForNewTenant)); - } - - @Override - public void add(TenantId tenantId, EntityId profileId, EntityId entityId) { - tenantCache.computeIfAbsent(tenantId, id -> new TenantEntityProfileCache()).add(profileId, entityId); - } - - @Override - public void update(TenantId tenantId, EntityId oldProfileId, EntityId newProfileId, EntityId entityId) { - tenantCache.compute(tenantId, (id, cache) -> { - if (cache == null) { - cache = new TenantEntityProfileCache(); - } - cache.remove(oldProfileId, entityId); - cache.add(newProfileId, entityId); - return cache; - }); - } - - @Override - public void evict(TenantId tenantId, EntityId entityId) { - var cache = tenantCache.computeIfAbsent(tenantId, id -> new TenantEntityProfileCache()); - cache.removeEntityId(entityId); - } - - @Override - public void evictProfile(TenantId tenantId, EntityId profileId) { - tenantCache.computeIfAbsent(tenantId, id -> new TenantEntityProfileCache()).removeProfileId(profileId); - } - - @Override - public void removeTenant(TenantId tenantId) { - tenantCache.remove(tenantId); - } - - @Override - public Collection getEntityIdsByProfileId(TenantId tenantId, EntityId profileId) { - return tenantCache.computeIfAbsent(tenantId, id -> new TenantEntityProfileCache()).getEntityIdsByProfileId(profileId); - } - - @Override - public int getEntityIdPartition(TenantId tenantId, EntityId entityId) { - var tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME, tenantId, entityId); - return tpi.getPartition().orElse(UNKNOWN); - } - - private void initCacheForNewTenant(TenantId tenantId) { - PageDataIterable devices = new PageDataIterable<>(pageLink -> deviceService.findDevicesByTenantId(tenantId, pageLink), initFetchPackSize); - for (Device device : devices) { - log.trace("Processing device record: {}", device); - try { - if (partitionService.isManagedByCurrentService(device.getTenantId())) { - add(device.getTenantId(), device.getDeviceProfileId(), device.getId()); - } - } catch (Exception e) { - log.error("Failed to process device record: {}", device, e); - } - } - PageDataIterable assets = new PageDataIterable<>(pageLink -> assetService.findAssetsByTenantId(tenantId, pageLink), initFetchPackSize); - for (Asset asset : assets) { - log.trace("Processing asset record: {}", asset); - try { - if (partitionService.isManagedByCurrentService(asset.getTenantId())) { - add(asset.getTenantId(), asset.getAssetProfileId(), asset.getId()); - } - } catch (Exception e) { - log.error("Failed to process asset record: {}", asset, e); - } - } - } - -} diff --git a/application/src/main/java/org/thingsboard/server/service/cf/cache/TenantEntityProfileCache.java b/application/src/main/java/org/thingsboard/server/service/cf/cache/TenantEntityProfileCache.java index 27c2bdd6d5..6435ffb14c 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/cache/TenantEntityProfileCache.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/cache/TenantEntityProfileCache.java @@ -76,6 +76,11 @@ public class TenantEntityProfileCache { } } + public void update(EntityId oldProfileId, EntityId newProfileId, EntityId entityId) { + remove(oldProfileId, entityId); + add(newProfileId, entityId); + } + public Collection getEntityIdsByProfileId(EntityId profileId) { lock.readLock().lock(); try { diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java index f108949efa..ab03a56225 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java @@ -52,7 +52,6 @@ import org.thingsboard.server.queue.util.TbRuleEngineComponent; import org.thingsboard.server.service.apiusage.TbApiUsageStateService; import org.thingsboard.server.service.cf.CalculatedFieldCache; import org.thingsboard.server.service.cf.CalculatedFieldStateService; -import org.thingsboard.server.service.cf.cache.CalculatedFieldEntityProfileCache; import org.thingsboard.server.service.profile.TbAssetProfileCache; import org.thingsboard.server.service.profile.TbDeviceProfileCache; import org.thingsboard.server.service.queue.processing.AbstractPartitionBasedConsumerService; @@ -81,7 +80,6 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractPartitionBa private final TbRuleEngineQueueFactory queueFactory; private final CalculatedFieldStateService stateService; - private final CalculatedFieldEntityProfileCache entityProfileCache; public DefaultTbCalculatedFieldConsumerService(TbRuleEngineQueueFactory tbQueueFactory, ActorSystemContext actorContext, @@ -93,13 +91,11 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractPartitionBa ApplicationEventPublisher eventPublisher, JwtSettingsService jwtSettingsService, CalculatedFieldCache calculatedFieldCache, - CalculatedFieldStateService stateService, - CalculatedFieldEntityProfileCache entityProfileCache) { + CalculatedFieldStateService stateService) { super(actorContext, tenantProfileCache, deviceProfileCache, assetProfileCache, calculatedFieldCache, apiUsageStateService, partitionService, eventPublisher, jwtSettingsService); this.queueFactory = tbQueueFactory; this.stateService = stateService; - this.entityProfileCache = entityProfileCache; } @Override @@ -230,8 +226,6 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractPartitionBa public void handleComponentLifecycleEvent(ComponentLifecycleMsg event) { if (event.getEntityId().getEntityType() == EntityType.TENANT) { if (event.getEvent() == ComponentLifecycleEvent.DELETED) { - entityProfileCache.removeTenant(event.getTenantId()); - Set partitions = stateService.getPartitions(); if (CollectionUtils.isEmpty(partitions)) { return; @@ -240,14 +234,6 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractPartitionBa .filter(tpi -> tpi.getTenantId().isPresent() && tpi.getTenantId().get().equals(event.getTenantId())) .collect(Collectors.toSet())); } - } else if (event.getEntityId().getEntityType() == EntityType.ASSET_PROFILE) { - if (event.getEvent() == ComponentLifecycleEvent.DELETED) { - entityProfileCache.evictProfile(event.getTenantId(), event.getEntityId()); - } - } else if (event.getEntityId().getEntityType() == EntityType.DEVICE_PROFILE) { - if (event.getEvent() == ComponentLifecycleEvent.DELETED) { - entityProfileCache.evictProfile(event.getTenantId(), event.getEntityId()); - } } } diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/asset/AssetService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/asset/AssetService.java index 09bc8f1f93..c22c9c4140 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/asset/AssetService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/asset/AssetService.java @@ -66,6 +66,8 @@ public interface AssetService extends EntityDaoService { PageData findProfileEntityIdInfos(PageLink pageLink); + PageData findProfileEntityIdInfosByTenantId(TenantId tenantId, PageLink pageLink); + PageData findAssetIdsByTenantIdAndAssetProfileId(TenantId tenantId, AssetProfileId assetProfileId, PageLink pageLink); ListenableFuture> findAssetsByTenantIdAndIdsAsync(TenantId tenantId, List assetIds); diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/cf/CalculatedFieldService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/cf/CalculatedFieldService.java index 8507ebbd42..b6f43cda67 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/cf/CalculatedFieldService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/cf/CalculatedFieldService.java @@ -39,7 +39,7 @@ public interface CalculatedFieldService extends EntityDaoService { PageData findAllCalculatedFields(PageLink pageLink); - List findCalculatedFieldsByTenantId(TenantId tenantId); + PageData findCalculatedFieldsByTenantId(TenantId tenantId, PageLink pageLink); PageData findAllCalculatedFieldsByEntityId(TenantId tenantId, EntityId entityId, PageLink pageLink); @@ -57,6 +57,8 @@ public interface CalculatedFieldService extends EntityDaoService { List findAllCalculatedFieldLinksByTenantId(TenantId tenantId); + PageData findAllCalculatedFieldLinksByTenantId(TenantId tenantId, PageLink pageLink); + PageData findAllCalculatedFieldLinks(PageLink pageLink); boolean referencedInAnyCalculatedField(TenantId tenantId, EntityId referencedEntityId); diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/device/DeviceService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/device/DeviceService.java index 4ef653855d..9eb258f182 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/device/DeviceService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/device/DeviceService.java @@ -76,6 +76,8 @@ public interface DeviceService extends EntityDaoService { PageData findProfileEntityIdInfos(PageLink pageLink); + PageData findProfileEntityIdInfosByTenantId(TenantId tenantId, PageLink pageLink); + PageData findDevicesByTenantIdAndType(TenantId tenantId, String type, PageLink pageLink); PageData findDeviceIdsByTenantIdAndDeviceProfileId(TenantId tenantId, DeviceProfileId deviceProfileId, PageLink pageLink); diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java index 178caf7961..bfbd5fb0d6 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java @@ -136,6 +136,8 @@ public enum MsgType { EDGE_SYNC_RESPONSE_FROM_EDGE_SESSION_MSG, + CF_ACTOR_INIT_MSG, // Sent to init caches for CF actor; + CF_PROFILE_ENTITY_MSG, // Sent to init profile entities cache; CF_INIT_MSG, // Sent to init particular calculated field; CF_LINK_INIT_MSG, // Sent to init particular calculated field; CF_STATE_RESTORE_MSG, // Sent to restore particular calculated field entity state; diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/cf/CalculatedFieldActorInitMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/cf/CalculatedFieldActorInitMsg.java new file mode 100644 index 0000000000..851c7709bb --- /dev/null +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/cf/CalculatedFieldActorInitMsg.java @@ -0,0 +1,33 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.msg.cf; + +import lombok.Data; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.msg.MsgType; +import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg; + +@Data +public class CalculatedFieldActorInitMsg implements ToCalculatedFieldSystemMsg { + + private final TenantId tenantId; + + @Override + public MsgType getMsgType() { + return MsgType.CF_ACTOR_INIT_MSG; + } + +} diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/cf/ProfileEntityMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/cf/ProfileEntityMsg.java new file mode 100644 index 0000000000..81196c150e --- /dev/null +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/cf/ProfileEntityMsg.java @@ -0,0 +1,36 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.msg.cf; + +import lombok.Data; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.msg.MsgType; +import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg; + +@Data +public class ProfileEntityMsg implements ToCalculatedFieldSystemMsg { + + private final TenantId tenantId; + private final EntityId profileEntityId; + private final EntityId entityId; + + @Override + public MsgType getMsgType() { + return MsgType.CF_PROFILE_ENTITY_MSG; + } + +} diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java index 1c46a624c3..87a9d6a697 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java @@ -579,7 +579,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi .readFromBeginning(true) .stopWhenRead(true) .clientId("monolith-calculated-field-state-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet()) - .groupId(topicService.buildTopicName("monolith-calculated-field-state-consumer")) + .groupId(null) // not using consumer group management .decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), msg.getData() != null ? CalculatedFieldStateProto.parseFrom(msg.getData()) : null, msg.getHeaders())) .admin(cfStateAdmin) .statsService(consumerStatsService) diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java index 112af9f646..fdf2b2f3f7 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java @@ -379,7 +379,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { .readFromBeginning(true) .stopWhenRead(true) .clientId("tb-rule-engine-calculated-field-state-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet()) - .groupId(topicService.buildTopicName("tb-rule-engine-calculated-field-state-consumer")) + .groupId(null) // not using consumer group management .decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), msg.getData() != null ? CalculatedFieldStateProto.parseFrom(msg.getData()) : null, msg.getHeaders())) .admin(cfStateAdmin) .statsService(consumerStatsService) diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/util/AfterStartUp.java b/common/queue/src/main/java/org/thingsboard/server/queue/util/AfterStartUp.java index 46c29b867b..6a3aaa4a7f 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/util/AfterStartUp.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/util/AfterStartUp.java @@ -36,10 +36,11 @@ public @interface AfterStartUp { int STARTUP_SERVICE = 8; int ACTOR_SYSTEM = 9; - int REGULAR_SERVICE = 10; int CF_READ_PROFILE_ENTITIES_SERVICE = 10; - int CF_READ_CF_SERVICE = 11; + int CF_READ_CF_SERVICE = 10; + + int REGULAR_SERVICE = 11; int BEFORE_TRANSPORT_SERVICE = Integer.MAX_VALUE - 1001; int TRANSPORT_SERVICE = Integer.MAX_VALUE - 1000; diff --git a/dao/src/main/java/org/thingsboard/server/dao/asset/AssetDao.java b/dao/src/main/java/org/thingsboard/server/dao/asset/AssetDao.java index 36700ff59f..098dc4e83d 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/asset/AssetDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/asset/AssetDao.java @@ -240,4 +240,6 @@ public interface AssetDao extends Dao, TenantEntityDao, Exportable PageData findProfileEntityIdInfos(PageLink pageLink); + PageData findProfileEntityIdInfosByTenantId(UUID tenantId, PageLink pageLink); + } diff --git a/dao/src/main/java/org/thingsboard/server/dao/asset/BaseAssetService.java b/dao/src/main/java/org/thingsboard/server/dao/asset/BaseAssetService.java index 21d5ea7f4e..fed201d403 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/asset/BaseAssetService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/asset/BaseAssetService.java @@ -293,6 +293,14 @@ public class BaseAssetService extends AbstractCachedEntityService findProfileEntityIdInfosByTenantId(TenantId tenantId, PageLink pageLink) { + log.trace("Executing findProfileEntityIdInfosByTenantId, tenantId[{}], pageLink [{}]", tenantId, pageLink); + validateId(tenantId, id -> INCORRECT_TENANT_ID + id); + validatePageLink(pageLink); + return assetDao.findProfileEntityIdInfosByTenantId(tenantId.getId(), pageLink); + } + @Override public PageData findAssetIdsByTenantIdAndAssetProfileId(TenantId tenantId, AssetProfileId assetProfileId, PageLink pageLink) { log.trace("Executing findAssetIdsByTenantIdAndAssetProfileId, tenantId [{}], assetProfileId [{}]", tenantId, assetProfileId); diff --git a/dao/src/main/java/org/thingsboard/server/dao/cf/BaseCalculatedFieldService.java b/dao/src/main/java/org/thingsboard/server/dao/cf/BaseCalculatedFieldService.java index 464041869e..d3fea73849 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/cf/BaseCalculatedFieldService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/cf/BaseCalculatedFieldService.java @@ -105,10 +105,11 @@ public class BaseCalculatedFieldService extends AbstractEntityService implements } @Override - public List findCalculatedFieldsByTenantId(TenantId tenantId) { - log.trace("Executing findAllByTenantId, tenantId [{}]", tenantId); + public PageData findCalculatedFieldsByTenantId(TenantId tenantId, PageLink pageLink) { + log.trace("Executing findAllByTenantId, tenantId [{}], pageLink [{}]", tenantId, pageLink); validateId(tenantId, id -> INCORRECT_TENANT_ID + id); - return calculatedFieldDao.findAllByTenantId(tenantId); + validatePageLink(pageLink); + return calculatedFieldDao.findAllByTenantId(tenantId, pageLink); } @Override @@ -187,6 +188,14 @@ public class BaseCalculatedFieldService extends AbstractEntityService implements return calculatedFieldLinkDao.findCalculatedFieldLinksByTenantId(tenantId); } + @Override + public PageData findAllCalculatedFieldLinksByTenantId(TenantId tenantId, PageLink pageLink) { + log.trace("Executing findAllCalculatedFieldLinksByTenantId, tenantId[{}] pageLink [{}]", tenantId, pageLink); + validateId(tenantId, id -> INCORRECT_TENANT_ID + id); + validatePageLink(pageLink); + return calculatedFieldLinkDao.findAllByTenantId(tenantId, pageLink); + } + @Override public PageData findAllCalculatedFieldLinks(PageLink pageLink) { log.trace("Executing findAllCalculatedFieldLinks, pageLink [{}]", pageLink); diff --git a/dao/src/main/java/org/thingsboard/server/dao/cf/CalculatedFieldDao.java b/dao/src/main/java/org/thingsboard/server/dao/cf/CalculatedFieldDao.java index a966977968..aadae93893 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/cf/CalculatedFieldDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/cf/CalculatedFieldDao.java @@ -37,6 +37,8 @@ public interface CalculatedFieldDao extends Dao { PageData findAll(PageLink pageLink); + PageData findAllByTenantId(TenantId tenantId, PageLink pageLink); + PageData findAllByEntityId(TenantId tenantId, EntityId entityId, PageLink pageLink); List removeAllByEntityId(TenantId tenantId, EntityId entityId); diff --git a/dao/src/main/java/org/thingsboard/server/dao/cf/CalculatedFieldLinkDao.java b/dao/src/main/java/org/thingsboard/server/dao/cf/CalculatedFieldLinkDao.java index cf168a9b3d..dd184289ed 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/cf/CalculatedFieldLinkDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/cf/CalculatedFieldLinkDao.java @@ -37,4 +37,6 @@ public interface CalculatedFieldLinkDao extends Dao { PageData findAll(PageLink pageLink); + PageData findAllByTenantId(TenantId tenantId, PageLink pageLink); + } diff --git a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceDao.java b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceDao.java index efc57119eb..6bc8903d20 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceDao.java @@ -23,9 +23,7 @@ import org.thingsboard.server.common.data.DeviceInfoFilter; import org.thingsboard.server.common.data.DeviceTransportType; import org.thingsboard.server.common.data.EntitySubtype; import org.thingsboard.server.common.data.ProfileEntityIdInfo; -import org.thingsboard.server.common.data.id.AssetId; import org.thingsboard.server.common.data.id.DeviceId; -import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.ota.OtaPackageType; import org.thingsboard.server.common.data.page.PageData; @@ -233,6 +231,8 @@ public interface DeviceDao extends Dao, TenantEntityDao, Exporta PageData findProfileEntityIdInfos(PageLink pageLink); + PageData findProfileEntityIdInfosByTenantId(UUID tenantId, PageLink pageLink); + PageData findDeviceInfosByFilter(DeviceInfoFilter filter, PageLink pageLink); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java index 2c890082a0..6d993f3e3d 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java +++ b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java @@ -394,6 +394,14 @@ public class DeviceServiceImpl extends CachedVersionedEntityService findProfileEntityIdInfosByTenantId(TenantId tenantId, PageLink pageLink) { + log.trace("Executing findProfileEntityIdInfosByTenantId, tenantId[{}], pageLink [{}]", tenantId, pageLink); + validateId(tenantId, id -> INCORRECT_TENANT_ID + id); + validatePageLink(pageLink); + return deviceDao.findProfileEntityIdInfosByTenantId(tenantId.getId(), pageLink); + } + @Override public PageData findDevicesByTenantIdAndType(TenantId tenantId, String type, PageLink pageLink) { log.trace("Executing findDevicesByTenantIdAndType, tenantId [{}], type [{}], pageLink [{}]", tenantId, type, pageLink); diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/asset/JpaAssetDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/asset/JpaAssetDao.java index c61d894de5..4e99fb57e4 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/asset/JpaAssetDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/asset/JpaAssetDao.java @@ -39,14 +39,12 @@ import org.thingsboard.server.dao.model.sql.AssetEntity; import org.thingsboard.server.dao.model.sql.AssetInfoEntity; import org.thingsboard.server.dao.sql.JpaAbstractDao; import org.thingsboard.server.dao.sql.device.NativeAssetRepository; -import org.thingsboard.server.dao.sql.device.NativeDeviceRepository; import org.thingsboard.server.dao.util.SqlDao; import java.util.Arrays; import java.util.List; import java.util.Optional; import java.util.UUID; -import java.util.stream.Collectors; import static org.thingsboard.server.dao.DaoUtil.convertTenantEntityInfosToDto; @@ -262,10 +260,16 @@ public class JpaAssetDao extends JpaAbstractDao implements A @Override public PageData findProfileEntityIdInfos(PageLink pageLink) { - log.debug("Find profile device id infos by pageLink [{}]", pageLink); + log.debug("Find profile asset id infos by pageLink [{}]", pageLink); return nativeAssetRepository.findProfileEntityIdInfos(DaoUtil.toPageable(pageLink)); } + @Override + public PageData findProfileEntityIdInfosByTenantId(UUID tenantId, PageLink pageLink) { + log.debug("Find profile asset id infos by pageLink [{}]", pageLink); + return nativeAssetRepository.findProfileEntityIdInfosByTenantId(tenantId, DaoUtil.toPageable(pageLink)); + } + @Override public Long countByTenantId(TenantId tenantId) { return assetRepository.countByTenantId(tenantId.getId()); diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/cf/CalculatedFieldLinkRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/cf/CalculatedFieldLinkRepository.java index aeb8e1b04c..6f6a0775f3 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/cf/CalculatedFieldLinkRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/cf/CalculatedFieldLinkRepository.java @@ -15,6 +15,8 @@ */ package org.thingsboard.server.dao.sql.cf; +import org.springframework.data.domain.Page; +import org.springframework.data.domain.Pageable; import org.springframework.data.jpa.repository.JpaRepository; import org.thingsboard.server.dao.model.sql.CalculatedFieldLinkEntity; @@ -29,4 +31,6 @@ public interface CalculatedFieldLinkRepository extends JpaRepository findAllByTenantId(UUID tenantId); + Page findAllByTenantId(UUID tenantId, Pageable pageable); + } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/cf/CalculatedFieldRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/cf/CalculatedFieldRepository.java index 0f48f3b00d..be122816ba 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/cf/CalculatedFieldRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/cf/CalculatedFieldRepository.java @@ -32,6 +32,8 @@ public interface CalculatedFieldRepository extends JpaRepository findAllByTenantIdAndEntityId(UUID tenantId, UUID entityId); + Page findAllByTenantId(UUID tenantId, Pageable pageable); + Page findAllByTenantIdAndEntityId(UUID tenantId, UUID entityId, Pageable pageable); List findAllByTenantId(UUID tenantId); diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/cf/JpaCalculatedFieldDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/cf/JpaCalculatedFieldDao.java index 8922eaca4e..4bb52c29db 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/cf/JpaCalculatedFieldDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/cf/JpaCalculatedFieldDao.java @@ -71,6 +71,12 @@ public class JpaCalculatedFieldDao extends JpaAbstractDao findAllByTenantId(TenantId tenantId, PageLink pageLink) { + log.debug("Try to find calculated fields by tenantId[{}] and pageLink [{}]", tenantId, pageLink); + return DaoUtil.toPageData(calculatedFieldRepository.findAllByTenantId(tenantId.getId(), DaoUtil.toPageable(pageLink))); + } + @Override public PageData findAllByEntityId(TenantId tenantId, EntityId entityId, PageLink pageLink) { log.debug("Try to find calculated fields by entityId[{}] and pageLink [{}]", entityId, pageLink); diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/cf/JpaCalculatedFieldLinkDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/cf/JpaCalculatedFieldLinkDao.java index 39b6e6b890..38871f2fb8 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/cf/JpaCalculatedFieldLinkDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/cf/JpaCalculatedFieldLinkDao.java @@ -70,6 +70,12 @@ public class JpaCalculatedFieldLinkDao extends JpaAbstractDao findAllByTenantId(TenantId tenantId, PageLink pageLink) { + log.debug("Try to find calculated field links by tenantId [{}], pageLink [{}]", tenantId, pageLink); + return DaoUtil.toPageData(calculatedFieldLinkRepository.findAllByTenantId(tenantId.getId(), DaoUtil.toPageable(pageLink))); + } + @Override protected Class getEntityClass() { return CalculatedFieldLinkEntity.class; diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/device/DefaultNativeAssetRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/device/DefaultNativeAssetRepository.java index 43d66e2ff0..6c2a8dc506 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/device/DefaultNativeAssetRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/device/DefaultNativeAssetRepository.java @@ -20,12 +20,9 @@ import org.springframework.data.domain.Pageable; import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; import org.springframework.stereotype.Repository; import org.springframework.transaction.support.TransactionTemplate; -import org.thingsboard.server.common.data.DeviceIdInfo; import org.thingsboard.server.common.data.ProfileEntityIdInfo; import org.thingsboard.server.common.data.id.AssetId; import org.thingsboard.server.common.data.id.AssetProfileId; -import org.thingsboard.server.common.data.id.DeviceId; -import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.PageData; @@ -43,8 +40,8 @@ public class DefaultNativeAssetRepository extends AbstractNativeRepository imple @Override public PageData findProfileEntityIdInfos(Pageable pageable) { - String PROFILE_DEVICE_ID_INFO_QUERY = "SELECT tenant_id as tenantId, asset_profile_id as profileId, id as id FROM asset ORDER BY created_time ASC LIMIT %s OFFSET %s"; - return find(COUNT_QUERY, PROFILE_DEVICE_ID_INFO_QUERY, pageable, row -> { + String PROFILE_ASSET_ID_INFO_QUERY = "SELECT tenant_id as tenantId, asset_profile_id as profileId, id as id FROM asset ORDER BY created_time ASC LIMIT %s OFFSET %s"; + return find(COUNT_QUERY, PROFILE_ASSET_ID_INFO_QUERY, pageable, row -> { AssetId id = new AssetId((UUID) row.get("id")); AssetProfileId profileId = new AssetProfileId((UUID) row.get("profileId")); var tenantIdObj = row.get("tenantId"); @@ -52,4 +49,14 @@ public class DefaultNativeAssetRepository extends AbstractNativeRepository imple }); } + @Override + public PageData findProfileEntityIdInfosByTenantId(UUID tenantId, Pageable pageable) { + String PROFILE_ASSET_ID_INFO_QUERY = String.format("SELECT tenant_id as tenantId, asset_profile_id as profileId, id as id FROM asset WHERE tenant_id = %s ORDER BY created_time ASC LIMIT %%s OFFSET %%s", tenantId); + return find(COUNT_QUERY, PROFILE_ASSET_ID_INFO_QUERY, pageable, row -> { + AssetId id = new AssetId((UUID) row.get("id")); + AssetProfileId profileId = new AssetProfileId((UUID) row.get("profileId")); + var tenantIdObj = row.get("tenantId"); + return ProfileEntityIdInfo.create(tenantIdObj != null ? (UUID) tenantIdObj : TenantId.SYS_TENANT_ID.getId(), profileId, id); + }); + } } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/device/DefaultNativeDeviceRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/device/DefaultNativeDeviceRepository.java index 776dedc2d5..1648bb2255 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/device/DefaultNativeDeviceRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/device/DefaultNativeDeviceRepository.java @@ -61,4 +61,15 @@ public class DefaultNativeDeviceRepository extends AbstractNativeRepository impl }); } + @Override + public PageData findProfileEntityIdInfosByTenantId(UUID tenantId, Pageable pageable) { + String PROFILE_DEVICE_ID_INFO_QUERY = String.format("SELECT tenant_id as tenantId, device_profile_id as profileId, id as id FROM device WHERE tenant_id = %s ORDER BY created_time ASC LIMIT %%s OFFSET %%s", tenantId); + return find(COUNT_QUERY, PROFILE_DEVICE_ID_INFO_QUERY, pageable, row -> { + DeviceId id = new DeviceId((UUID) row.get("id")); + DeviceProfileId profileId = new DeviceProfileId((UUID) row.get("profileId")); + var tenantIdObj = row.get("tenantId"); + return ProfileEntityIdInfo.create(tenantIdObj != null ? (UUID) tenantIdObj : TenantId.SYS_TENANT_ID.getId(), profileId, id); + }); + } + } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/device/JpaDeviceDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/device/JpaDeviceDao.java index 34835f52f1..9c79637e39 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/device/JpaDeviceDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/device/JpaDeviceDao.java @@ -179,11 +179,11 @@ public class JpaDeviceDao extends JpaAbstractDao implement @Override public PageData findDeviceIdsByTenantIdAndDeviceProfileId(UUID tenantId, UUID deviceProfileId, PageLink pageLink) { return DaoUtil.pageToPageData( - deviceRepository.findIdsByTenantIdAndDeviceProfileId( - tenantId, - deviceProfileId, - pageLink.getTextSearch(), - DaoUtil.toPageable(pageLink))) + deviceRepository.findIdsByTenantIdAndDeviceProfileId( + tenantId, + deviceProfileId, + pageLink.getTextSearch(), + DaoUtil.toPageable(pageLink))) .mapData(DeviceId::new); } @@ -281,6 +281,12 @@ public class JpaDeviceDao extends JpaAbstractDao implement return nativeDeviceRepository.findProfileEntityIdInfos(DaoUtil.toPageable(pageLink)); } + @Override + public PageData findProfileEntityIdInfosByTenantId(UUID tenantId, PageLink pageLink) { + log.debug("Find profile device id infos by tenantId[{}], pageLink [{}]", tenantId, pageLink); + return nativeDeviceRepository.findProfileEntityIdInfosByTenantId(tenantId, DaoUtil.toPageable(pageLink)); + } + @Override public Device findByTenantIdAndExternalId(UUID tenantId, UUID externalId) { return DaoUtil.getData(deviceRepository.findByTenantIdAndExternalId(tenantId, externalId)); diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/device/NativeProfileEntityRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/device/NativeProfileEntityRepository.java index 750f0c8787..3568f01851 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/device/NativeProfileEntityRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/device/NativeProfileEntityRepository.java @@ -19,8 +19,12 @@ import org.springframework.data.domain.Pageable; import org.thingsboard.server.common.data.ProfileEntityIdInfo; import org.thingsboard.server.common.data.page.PageData; +import java.util.UUID; + public interface NativeProfileEntityRepository { PageData findProfileEntityIdInfos(Pageable pageable); + PageData findProfileEntityIdInfosByTenantId(UUID tenantId, Pageable pageable); + }