|
Before Width: | Height: | Size: 15 KiB After Width: | Height: | Size: 15 KiB |
|
Before Width: | Height: | Size: 14 KiB After Width: | Height: | Size: 14 KiB |
|
Before Width: | Height: | Size: 85 KiB After Width: | Height: | Size: 85 KiB |
|
Before Width: | Height: | Size: 17 KiB After Width: | Height: | Size: 17 KiB |
|
Before Width: | Height: | Size: 30 KiB After Width: | Height: | Size: 30 KiB |
|
Before Width: | Height: | Size: 30 KiB After Width: | Height: | Size: 30 KiB |
|
Before Width: | Height: | Size: 13 KiB After Width: | Height: | Size: 13 KiB |
|
Before Width: | Height: | Size: 12 KiB After Width: | Height: | Size: 12 KiB |
|
Before Width: | Height: | Size: 30 KiB After Width: | Height: | Size: 30 KiB |
|
Before Width: | Height: | Size: 13 KiB After Width: | Height: | Size: 13 KiB |
|
Before Width: | Height: | Size: 13 KiB After Width: | Height: | Size: 13 KiB |
|
Before Width: | Height: | Size: 26 KiB After Width: | Height: | Size: 26 KiB |
|
Before Width: | Height: | Size: 49 KiB After Width: | Height: | Size: 49 KiB |
|
Before Width: | Height: | Size: 14 KiB After Width: | Height: | Size: 14 KiB |
|
Before Width: | Height: | Size: 14 KiB After Width: | Height: | Size: 14 KiB |
|
Before Width: | Height: | Size: 13 KiB After Width: | Height: | Size: 13 KiB |
|
Before Width: | Height: | Size: 68 KiB After Width: | Height: | Size: 68 KiB |
|
Before Width: | Height: | Size: 118 KiB After Width: | Height: | Size: 118 KiB |
|
Before Width: | Height: | Size: 119 KiB After Width: | Height: | Size: 119 KiB |
|
Before Width: | Height: | Size: 13 KiB After Width: | Height: | Size: 13 KiB |
|
Before Width: | Height: | Size: 6.0 KiB After Width: | Height: | Size: 6.0 KiB |
|
Before Width: | Height: | Size: 11 KiB After Width: | Height: | Size: 11 KiB |
|
Before Width: | Height: | Size: 58 KiB After Width: | Height: | Size: 58 KiB |
|
Before Width: | Height: | Size: 16 KiB After Width: | Height: | Size: 16 KiB |
|
Before Width: | Height: | Size: 48 KiB After Width: | Height: | Size: 48 KiB |
|
Before Width: | Height: | Size: 73 KiB After Width: | Height: | Size: 73 KiB |
|
Before Width: | Height: | Size: 14 KiB After Width: | Height: | Size: 14 KiB |
|
Before Width: | Height: | Size: 50 KiB After Width: | Height: | Size: 50 KiB |
|
Before Width: | Height: | Size: 27 KiB After Width: | Height: | Size: 27 KiB |
|
Before Width: | Height: | Size: 27 KiB After Width: | Height: | Size: 27 KiB |
|
Before Width: | Height: | Size: 99 KiB After Width: | Height: | Size: 99 KiB |
|
Before Width: | Height: | Size: 13 KiB After Width: | Height: | Size: 13 KiB |
|
Before Width: | Height: | Size: 45 KiB After Width: | Height: | Size: 45 KiB |
|
Before Width: | Height: | Size: 13 KiB After Width: | Height: | Size: 13 KiB |
|
Before Width: | Height: | Size: 13 KiB After Width: | Height: | Size: 13 KiB |
|
Before Width: | Height: | Size: 117 KiB After Width: | Height: | Size: 117 KiB |
|
Before Width: | Height: | Size: 118 KiB After Width: | Height: | Size: 118 KiB |
|
Before Width: | Height: | Size: 107 KiB After Width: | Height: | Size: 107 KiB |
|
Before Width: | Height: | Size: 14 KiB After Width: | Height: | Size: 14 KiB |
|
Before Width: | Height: | Size: 15 KiB After Width: | Height: | Size: 15 KiB |
|
Before Width: | Height: | Size: 15 KiB After Width: | Height: | Size: 15 KiB |
|
Before Width: | Height: | Size: 50 KiB After Width: | Height: | Size: 50 KiB |
|
Before Width: | Height: | Size: 21 KiB After Width: | Height: | Size: 21 KiB |
|
Before Width: | Height: | Size: 19 KiB After Width: | Height: | Size: 19 KiB |
|
Before Width: | Height: | Size: 15 KiB After Width: | Height: | Size: 15 KiB |
@ -1,67 +0,0 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.cf; |
|||
|
|||
import lombok.Getter; |
|||
import lombok.RequiredArgsConstructor; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.beans.factory.annotation.Value; |
|||
import org.springframework.stereotype.Service; |
|||
import org.thingsboard.server.common.data.ProfileEntityIdInfo; |
|||
import org.thingsboard.server.common.data.page.PageDataIterable; |
|||
import org.thingsboard.server.dao.asset.AssetService; |
|||
import org.thingsboard.server.dao.device.DeviceService; |
|||
import org.thingsboard.server.queue.util.AfterStartUp; |
|||
import org.thingsboard.server.queue.util.TbRuleEngineComponent; |
|||
import org.thingsboard.server.service.cf.cache.CalculatedFieldEntityProfileCache; |
|||
|
|||
@Slf4j |
|||
@Service |
|||
@TbRuleEngineComponent |
|||
@RequiredArgsConstructor |
|||
public class DefaultCalculatedFieldInitService implements CalculatedFieldInitService { |
|||
|
|||
private final CalculatedFieldEntityProfileCache entityProfileCache; |
|||
private final AssetService assetService; |
|||
private final DeviceService deviceService; |
|||
|
|||
@Value("${calculated_fields.init_fetch_pack_size:50000}") |
|||
@Getter |
|||
private int initFetchPackSize; |
|||
|
|||
@AfterStartUp(order = AfterStartUp.CF_READ_PROFILE_ENTITIES_SERVICE) |
|||
public void initCalculatedFieldDefinitions() { |
|||
PageDataIterable<ProfileEntityIdInfo> deviceIdInfos = new PageDataIterable<>(deviceService::findProfileEntityIdInfos, initFetchPackSize); |
|||
for (ProfileEntityIdInfo idInfo : deviceIdInfos) { |
|||
log.trace("Processing device record: {}", idInfo); |
|||
try { |
|||
entityProfileCache.add(idInfo.getTenantId(), idInfo.getProfileId(), idInfo.getEntityId()); |
|||
} catch (Exception e) { |
|||
log.error("Failed to process device record: {}", idInfo, e); |
|||
} |
|||
} |
|||
PageDataIterable<ProfileEntityIdInfo> assetIdInfos = new PageDataIterable<>(assetService::findProfileEntityIdInfos, initFetchPackSize); |
|||
for (ProfileEntityIdInfo idInfo : assetIdInfos) { |
|||
log.trace("Processing asset record: {}", idInfo); |
|||
try { |
|||
entityProfileCache.add(idInfo.getTenantId(), idInfo.getProfileId(), idInfo.getEntityId()); |
|||
} catch (Exception e) { |
|||
log.error("Failed to process asset record: {}", idInfo, e); |
|||
} |
|||
} |
|||
} |
|||
|
|||
} |
|||
@ -1,95 +0,0 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.cf.cache; |
|||
|
|||
import lombok.RequiredArgsConstructor; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.stereotype.Service; |
|||
import org.thingsboard.server.common.data.DataConstants; |
|||
import org.thingsboard.server.common.data.id.EntityId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.msg.queue.ServiceType; |
|||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; |
|||
import org.thingsboard.server.queue.discovery.PartitionService; |
|||
import org.thingsboard.server.queue.discovery.QueueKey; |
|||
import org.thingsboard.server.queue.discovery.TbApplicationEventListener; |
|||
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent; |
|||
import org.thingsboard.server.queue.util.TbRuleEngineComponent; |
|||
|
|||
import java.util.Collection; |
|||
import java.util.Collections; |
|||
import java.util.List; |
|||
import java.util.concurrent.ConcurrentHashMap; |
|||
import java.util.concurrent.ConcurrentMap; |
|||
import java.util.stream.Collectors; |
|||
|
|||
@TbRuleEngineComponent |
|||
@Service |
|||
@Slf4j |
|||
@RequiredArgsConstructor |
|||
//TODO ashvayka: remove and use TenantEntityProfileCache in each CalculatedFieldManagerMessageProcessor;
|
|||
public class DefaultCalculatedFieldEntityProfileCache extends TbApplicationEventListener<PartitionChangeEvent> implements CalculatedFieldEntityProfileCache { |
|||
|
|||
private static final Integer UNKNOWN = 0; |
|||
private final ConcurrentMap<TenantId, TenantEntityProfileCache> tenantCache = new ConcurrentHashMap<>(); |
|||
private final PartitionService partitionService; |
|||
private volatile List<Integer> myPartitions = Collections.emptyList(); |
|||
|
|||
@Override |
|||
protected void onTbApplicationEvent(PartitionChangeEvent event) { |
|||
myPartitions = event.getCfPartitions().stream() |
|||
.filter(TopicPartitionInfo::isMyPartition) |
|||
.map(tpi -> tpi.getPartition().orElse(UNKNOWN)).collect(Collectors.toList()); |
|||
//Naive approach that need to be improved.
|
|||
tenantCache.values().forEach(cache -> cache.setMyPartitions(myPartitions)); |
|||
} |
|||
|
|||
@Override |
|||
public void add(TenantId tenantId, EntityId profileId, EntityId entityId) { |
|||
var tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME, tenantId, entityId); |
|||
var partition = tpi.getPartition().orElse(UNKNOWN); |
|||
tenantCache.computeIfAbsent(tenantId, id -> new TenantEntityProfileCache()) |
|||
.add(profileId, entityId, partition, tpi.isMyPartition()); |
|||
} |
|||
|
|||
@Override |
|||
public void update(TenantId tenantId, EntityId oldProfileId, EntityId newProfileId, EntityId entityId) { |
|||
var tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME, tenantId, entityId); |
|||
var partition = tpi.getPartition().orElse(UNKNOWN); |
|||
var cache = tenantCache.computeIfAbsent(tenantId, id -> new TenantEntityProfileCache()); |
|||
//TODO: make this method atomic;
|
|||
cache.remove(oldProfileId, entityId); |
|||
cache.add(newProfileId, entityId, partition, tpi.isMyPartition()); |
|||
} |
|||
|
|||
@Override |
|||
public void evict(TenantId tenantId, EntityId entityId) { |
|||
var cache = tenantCache.computeIfAbsent(tenantId, id -> new TenantEntityProfileCache()); |
|||
cache.removeEntityId(entityId); |
|||
} |
|||
|
|||
@Override |
|||
public Collection<EntityId> getMyEntityIdsByProfileId(TenantId tenantId, EntityId profileId) { |
|||
return tenantCache.computeIfAbsent(tenantId, id -> new TenantEntityProfileCache()).getMyEntityIdsByProfileId(profileId); |
|||
} |
|||
|
|||
@Override |
|||
public int getEntityIdPartition(TenantId tenantId, EntityId entityId) { |
|||
var tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME, tenantId, entityId); |
|||
return tpi.getPartition().orElse(UNKNOWN); |
|||
} |
|||
|
|||
} |
|||