diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index c714ed7dab..c787fc26af 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -106,9 +106,9 @@ import org.thingsboard.server.queue.discovery.DiscoveryService; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.service.apiusage.TbApiUsageStateService; -import org.thingsboard.server.service.cf.CalculatedFieldExecutionService; +import org.thingsboard.server.service.cf.CalculatedFieldProcessingService; import org.thingsboard.server.service.cf.cache.CalculatedFieldEntityProfileCache; -import org.thingsboard.server.service.cf.ctx.CalculatedFieldStateService; +import org.thingsboard.server.service.cf.CalculatedFieldStateService; import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry; import org.thingsboard.server.service.component.ComponentDiscoveryService; import org.thingsboard.server.service.edge.rpc.EdgeRpcService; @@ -526,7 +526,7 @@ public class ActorSystemContext { @Lazy @Autowired(required = false) @Getter - private CalculatedFieldExecutionService calculatedFieldExecutionService; + private CalculatedFieldProcessingService calculatedFieldProcessingService; @Lazy @Autowired(required = false) diff --git a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java index 8a6907c107..8c70eaf617 100644 --- a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java @@ -88,6 +88,7 @@ public class AppActor extends ContextAwareActor { case APP_INIT_MSG: break; case PARTITION_CHANGE_MSG: + case CF_PARTITIONS_CHANGE_MSG: ctx.broadcastToChildren(msg, true); break; case COMPONENT_LIFE_CYCLE_MSG: diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityActor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityActor.java index cebe6b6a60..5e4aded41f 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityActor.java @@ -23,6 +23,7 @@ import org.thingsboard.server.actors.service.ContextAwareActor; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.TbActorMsg; +import org.thingsboard.server.common.msg.cf.CalculatedFieldPartitionChangeMsg; @Slf4j public class CalculatedFieldEntityActor extends ContextAwareActor { @@ -50,6 +51,9 @@ public class CalculatedFieldEntityActor extends ContextAwareActor { @Override protected boolean doProcess(TbActorMsg msg) { switch (msg.getMsgType()) { + case CF_PARTITIONS_CHANGE_MSG: + processor.process((CalculatedFieldPartitionChangeMsg) msg); + break; case CF_STATE_RESTORE_MSG: processor.process((CalculatedFieldStateRestoreMsg) msg); break; diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java index 00913ec66d..61f7abb297 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java @@ -30,15 +30,16 @@ import org.thingsboard.server.common.data.id.CalculatedFieldId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.msg.TbMsgType; +import org.thingsboard.server.common.msg.cf.CalculatedFieldPartitionChangeMsg; import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.gen.transport.TransportProtos.AttributeScopeProto; import org.thingsboard.server.gen.transport.TransportProtos.AttributeValueProto; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto; import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto; -import org.thingsboard.server.service.cf.CalculatedFieldExecutionService; +import org.thingsboard.server.service.cf.CalculatedFieldProcessingService; import org.thingsboard.server.service.cf.CalculatedFieldResult; import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId; -import org.thingsboard.server.service.cf.ctx.CalculatedFieldStateService; +import org.thingsboard.server.service.cf.CalculatedFieldStateService; import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry; import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx; import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldState; @@ -67,8 +68,9 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM final TenantId tenantId; final EntityId entityId; - final CalculatedFieldExecutionService cfService; + final CalculatedFieldProcessingService cfService; final CalculatedFieldStateService cfStateService; + final int partition; TbActorCtx ctx; Map states = new HashMap<>(); @@ -77,14 +79,22 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM super(systemContext); this.tenantId = tenantId; this.entityId = entityId; - this.cfService = systemContext.getCalculatedFieldExecutionService(); + this.cfService = systemContext.getCalculatedFieldProcessingService(); this.cfStateService = systemContext.getCalculatedFieldStateService(); + this.partition = systemContext.getCalculatedFieldEntityProfileCache().getEntityIdPartition(tenantId, entityId); } void init(TbActorCtx ctx) { this.ctx = ctx; } + public void process(CalculatedFieldPartitionChangeMsg msg) { + if (!msg.getPartitions()[partition]) { + log.info("[{}][{}] Stopping entity actor due to change partition event.", partition, entityId); + ctx.stop(ctx.getSelf()); + } + } + public void process(CalculatedFieldStateRestoreMsg msg) { log.info("[{}] [{}] Processing CF state restore msg.", msg.getId().entityId(), msg.getId().cfId()); states.put(msg.getId().cfId(), msg.getState()); @@ -202,7 +212,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM if (state != null) { return state; } else { - ListenableFuture stateFuture = systemContext.getCalculatedFieldExecutionService().fetchStateFromDb(ctx, entityId); + ListenableFuture stateFuture = systemContext.getCalculatedFieldProcessingService().fetchStateFromDb(ctx, entityId); // Ugly but necessary. We do not expect to often fetch data from DB. Only once per pair lifetime. // This call happens while processing the CF pack from the queue consumer. So the timeout should be relatively low. // Alternatively, we can fetch the state outside the actor system and push separate command to create this actor, diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerActor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerActor.java index 22909dd5af..497602c93b 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerActor.java @@ -25,6 +25,7 @@ import org.thingsboard.server.common.msg.TbActorMsg; import org.thingsboard.server.common.msg.cf.CalculatedFieldEntityLifecycleMsg; import org.thingsboard.server.common.msg.cf.CalculatedFieldInitMsg; import org.thingsboard.server.common.msg.cf.CalculatedFieldLinkInitMsg; +import org.thingsboard.server.common.msg.cf.CalculatedFieldPartitionChangeMsg; /** * Created by ashvayka on 15.03.18. @@ -55,8 +56,8 @@ public class CalculatedFieldManagerActor extends ContextAwareActor { @Override protected boolean doProcess(TbActorMsg msg) { switch (msg.getMsgType()) { - case PARTITION_CHANGE_MSG: - ctx.broadcastToChildren(msg, true); // TODO + case CF_PARTITIONS_CHANGE_MSG: + processor.onPartitionChange((CalculatedFieldPartitionChangeMsg) msg); break; case CF_INIT_MSG: processor.onFieldInitMsg((CalculatedFieldInitMsg) msg); diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java index cdc31ed93c..22492f3c4f 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java @@ -35,14 +35,15 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.cf.CalculatedFieldEntityLifecycleMsg; import org.thingsboard.server.common.msg.cf.CalculatedFieldInitMsg; import org.thingsboard.server.common.msg.cf.CalculatedFieldLinkInitMsg; +import org.thingsboard.server.common.msg.cf.CalculatedFieldPartitionChangeMsg; import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.dao.cf.CalculatedFieldService; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldEntityCtxIdProto; -import org.thingsboard.server.service.cf.CalculatedFieldExecutionService; +import org.thingsboard.server.service.cf.CalculatedFieldProcessingService; import org.thingsboard.server.service.cf.cache.CalculatedFieldEntityProfileCache; import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId; -import org.thingsboard.server.service.cf.ctx.CalculatedFieldStateService; +import org.thingsboard.server.service.cf.CalculatedFieldStateService; import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx; import org.thingsboard.server.service.profile.TbAssetProfileCache; import org.thingsboard.server.service.profile.TbDeviceProfileCache; @@ -52,6 +53,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -68,20 +70,20 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware private final Map> entityIdCalculatedFields = new ConcurrentHashMap<>(); private final ConcurrentMap> entityIdCalculatedFieldLinks = new ConcurrentHashMap<>(); - private final CalculatedFieldExecutionService cfExecService; + private final CalculatedFieldProcessingService cfExecService; private final CalculatedFieldStateService cfStateService; private final CalculatedFieldEntityProfileCache cfEntityCache; private final CalculatedFieldService cfDaoService; private final TbAssetProfileCache assetProfileCache; private final TbDeviceProfileCache deviceProfileCache; + protected final TenantId tenantId; protected TbActorCtx ctx; - final TenantId tenantId; CalculatedFieldManagerMessageProcessor(ActorSystemContext systemContext, TenantId tenantId) { super(systemContext); this.cfEntityCache = systemContext.getCalculatedFieldEntityProfileCache(); - this.cfExecService = systemContext.getCalculatedFieldExecutionService(); + this.cfExecService = systemContext.getCalculatedFieldProcessingService(); this.cfStateService = systemContext.getCalculatedFieldStateService(); this.cfDaoService = systemContext.getCalculatedFieldService(); this.assetProfileCache = systemContext.getAssetProfileCache(); @@ -478,4 +480,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware oldLinks.forEach(link -> entityIdCalculatedFieldLinks.computeIfAbsent(link.getEntityId(), id -> new ArrayList<>()).remove(link)); } + public void onPartitionChange(CalculatedFieldPartitionChangeMsg msg) { + ctx.broadcastToChildren(msg, true); + } } diff --git a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java index 1b008f462a..16ea2c1398 100644 --- a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java @@ -170,6 +170,7 @@ public class TenantActor extends RuleChainManagerActor { case CF_INIT_MSG: case CF_LINK_INIT_MSG: case CF_STATE_RESTORE_MSG: + case CF_PARTITIONS_CHANGE_MSG: case CF_ENTITY_LIFECYCLE_MSG: onToCalculatedFieldSystemActorMsg((ToCalculatedFieldSystemMsg) msg, true); break; diff --git a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldExecutionService.java b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldProcessingService.java similarity index 70% rename from application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldExecutionService.java rename to application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldProcessingService.java index 47fd560b4d..24b428593a 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldExecutionService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldProcessingService.java @@ -15,15 +15,11 @@ */ package org.thingsboard.server.service.cf; -import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.ListenableFuture; -import org.thingsboard.rule.engine.api.AttributesSaveRequest; -import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; import org.thingsboard.server.actors.calculatedField.CalculatedFieldTelemetryMsg; import org.thingsboard.server.common.data.id.CalculatedFieldId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; -import org.thingsboard.server.common.data.kv.TimeseriesSaveResult; import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId; import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx; @@ -31,17 +27,7 @@ import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldState; import java.util.List; -public interface CalculatedFieldExecutionService { - - /** - * Filter CFs based on the request entity. Push to the queue if any matching CF exist; - * - * @param request - telemetry save request; - * @param callback - */ - void pushRequestToQueue(TimeseriesSaveRequest request, TimeseriesSaveResult result, FutureCallback callback); - - void pushRequestToQueue(AttributesSaveRequest request, List result, FutureCallback callback); +public interface CalculatedFieldProcessingService { ListenableFuture fetchStateFromDb(CalculatedFieldCtx ctx, EntityId entityId); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldQueueService.java b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldQueueService.java new file mode 100644 index 0000000000..b84b54af81 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldQueueService.java @@ -0,0 +1,37 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.cf; + +import com.google.common.util.concurrent.FutureCallback; +import org.thingsboard.rule.engine.api.AttributesSaveRequest; +import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; +import org.thingsboard.server.common.data.kv.TimeseriesSaveResult; + +import java.util.List; + +public interface CalculatedFieldQueueService { + + /** + * Filter CFs based on the request entity. Push to the queue if any matching CF exist; + * + * @param request - telemetry save request; + * @param callback + */ + void pushRequestToQueue(TimeseriesSaveRequest request, TimeseriesSaveResult result, FutureCallback callback); + + void pushRequestToQueue(AttributesSaveRequest request, List result, FutureCallback callback); + +} diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/CalculatedFieldStateService.java b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldStateService.java similarity index 90% rename from application/src/main/java/org/thingsboard/server/service/cf/ctx/CalculatedFieldStateService.java rename to application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldStateService.java index ce1562c735..37211c66c5 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/CalculatedFieldStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldStateService.java @@ -13,9 +13,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.service.cf.ctx; +package org.thingsboard.server.service.cf; import org.thingsboard.server.common.msg.queue.TbCallback; +import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId; import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx; import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldState; diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldInitService.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldInitService.java index 2afd6d8238..f8eb7e852e 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldInitService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldInitService.java @@ -20,8 +20,6 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; -import org.thingsboard.server.actors.ActorSystemContext; -import org.thingsboard.server.actors.calculatedField.CalculatedFieldStateRestoreMsg; import org.thingsboard.server.common.data.ProfileEntityIdInfo; import org.thingsboard.server.common.data.page.PageDataIterable; import org.thingsboard.server.dao.asset.AssetService; diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java similarity index 55% rename from application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java rename to application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java index e84e785a94..6b47053f2d 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java @@ -17,7 +17,6 @@ package org.thingsboard.server.service.cf; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; @@ -32,25 +31,17 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.ThingsBoardExecutors; -import org.thingsboard.rule.engine.api.AttributesSaveRequest; -import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; import org.thingsboard.server.actors.calculatedField.CalculatedFieldTelemetryMsg; import org.thingsboard.server.actors.calculatedField.MultipleTbCallback; import org.thingsboard.server.cluster.TbClusterService; -import org.thingsboard.server.common.data.AttributeScope; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.StringUtils; -import org.thingsboard.server.common.data.cf.CalculatedFieldLink; import org.thingsboard.server.common.data.cf.configuration.Argument; import org.thingsboard.server.common.data.cf.configuration.OutputType; -import org.thingsboard.server.common.data.id.AssetId; import org.thingsboard.server.common.data.id.CalculatedFieldId; -import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EntityId; -import org.thingsboard.server.common.data.id.EntityIdFactory; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.Aggregation; -import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery; import org.thingsboard.server.common.data.kv.BasicTsKvEntry; @@ -59,7 +50,6 @@ import org.thingsboard.server.common.data.kv.DoubleDataEntry; import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.data.kv.ReadTsKvQuery; import org.thingsboard.server.common.data.kv.StringDataEntry; -import org.thingsboard.server.common.data.kv.TimeseriesSaveResult; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration; @@ -67,12 +57,9 @@ import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; -import org.thingsboard.server.common.util.ProtoUtils; import org.thingsboard.server.dao.attributes.AttributesService; import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.dao.usagerecord.ApiLimitService; -import org.thingsboard.server.gen.transport.TransportProtos.AttributeScopeProto; -import org.thingsboard.server.gen.transport.TransportProtos.AttributeValueProto; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldEntityCtxIdProto; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldIdProto; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldLinkedTelemetryMsgProto; @@ -80,12 +67,11 @@ import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldLinke import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto; import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldNotificationMsg; -import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto; import org.thingsboard.server.queue.TbQueueCallback; import org.thingsboard.server.queue.TbQueueMsgMetadata; -import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtx; +import org.thingsboard.server.queue.discovery.PartitionService; +import org.thingsboard.server.queue.util.TbRuleEngineComponent; import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId; -import org.thingsboard.server.service.cf.ctx.CalculatedFieldStateService; import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry; import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx; import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldState; @@ -93,149 +79,51 @@ import org.thingsboard.server.service.cf.ctx.state.ScriptCalculatedFieldState; import org.thingsboard.server.service.cf.ctx.state.SimpleCalculatedFieldState; import org.thingsboard.server.service.cf.ctx.state.SingleValueArgumentEntry; import org.thingsboard.server.service.cf.ctx.state.TsRollingArgumentEntry; -import org.thingsboard.server.service.cf.telemetry.CalculatedFieldAttributeUpdateRequest; -import org.thingsboard.server.service.cf.telemetry.CalculatedFieldTelemetryUpdateRequest; -import org.thingsboard.server.service.cf.telemetry.CalculatedFieldTimeSeriesUpdateRequest; -import org.thingsboard.server.service.partition.AbstractPartitionBasedService; -import org.thingsboard.server.service.profile.TbAssetProfileCache; -import org.thingsboard.server.service.profile.TbDeviceProfileCache; import java.util.ArrayList; -import java.util.EnumSet; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Optional; -import java.util.Set; import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; -import java.util.function.Consumer; -import java.util.function.Predicate; -import java.util.function.Supplier; import java.util.stream.Collectors; import static org.thingsboard.server.common.data.DataConstants.SCOPE; -import static org.thingsboard.server.common.util.ProtoUtils.toTsKvProto; import static org.thingsboard.server.queue.discovery.HashPartitionService.CALCULATED_FIELD_QUEUE_KEY; +@TbRuleEngineComponent @Service @Slf4j @RequiredArgsConstructor -public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBasedService implements CalculatedFieldExecutionService { +public class DefaultCalculatedFieldProcessingService implements CalculatedFieldProcessingService { - public static final TbQueueCallback DUMMY_TB_QUEUE_CALLBACK = new TbQueueCallback() { - @Override - public void onSuccess(TbQueueMsgMetadata metadata) { - } - - @Override - public void onFailure(Throwable t) { - } - }; - - private final TbAssetProfileCache assetProfileCache; - private final TbDeviceProfileCache deviceProfileCache; - private final CalculatedFieldCache calculatedFieldCache; private final AttributesService attributesService; private final TimeseriesService timeseriesService; private final TbClusterService clusterService; private final ApiLimitService apiLimitService; + private final PartitionService partitionService; - private ListeningExecutorService calculatedFieldExecutor; private ListeningExecutorService calculatedFieldCallbackExecutor; - private final ConcurrentMap states = new ConcurrentHashMap<>(); - - private static final Set supportedReferencedEntities = EnumSet.of( - EntityType.DEVICE, EntityType.ASSET, EntityType.CUSTOMER, EntityType.TENANT - ); - @Value("${calculatedField.initFetchPackSize:50000}") @Getter private int initFetchPackSize; @PostConstruct public void init() { - super.init(); - calculatedFieldExecutor = MoreExecutors.listeningDecorator(ThingsBoardExecutors.newWorkStealingPool( - Math.max(4, Runtime.getRuntime().availableProcessors()), "calculated-field")); calculatedFieldCallbackExecutor = MoreExecutors.listeningDecorator(ThingsBoardExecutors.newWorkStealingPool( Math.max(4, Runtime.getRuntime().availableProcessors()), "calculated-field-callback")); } @PreDestroy public void stop() { - super.stop(); - if (calculatedFieldExecutor != null) { - calculatedFieldExecutor.shutdownNow(); - } if (calculatedFieldCallbackExecutor != null) { calculatedFieldCallbackExecutor.shutdownNow(); } } - @Override - protected String getServiceName() { - return "Calculated Field Execution"; - } - - @Override - protected String getSchedulerExecutorName() { - return "calculated-field-scheduled"; - } - - @Override - public void pushRequestToQueue(TimeseriesSaveRequest request, TimeseriesSaveResult result, FutureCallback callback) { - var tenantId = request.getTenantId(); - var entityId = request.getEntityId(); - //TODO: 1. check that request entity has calculated fields for entity or profile. If yes - push to corresponding partitions; - //TODO: 2. check that request entity has calculated field links. If yes - push to corresponding partitions; - //TODO: in 1 and 2 we should do the check as quick as possible. Should we also check the field/link keys?; - checkEntityAndPushToQueue(tenantId, entityId, cf -> cf.matches(request.getEntries()), cf -> cf.linkMatches(entityId, request.getEntries()), - () -> toCalculatedFieldTelemetryMsgProto(request, result), callback); - } - - @Override - public void pushRequestToQueue(AttributesSaveRequest request, List result, FutureCallback callback) { - var tenantId = request.getTenantId(); - var entityId = request.getEntityId(); - checkEntityAndPushToQueue(tenantId, entityId, cf -> cf.matches(request.getEntries(), request.getScope()), cf -> cf.linkMatches(entityId, request.getEntries(), request.getScope()), - () -> toCalculatedFieldTelemetryMsgProto(request, result), callback); - } - - private void checkEntityAndPushToQueue(TenantId tenantId, EntityId entityId, - Predicate mainEntityFilter, Predicate linkedEntityFilter, - Supplier msg, FutureCallback callback) { - boolean send = checkEntityForCalculatedFields(tenantId, entityId, mainEntityFilter, linkedEntityFilter); - if (send) { - clusterService.pushMsgToCalculatedFields(tenantId, entityId, msg.get(), wrap(callback)); - } else { - if (callback != null) { - callback.onSuccess(null); - } - } - } - - private boolean checkEntityForCalculatedFields(TenantId tenantId, EntityId entityId, Predicate filter, Predicate linkedEntityFilter) { - boolean send = false; - if (supportedReferencedEntities.contains(entityId.getEntityType())) { - send = calculatedFieldCache.getCalculatedFieldCtxsByEntityId(entityId).stream().anyMatch(filter); - if (!send) { - send = calculatedFieldCache.getCalculatedFieldCtxsByEntityId(getProfileId(tenantId, entityId)).stream().anyMatch(filter); - } - if (!send) { - send = calculatedFieldCache.getCalculatedFieldLinksByEntityId(entityId).stream() - .map(CalculatedFieldLink::getCalculatedFieldId) - .map(calculatedFieldCache::getCalculatedFieldCtx) - .anyMatch(linkedEntityFilter); - } - } - return send; - } - @Override public ListenableFuture fetchStateFromDb(CalculatedFieldCtx ctx, EntityId entityId) { Map> argFutures = new HashMap<>(); @@ -262,21 +150,6 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas }, calculatedFieldCallbackExecutor); } - @Override - protected Map>> onAddedPartitions(Set addedPartitions) { - var result = new HashMap>>(); - return result; - } - - @Override - protected void cleanupEntityOnPartitionRemoval(CalculatedFieldId entityId) { - cleanupEntity(entityId); - } - - private void cleanupEntity(CalculatedFieldId calculatedFieldId) { - states.keySet().removeIf(ctxId -> ctxId.cfId().equals(calculatedFieldId)); - } - @Override public void pushMsgToRuleEngine(TenantId tenantId, EntityId entityId, CalculatedFieldResult calculatedFieldResult, List cfIds, TbCallback callback) { try { @@ -369,30 +242,6 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas .build(); } - private ListenableFuture fetchArguments(TenantId tenantId, EntityId entityId, Map necessaryArguments, Consumer> onComplete) { - Map argumentValues = new HashMap<>(); - List> futures = new ArrayList<>(); - necessaryArguments.forEach((key, argument) -> { - futures.add(Futures.transform(fetchArgumentValue(tenantId, entityId, argument), - result -> { - argumentValues.put(key, result); - return result; - }, calculatedFieldCallbackExecutor)); - }); - return Futures.transform(Futures.allAsList(futures), results -> { - onComplete.accept(argumentValues); - return null; - }, calculatedFieldCallbackExecutor); - } - - private ListenableFuture fetchArgumentValue(TenantId tenantId, EntityId targetEntityId, Argument argument) { - EntityId argumentEntityId = argument.getRefEntityId(); - EntityId entityId = (argumentEntityId == null || isProfileEntity(argumentEntityId)) - ? targetEntityId - : argumentEntityId; - return fetchKvEntry(tenantId, entityId, argument); - } - private ListenableFuture fetchKvEntry(TenantId tenantId, EntityId entityId, Argument argument) { return switch (argument.getRefEntityKey().getType()) { case TS_ROLLING -> fetchTsRolling(tenantId, entityId, argument); @@ -462,78 +311,6 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas }; } - private boolean isProfileEntity(EntityId entityId) { - return EntityType.DEVICE_PROFILE.equals(entityId.getEntityType()) || EntityType.ASSET_PROFILE.equals(entityId.getEntityType()); - } - - private EntityId getProfileId(TenantId tenantId, EntityId entityId) { - return switch (entityId.getEntityType()) { - case ASSET -> assetProfileCache.get(tenantId, (AssetId) entityId).getId(); - case DEVICE -> deviceProfileCache.get(tenantId, (DeviceId) entityId).getId(); - default -> null; - }; - } - - private ToCalculatedFieldMsg toCalculatedFieldTelemetryMsgProto(TimeseriesSaveRequest request, TimeseriesSaveResult result) { - ToCalculatedFieldMsg.Builder msg = ToCalculatedFieldMsg.newBuilder(); - - CalculatedFieldTelemetryMsgProto.Builder telemetryMsg = buildTelemetryMsgProto(request.getTenantId(), request.getEntityId(), request.getPreviousCalculatedFieldIds(), request.getTbMsgId(), request.getTbMsgType()); - List entries = request.getEntries(); - List versions = result.getVersions(); - for (int i = 0; i < entries.size(); i++) { - long tsVersion = versions.get(i); - TsKvProto tsProto = toTsKvProto(entries.get(i)).toBuilder().setVersion(tsVersion).build(); - telemetryMsg.addTsData(tsProto); - } - msg.setTelemetryMsg(telemetryMsg.build()); - - return msg.build(); - } - - private ToCalculatedFieldMsg toCalculatedFieldTelemetryMsgProto(AttributesSaveRequest request, List versions) { - ToCalculatedFieldMsg.Builder msg = ToCalculatedFieldMsg.newBuilder(); - - CalculatedFieldTelemetryMsgProto.Builder telemetryMsg = buildTelemetryMsgProto(request.getTenantId(), request.getEntityId(), request.getPreviousCalculatedFieldIds(), request.getTbMsgId(), request.getTbMsgType()); - telemetryMsg.setScope(AttributeScopeProto.valueOf(request.getScope().name())); - List entries = request.getEntries(); - for (int i = 0; i < entries.size(); i++) { - long attrVersion = versions.get(i); - AttributeValueProto attrProto = ProtoUtils.toProto(entries.get(i)).toBuilder().setVersion(attrVersion).build(); - telemetryMsg.addAttrData(attrProto); - } - msg.setTelemetryMsg(telemetryMsg.build()); - - return msg.build(); - } - - private CalculatedFieldTelemetryMsgProto.Builder buildTelemetryMsgProto(TenantId tenantId, EntityId entityId, List calculatedFieldIds, UUID tbMsgId, TbMsgType tbMsgType) { - CalculatedFieldTelemetryMsgProto.Builder telemetryMsg = CalculatedFieldTelemetryMsgProto.newBuilder(); - - telemetryMsg.setTenantIdMSB(tenantId.getId().getMostSignificantBits()); - telemetryMsg.setTenantIdLSB(tenantId.getId().getLeastSignificantBits()); - - telemetryMsg.setEntityType(entityId.getEntityType().name()); - telemetryMsg.setEntityIdMSB(entityId.getId().getMostSignificantBits()); - telemetryMsg.setEntityIdLSB(entityId.getId().getLeastSignificantBits()); - - if (calculatedFieldIds != null) { - for (CalculatedFieldId cfId : calculatedFieldIds) { - telemetryMsg.addPreviousCalculatedFields(toProto(cfId)); - } - } - - if (tbMsgId != null) { - telemetryMsg.setTbMsgIdMSB(tbMsgId.getMostSignificantBits()); - telemetryMsg.setTbMsgIdLSB(tbMsgId.getLeastSignificantBits()); - } - - if (tbMsgType != null) { - telemetryMsg.setTbMsgType(tbMsgType.name()); - } - - return telemetryMsg; - } - private CalculatedFieldIdProto toProto(CalculatedFieldId cfId) { return CalculatedFieldIdProto.newBuilder() .setCalculatedFieldIdMSB(cfId.getId().getMostSignificantBits()) @@ -541,60 +318,6 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas .build(); } - private CalculatedFieldTelemetryUpdateRequest fromProto(CalculatedFieldTelemetryMsgProto proto) { - TenantId tenantId = TenantId.fromUUID(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB())); - EntityId entityId = EntityIdFactory.getByTypeAndUuid(proto.getEntityType(), new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB())); - - if (!proto.getTsDataList().isEmpty()) { - List updatedTelemetry = proto.getTsDataList().stream() - .map(ProtoUtils::fromProto) - .toList(); - return new CalculatedFieldTimeSeriesUpdateRequest( - tenantId, entityId, updatedTelemetry, - proto.getPreviousCalculatedFieldsList().stream() - .map(cfIdProto -> new CalculatedFieldId( - new UUID(cfIdProto.getCalculatedFieldIdMSB(), cfIdProto.getCalculatedFieldIdLSB()))) - .toList()); - } else { - AttributeScope scope = AttributeScope.valueOf(proto.getScope().name()); - List updatedTelemetry = proto.getAttrDataList().stream() - .map(ProtoUtils::fromProto) - .toList(); - return new CalculatedFieldAttributeUpdateRequest( - tenantId, entityId, scope, updatedTelemetry, - proto.getPreviousCalculatedFieldsList().stream() - .map(cfIdProto -> new CalculatedFieldId( - new UUID(cfIdProto.getCalculatedFieldIdMSB(), cfIdProto.getCalculatedFieldIdLSB()))) - .toList()); - } - } - - private static TbQueueCallback wrap(FutureCallback callback) { - if (callback != null) { - return new FutureCallbackWrapper(callback); - } else { - return DUMMY_TB_QUEUE_CALLBACK; - } - } - - private static class FutureCallbackWrapper implements TbQueueCallback { - private final FutureCallback callback; - - public FutureCallbackWrapper(FutureCallback callback) { - this.callback = callback; - } - - @Override - public void onSuccess(TbQueueMsgMetadata metadata) { - callback.onSuccess(null); - } - - @Override - public void onFailure(Throwable t) { - callback.onFailure(t); - } - } - private static class TbCallbackWrapper implements TbQueueCallback { private final TbCallback callback; diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java new file mode 100644 index 0000000000..9d1f9b6db5 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java @@ -0,0 +1,238 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.cf; + +import com.google.common.util.concurrent.FutureCallback; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; +import org.thingsboard.rule.engine.api.AttributesSaveRequest; +import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; +import org.thingsboard.server.cluster.TbClusterService; +import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.cf.CalculatedFieldLink; +import org.thingsboard.server.common.data.id.AssetId; +import org.thingsboard.server.common.data.id.CalculatedFieldId; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.common.data.kv.TimeseriesSaveResult; +import org.thingsboard.server.common.data.kv.TsKvEntry; +import org.thingsboard.server.common.data.msg.TbMsgType; +import org.thingsboard.server.common.util.ProtoUtils; +import org.thingsboard.server.gen.transport.TransportProtos.AttributeScopeProto; +import org.thingsboard.server.gen.transport.TransportProtos.AttributeValueProto; +import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldIdProto; +import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto; +import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg; +import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto; +import org.thingsboard.server.queue.TbQueueCallback; +import org.thingsboard.server.queue.TbQueueMsgMetadata; +import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx; +import org.thingsboard.server.service.profile.TbAssetProfileCache; +import org.thingsboard.server.service.profile.TbDeviceProfileCache; + +import java.util.EnumSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.function.Predicate; +import java.util.function.Supplier; + +import static org.thingsboard.server.common.util.ProtoUtils.toTsKvProto; + +@Service +@Slf4j +@RequiredArgsConstructor +public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueService { + + public static final TbQueueCallback DUMMY_TB_QUEUE_CALLBACK = new TbQueueCallback() { + @Override + public void onSuccess(TbQueueMsgMetadata metadata) { + } + + @Override + public void onFailure(Throwable t) { + } + }; + + private final TbAssetProfileCache assetProfileCache; + private final TbDeviceProfileCache deviceProfileCache; + private final CalculatedFieldCache calculatedFieldCache; + private final TbClusterService clusterService; + + private static final Set supportedReferencedEntities = EnumSet.of( + EntityType.DEVICE, EntityType.ASSET, EntityType.CUSTOMER, EntityType.TENANT + ); + + @Value("${calculatedField.initFetchPackSize:50000}") + @Getter + private int initFetchPackSize; + + @Override + public void pushRequestToQueue(TimeseriesSaveRequest request, TimeseriesSaveResult result, FutureCallback callback) { + var tenantId = request.getTenantId(); + var entityId = request.getEntityId(); + //TODO: 1. check that request entity has calculated fields for entity or profile. If yes - push to corresponding partitions; + //TODO: 2. check that request entity has calculated field links. If yes - push to corresponding partitions; + //TODO: in 1 and 2 we should do the check as quick as possible. Should we also check the field/link keys?; + checkEntityAndPushToQueue(tenantId, entityId, cf -> cf.matches(request.getEntries()), cf -> cf.linkMatches(entityId, request.getEntries()), + () -> toCalculatedFieldTelemetryMsgProto(request, result), callback); + } + + @Override + public void pushRequestToQueue(AttributesSaveRequest request, List result, FutureCallback callback) { + var tenantId = request.getTenantId(); + var entityId = request.getEntityId(); + checkEntityAndPushToQueue(tenantId, entityId, cf -> cf.matches(request.getEntries(), request.getScope()), cf -> cf.linkMatches(entityId, request.getEntries(), request.getScope()), + () -> toCalculatedFieldTelemetryMsgProto(request, result), callback); + } + + private void checkEntityAndPushToQueue(TenantId tenantId, EntityId entityId, + Predicate mainEntityFilter, Predicate linkedEntityFilter, + Supplier msg, FutureCallback callback) { + boolean send = checkEntityForCalculatedFields(tenantId, entityId, mainEntityFilter, linkedEntityFilter); + if (send) { + clusterService.pushMsgToCalculatedFields(tenantId, entityId, msg.get(), wrap(callback)); + } else { + if (callback != null) { + callback.onSuccess(null); + } + } + } + + private boolean checkEntityForCalculatedFields(TenantId tenantId, EntityId entityId, Predicate filter, Predicate linkedEntityFilter) { + boolean send = false; + if (supportedReferencedEntities.contains(entityId.getEntityType())) { + send = calculatedFieldCache.getCalculatedFieldCtxsByEntityId(entityId).stream().anyMatch(filter); + if (!send) { + send = calculatedFieldCache.getCalculatedFieldCtxsByEntityId(getProfileId(tenantId, entityId)).stream().anyMatch(filter); + } + if (!send) { + send = calculatedFieldCache.getCalculatedFieldLinksByEntityId(entityId).stream() + .map(CalculatedFieldLink::getCalculatedFieldId) + .map(calculatedFieldCache::getCalculatedFieldCtx) + .anyMatch(linkedEntityFilter); + } + } + return send; + } + + private EntityId getProfileId(TenantId tenantId, EntityId entityId) { + return switch (entityId.getEntityType()) { + case ASSET -> assetProfileCache.get(tenantId, (AssetId) entityId).getId(); + case DEVICE -> deviceProfileCache.get(tenantId, (DeviceId) entityId).getId(); + default -> null; + }; + } + + private ToCalculatedFieldMsg toCalculatedFieldTelemetryMsgProto(TimeseriesSaveRequest request, TimeseriesSaveResult result) { + ToCalculatedFieldMsg.Builder msg = ToCalculatedFieldMsg.newBuilder(); + + CalculatedFieldTelemetryMsgProto.Builder telemetryMsg = buildTelemetryMsgProto(request.getTenantId(), request.getEntityId(), request.getPreviousCalculatedFieldIds(), request.getTbMsgId(), request.getTbMsgType()); + List entries = request.getEntries(); + List versions = result.getVersions(); + for (int i = 0; i < entries.size(); i++) { + long tsVersion = versions.get(i); + TsKvProto tsProto = toTsKvProto(entries.get(i)).toBuilder().setVersion(tsVersion).build(); + telemetryMsg.addTsData(tsProto); + } + msg.setTelemetryMsg(telemetryMsg.build()); + + return msg.build(); + } + + private ToCalculatedFieldMsg toCalculatedFieldTelemetryMsgProto(AttributesSaveRequest request, List versions) { + ToCalculatedFieldMsg.Builder msg = ToCalculatedFieldMsg.newBuilder(); + + CalculatedFieldTelemetryMsgProto.Builder telemetryMsg = buildTelemetryMsgProto(request.getTenantId(), request.getEntityId(), request.getPreviousCalculatedFieldIds(), request.getTbMsgId(), request.getTbMsgType()); + telemetryMsg.setScope(AttributeScopeProto.valueOf(request.getScope().name())); + List entries = request.getEntries(); + for (int i = 0; i < entries.size(); i++) { + long attrVersion = versions.get(i); + AttributeValueProto attrProto = ProtoUtils.toProto(entries.get(i)).toBuilder().setVersion(attrVersion).build(); + telemetryMsg.addAttrData(attrProto); + } + msg.setTelemetryMsg(telemetryMsg.build()); + + return msg.build(); + } + + private CalculatedFieldTelemetryMsgProto.Builder buildTelemetryMsgProto(TenantId tenantId, EntityId entityId, List calculatedFieldIds, UUID tbMsgId, TbMsgType tbMsgType) { + CalculatedFieldTelemetryMsgProto.Builder telemetryMsg = CalculatedFieldTelemetryMsgProto.newBuilder(); + + telemetryMsg.setTenantIdMSB(tenantId.getId().getMostSignificantBits()); + telemetryMsg.setTenantIdLSB(tenantId.getId().getLeastSignificantBits()); + + telemetryMsg.setEntityType(entityId.getEntityType().name()); + telemetryMsg.setEntityIdMSB(entityId.getId().getMostSignificantBits()); + telemetryMsg.setEntityIdLSB(entityId.getId().getLeastSignificantBits()); + + if (calculatedFieldIds != null) { + for (CalculatedFieldId cfId : calculatedFieldIds) { + telemetryMsg.addPreviousCalculatedFields(toProto(cfId)); + } + } + + if (tbMsgId != null) { + telemetryMsg.setTbMsgIdMSB(tbMsgId.getMostSignificantBits()); + telemetryMsg.setTbMsgIdLSB(tbMsgId.getLeastSignificantBits()); + } + + if (tbMsgType != null) { + telemetryMsg.setTbMsgType(tbMsgType.name()); + } + + return telemetryMsg; + } + + private CalculatedFieldIdProto toProto(CalculatedFieldId cfId) { + return CalculatedFieldIdProto.newBuilder() + .setCalculatedFieldIdMSB(cfId.getId().getMostSignificantBits()) + .setCalculatedFieldIdLSB(cfId.getId().getLeastSignificantBits()) + .build(); + } + + private static TbQueueCallback wrap(FutureCallback callback) { + if (callback != null) { + return new FutureCallbackWrapper(callback); + } else { + return DUMMY_TB_QUEUE_CALLBACK; + } + } + + private static class FutureCallbackWrapper implements TbQueueCallback { + private final FutureCallback callback; + + public FutureCallbackWrapper(FutureCallback callback) { + this.callback = callback; + } + + @Override + public void onSuccess(TbQueueMsgMetadata metadata) { + callback.onSuccess(null); + } + + @Override + public void onFailure(Throwable t) { + callback.onFailure(t); + } + } + +} diff --git a/application/src/main/java/org/thingsboard/server/service/cf/cache/CalculatedFieldEntityProfileCache.java b/application/src/main/java/org/thingsboard/server/service/cf/cache/CalculatedFieldEntityProfileCache.java index 6b0718eb84..fec8c8ba12 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/cache/CalculatedFieldEntityProfileCache.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/cache/CalculatedFieldEntityProfileCache.java @@ -32,4 +32,5 @@ public interface CalculatedFieldEntityProfileCache extends ApplicationListener

getMyEntityIdsByProfileId(TenantId tenantId, EntityId profileId); + int getEntityIdPartition(TenantId tenantId, EntityId entityId); } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/cache/DefaultCalculatedFieldEntityProfileCache.java b/application/src/main/java/org/thingsboard/server/service/cf/cache/DefaultCalculatedFieldEntityProfileCache.java index 866cc86f24..9a56d35ac4 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/cache/DefaultCalculatedFieldEntityProfileCache.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/cache/DefaultCalculatedFieldEntityProfileCache.java @@ -42,7 +42,7 @@ import java.util.stream.Collectors; //TODO: remove and use TenantEntityProfileCache in each CalculatedFieldManagerMessageProcessor; public class DefaultCalculatedFieldEntityProfileCache extends TbApplicationEventListener implements CalculatedFieldEntityProfileCache { - private static final Integer UNKNOWN = -1; + private static final Integer UNKNOWN = 0; private final ConcurrentMap tenantCache = new ConcurrentHashMap<>(); private final PartitionService partitionService; private volatile List myPartitions = Collections.emptyList(); @@ -84,4 +84,11 @@ public class DefaultCalculatedFieldEntityProfileCache extends TbApplicationEvent public Collection getMyEntityIdsByProfileId(TenantId tenantId, EntityId profileId) { return tenantCache.computeIfAbsent(tenantId, id -> new TenantEntityProfileCache()).getMyEntityIdsByProfileId(profileId); } + + @Override + public int getEntityIdPartition(TenantId tenantId, EntityId entityId) { + var tpi = partitionService.resolve(HashPartitionService.CALCULATED_FIELD_QUEUE_KEY, entityId); + return tpi.getPartition().orElse(UNKNOWN); + } + } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java index 9d45532eec..a1999d438c 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java @@ -17,34 +17,11 @@ package org.thingsboard.server.service.cf.ctx.state; import lombok.RequiredArgsConstructor; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Service; -import org.thingsboard.server.actors.ActorSystemContext; -import org.thingsboard.server.actors.calculatedField.CalculatedFieldStateRestoreMsg; -import org.thingsboard.server.common.data.StringUtils; -import org.thingsboard.server.common.data.cf.CalculatedFieldType; -import org.thingsboard.server.common.data.id.CalculatedFieldId; -import org.thingsboard.server.common.data.id.EntityId; -import org.thingsboard.server.common.data.id.EntityIdFactory; -import org.thingsboard.server.common.data.id.TenantId; -import org.thingsboard.server.common.data.kv.BasicKvEntry; import org.thingsboard.server.common.msg.queue.TbCallback; -import org.thingsboard.server.common.util.KvProtoUtil; -import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldEntityCtxIdProto; -import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateProto; -import org.thingsboard.server.gen.transport.TransportProtos.SingleValueArgumentProto; -import org.thingsboard.server.gen.transport.TransportProtos.TsValueListProto; -import org.thingsboard.server.gen.transport.TransportProtos.TsValueProto; import org.thingsboard.server.queue.util.AfterStartUp; -import org.thingsboard.server.service.cf.RocksDBService; import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId; -import org.thingsboard.server.service.cf.ctx.CalculatedFieldStateService; - -import java.util.Map; -import java.util.Optional; -import java.util.TreeMap; -import java.util.UUID; -import java.util.stream.Collectors; +import org.thingsboard.server.service.cf.CalculatedFieldStateService; @Service @RequiredArgsConstructor diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/RocksDBCalculatedFieldStateService.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/RocksDBCalculatedFieldStateService.java index 86556d7adc..48e17954ea 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/RocksDBCalculatedFieldStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/RocksDBCalculatedFieldStateService.java @@ -37,7 +37,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.TsValueProto; import org.thingsboard.server.queue.util.AfterStartUp; import org.thingsboard.server.service.cf.RocksDBService; import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId; -import org.thingsboard.server.service.cf.ctx.CalculatedFieldStateService; +import org.thingsboard.server.service.cf.CalculatedFieldStateService; import java.util.Map; import java.util.Optional; diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java index 3d8bbddc80..f3e976084b 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java @@ -21,6 +21,7 @@ import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; import lombok.Data; import lombok.extern.slf4j.Slf4j; +import org.jetbrains.annotations.NotNull; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.ApplicationEventPublisher; import org.springframework.stereotype.Service; @@ -32,8 +33,10 @@ import org.thingsboard.server.common.data.id.EntityIdFactory; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.queue.QueueConfig; import org.thingsboard.server.common.msg.cf.CalculatedFieldEntityLifecycleMsg; +import org.thingsboard.server.common.msg.cf.CalculatedFieldPartitionChangeMsg; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TbCallback; +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.dao.tenant.TbTenantProfileCache; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldLinkedTelemetryMsgProto; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto; @@ -49,7 +52,6 @@ import org.thingsboard.server.queue.provider.TbRuleEngineQueueFactory; import org.thingsboard.server.queue.util.TbRuleEngineComponent; import org.thingsboard.server.service.apiusage.TbApiUsageStateService; import org.thingsboard.server.service.cf.CalculatedFieldCache; -import org.thingsboard.server.service.cf.CalculatedFieldExecutionService; import org.thingsboard.server.service.profile.TbAssetProfileCache; import org.thingsboard.server.service.profile.TbDeviceProfileCache; import org.thingsboard.server.service.queue.consumer.MainQueueConsumerManager; @@ -58,6 +60,7 @@ import org.thingsboard.server.service.queue.processing.IdMsgPair; import org.thingsboard.server.service.security.auth.jwt.settings.JwtSettingsService; import java.util.List; +import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -134,8 +137,23 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer @Override protected void onTbApplicationEvent(PartitionChangeEvent event) { - log.debug("Subscribing to partitions: {}", event.getCalculatedFieldsPartitions()); + var partitions = event.getCalculatedFieldsPartitions(); + log.info("Subscribing to partitions: {}", partitions); + // TODO: @vklimov - before update of the main consumer, we should read the state topics and use + // CalculatedFieldStateService (KafkaCalculatedFieldStateService) to restore the states for entities that belong to new partitions. + // Cleanup entities that do not belong to current partition; mainConsumer.update(event.getCalculatedFieldsPartitions()); + // Cleanup old entities after corresponding consumers are stopped. + // Any periodic tasks need to check that the entity is still managed by the current server before processing. + actorContext.tell(new CalculatedFieldPartitionChangeMsg(partitionsToBooleanIndexArray(partitions))); + } + + private boolean[] partitionsToBooleanIndexArray(Set partitions) { + boolean[] myPartitions = new boolean[partitionService.getTotalCalculatedFieldPartitions()]; + for(var tpi : partitions) { + tpi.getPartition().ifPresent(partition -> myPartitions[partition] = true); + } + return myPartitions; } private void processMsgs(List> msgs, TbQueueConsumer> consumer, CalculatedFieldQueueConfig config) throws Exception { diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java index 2a090fe701..48d26895b9 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java @@ -96,7 +96,7 @@ import org.thingsboard.server.queue.common.TbRuleEngineProducerService; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TopicService; import org.thingsboard.server.queue.provider.TbQueueProducerProvider; -import org.thingsboard.server.service.cf.CalculatedFieldExecutionService; +import org.thingsboard.server.service.cf.CalculatedFieldProcessingService; import org.thingsboard.server.service.gateway_device.GatewayNotificationsService; import org.thingsboard.server.service.ota.OtaPackageStateService; import org.thingsboard.server.service.profile.TbAssetProfileCache; @@ -148,7 +148,7 @@ public class DefaultTbClusterService implements TbClusterService { @Autowired @Lazy - private CalculatedFieldExecutionService calculatedFieldExecutionService; + private CalculatedFieldProcessingService calculatedFieldProcessingService; private final TopicService topicService; private final TbDeviceProfileCache deviceProfileCache; diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java index 3f7f9c0b7d..b338a8382c 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java @@ -50,7 +50,7 @@ import org.thingsboard.server.dao.attributes.AttributesService; import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.dao.util.KvUtils; import org.thingsboard.server.service.apiusage.TbApiUsageStateService; -import org.thingsboard.server.service.cf.CalculatedFieldExecutionService; +import org.thingsboard.server.service.cf.CalculatedFieldQueueService; import org.thingsboard.server.service.entitiy.entityview.TbEntityViewService; import org.thingsboard.server.service.subscription.TbSubscriptionUtils; @@ -77,7 +77,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer private final TbEntityViewService tbEntityViewService; private final TbApiUsageReportClient apiUsageClient; private final TbApiUsageStateService apiUsageStateService; - private final CalculatedFieldExecutionService calculatedFieldExecutionService; + private final CalculatedFieldQueueService calculatedFieldQueueService; private ExecutorService tsCallBackExecutor; @@ -89,13 +89,13 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer @Lazy TbEntityViewService tbEntityViewService, TbApiUsageReportClient apiUsageClient, TbApiUsageStateService apiUsageStateService, - CalculatedFieldExecutionService calculatedFieldExecutionService) { + CalculatedFieldQueueService calculatedFieldQueueService) { this.attrService = attrService; this.tsService = tsService; this.tbEntityViewService = tbEntityViewService; this.apiUsageClient = apiUsageClient; this.apiUsageStateService = apiUsageStateService; - this.calculatedFieldExecutionService = calculatedFieldExecutionService; + this.calculatedFieldQueueService = calculatedFieldQueueService; } @PostConstruct @@ -147,7 +147,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer resultFuture = tsService.saveWithoutLatest(tenantId, entityId, request.getEntries(), request.getTtl()); } DonAsynchron.withCallback(resultFuture, result -> { - calculatedFieldExecutionService.pushRequestToQueue(request, result, request.getCallback()); + calculatedFieldQueueService.pushRequestToQueue(request, result, request.getCallback()); }, safeCallback(request.getCallback()), tsCallBackExecutor); addWsCallback(resultFuture, success -> onTimeSeriesUpdate(tenantId, entityId, request.getEntries())); if (request.isSaveLatest() && !request.isOnlyLatest()) { @@ -167,7 +167,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer log.trace("Executing saveInternal [{}]", request); ListenableFuture> saveFuture = attrService.save(request.getTenantId(), request.getEntityId(), request.getScope(), request.getEntries()); DonAsynchron.withCallback(saveFuture, result -> { - calculatedFieldExecutionService.pushRequestToQueue(request, result, request.getCallback()); + calculatedFieldQueueService.pushRequestToQueue(request, result, request.getCallback()); }, safeCallback(request.getCallback()), tsCallBackExecutor); addWsCallback(saveFuture, success -> onAttributesUpdate(request.getTenantId(), request.getEntityId(), request.getScope().name(), request.getEntries(), request.isNotifyDevice())); } diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java index e13e2d2ecb..9e5f00abf2 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java @@ -139,6 +139,8 @@ public enum MsgType { CF_INIT_MSG, // Sent to init particular calculated field; CF_LINK_INIT_MSG, // Sent to init particular calculated field; CF_STATE_RESTORE_MSG, // Sent to restore particular calculated field entity state; + CF_PARTITIONS_CHANGE_MSG, // Sent when cluster event occures; + CF_ENTITY_LIFECYCLE_MSG, // Sent on CF/Device/Asset create/update/delete; CF_TELEMETRY_MSG, // Sent from queue to actor system; CF_LINKED_TELEMETRY_MSG, // Sent from queue to actor system; diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/cf/CalculatedFieldPartitionChangeMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/cf/CalculatedFieldPartitionChangeMsg.java new file mode 100644 index 0000000000..5ab4b0ad9f --- /dev/null +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/cf/CalculatedFieldPartitionChangeMsg.java @@ -0,0 +1,40 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.msg.cf; + +import lombok.Data; +import org.thingsboard.server.common.data.cf.CalculatedField; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.msg.MsgType; +import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg; + +import java.util.Set; + +@Data +public class CalculatedFieldPartitionChangeMsg implements ToCalculatedFieldSystemMsg { + + private final boolean[] partitions; + + @Override + public TenantId getTenantId() { + return TenantId.SYS_TENANT_ID; + } + + @Override + public MsgType getMsgType() { + return MsgType.CF_PARTITIONS_CHANGE_MSG; + } +} diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java index 5c6bf545b5..f2f1ccd19c 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java @@ -526,6 +526,11 @@ public class HashPartitionService implements PartitionService { return list == null ? 0 : list.size(); } + @Override + public int getTotalCalculatedFieldPartitions() { + return cfPartitions; + } + private Map> getServiceKeyListMap(List services) { final Map> currentMap = new HashMap<>(); services.forEach(serviceInfo -> { diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java index 8e2152830e..b0e1229f97 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java @@ -77,4 +77,6 @@ public interface PartitionService { boolean isManagedByCurrentService(TenantId tenantId); + int getTotalCalculatedFieldPartitions(); + }