Browse Source

Refactoring

pull/12681/head
Andrii Shvaika 1 year ago
parent
commit
5efb94dc7a
  1. 6
      application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
  2. 1
      application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
  3. 4
      application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityActor.java
  4. 20
      application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java
  5. 5
      application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerActor.java
  6. 15
      application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java
  7. 1
      application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
  8. 16
      application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldProcessingService.java
  9. 37
      application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldQueueService.java
  10. 3
      application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldStateService.java
  11. 2
      application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldInitService.java
  12. 287
      application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java
  13. 238
      application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java
  14. 1
      application/src/main/java/org/thingsboard/server/service/cf/cache/CalculatedFieldEntityProfileCache.java
  15. 9
      application/src/main/java/org/thingsboard/server/service/cf/cache/DefaultCalculatedFieldEntityProfileCache.java
  16. 25
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java
  17. 2
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/RocksDBCalculatedFieldStateService.java
  18. 22
      application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java
  19. 4
      application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java
  20. 12
      application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
  21. 2
      common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
  22. 40
      common/message/src/main/java/org/thingsboard/server/common/msg/cf/CalculatedFieldPartitionChangeMsg.java
  23. 5
      common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java
  24. 2
      common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java

6
application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java

@ -106,9 +106,9 @@ import org.thingsboard.server.queue.discovery.DiscoveryService;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
import org.thingsboard.server.service.cf.CalculatedFieldExecutionService;
import org.thingsboard.server.service.cf.CalculatedFieldProcessingService;
import org.thingsboard.server.service.cf.cache.CalculatedFieldEntityProfileCache;
import org.thingsboard.server.service.cf.ctx.CalculatedFieldStateService;
import org.thingsboard.server.service.cf.CalculatedFieldStateService;
import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry;
import org.thingsboard.server.service.component.ComponentDiscoveryService;
import org.thingsboard.server.service.edge.rpc.EdgeRpcService;
@ -526,7 +526,7 @@ public class ActorSystemContext {
@Lazy
@Autowired(required = false)
@Getter
private CalculatedFieldExecutionService calculatedFieldExecutionService;
private CalculatedFieldProcessingService calculatedFieldProcessingService;
@Lazy
@Autowired(required = false)

1
application/src/main/java/org/thingsboard/server/actors/app/AppActor.java

@ -88,6 +88,7 @@ public class AppActor extends ContextAwareActor {
case APP_INIT_MSG:
break;
case PARTITION_CHANGE_MSG:
case CF_PARTITIONS_CHANGE_MSG:
ctx.broadcastToChildren(msg, true);
break;
case COMPONENT_LIFE_CYCLE_MSG:

4
application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityActor.java

@ -23,6 +23,7 @@ import org.thingsboard.server.actors.service.ContextAwareActor;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.cf.CalculatedFieldPartitionChangeMsg;
@Slf4j
public class CalculatedFieldEntityActor extends ContextAwareActor {
@ -50,6 +51,9 @@ public class CalculatedFieldEntityActor extends ContextAwareActor {
@Override
protected boolean doProcess(TbActorMsg msg) {
switch (msg.getMsgType()) {
case CF_PARTITIONS_CHANGE_MSG:
processor.process((CalculatedFieldPartitionChangeMsg) msg);
break;
case CF_STATE_RESTORE_MSG:
processor.process((CalculatedFieldStateRestoreMsg) msg);
break;

20
application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java

@ -30,15 +30,16 @@ import org.thingsboard.server.common.data.id.CalculatedFieldId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.msg.cf.CalculatedFieldPartitionChangeMsg;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.gen.transport.TransportProtos.AttributeScopeProto;
import org.thingsboard.server.gen.transport.TransportProtos.AttributeValueProto;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto;
import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto;
import org.thingsboard.server.service.cf.CalculatedFieldExecutionService;
import org.thingsboard.server.service.cf.CalculatedFieldProcessingService;
import org.thingsboard.server.service.cf.CalculatedFieldResult;
import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId;
import org.thingsboard.server.service.cf.ctx.CalculatedFieldStateService;
import org.thingsboard.server.service.cf.CalculatedFieldStateService;
import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry;
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx;
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldState;
@ -67,8 +68,9 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
final TenantId tenantId;
final EntityId entityId;
final CalculatedFieldExecutionService cfService;
final CalculatedFieldProcessingService cfService;
final CalculatedFieldStateService cfStateService;
final int partition;
TbActorCtx ctx;
Map<CalculatedFieldId, CalculatedFieldState> states = new HashMap<>();
@ -77,14 +79,22 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
super(systemContext);
this.tenantId = tenantId;
this.entityId = entityId;
this.cfService = systemContext.getCalculatedFieldExecutionService();
this.cfService = systemContext.getCalculatedFieldProcessingService();
this.cfStateService = systemContext.getCalculatedFieldStateService();
this.partition = systemContext.getCalculatedFieldEntityProfileCache().getEntityIdPartition(tenantId, entityId);
}
void init(TbActorCtx ctx) {
this.ctx = ctx;
}
public void process(CalculatedFieldPartitionChangeMsg msg) {
if (!msg.getPartitions()[partition]) {
log.info("[{}][{}] Stopping entity actor due to change partition event.", partition, entityId);
ctx.stop(ctx.getSelf());
}
}
public void process(CalculatedFieldStateRestoreMsg msg) {
log.info("[{}] [{}] Processing CF state restore msg.", msg.getId().entityId(), msg.getId().cfId());
states.put(msg.getId().cfId(), msg.getState());
@ -202,7 +212,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
if (state != null) {
return state;
} else {
ListenableFuture<CalculatedFieldState> stateFuture = systemContext.getCalculatedFieldExecutionService().fetchStateFromDb(ctx, entityId);
ListenableFuture<CalculatedFieldState> stateFuture = systemContext.getCalculatedFieldProcessingService().fetchStateFromDb(ctx, entityId);
// Ugly but necessary. We do not expect to often fetch data from DB. Only once per <Entity, CalculatedField> pair lifetime.
// This call happens while processing the CF pack from the queue consumer. So the timeout should be relatively low.
// Alternatively, we can fetch the state outside the actor system and push separate command to create this actor,

5
application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerActor.java

@ -25,6 +25,7 @@ import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.cf.CalculatedFieldEntityLifecycleMsg;
import org.thingsboard.server.common.msg.cf.CalculatedFieldInitMsg;
import org.thingsboard.server.common.msg.cf.CalculatedFieldLinkInitMsg;
import org.thingsboard.server.common.msg.cf.CalculatedFieldPartitionChangeMsg;
/**
* Created by ashvayka on 15.03.18.
@ -55,8 +56,8 @@ public class CalculatedFieldManagerActor extends ContextAwareActor {
@Override
protected boolean doProcess(TbActorMsg msg) {
switch (msg.getMsgType()) {
case PARTITION_CHANGE_MSG:
ctx.broadcastToChildren(msg, true); // TODO
case CF_PARTITIONS_CHANGE_MSG:
processor.onPartitionChange((CalculatedFieldPartitionChangeMsg) msg);
break;
case CF_INIT_MSG:
processor.onFieldInitMsg((CalculatedFieldInitMsg) msg);

15
application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java

@ -35,14 +35,15 @@ import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.cf.CalculatedFieldEntityLifecycleMsg;
import org.thingsboard.server.common.msg.cf.CalculatedFieldInitMsg;
import org.thingsboard.server.common.msg.cf.CalculatedFieldLinkInitMsg;
import org.thingsboard.server.common.msg.cf.CalculatedFieldPartitionChangeMsg;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.dao.cf.CalculatedFieldService;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldEntityCtxIdProto;
import org.thingsboard.server.service.cf.CalculatedFieldExecutionService;
import org.thingsboard.server.service.cf.CalculatedFieldProcessingService;
import org.thingsboard.server.service.cf.cache.CalculatedFieldEntityProfileCache;
import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId;
import org.thingsboard.server.service.cf.ctx.CalculatedFieldStateService;
import org.thingsboard.server.service.cf.CalculatedFieldStateService;
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx;
import org.thingsboard.server.service.profile.TbAssetProfileCache;
import org.thingsboard.server.service.profile.TbDeviceProfileCache;
@ -52,6 +53,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -68,20 +70,20 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
private final Map<EntityId, List<CalculatedFieldCtx>> entityIdCalculatedFields = new ConcurrentHashMap<>();
private final ConcurrentMap<EntityId, List<CalculatedFieldLink>> entityIdCalculatedFieldLinks = new ConcurrentHashMap<>();
private final CalculatedFieldExecutionService cfExecService;
private final CalculatedFieldProcessingService cfExecService;
private final CalculatedFieldStateService cfStateService;
private final CalculatedFieldEntityProfileCache cfEntityCache;
private final CalculatedFieldService cfDaoService;
private final TbAssetProfileCache assetProfileCache;
private final TbDeviceProfileCache deviceProfileCache;
protected final TenantId tenantId;
protected TbActorCtx ctx;
final TenantId tenantId;
CalculatedFieldManagerMessageProcessor(ActorSystemContext systemContext, TenantId tenantId) {
super(systemContext);
this.cfEntityCache = systemContext.getCalculatedFieldEntityProfileCache();
this.cfExecService = systemContext.getCalculatedFieldExecutionService();
this.cfExecService = systemContext.getCalculatedFieldProcessingService();
this.cfStateService = systemContext.getCalculatedFieldStateService();
this.cfDaoService = systemContext.getCalculatedFieldService();
this.assetProfileCache = systemContext.getAssetProfileCache();
@ -478,4 +480,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
oldLinks.forEach(link -> entityIdCalculatedFieldLinks.computeIfAbsent(link.getEntityId(), id -> new ArrayList<>()).remove(link));
}
public void onPartitionChange(CalculatedFieldPartitionChangeMsg msg) {
ctx.broadcastToChildren(msg, true);
}
}

1
application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java

@ -170,6 +170,7 @@ public class TenantActor extends RuleChainManagerActor {
case CF_INIT_MSG:
case CF_LINK_INIT_MSG:
case CF_STATE_RESTORE_MSG:
case CF_PARTITIONS_CHANGE_MSG:
case CF_ENTITY_LIFECYCLE_MSG:
onToCalculatedFieldSystemActorMsg((ToCalculatedFieldSystemMsg) msg, true);
break;

16
application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldExecutionService.java → application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldProcessingService.java

@ -15,15 +15,11 @@
*/
package org.thingsboard.server.service.cf;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.ListenableFuture;
import org.thingsboard.rule.engine.api.AttributesSaveRequest;
import org.thingsboard.rule.engine.api.TimeseriesSaveRequest;
import org.thingsboard.server.actors.calculatedField.CalculatedFieldTelemetryMsg;
import org.thingsboard.server.common.data.id.CalculatedFieldId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.TimeseriesSaveResult;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId;
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx;
@ -31,17 +27,7 @@ import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldState;
import java.util.List;
public interface CalculatedFieldExecutionService {
/**
* Filter CFs based on the request entity. Push to the queue if any matching CF exist;
*
* @param request - telemetry save request;
* @param callback
*/
void pushRequestToQueue(TimeseriesSaveRequest request, TimeseriesSaveResult result, FutureCallback<Void> callback);
void pushRequestToQueue(AttributesSaveRequest request, List<Long> result, FutureCallback<Void> callback);
public interface CalculatedFieldProcessingService {
ListenableFuture<CalculatedFieldState> fetchStateFromDb(CalculatedFieldCtx ctx, EntityId entityId);

37
application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldQueueService.java

@ -0,0 +1,37 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.cf;
import com.google.common.util.concurrent.FutureCallback;
import org.thingsboard.rule.engine.api.AttributesSaveRequest;
import org.thingsboard.rule.engine.api.TimeseriesSaveRequest;
import org.thingsboard.server.common.data.kv.TimeseriesSaveResult;
import java.util.List;
public interface CalculatedFieldQueueService {
/**
* Filter CFs based on the request entity. Push to the queue if any matching CF exist;
*
* @param request - telemetry save request;
* @param callback
*/
void pushRequestToQueue(TimeseriesSaveRequest request, TimeseriesSaveResult result, FutureCallback<Void> callback);
void pushRequestToQueue(AttributesSaveRequest request, List<Long> result, FutureCallback<Void> callback);
}

3
application/src/main/java/org/thingsboard/server/service/cf/ctx/CalculatedFieldStateService.java → application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldStateService.java

@ -13,9 +13,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.cf.ctx;
package org.thingsboard.server.service.cf;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId;
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx;
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldState;

2
application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldInitService.java

@ -20,8 +20,6 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.calculatedField.CalculatedFieldStateRestoreMsg;
import org.thingsboard.server.common.data.ProfileEntityIdInfo;
import org.thingsboard.server.common.data.page.PageDataIterable;
import org.thingsboard.server.dao.asset.AssetService;

287
application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java → application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java

@ -17,7 +17,6 @@ package org.thingsboard.server.service.cf;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
@ -32,25 +31,17 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.rule.engine.api.AttributesSaveRequest;
import org.thingsboard.rule.engine.api.TimeseriesSaveRequest;
import org.thingsboard.server.actors.calculatedField.CalculatedFieldTelemetryMsg;
import org.thingsboard.server.actors.calculatedField.MultipleTbCallback;
import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.AttributeScope;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.cf.CalculatedFieldLink;
import org.thingsboard.server.common.data.cf.configuration.Argument;
import org.thingsboard.server.common.data.cf.configuration.OutputType;
import org.thingsboard.server.common.data.id.AssetId;
import org.thingsboard.server.common.data.id.CalculatedFieldId;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityIdFactory;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.Aggregation;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
@ -59,7 +50,6 @@ import org.thingsboard.server.common.data.kv.DoubleDataEntry;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
import org.thingsboard.server.common.data.kv.StringDataEntry;
import org.thingsboard.server.common.data.kv.TimeseriesSaveResult;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration;
@ -67,12 +57,9 @@ import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.common.util.ProtoUtils;
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.dao.usagerecord.ApiLimitService;
import org.thingsboard.server.gen.transport.TransportProtos.AttributeScopeProto;
import org.thingsboard.server.gen.transport.TransportProtos.AttributeValueProto;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldEntityCtxIdProto;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldIdProto;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldLinkedTelemetryMsgProto;
@ -80,12 +67,11 @@ import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldLinke
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto;
import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto;
import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.TbQueueMsgMetadata;
import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtx;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.util.TbRuleEngineComponent;
import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId;
import org.thingsboard.server.service.cf.ctx.CalculatedFieldStateService;
import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry;
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx;
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldState;
@ -93,149 +79,51 @@ import org.thingsboard.server.service.cf.ctx.state.ScriptCalculatedFieldState;
import org.thingsboard.server.service.cf.ctx.state.SimpleCalculatedFieldState;
import org.thingsboard.server.service.cf.ctx.state.SingleValueArgumentEntry;
import org.thingsboard.server.service.cf.ctx.state.TsRollingArgumentEntry;
import org.thingsboard.server.service.cf.telemetry.CalculatedFieldAttributeUpdateRequest;
import org.thingsboard.server.service.cf.telemetry.CalculatedFieldTelemetryUpdateRequest;
import org.thingsboard.server.service.cf.telemetry.CalculatedFieldTimeSeriesUpdateRequest;
import org.thingsboard.server.service.partition.AbstractPartitionBasedService;
import org.thingsboard.server.service.profile.TbAssetProfileCache;
import org.thingsboard.server.service.profile.TbDeviceProfileCache;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import static org.thingsboard.server.common.data.DataConstants.SCOPE;
import static org.thingsboard.server.common.util.ProtoUtils.toTsKvProto;
import static org.thingsboard.server.queue.discovery.HashPartitionService.CALCULATED_FIELD_QUEUE_KEY;
@TbRuleEngineComponent
@Service
@Slf4j
@RequiredArgsConstructor
public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBasedService<CalculatedFieldId> implements CalculatedFieldExecutionService {
public class DefaultCalculatedFieldProcessingService implements CalculatedFieldProcessingService {
public static final TbQueueCallback DUMMY_TB_QUEUE_CALLBACK = new TbQueueCallback() {
@Override
public void onSuccess(TbQueueMsgMetadata metadata) {
}
@Override
public void onFailure(Throwable t) {
}
};
private final TbAssetProfileCache assetProfileCache;
private final TbDeviceProfileCache deviceProfileCache;
private final CalculatedFieldCache calculatedFieldCache;
private final AttributesService attributesService;
private final TimeseriesService timeseriesService;
private final TbClusterService clusterService;
private final ApiLimitService apiLimitService;
private final PartitionService partitionService;
private ListeningExecutorService calculatedFieldExecutor;
private ListeningExecutorService calculatedFieldCallbackExecutor;
private final ConcurrentMap<CalculatedFieldEntityCtxId, CalculatedFieldEntityCtx> states = new ConcurrentHashMap<>();
private static final Set<EntityType> supportedReferencedEntities = EnumSet.of(
EntityType.DEVICE, EntityType.ASSET, EntityType.CUSTOMER, EntityType.TENANT
);
@Value("${calculatedField.initFetchPackSize:50000}")
@Getter
private int initFetchPackSize;
@PostConstruct
public void init() {
super.init();
calculatedFieldExecutor = MoreExecutors.listeningDecorator(ThingsBoardExecutors.newWorkStealingPool(
Math.max(4, Runtime.getRuntime().availableProcessors()), "calculated-field"));
calculatedFieldCallbackExecutor = MoreExecutors.listeningDecorator(ThingsBoardExecutors.newWorkStealingPool(
Math.max(4, Runtime.getRuntime().availableProcessors()), "calculated-field-callback"));
}
@PreDestroy
public void stop() {
super.stop();
if (calculatedFieldExecutor != null) {
calculatedFieldExecutor.shutdownNow();
}
if (calculatedFieldCallbackExecutor != null) {
calculatedFieldCallbackExecutor.shutdownNow();
}
}
@Override
protected String getServiceName() {
return "Calculated Field Execution";
}
@Override
protected String getSchedulerExecutorName() {
return "calculated-field-scheduled";
}
@Override
public void pushRequestToQueue(TimeseriesSaveRequest request, TimeseriesSaveResult result, FutureCallback<Void> callback) {
var tenantId = request.getTenantId();
var entityId = request.getEntityId();
//TODO: 1. check that request entity has calculated fields for entity or profile. If yes - push to corresponding partitions;
//TODO: 2. check that request entity has calculated field links. If yes - push to corresponding partitions;
//TODO: in 1 and 2 we should do the check as quick as possible. Should we also check the field/link keys?;
checkEntityAndPushToQueue(tenantId, entityId, cf -> cf.matches(request.getEntries()), cf -> cf.linkMatches(entityId, request.getEntries()),
() -> toCalculatedFieldTelemetryMsgProto(request, result), callback);
}
@Override
public void pushRequestToQueue(AttributesSaveRequest request, List<Long> result, FutureCallback<Void> callback) {
var tenantId = request.getTenantId();
var entityId = request.getEntityId();
checkEntityAndPushToQueue(tenantId, entityId, cf -> cf.matches(request.getEntries(), request.getScope()), cf -> cf.linkMatches(entityId, request.getEntries(), request.getScope()),
() -> toCalculatedFieldTelemetryMsgProto(request, result), callback);
}
private void checkEntityAndPushToQueue(TenantId tenantId, EntityId entityId,
Predicate<CalculatedFieldCtx> mainEntityFilter, Predicate<CalculatedFieldCtx> linkedEntityFilter,
Supplier<ToCalculatedFieldMsg> msg, FutureCallback<Void> callback) {
boolean send = checkEntityForCalculatedFields(tenantId, entityId, mainEntityFilter, linkedEntityFilter);
if (send) {
clusterService.pushMsgToCalculatedFields(tenantId, entityId, msg.get(), wrap(callback));
} else {
if (callback != null) {
callback.onSuccess(null);
}
}
}
private boolean checkEntityForCalculatedFields(TenantId tenantId, EntityId entityId, Predicate<CalculatedFieldCtx> filter, Predicate<CalculatedFieldCtx> linkedEntityFilter) {
boolean send = false;
if (supportedReferencedEntities.contains(entityId.getEntityType())) {
send = calculatedFieldCache.getCalculatedFieldCtxsByEntityId(entityId).stream().anyMatch(filter);
if (!send) {
send = calculatedFieldCache.getCalculatedFieldCtxsByEntityId(getProfileId(tenantId, entityId)).stream().anyMatch(filter);
}
if (!send) {
send = calculatedFieldCache.getCalculatedFieldLinksByEntityId(entityId).stream()
.map(CalculatedFieldLink::getCalculatedFieldId)
.map(calculatedFieldCache::getCalculatedFieldCtx)
.anyMatch(linkedEntityFilter);
}
}
return send;
}
@Override
public ListenableFuture<CalculatedFieldState> fetchStateFromDb(CalculatedFieldCtx ctx, EntityId entityId) {
Map<String, ListenableFuture<ArgumentEntry>> argFutures = new HashMap<>();
@ -262,21 +150,6 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
}, calculatedFieldCallbackExecutor);
}
@Override
protected Map<TopicPartitionInfo, List<ListenableFuture<?>>> onAddedPartitions(Set<TopicPartitionInfo> addedPartitions) {
var result = new HashMap<TopicPartitionInfo, List<ListenableFuture<?>>>();
return result;
}
@Override
protected void cleanupEntityOnPartitionRemoval(CalculatedFieldId entityId) {
cleanupEntity(entityId);
}
private void cleanupEntity(CalculatedFieldId calculatedFieldId) {
states.keySet().removeIf(ctxId -> ctxId.cfId().equals(calculatedFieldId));
}
@Override
public void pushMsgToRuleEngine(TenantId tenantId, EntityId entityId, CalculatedFieldResult calculatedFieldResult, List<CalculatedFieldId> cfIds, TbCallback callback) {
try {
@ -369,30 +242,6 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
.build();
}
private ListenableFuture<Void> fetchArguments(TenantId tenantId, EntityId entityId, Map<String, Argument> necessaryArguments, Consumer<Map<String, ArgumentEntry>> onComplete) {
Map<String, ArgumentEntry> argumentValues = new HashMap<>();
List<ListenableFuture<ArgumentEntry>> futures = new ArrayList<>();
necessaryArguments.forEach((key, argument) -> {
futures.add(Futures.transform(fetchArgumentValue(tenantId, entityId, argument),
result -> {
argumentValues.put(key, result);
return result;
}, calculatedFieldCallbackExecutor));
});
return Futures.transform(Futures.allAsList(futures), results -> {
onComplete.accept(argumentValues);
return null;
}, calculatedFieldCallbackExecutor);
}
private ListenableFuture<ArgumentEntry> fetchArgumentValue(TenantId tenantId, EntityId targetEntityId, Argument argument) {
EntityId argumentEntityId = argument.getRefEntityId();
EntityId entityId = (argumentEntityId == null || isProfileEntity(argumentEntityId))
? targetEntityId
: argumentEntityId;
return fetchKvEntry(tenantId, entityId, argument);
}
private ListenableFuture<ArgumentEntry> fetchKvEntry(TenantId tenantId, EntityId entityId, Argument argument) {
return switch (argument.getRefEntityKey().getType()) {
case TS_ROLLING -> fetchTsRolling(tenantId, entityId, argument);
@ -462,78 +311,6 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
};
}
private boolean isProfileEntity(EntityId entityId) {
return EntityType.DEVICE_PROFILE.equals(entityId.getEntityType()) || EntityType.ASSET_PROFILE.equals(entityId.getEntityType());
}
private EntityId getProfileId(TenantId tenantId, EntityId entityId) {
return switch (entityId.getEntityType()) {
case ASSET -> assetProfileCache.get(tenantId, (AssetId) entityId).getId();
case DEVICE -> deviceProfileCache.get(tenantId, (DeviceId) entityId).getId();
default -> null;
};
}
private ToCalculatedFieldMsg toCalculatedFieldTelemetryMsgProto(TimeseriesSaveRequest request, TimeseriesSaveResult result) {
ToCalculatedFieldMsg.Builder msg = ToCalculatedFieldMsg.newBuilder();
CalculatedFieldTelemetryMsgProto.Builder telemetryMsg = buildTelemetryMsgProto(request.getTenantId(), request.getEntityId(), request.getPreviousCalculatedFieldIds(), request.getTbMsgId(), request.getTbMsgType());
List<TsKvEntry> entries = request.getEntries();
List<Long> versions = result.getVersions();
for (int i = 0; i < entries.size(); i++) {
long tsVersion = versions.get(i);
TsKvProto tsProto = toTsKvProto(entries.get(i)).toBuilder().setVersion(tsVersion).build();
telemetryMsg.addTsData(tsProto);
}
msg.setTelemetryMsg(telemetryMsg.build());
return msg.build();
}
private ToCalculatedFieldMsg toCalculatedFieldTelemetryMsgProto(AttributesSaveRequest request, List<Long> versions) {
ToCalculatedFieldMsg.Builder msg = ToCalculatedFieldMsg.newBuilder();
CalculatedFieldTelemetryMsgProto.Builder telemetryMsg = buildTelemetryMsgProto(request.getTenantId(), request.getEntityId(), request.getPreviousCalculatedFieldIds(), request.getTbMsgId(), request.getTbMsgType());
telemetryMsg.setScope(AttributeScopeProto.valueOf(request.getScope().name()));
List<AttributeKvEntry> entries = request.getEntries();
for (int i = 0; i < entries.size(); i++) {
long attrVersion = versions.get(i);
AttributeValueProto attrProto = ProtoUtils.toProto(entries.get(i)).toBuilder().setVersion(attrVersion).build();
telemetryMsg.addAttrData(attrProto);
}
msg.setTelemetryMsg(telemetryMsg.build());
return msg.build();
}
private CalculatedFieldTelemetryMsgProto.Builder buildTelemetryMsgProto(TenantId tenantId, EntityId entityId, List<CalculatedFieldId> calculatedFieldIds, UUID tbMsgId, TbMsgType tbMsgType) {
CalculatedFieldTelemetryMsgProto.Builder telemetryMsg = CalculatedFieldTelemetryMsgProto.newBuilder();
telemetryMsg.setTenantIdMSB(tenantId.getId().getMostSignificantBits());
telemetryMsg.setTenantIdLSB(tenantId.getId().getLeastSignificantBits());
telemetryMsg.setEntityType(entityId.getEntityType().name());
telemetryMsg.setEntityIdMSB(entityId.getId().getMostSignificantBits());
telemetryMsg.setEntityIdLSB(entityId.getId().getLeastSignificantBits());
if (calculatedFieldIds != null) {
for (CalculatedFieldId cfId : calculatedFieldIds) {
telemetryMsg.addPreviousCalculatedFields(toProto(cfId));
}
}
if (tbMsgId != null) {
telemetryMsg.setTbMsgIdMSB(tbMsgId.getMostSignificantBits());
telemetryMsg.setTbMsgIdLSB(tbMsgId.getLeastSignificantBits());
}
if (tbMsgType != null) {
telemetryMsg.setTbMsgType(tbMsgType.name());
}
return telemetryMsg;
}
private CalculatedFieldIdProto toProto(CalculatedFieldId cfId) {
return CalculatedFieldIdProto.newBuilder()
.setCalculatedFieldIdMSB(cfId.getId().getMostSignificantBits())
@ -541,60 +318,6 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
.build();
}
private CalculatedFieldTelemetryUpdateRequest fromProto(CalculatedFieldTelemetryMsgProto proto) {
TenantId tenantId = TenantId.fromUUID(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB()));
EntityId entityId = EntityIdFactory.getByTypeAndUuid(proto.getEntityType(), new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB()));
if (!proto.getTsDataList().isEmpty()) {
List<TsKvEntry> updatedTelemetry = proto.getTsDataList().stream()
.map(ProtoUtils::fromProto)
.toList();
return new CalculatedFieldTimeSeriesUpdateRequest(
tenantId, entityId, updatedTelemetry,
proto.getPreviousCalculatedFieldsList().stream()
.map(cfIdProto -> new CalculatedFieldId(
new UUID(cfIdProto.getCalculatedFieldIdMSB(), cfIdProto.getCalculatedFieldIdLSB())))
.toList());
} else {
AttributeScope scope = AttributeScope.valueOf(proto.getScope().name());
List<AttributeKvEntry> updatedTelemetry = proto.getAttrDataList().stream()
.map(ProtoUtils::fromProto)
.toList();
return new CalculatedFieldAttributeUpdateRequest(
tenantId, entityId, scope, updatedTelemetry,
proto.getPreviousCalculatedFieldsList().stream()
.map(cfIdProto -> new CalculatedFieldId(
new UUID(cfIdProto.getCalculatedFieldIdMSB(), cfIdProto.getCalculatedFieldIdLSB())))
.toList());
}
}
private static TbQueueCallback wrap(FutureCallback<Void> callback) {
if (callback != null) {
return new FutureCallbackWrapper(callback);
} else {
return DUMMY_TB_QUEUE_CALLBACK;
}
}
private static class FutureCallbackWrapper implements TbQueueCallback {
private final FutureCallback<Void> callback;
public FutureCallbackWrapper(FutureCallback<Void> callback) {
this.callback = callback;
}
@Override
public void onSuccess(TbQueueMsgMetadata metadata) {
callback.onSuccess(null);
}
@Override
public void onFailure(Throwable t) {
callback.onFailure(t);
}
}
private static class TbCallbackWrapper implements TbQueueCallback {
private final TbCallback callback;

238
application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java

@ -0,0 +1,238 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.cf;
import com.google.common.util.concurrent.FutureCallback;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.thingsboard.rule.engine.api.AttributesSaveRequest;
import org.thingsboard.rule.engine.api.TimeseriesSaveRequest;
import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.cf.CalculatedFieldLink;
import org.thingsboard.server.common.data.id.AssetId;
import org.thingsboard.server.common.data.id.CalculatedFieldId;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.TimeseriesSaveResult;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.util.ProtoUtils;
import org.thingsboard.server.gen.transport.TransportProtos.AttributeScopeProto;
import org.thingsboard.server.gen.transport.TransportProtos.AttributeValueProto;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldIdProto;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto;
import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto;
import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.TbQueueMsgMetadata;
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx;
import org.thingsboard.server.service.profile.TbAssetProfileCache;
import org.thingsboard.server.service.profile.TbDeviceProfileCache;
import java.util.EnumSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.function.Predicate;
import java.util.function.Supplier;
import static org.thingsboard.server.common.util.ProtoUtils.toTsKvProto;
@Service
@Slf4j
@RequiredArgsConstructor
public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueService {
public static final TbQueueCallback DUMMY_TB_QUEUE_CALLBACK = new TbQueueCallback() {
@Override
public void onSuccess(TbQueueMsgMetadata metadata) {
}
@Override
public void onFailure(Throwable t) {
}
};
private final TbAssetProfileCache assetProfileCache;
private final TbDeviceProfileCache deviceProfileCache;
private final CalculatedFieldCache calculatedFieldCache;
private final TbClusterService clusterService;
private static final Set<EntityType> supportedReferencedEntities = EnumSet.of(
EntityType.DEVICE, EntityType.ASSET, EntityType.CUSTOMER, EntityType.TENANT
);
@Value("${calculatedField.initFetchPackSize:50000}")
@Getter
private int initFetchPackSize;
@Override
public void pushRequestToQueue(TimeseriesSaveRequest request, TimeseriesSaveResult result, FutureCallback<Void> callback) {
var tenantId = request.getTenantId();
var entityId = request.getEntityId();
//TODO: 1. check that request entity has calculated fields for entity or profile. If yes - push to corresponding partitions;
//TODO: 2. check that request entity has calculated field links. If yes - push to corresponding partitions;
//TODO: in 1 and 2 we should do the check as quick as possible. Should we also check the field/link keys?;
checkEntityAndPushToQueue(tenantId, entityId, cf -> cf.matches(request.getEntries()), cf -> cf.linkMatches(entityId, request.getEntries()),
() -> toCalculatedFieldTelemetryMsgProto(request, result), callback);
}
@Override
public void pushRequestToQueue(AttributesSaveRequest request, List<Long> result, FutureCallback<Void> callback) {
var tenantId = request.getTenantId();
var entityId = request.getEntityId();
checkEntityAndPushToQueue(tenantId, entityId, cf -> cf.matches(request.getEntries(), request.getScope()), cf -> cf.linkMatches(entityId, request.getEntries(), request.getScope()),
() -> toCalculatedFieldTelemetryMsgProto(request, result), callback);
}
private void checkEntityAndPushToQueue(TenantId tenantId, EntityId entityId,
Predicate<CalculatedFieldCtx> mainEntityFilter, Predicate<CalculatedFieldCtx> linkedEntityFilter,
Supplier<ToCalculatedFieldMsg> msg, FutureCallback<Void> callback) {
boolean send = checkEntityForCalculatedFields(tenantId, entityId, mainEntityFilter, linkedEntityFilter);
if (send) {
clusterService.pushMsgToCalculatedFields(tenantId, entityId, msg.get(), wrap(callback));
} else {
if (callback != null) {
callback.onSuccess(null);
}
}
}
private boolean checkEntityForCalculatedFields(TenantId tenantId, EntityId entityId, Predicate<CalculatedFieldCtx> filter, Predicate<CalculatedFieldCtx> linkedEntityFilter) {
boolean send = false;
if (supportedReferencedEntities.contains(entityId.getEntityType())) {
send = calculatedFieldCache.getCalculatedFieldCtxsByEntityId(entityId).stream().anyMatch(filter);
if (!send) {
send = calculatedFieldCache.getCalculatedFieldCtxsByEntityId(getProfileId(tenantId, entityId)).stream().anyMatch(filter);
}
if (!send) {
send = calculatedFieldCache.getCalculatedFieldLinksByEntityId(entityId).stream()
.map(CalculatedFieldLink::getCalculatedFieldId)
.map(calculatedFieldCache::getCalculatedFieldCtx)
.anyMatch(linkedEntityFilter);
}
}
return send;
}
private EntityId getProfileId(TenantId tenantId, EntityId entityId) {
return switch (entityId.getEntityType()) {
case ASSET -> assetProfileCache.get(tenantId, (AssetId) entityId).getId();
case DEVICE -> deviceProfileCache.get(tenantId, (DeviceId) entityId).getId();
default -> null;
};
}
private ToCalculatedFieldMsg toCalculatedFieldTelemetryMsgProto(TimeseriesSaveRequest request, TimeseriesSaveResult result) {
ToCalculatedFieldMsg.Builder msg = ToCalculatedFieldMsg.newBuilder();
CalculatedFieldTelemetryMsgProto.Builder telemetryMsg = buildTelemetryMsgProto(request.getTenantId(), request.getEntityId(), request.getPreviousCalculatedFieldIds(), request.getTbMsgId(), request.getTbMsgType());
List<TsKvEntry> entries = request.getEntries();
List<Long> versions = result.getVersions();
for (int i = 0; i < entries.size(); i++) {
long tsVersion = versions.get(i);
TsKvProto tsProto = toTsKvProto(entries.get(i)).toBuilder().setVersion(tsVersion).build();
telemetryMsg.addTsData(tsProto);
}
msg.setTelemetryMsg(telemetryMsg.build());
return msg.build();
}
private ToCalculatedFieldMsg toCalculatedFieldTelemetryMsgProto(AttributesSaveRequest request, List<Long> versions) {
ToCalculatedFieldMsg.Builder msg = ToCalculatedFieldMsg.newBuilder();
CalculatedFieldTelemetryMsgProto.Builder telemetryMsg = buildTelemetryMsgProto(request.getTenantId(), request.getEntityId(), request.getPreviousCalculatedFieldIds(), request.getTbMsgId(), request.getTbMsgType());
telemetryMsg.setScope(AttributeScopeProto.valueOf(request.getScope().name()));
List<AttributeKvEntry> entries = request.getEntries();
for (int i = 0; i < entries.size(); i++) {
long attrVersion = versions.get(i);
AttributeValueProto attrProto = ProtoUtils.toProto(entries.get(i)).toBuilder().setVersion(attrVersion).build();
telemetryMsg.addAttrData(attrProto);
}
msg.setTelemetryMsg(telemetryMsg.build());
return msg.build();
}
private CalculatedFieldTelemetryMsgProto.Builder buildTelemetryMsgProto(TenantId tenantId, EntityId entityId, List<CalculatedFieldId> calculatedFieldIds, UUID tbMsgId, TbMsgType tbMsgType) {
CalculatedFieldTelemetryMsgProto.Builder telemetryMsg = CalculatedFieldTelemetryMsgProto.newBuilder();
telemetryMsg.setTenantIdMSB(tenantId.getId().getMostSignificantBits());
telemetryMsg.setTenantIdLSB(tenantId.getId().getLeastSignificantBits());
telemetryMsg.setEntityType(entityId.getEntityType().name());
telemetryMsg.setEntityIdMSB(entityId.getId().getMostSignificantBits());
telemetryMsg.setEntityIdLSB(entityId.getId().getLeastSignificantBits());
if (calculatedFieldIds != null) {
for (CalculatedFieldId cfId : calculatedFieldIds) {
telemetryMsg.addPreviousCalculatedFields(toProto(cfId));
}
}
if (tbMsgId != null) {
telemetryMsg.setTbMsgIdMSB(tbMsgId.getMostSignificantBits());
telemetryMsg.setTbMsgIdLSB(tbMsgId.getLeastSignificantBits());
}
if (tbMsgType != null) {
telemetryMsg.setTbMsgType(tbMsgType.name());
}
return telemetryMsg;
}
private CalculatedFieldIdProto toProto(CalculatedFieldId cfId) {
return CalculatedFieldIdProto.newBuilder()
.setCalculatedFieldIdMSB(cfId.getId().getMostSignificantBits())
.setCalculatedFieldIdLSB(cfId.getId().getLeastSignificantBits())
.build();
}
private static TbQueueCallback wrap(FutureCallback<Void> callback) {
if (callback != null) {
return new FutureCallbackWrapper(callback);
} else {
return DUMMY_TB_QUEUE_CALLBACK;
}
}
private static class FutureCallbackWrapper implements TbQueueCallback {
private final FutureCallback<Void> callback;
public FutureCallbackWrapper(FutureCallback<Void> callback) {
this.callback = callback;
}
@Override
public void onSuccess(TbQueueMsgMetadata metadata) {
callback.onSuccess(null);
}
@Override
public void onFailure(Throwable t) {
callback.onFailure(t);
}
}
}

1
application/src/main/java/org/thingsboard/server/service/cf/cache/CalculatedFieldEntityProfileCache.java

@ -32,4 +32,5 @@ public interface CalculatedFieldEntityProfileCache extends ApplicationListener<P
Collection<EntityId> getMyEntityIdsByProfileId(TenantId tenantId, EntityId profileId);
int getEntityIdPartition(TenantId tenantId, EntityId entityId);
}

9
application/src/main/java/org/thingsboard/server/service/cf/cache/DefaultCalculatedFieldEntityProfileCache.java

@ -42,7 +42,7 @@ import java.util.stream.Collectors;
//TODO: remove and use TenantEntityProfileCache in each CalculatedFieldManagerMessageProcessor;
public class DefaultCalculatedFieldEntityProfileCache extends TbApplicationEventListener<PartitionChangeEvent> implements CalculatedFieldEntityProfileCache {
private static final Integer UNKNOWN = -1;
private static final Integer UNKNOWN = 0;
private final ConcurrentMap<TenantId, TenantEntityProfileCache> tenantCache = new ConcurrentHashMap<>();
private final PartitionService partitionService;
private volatile List<Integer> myPartitions = Collections.emptyList();
@ -84,4 +84,11 @@ public class DefaultCalculatedFieldEntityProfileCache extends TbApplicationEvent
public Collection<EntityId> getMyEntityIdsByProfileId(TenantId tenantId, EntityId profileId) {
return tenantCache.computeIfAbsent(tenantId, id -> new TenantEntityProfileCache()).getMyEntityIdsByProfileId(profileId);
}
@Override
public int getEntityIdPartition(TenantId tenantId, EntityId entityId) {
var tpi = partitionService.resolve(HashPartitionService.CALCULATED_FIELD_QUEUE_KEY, entityId);
return tpi.getPartition().orElse(UNKNOWN);
}
}

25
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java

@ -17,34 +17,11 @@ package org.thingsboard.server.service.cf.ctx.state;
import lombok.RequiredArgsConstructor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.calculatedField.CalculatedFieldStateRestoreMsg;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.cf.CalculatedFieldType;
import org.thingsboard.server.common.data.id.CalculatedFieldId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityIdFactory;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.BasicKvEntry;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.util.KvProtoUtil;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldEntityCtxIdProto;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateProto;
import org.thingsboard.server.gen.transport.TransportProtos.SingleValueArgumentProto;
import org.thingsboard.server.gen.transport.TransportProtos.TsValueListProto;
import org.thingsboard.server.gen.transport.TransportProtos.TsValueProto;
import org.thingsboard.server.queue.util.AfterStartUp;
import org.thingsboard.server.service.cf.RocksDBService;
import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId;
import org.thingsboard.server.service.cf.ctx.CalculatedFieldStateService;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import java.util.UUID;
import java.util.stream.Collectors;
import org.thingsboard.server.service.cf.CalculatedFieldStateService;
@Service
@RequiredArgsConstructor

2
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/RocksDBCalculatedFieldStateService.java

@ -37,7 +37,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.TsValueProto;
import org.thingsboard.server.queue.util.AfterStartUp;
import org.thingsboard.server.service.cf.RocksDBService;
import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId;
import org.thingsboard.server.service.cf.ctx.CalculatedFieldStateService;
import org.thingsboard.server.service.cf.CalculatedFieldStateService;
import java.util.Map;
import java.util.Optional;

22
application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java

@ -21,6 +21,7 @@ import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Service;
@ -32,8 +33,10 @@ import org.thingsboard.server.common.data.id.EntityIdFactory;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.queue.QueueConfig;
import org.thingsboard.server.common.msg.cf.CalculatedFieldEntityLifecycleMsg;
import org.thingsboard.server.common.msg.cf.CalculatedFieldPartitionChangeMsg;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldLinkedTelemetryMsgProto;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto;
@ -49,7 +52,6 @@ import org.thingsboard.server.queue.provider.TbRuleEngineQueueFactory;
import org.thingsboard.server.queue.util.TbRuleEngineComponent;
import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
import org.thingsboard.server.service.cf.CalculatedFieldCache;
import org.thingsboard.server.service.cf.CalculatedFieldExecutionService;
import org.thingsboard.server.service.profile.TbAssetProfileCache;
import org.thingsboard.server.service.profile.TbDeviceProfileCache;
import org.thingsboard.server.service.queue.consumer.MainQueueConsumerManager;
@ -58,6 +60,7 @@ import org.thingsboard.server.service.queue.processing.IdMsgPair;
import org.thingsboard.server.service.security.auth.jwt.settings.JwtSettingsService;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -134,8 +137,23 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer
@Override
protected void onTbApplicationEvent(PartitionChangeEvent event) {
log.debug("Subscribing to partitions: {}", event.getCalculatedFieldsPartitions());
var partitions = event.getCalculatedFieldsPartitions();
log.info("Subscribing to partitions: {}", partitions);
// TODO: @vklimov - before update of the main consumer, we should read the state topics and use
// CalculatedFieldStateService (KafkaCalculatedFieldStateService) to restore the states for entities that belong to new partitions.
// Cleanup entities that do not belong to current partition;
mainConsumer.update(event.getCalculatedFieldsPartitions());
// Cleanup old entities after corresponding consumers are stopped.
// Any periodic tasks need to check that the entity is still managed by the current server before processing.
actorContext.tell(new CalculatedFieldPartitionChangeMsg(partitionsToBooleanIndexArray(partitions)));
}
private boolean[] partitionsToBooleanIndexArray(Set<TopicPartitionInfo> partitions) {
boolean[] myPartitions = new boolean[partitionService.getTotalCalculatedFieldPartitions()];
for(var tpi : partitions) {
tpi.getPartition().ifPresent(partition -> myPartitions[partition] = true);
}
return myPartitions;
}
private void processMsgs(List<TbProtoQueueMsg<ToCalculatedFieldMsg>> msgs, TbQueueConsumer<TbProtoQueueMsg<ToCalculatedFieldMsg>> consumer, CalculatedFieldQueueConfig config) throws Exception {

4
application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java

@ -96,7 +96,7 @@ import org.thingsboard.server.queue.common.TbRuleEngineProducerService;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
import org.thingsboard.server.service.cf.CalculatedFieldExecutionService;
import org.thingsboard.server.service.cf.CalculatedFieldProcessingService;
import org.thingsboard.server.service.gateway_device.GatewayNotificationsService;
import org.thingsboard.server.service.ota.OtaPackageStateService;
import org.thingsboard.server.service.profile.TbAssetProfileCache;
@ -148,7 +148,7 @@ public class DefaultTbClusterService implements TbClusterService {
@Autowired
@Lazy
private CalculatedFieldExecutionService calculatedFieldExecutionService;
private CalculatedFieldProcessingService calculatedFieldProcessingService;
private final TopicService topicService;
private final TbDeviceProfileCache deviceProfileCache;

12
application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java

@ -50,7 +50,7 @@ import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.dao.util.KvUtils;
import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
import org.thingsboard.server.service.cf.CalculatedFieldExecutionService;
import org.thingsboard.server.service.cf.CalculatedFieldQueueService;
import org.thingsboard.server.service.entitiy.entityview.TbEntityViewService;
import org.thingsboard.server.service.subscription.TbSubscriptionUtils;
@ -77,7 +77,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
private final TbEntityViewService tbEntityViewService;
private final TbApiUsageReportClient apiUsageClient;
private final TbApiUsageStateService apiUsageStateService;
private final CalculatedFieldExecutionService calculatedFieldExecutionService;
private final CalculatedFieldQueueService calculatedFieldQueueService;
private ExecutorService tsCallBackExecutor;
@ -89,13 +89,13 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
@Lazy TbEntityViewService tbEntityViewService,
TbApiUsageReportClient apiUsageClient,
TbApiUsageStateService apiUsageStateService,
CalculatedFieldExecutionService calculatedFieldExecutionService) {
CalculatedFieldQueueService calculatedFieldQueueService) {
this.attrService = attrService;
this.tsService = tsService;
this.tbEntityViewService = tbEntityViewService;
this.apiUsageClient = apiUsageClient;
this.apiUsageStateService = apiUsageStateService;
this.calculatedFieldExecutionService = calculatedFieldExecutionService;
this.calculatedFieldQueueService = calculatedFieldQueueService;
}
@PostConstruct
@ -147,7 +147,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
resultFuture = tsService.saveWithoutLatest(tenantId, entityId, request.getEntries(), request.getTtl());
}
DonAsynchron.withCallback(resultFuture, result -> {
calculatedFieldExecutionService.pushRequestToQueue(request, result, request.getCallback());
calculatedFieldQueueService.pushRequestToQueue(request, result, request.getCallback());
}, safeCallback(request.getCallback()), tsCallBackExecutor);
addWsCallback(resultFuture, success -> onTimeSeriesUpdate(tenantId, entityId, request.getEntries()));
if (request.isSaveLatest() && !request.isOnlyLatest()) {
@ -167,7 +167,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
log.trace("Executing saveInternal [{}]", request);
ListenableFuture<List<Long>> saveFuture = attrService.save(request.getTenantId(), request.getEntityId(), request.getScope(), request.getEntries());
DonAsynchron.withCallback(saveFuture, result -> {
calculatedFieldExecutionService.pushRequestToQueue(request, result, request.getCallback());
calculatedFieldQueueService.pushRequestToQueue(request, result, request.getCallback());
}, safeCallback(request.getCallback()), tsCallBackExecutor);
addWsCallback(saveFuture, success -> onAttributesUpdate(request.getTenantId(), request.getEntityId(), request.getScope().name(), request.getEntries(), request.isNotifyDevice()));
}

2
common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java

@ -139,6 +139,8 @@ public enum MsgType {
CF_INIT_MSG, // Sent to init particular calculated field;
CF_LINK_INIT_MSG, // Sent to init particular calculated field;
CF_STATE_RESTORE_MSG, // Sent to restore particular calculated field entity state;
CF_PARTITIONS_CHANGE_MSG, // Sent when cluster event occures;
CF_ENTITY_LIFECYCLE_MSG, // Sent on CF/Device/Asset create/update/delete;
CF_TELEMETRY_MSG, // Sent from queue to actor system;
CF_LINKED_TELEMETRY_MSG, // Sent from queue to actor system;

40
common/message/src/main/java/org/thingsboard/server/common/msg/cf/CalculatedFieldPartitionChangeMsg.java

@ -0,0 +1,40 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.msg.cf;
import lombok.Data;
import org.thingsboard.server.common.data.cf.CalculatedField;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg;
import java.util.Set;
@Data
public class CalculatedFieldPartitionChangeMsg implements ToCalculatedFieldSystemMsg {
private final boolean[] partitions;
@Override
public TenantId getTenantId() {
return TenantId.SYS_TENANT_ID;
}
@Override
public MsgType getMsgType() {
return MsgType.CF_PARTITIONS_CHANGE_MSG;
}
}

5
common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java

@ -526,6 +526,11 @@ public class HashPartitionService implements PartitionService {
return list == null ? 0 : list.size();
}
@Override
public int getTotalCalculatedFieldPartitions() {
return cfPartitions;
}
private Map<QueueKey, List<ServiceInfo>> getServiceKeyListMap(List<ServiceInfo> services) {
final Map<QueueKey, List<ServiceInfo>> currentMap = new HashMap<>();
services.forEach(serviceInfo -> {

2
common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java

@ -77,4 +77,6 @@ public interface PartitionService {
boolean isManagedByCurrentService(TenantId tenantId);
int getTotalCalculatedFieldPartitions();
}

Loading…
Cancel
Save