|
Before Width: | Height: | Size: 2.2 KiB After Width: | Height: | Size: 9.0 KiB |
|
Before Width: | Height: | Size: 2.7 KiB After Width: | Height: | Size: 17 KiB |
|
Before Width: | Height: | Size: 2.9 KiB After Width: | Height: | Size: 21 KiB |
|
Before Width: | Height: | Size: 113 KiB After Width: | Height: | Size: 111 KiB |
|
Before Width: | Height: | Size: 139 KiB After Width: | Height: | Size: 137 KiB |
|
Before Width: | Height: | Size: 5.3 KiB After Width: | Height: | Size: 10 KiB |
|
Before Width: | Height: | Size: 50 KiB After Width: | Height: | Size: 49 KiB |
|
Before Width: | Height: | Size: 115 KiB After Width: | Height: | Size: 113 KiB |
|
Before Width: | Height: | Size: 113 KiB After Width: | Height: | Size: 111 KiB |
|
Before Width: | Height: | Size: 120 KiB After Width: | Height: | Size: 118 KiB |
|
Before Width: | Height: | Size: 121 KiB After Width: | Height: | Size: 119 KiB |
|
Before Width: | Height: | Size: 114 KiB After Width: | Height: | Size: 112 KiB |
|
Before Width: | Height: | Size: 2.2 KiB After Width: | Height: | Size: 9.0 KiB |
|
Before Width: | Height: | Size: 2.7 KiB After Width: | Height: | Size: 17 KiB |
|
Before Width: | Height: | Size: 2.2 KiB After Width: | Height: | Size: 9.0 KiB |
|
Before Width: | Height: | Size: 6.9 KiB After Width: | Height: | Size: 6.9 KiB |
|
Before Width: | Height: | Size: 5.3 KiB After Width: | Height: | Size: 10 KiB |
|
Before Width: | Height: | Size: 7.0 KiB After Width: | Height: | Size: 7.0 KiB |
|
Before Width: | Height: | Size: 5.3 KiB After Width: | Height: | Size: 10 KiB |
|
Before Width: | Height: | Size: 48 KiB After Width: | Height: | Size: 48 KiB |
|
Before Width: | Height: | Size: 2.7 KiB After Width: | Height: | Size: 17 KiB |
|
Before Width: | Height: | Size: 7.1 KiB After Width: | Height: | Size: 7.1 KiB |
|
Before Width: | Height: | Size: 51 KiB After Width: | Height: | Size: 50 KiB |
|
Before Width: | Height: | Size: 101 KiB After Width: | Height: | Size: 99 KiB |
|
Before Width: | Height: | Size: 105 KiB After Width: | Height: | Size: 104 KiB |
|
Before Width: | Height: | Size: 118 KiB After Width: | Height: | Size: 117 KiB |
|
Before Width: | Height: | Size: 120 KiB After Width: | Height: | Size: 118 KiB |
|
Before Width: | Height: | Size: 122 KiB After Width: | Height: | Size: 120 KiB |
|
Before Width: | Height: | Size: 108 KiB After Width: | Height: | Size: 107 KiB |
|
Before Width: | Height: | Size: 120 KiB After Width: | Height: | Size: 119 KiB |
|
Before Width: | Height: | Size: 2.2 KiB After Width: | Height: | Size: 9.0 KiB |
|
Before Width: | Height: | Size: 2.7 KiB After Width: | Height: | Size: 17 KiB |
|
Before Width: | Height: | Size: 5.3 KiB After Width: | Height: | Size: 10 KiB |
|
Before Width: | Height: | Size: 101 KiB After Width: | Height: | Size: 100 KiB |
|
Before Width: | Height: | Size: 50 KiB After Width: | Height: | Size: 50 KiB |
|
Before Width: | Height: | Size: 113 KiB After Width: | Height: | Size: 112 KiB |
@ -0,0 +1,70 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors.calculatedField; |
|||
|
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.thingsboard.common.util.DebugModeUtil; |
|||
import org.thingsboard.server.actors.ActorSystemContext; |
|||
import org.thingsboard.server.actors.service.ContextAwareActor; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.msg.TbActorMsg; |
|||
import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg; |
|||
|
|||
@Slf4j |
|||
public abstract class AbstractCalculatedFieldActor extends ContextAwareActor { |
|||
|
|||
protected final TenantId tenantId; |
|||
|
|||
public AbstractCalculatedFieldActor(ActorSystemContext systemContext, TenantId tenantId) { |
|||
super(systemContext); |
|||
this.tenantId = tenantId; |
|||
} |
|||
|
|||
@Override |
|||
protected boolean doProcess(TbActorMsg msg) { |
|||
if (msg instanceof ToCalculatedFieldSystemMsg cfm) { |
|||
Exception cause; |
|||
try { |
|||
return doProcessCfMsg(cfm); |
|||
} catch (CalculatedFieldException cfe) { |
|||
if (DebugModeUtil.isDebugFailuresAvailable(cfe.getCtx().getCalculatedField())) { |
|||
String message; |
|||
if (cfe.getErrorMessage() != null) { |
|||
message = cfe.getErrorMessage(); |
|||
} else if (cfe.getCause() != null) { |
|||
message = cfe.getCause().getMessage(); |
|||
} else { |
|||
message = "N/A"; |
|||
} |
|||
systemContext.persistCalculatedFieldDebugEvent(tenantId, cfe.getCtx().getCfId(), cfe.getEventEntity(), cfe.getArguments(), cfe.getMsgId(), cfe.getMsgType(), null, message); |
|||
} |
|||
cause = cfe.getCause(); |
|||
} catch (Exception e) { |
|||
logProcessingException(e); |
|||
cause = e; |
|||
} |
|||
cfm.getCallback().onFailure(cause); |
|||
return true; |
|||
} else { |
|||
return false; |
|||
} |
|||
} |
|||
|
|||
abstract void logProcessingException(Exception e); |
|||
|
|||
abstract boolean doProcessCfMsg(ToCalculatedFieldSystemMsg msg) throws CalculatedFieldException; |
|||
|
|||
} |
|||
@ -0,0 +1,81 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors.calculatedField; |
|||
|
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.thingsboard.server.actors.ActorSystemContext; |
|||
import org.thingsboard.server.actors.TbActorCtx; |
|||
import org.thingsboard.server.actors.TbActorException; |
|||
import org.thingsboard.server.common.data.id.EntityId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg; |
|||
import org.thingsboard.server.common.msg.cf.CalculatedFieldPartitionChangeMsg; |
|||
|
|||
@Slf4j |
|||
public class CalculatedFieldEntityActor extends AbstractCalculatedFieldActor { |
|||
|
|||
private final CalculatedFieldEntityMessageProcessor processor; |
|||
|
|||
CalculatedFieldEntityActor(ActorSystemContext systemContext, TenantId tenantId, EntityId entityId) { |
|||
super(systemContext, tenantId); |
|||
this.processor = new CalculatedFieldEntityMessageProcessor(systemContext, tenantId, entityId); |
|||
} |
|||
|
|||
@Override |
|||
public void init(TbActorCtx ctx) throws TbActorException { |
|||
super.init(ctx); |
|||
log.debug("[{}][{}] Starting CF entity actor.", processor.tenantId, processor.entityId); |
|||
try { |
|||
processor.init(ctx); |
|||
log.debug("[{}][{}] CF entity actor started.", processor.tenantId, processor.entityId); |
|||
} catch (Exception e) { |
|||
log.warn("[{}][{}] Unknown failure", processor.tenantId, processor.entityId, e); |
|||
throw new TbActorException("Failed to initialize CF entity actor", e); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
protected boolean doProcessCfMsg(ToCalculatedFieldSystemMsg msg) throws CalculatedFieldException { |
|||
switch (msg.getMsgType()) { |
|||
case CF_PARTITIONS_CHANGE_MSG: |
|||
processor.process((CalculatedFieldPartitionChangeMsg) msg); |
|||
break; |
|||
case CF_STATE_RESTORE_MSG: |
|||
processor.process((CalculatedFieldStateRestoreMsg) msg); |
|||
break; |
|||
case CF_ENTITY_INIT_CF_MSG: |
|||
processor.process((EntityInitCalculatedFieldMsg) msg); |
|||
break; |
|||
case CF_ENTITY_DELETE_MSG: |
|||
processor.process((CalculatedFieldEntityDeleteMsg) msg); |
|||
break; |
|||
case CF_ENTITY_TELEMETRY_MSG: |
|||
processor.process((EntityCalculatedFieldTelemetryMsg) msg); |
|||
break; |
|||
case CF_LINKED_TELEMETRY_MSG: |
|||
processor.process((EntityCalculatedFieldLinkedTelemetryMsg) msg); |
|||
break; |
|||
default: |
|||
return false; |
|||
} |
|||
return true; |
|||
} |
|||
|
|||
@Override |
|||
void logProcessingException(Exception e) { |
|||
log.warn("[{}][{}] Processing failure", tenantId, processor.entityId, e); |
|||
} |
|||
} |
|||
@ -0,0 +1,50 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors.calculatedField; |
|||
|
|||
import org.thingsboard.server.actors.ActorSystemContext; |
|||
import org.thingsboard.server.actors.TbActor; |
|||
import org.thingsboard.server.actors.TbActorId; |
|||
import org.thingsboard.server.actors.TbCalculatedFieldEntityActorId; |
|||
import org.thingsboard.server.actors.TbEntityActorId; |
|||
import org.thingsboard.server.actors.device.DeviceActor; |
|||
import org.thingsboard.server.actors.service.ContextBasedCreator; |
|||
import org.thingsboard.server.common.data.id.DeviceId; |
|||
import org.thingsboard.server.common.data.id.EntityId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
|
|||
public class CalculatedFieldEntityActorCreator extends ContextBasedCreator { |
|||
|
|||
private final TenantId tenantId; |
|||
private final EntityId entityId; |
|||
|
|||
public CalculatedFieldEntityActorCreator(ActorSystemContext context, TenantId tenantId, EntityId entityId) { |
|||
super(context); |
|||
this.tenantId = tenantId; |
|||
this.entityId = entityId; |
|||
} |
|||
|
|||
@Override |
|||
public TbActorId createActorId() { |
|||
return new TbCalculatedFieldEntityActorId(entityId); |
|||
} |
|||
|
|||
@Override |
|||
public TbActor createActor() { |
|||
return new CalculatedFieldEntityActor(context, tenantId, entityId); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,44 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors.calculatedField; |
|||
|
|||
import lombok.Data; |
|||
import org.thingsboard.server.common.data.id.EntityId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.msg.MsgType; |
|||
import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg; |
|||
import org.thingsboard.server.common.msg.queue.TbCallback; |
|||
|
|||
@Data |
|||
public class CalculatedFieldEntityDeleteMsg implements ToCalculatedFieldSystemMsg { |
|||
|
|||
private final TenantId tenantId; |
|||
private final EntityId entityId; |
|||
private final TbCallback callback; |
|||
|
|||
public CalculatedFieldEntityDeleteMsg(TenantId tenantId, |
|||
EntityId entityId, |
|||
TbCallback callback) { |
|||
this.tenantId = tenantId; |
|||
this.entityId = entityId; |
|||
this.callback = callback; |
|||
} |
|||
|
|||
@Override |
|||
public MsgType getMsgType() { |
|||
return MsgType.CF_ENTITY_DELETE_MSG; |
|||
} |
|||
} |
|||
@ -0,0 +1,448 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors.calculatedField; |
|||
|
|||
import com.google.common.util.concurrent.ListenableFuture; |
|||
import lombok.SneakyThrows; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.thingsboard.common.util.DebugModeUtil; |
|||
import org.thingsboard.common.util.JacksonUtil; |
|||
import org.thingsboard.server.actors.ActorSystemContext; |
|||
import org.thingsboard.server.actors.TbActorCtx; |
|||
import org.thingsboard.server.actors.shared.AbstractContextAwareMsgProcessor; |
|||
import org.thingsboard.server.common.data.AttributeScope; |
|||
import org.thingsboard.server.common.data.DataConstants; |
|||
import org.thingsboard.server.common.data.StringUtils; |
|||
import org.thingsboard.server.common.data.cf.configuration.Argument; |
|||
import org.thingsboard.server.common.data.cf.configuration.ArgumentType; |
|||
import org.thingsboard.server.common.data.cf.configuration.ReferencedEntityKey; |
|||
import org.thingsboard.server.common.data.id.CalculatedFieldId; |
|||
import org.thingsboard.server.common.data.id.EntityId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.data.kv.StringDataEntry; |
|||
import org.thingsboard.server.common.data.msg.TbMsgType; |
|||
import org.thingsboard.server.common.msg.cf.CalculatedFieldPartitionChangeMsg; |
|||
import org.thingsboard.server.common.msg.queue.ServiceType; |
|||
import org.thingsboard.server.common.msg.queue.TbCallback; |
|||
import org.thingsboard.server.gen.transport.TransportProtos.AttributeScopeProto; |
|||
import org.thingsboard.server.gen.transport.TransportProtos.AttributeValueProto; |
|||
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto; |
|||
import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto; |
|||
import org.thingsboard.server.service.cf.CalculatedFieldProcessingService; |
|||
import org.thingsboard.server.service.cf.CalculatedFieldResult; |
|||
import org.thingsboard.server.service.cf.CalculatedFieldStateService; |
|||
import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId; |
|||
import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry; |
|||
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx; |
|||
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldState; |
|||
import org.thingsboard.server.service.cf.ctx.state.SingleValueArgumentEntry; |
|||
|
|||
import java.util.ArrayList; |
|||
import java.util.Collection; |
|||
import java.util.Collections; |
|||
import java.util.HashMap; |
|||
import java.util.HashSet; |
|||
import java.util.LinkedList; |
|||
import java.util.List; |
|||
import java.util.Map; |
|||
import java.util.Set; |
|||
import java.util.UUID; |
|||
import java.util.concurrent.TimeUnit; |
|||
import java.util.stream.Collectors; |
|||
|
|||
|
|||
/** |
|||
* @author Andrew Shvayka |
|||
*/ |
|||
@Slf4j |
|||
public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareMsgProcessor { |
|||
// (1 for result persistence + 1 for the state persistence )
|
|||
public static final int CALLBACKS_PER_CF = 2; |
|||
|
|||
final TenantId tenantId; |
|||
final EntityId entityId; |
|||
final CalculatedFieldProcessingService cfService; |
|||
final CalculatedFieldStateService cfStateService; |
|||
|
|||
TbActorCtx ctx; |
|||
Map<CalculatedFieldId, CalculatedFieldState> states = new HashMap<>(); |
|||
|
|||
CalculatedFieldEntityMessageProcessor(ActorSystemContext systemContext, TenantId tenantId, EntityId entityId) { |
|||
super(systemContext); |
|||
this.tenantId = tenantId; |
|||
this.entityId = entityId; |
|||
this.cfService = systemContext.getCalculatedFieldProcessingService(); |
|||
this.cfStateService = systemContext.getCalculatedFieldStateService(); |
|||
} |
|||
|
|||
void init(TbActorCtx ctx) { |
|||
this.ctx = ctx; |
|||
} |
|||
|
|||
public void process(CalculatedFieldPartitionChangeMsg msg) { |
|||
if (!systemContext.getPartitionService().resolve(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME, tenantId, entityId).isMyPartition()) { |
|||
log.info("[{}] Stopping entity actor due to change partition event.", entityId); |
|||
ctx.stop(ctx.getSelf()); |
|||
} |
|||
} |
|||
|
|||
public void process(CalculatedFieldStateRestoreMsg msg) { |
|||
CalculatedFieldId cfId = msg.getId().cfId(); |
|||
log.info("[{}] [{}] Processing CF state restore msg.", msg.getId().entityId(), cfId); |
|||
if (msg.getState() != null) { |
|||
states.put(cfId, msg.getState()); |
|||
} else { |
|||
states.remove(cfId); |
|||
} |
|||
} |
|||
|
|||
public void process(EntityInitCalculatedFieldMsg msg) throws CalculatedFieldException { |
|||
log.info("[{}] Processing entity init CF msg.", msg.getCtx().getCfId()); |
|||
var ctx = msg.getCtx(); |
|||
if (msg.isForceReinit()) { |
|||
log.info("Force reinitialization of CF: [{}].", ctx.getCfId()); |
|||
states.remove(ctx.getCfId()); |
|||
} |
|||
try { |
|||
var state = getOrInitState(ctx); |
|||
if (state.isSizeOk()) { |
|||
processStateIfReady(ctx, Collections.singletonList(ctx.getCfId()), state, null, null, msg.getCallback()); |
|||
} else { |
|||
throw new RuntimeException(ctx.getSizeExceedsLimitMessage()); |
|||
} |
|||
} catch (Exception e) { |
|||
if (e instanceof CalculatedFieldException cfe) { |
|||
throw cfe; |
|||
} |
|||
throw CalculatedFieldException.builder().ctx(ctx).eventEntity(entityId).cause(e).build(); |
|||
} |
|||
} |
|||
|
|||
public void process(CalculatedFieldEntityDeleteMsg msg) { |
|||
log.info("[{}] Processing CF entity delete msg.", msg.getEntityId()); |
|||
if (this.entityId.equals(msg.getEntityId())) { |
|||
if (states.isEmpty()) { |
|||
msg.getCallback().onSuccess(); |
|||
} else { |
|||
MultipleTbCallback multipleTbCallback = new MultipleTbCallback(states.size(), msg.getCallback()); |
|||
states.forEach((cfId, state) -> cfStateService.removeState(new CalculatedFieldEntityCtxId(tenantId, cfId, entityId), multipleTbCallback)); |
|||
ctx.stop(ctx.getSelf()); |
|||
} |
|||
} else { |
|||
var cfId = new CalculatedFieldId(msg.getEntityId().getId()); |
|||
var state = states.remove(cfId); |
|||
if (state != null) { |
|||
cfStateService.removeState(new CalculatedFieldEntityCtxId(tenantId, cfId, entityId), msg.getCallback()); |
|||
} else { |
|||
msg.getCallback().onSuccess(); |
|||
} |
|||
} |
|||
} |
|||
|
|||
public void process(EntityCalculatedFieldTelemetryMsg msg) throws CalculatedFieldException { |
|||
log.debug("[{}] Processing CF telemetry msg.", msg.getEntityId()); |
|||
var proto = msg.getProto(); |
|||
var numberOfCallbacks = CALLBACKS_PER_CF * (msg.getEntityIdFields().size() + msg.getProfileIdFields().size()); |
|||
MultipleTbCallback callback = new MultipleTbCallback(numberOfCallbacks, msg.getCallback()); |
|||
List<CalculatedFieldId> cfIdList = getCalculatedFieldIds(proto); |
|||
Set<CalculatedFieldId> cfIdSet = new HashSet<>(cfIdList); |
|||
for (var ctx : msg.getEntityIdFields()) { |
|||
process(ctx, proto, cfIdSet, cfIdList, callback); |
|||
} |
|||
for (var ctx : msg.getProfileIdFields()) { |
|||
process(ctx, proto, cfIdSet, cfIdList, callback); |
|||
} |
|||
} |
|||
|
|||
public void process(EntityCalculatedFieldLinkedTelemetryMsg msg) throws CalculatedFieldException { |
|||
log.debug("[{}] Processing CF link telemetry msg.", msg.getEntityId()); |
|||
var proto = msg.getProto(); |
|||
var ctx = msg.getCtx(); |
|||
var callback = new MultipleTbCallback(CALLBACKS_PER_CF, msg.getCallback()); |
|||
try { |
|||
List<CalculatedFieldId> cfIds = getCalculatedFieldIds(proto); |
|||
if (cfIds.contains(ctx.getCfId())) { |
|||
callback.onSuccess(CALLBACKS_PER_CF); |
|||
} else { |
|||
if (proto.getTsDataCount() > 0) { |
|||
processArgumentValuesUpdate(ctx, cfIds, callback, mapToArguments(ctx, msg.getEntityId(), proto.getTsDataList()), toTbMsgId(proto), toTbMsgType(proto)); |
|||
} else if (proto.getAttrDataCount() > 0) { |
|||
processArgumentValuesUpdate(ctx, cfIds, callback, mapToArguments(ctx, msg.getEntityId(), proto.getScope(), proto.getAttrDataList()), toTbMsgId(proto), toTbMsgType(proto)); |
|||
} else if (proto.getRemovedTsKeysCount() > 0) { |
|||
processArgumentValuesUpdate(ctx, cfIds, callback, mapToArgumentsWithFetchedValue(ctx, proto.getRemovedTsKeysList()), toTbMsgId(proto), toTbMsgType(proto)); |
|||
} else if (proto.getRemovedAttrKeysCount() > 0) { |
|||
processArgumentValuesUpdate(ctx, cfIds, callback, mapToArgumentsWithDefaultValue(ctx, msg.getEntityId(), proto.getScope(), proto.getRemovedAttrKeysList()), toTbMsgId(proto), toTbMsgType(proto)); |
|||
} else { |
|||
callback.onSuccess(CALLBACKS_PER_CF); |
|||
} |
|||
} |
|||
} catch (Exception e) { |
|||
throw CalculatedFieldException.builder().ctx(ctx).eventEntity(entityId).cause(e).build(); |
|||
} |
|||
} |
|||
|
|||
private void process(CalculatedFieldCtx ctx, CalculatedFieldTelemetryMsgProto proto, Collection<CalculatedFieldId> cfIds, List<CalculatedFieldId> cfIdList, MultipleTbCallback callback) throws CalculatedFieldException { |
|||
try { |
|||
if (cfIds.contains(ctx.getCfId())) { |
|||
callback.onSuccess(CALLBACKS_PER_CF); |
|||
} else { |
|||
if (proto.getTsDataCount() > 0) { |
|||
processTelemetry(ctx, proto, cfIdList, callback); |
|||
} else if (proto.getAttrDataCount() > 0) { |
|||
processAttributes(ctx, proto, cfIdList, callback); |
|||
} else if (proto.getRemovedTsKeysCount() > 0) { |
|||
processRemovedTelemetry(ctx, proto, cfIdList, callback); |
|||
} else if (proto.getRemovedAttrKeysCount() > 0) { |
|||
processRemovedAttributes(ctx, proto, cfIdList, callback); |
|||
} else { |
|||
callback.onSuccess(CALLBACKS_PER_CF); |
|||
} |
|||
} |
|||
} catch (Exception e) { |
|||
if (e instanceof CalculatedFieldException cfe) { |
|||
throw cfe; |
|||
} |
|||
throw CalculatedFieldException.builder().ctx(ctx).eventEntity(entityId).cause(e).build(); |
|||
} |
|||
} |
|||
|
|||
private void processTelemetry(CalculatedFieldCtx ctx, CalculatedFieldTelemetryMsgProto proto, List<CalculatedFieldId> cfIdList, MultipleTbCallback callback) throws CalculatedFieldException { |
|||
processArgumentValuesUpdate(ctx, cfIdList, callback, mapToArguments(ctx, proto.getTsDataList()), toTbMsgId(proto), toTbMsgType(proto)); |
|||
} |
|||
|
|||
private void processAttributes(CalculatedFieldCtx ctx, CalculatedFieldTelemetryMsgProto proto, List<CalculatedFieldId> cfIdList, MultipleTbCallback callback) throws CalculatedFieldException { |
|||
processArgumentValuesUpdate(ctx, cfIdList, callback, mapToArguments(ctx, proto.getScope(), proto.getAttrDataList()), toTbMsgId(proto), toTbMsgType(proto)); |
|||
} |
|||
|
|||
private void processRemovedTelemetry(CalculatedFieldCtx ctx, CalculatedFieldTelemetryMsgProto proto, List<CalculatedFieldId> cfIdList, MultipleTbCallback callback) throws CalculatedFieldException { |
|||
processArgumentValuesUpdate(ctx, cfIdList, callback, mapToArgumentsWithFetchedValue(ctx, proto.getRemovedTsKeysList()), toTbMsgId(proto), toTbMsgType(proto)); |
|||
} |
|||
|
|||
private void processRemovedAttributes(CalculatedFieldCtx ctx, CalculatedFieldTelemetryMsgProto proto, List<CalculatedFieldId> cfIdList, MultipleTbCallback callback) throws CalculatedFieldException { |
|||
processArgumentValuesUpdate(ctx, cfIdList, callback, mapToArgumentsWithDefaultValue(ctx, proto.getScope(), proto.getRemovedAttrKeysList()), toTbMsgId(proto), toTbMsgType(proto)); |
|||
} |
|||
|
|||
private void processArgumentValuesUpdate(CalculatedFieldCtx ctx, List<CalculatedFieldId> cfIdList, MultipleTbCallback callback, |
|||
Map<String, ArgumentEntry> newArgValues, UUID tbMsgId, TbMsgType tbMsgType) throws CalculatedFieldException { |
|||
if (newArgValues.isEmpty()) { |
|||
log.info("[{}] No new argument values to process for CF.", ctx.getCfId()); |
|||
callback.onSuccess(CALLBACKS_PER_CF); |
|||
} |
|||
CalculatedFieldState state = states.get(ctx.getCfId()); |
|||
boolean justRestored = false; |
|||
if (state == null) { |
|||
state = getOrInitState(ctx); |
|||
justRestored = true; |
|||
} |
|||
if (state.isSizeOk()) { |
|||
if (state.updateState(ctx, newArgValues) || justRestored) { |
|||
cfIdList = new ArrayList<>(cfIdList); |
|||
cfIdList.add(ctx.getCfId()); |
|||
processStateIfReady(ctx, cfIdList, state, tbMsgId, tbMsgType, callback); |
|||
} else { |
|||
callback.onSuccess(CALLBACKS_PER_CF); |
|||
} |
|||
} else { |
|||
throw CalculatedFieldException.builder().ctx(ctx).eventEntity(entityId).errorMessage(ctx.getSizeExceedsLimitMessage()).build(); |
|||
} |
|||
} |
|||
|
|||
@SneakyThrows |
|||
private CalculatedFieldState getOrInitState(CalculatedFieldCtx ctx) { |
|||
CalculatedFieldState state = states.get(ctx.getCfId()); |
|||
if (state != null) { |
|||
return state; |
|||
} else { |
|||
ListenableFuture<CalculatedFieldState> stateFuture = systemContext.getCalculatedFieldProcessingService().fetchStateFromDb(ctx, entityId); |
|||
// Ugly but necessary. We do not expect to often fetch data from DB. Only once per <Entity, CalculatedField> pair lifetime.
|
|||
// This call happens while processing the CF pack from the queue consumer. So the timeout should be relatively low.
|
|||
// Alternatively, we can fetch the state outside the actor system and push separate command to create this actor,
|
|||
// but this will significantly complicate the code.
|
|||
state = stateFuture.get(1, TimeUnit.MINUTES); |
|||
state.checkStateSize(new CalculatedFieldEntityCtxId(tenantId, ctx.getCfId(), entityId), ctx.getMaxStateSize()); |
|||
states.put(ctx.getCfId(), state); |
|||
} |
|||
return state; |
|||
} |
|||
|
|||
private void processStateIfReady(CalculatedFieldCtx ctx, List<CalculatedFieldId> cfIdList, CalculatedFieldState state, UUID tbMsgId, TbMsgType tbMsgType, TbCallback callback) throws CalculatedFieldException { |
|||
CalculatedFieldEntityCtxId ctxId = new CalculatedFieldEntityCtxId(tenantId, ctx.getCfId(), entityId); |
|||
boolean stateSizeChecked = false; |
|||
try { |
|||
if (ctx.isInitialized() && state.isReady()) { |
|||
CalculatedFieldResult calculationResult = state.performCalculation(ctx).get(systemContext.getCfCalculationResultTimeout(), TimeUnit.SECONDS); |
|||
state.checkStateSize(ctxId, ctx.getMaxStateSize()); |
|||
stateSizeChecked = true; |
|||
if (state.isSizeOk()) { |
|||
if (!calculationResult.isEmpty()) { |
|||
cfService.pushMsgToRuleEngine(tenantId, entityId, calculationResult, cfIdList, callback); |
|||
} else { |
|||
callback.onSuccess(); |
|||
} |
|||
if (DebugModeUtil.isDebugAllAvailable(ctx.getCalculatedField())) { |
|||
systemContext.persistCalculatedFieldDebugEvent(tenantId, ctx.getCfId(), entityId, state.getArguments(), tbMsgId, tbMsgType, JacksonUtil.writeValueAsString(calculationResult.getResult()), null); |
|||
} |
|||
} |
|||
} |
|||
} catch (Exception e) { |
|||
throw CalculatedFieldException.builder().ctx(ctx).eventEntity(entityId).msgId(tbMsgId).msgType(tbMsgType).arguments(state.getArguments()).cause(e).build(); |
|||
} finally { |
|||
if (!stateSizeChecked) { |
|||
state.checkStateSize(ctxId, ctx.getMaxStateSize()); |
|||
} |
|||
if (state.isSizeOk()) { |
|||
cfStateService.persistState(ctxId, state, callback); |
|||
} else { |
|||
removeStateAndRaiseSizeException(ctxId, CalculatedFieldException.builder().ctx(ctx).eventEntity(entityId).errorMessage(ctx.getSizeExceedsLimitMessage()).build(), callback); |
|||
} |
|||
} |
|||
} |
|||
|
|||
private void removeStateAndRaiseSizeException(CalculatedFieldEntityCtxId ctxId, CalculatedFieldException ex, TbCallback callback) throws CalculatedFieldException { |
|||
// We remove the state, but remember that it is over-sized in a local map.
|
|||
cfStateService.removeState(ctxId, new TbCallback() { |
|||
@Override |
|||
public void onSuccess() { |
|||
callback.onFailure(ex); |
|||
} |
|||
|
|||
@Override |
|||
public void onFailure(Throwable t) { |
|||
callback.onFailure(ex); |
|||
} |
|||
}); |
|||
throw ex; |
|||
} |
|||
|
|||
private Map<String, ArgumentEntry> mapToArguments(CalculatedFieldCtx ctx, List<TsKvProto> data) { |
|||
return mapToArguments(ctx.getMainEntityArguments(), data); |
|||
} |
|||
|
|||
private Map<String, ArgumentEntry> mapToArguments(CalculatedFieldCtx ctx, EntityId entityId, List<TsKvProto> data) { |
|||
var argNames = ctx.getLinkedEntityArguments().get(entityId); |
|||
if (argNames.isEmpty()) { |
|||
return Collections.emptyMap(); |
|||
} |
|||
return mapToArguments(argNames, data); |
|||
} |
|||
|
|||
private Map<String, ArgumentEntry> mapToArguments(Map<ReferencedEntityKey, String> argNames, List<TsKvProto> data) { |
|||
if (argNames.isEmpty()) { |
|||
return Collections.emptyMap(); |
|||
} |
|||
Map<String, ArgumentEntry> arguments = new HashMap<>(); |
|||
for (TsKvProto item : data) { |
|||
ReferencedEntityKey key = new ReferencedEntityKey(item.getKv().getKey(), ArgumentType.TS_LATEST, null); |
|||
String argName = argNames.get(key); |
|||
if (argName != null) { |
|||
arguments.put(argName, new SingleValueArgumentEntry(item)); |
|||
} |
|||
key = new ReferencedEntityKey(item.getKv().getKey(), ArgumentType.TS_ROLLING, null); |
|||
argName = argNames.get(key); |
|||
if (argName != null) { |
|||
arguments.put(argName, new SingleValueArgumentEntry(item)); |
|||
} |
|||
} |
|||
return arguments; |
|||
} |
|||
|
|||
private Map<String, ArgumentEntry> mapToArguments(CalculatedFieldCtx ctx, AttributeScopeProto scope, List<AttributeValueProto> attrDataList) { |
|||
return mapToArguments(ctx.getMainEntityArguments(), scope, attrDataList); |
|||
} |
|||
|
|||
private Map<String, ArgumentEntry> mapToArguments(CalculatedFieldCtx ctx, EntityId entityId, AttributeScopeProto scope, List<AttributeValueProto> attrDataList) { |
|||
var argNames = ctx.getLinkedEntityArguments().get(entityId); |
|||
if (argNames.isEmpty()) { |
|||
return Collections.emptyMap(); |
|||
} |
|||
return mapToArguments(argNames, scope, attrDataList); |
|||
} |
|||
|
|||
private Map<String, ArgumentEntry> mapToArguments(Map<ReferencedEntityKey, String> argNames, AttributeScopeProto scope, List<AttributeValueProto> attrDataList) { |
|||
Map<String, ArgumentEntry> arguments = new HashMap<>(); |
|||
for (AttributeValueProto item : attrDataList) { |
|||
ReferencedEntityKey key = new ReferencedEntityKey(item.getKey(), ArgumentType.ATTRIBUTE, AttributeScope.valueOf(scope.name())); |
|||
String argName = argNames.get(key); |
|||
if (argName != null) { |
|||
arguments.put(argName, new SingleValueArgumentEntry(item)); |
|||
} |
|||
} |
|||
return arguments; |
|||
} |
|||
|
|||
private Map<String, ArgumentEntry> mapToArgumentsWithDefaultValue(CalculatedFieldCtx ctx, EntityId entityId, AttributeScopeProto scope, List<String> removedAttrKeys) { |
|||
var argNames = ctx.getLinkedEntityArguments().get(entityId); |
|||
if (argNames.isEmpty()) { |
|||
return Collections.emptyMap(); |
|||
} |
|||
return mapToArgumentsWithDefaultValue(argNames, ctx.getArguments(), scope, removedAttrKeys); |
|||
} |
|||
|
|||
private Map<String, ArgumentEntry> mapToArgumentsWithDefaultValue(CalculatedFieldCtx ctx, AttributeScopeProto scope, List<String> removedAttrKeys) { |
|||
return mapToArgumentsWithDefaultValue(ctx.getMainEntityArguments(), ctx.getArguments(), scope, removedAttrKeys); |
|||
} |
|||
|
|||
private Map<String, ArgumentEntry> mapToArgumentsWithDefaultValue(Map<ReferencedEntityKey, String> argNames, Map<String, Argument> configArguments, AttributeScopeProto scope, List<String> removedAttrKeys) { |
|||
Map<String, ArgumentEntry> arguments = new HashMap<>(); |
|||
for (String removedKey : removedAttrKeys) { |
|||
ReferencedEntityKey key = new ReferencedEntityKey(removedKey, ArgumentType.ATTRIBUTE, AttributeScope.valueOf(scope.name())); |
|||
String argName = argNames.get(key); |
|||
if (argName != null) { |
|||
Argument argument = configArguments.get(argName); |
|||
String defaultValue = (argument != null) ? argument.getDefaultValue() : null; |
|||
arguments.put(argName, StringUtils.isNotEmpty(defaultValue) |
|||
? new SingleValueArgumentEntry(System.currentTimeMillis(), new StringDataEntry(removedKey, defaultValue), null) |
|||
: new SingleValueArgumentEntry()); |
|||
|
|||
} |
|||
} |
|||
return arguments; |
|||
} |
|||
|
|||
private Map<String, ArgumentEntry> mapToArgumentsWithFetchedValue(CalculatedFieldCtx ctx, List<String> removedTelemetryKeys) { |
|||
Map<String, Argument> deletedArguments = ctx.getArguments().entrySet().stream() |
|||
.filter(entry -> removedTelemetryKeys.contains(entry.getKey())) |
|||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); |
|||
|
|||
Map<String, ArgumentEntry> fetchedArgs = cfService.fetchArgsFromDb(tenantId, entityId, deletedArguments); |
|||
|
|||
fetchedArgs.values().forEach(arg -> arg.setForceResetPrevious(true)); |
|||
return fetchedArgs; |
|||
} |
|||
|
|||
private static List<CalculatedFieldId> getCalculatedFieldIds(CalculatedFieldTelemetryMsgProto proto) { |
|||
List<CalculatedFieldId> cfIds = new LinkedList<>(); |
|||
for (var cfId : proto.getPreviousCalculatedFieldsList()) { |
|||
cfIds.add(new CalculatedFieldId(new UUID(cfId.getCalculatedFieldIdMSB(), cfId.getCalculatedFieldIdLSB()))); |
|||
} |
|||
return cfIds; |
|||
} |
|||
|
|||
private UUID toTbMsgId(CalculatedFieldTelemetryMsgProto proto) { |
|||
if (proto.getTbMsgIdMSB() != 0 && proto.getTbMsgIdLSB() != 0) { |
|||
return new UUID(proto.getTbMsgIdMSB(), proto.getTbMsgIdLSB()); |
|||
} |
|||
return null; |
|||
} |
|||
|
|||
private TbMsgType toTbMsgType(CalculatedFieldTelemetryMsgProto proto) { |
|||
if (!proto.getTbMsgType().isEmpty()) { |
|||
return TbMsgType.valueOf(proto.getTbMsgType()); |
|||
} |
|||
return null; |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,40 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors.calculatedField; |
|||
|
|||
import lombok.Builder; |
|||
import lombok.Getter; |
|||
import org.thingsboard.server.common.data.id.EntityId; |
|||
import org.thingsboard.server.common.data.msg.TbMsgType; |
|||
import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry; |
|||
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx; |
|||
|
|||
import java.util.Map; |
|||
import java.util.UUID; |
|||
|
|||
@Getter |
|||
@Builder |
|||
public class CalculatedFieldException extends Exception { |
|||
|
|||
private final CalculatedFieldCtx ctx; |
|||
private final EntityId eventEntity; |
|||
private final UUID msgId; |
|||
private final TbMsgType msgType; |
|||
private Map<String, ArgumentEntry> arguments; |
|||
private String errorMessage; |
|||
private Exception cause; |
|||
|
|||
} |
|||
@ -0,0 +1,40 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors.calculatedField; |
|||
|
|||
import lombok.Data; |
|||
import org.thingsboard.server.common.data.id.EntityId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.msg.MsgType; |
|||
import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg; |
|||
import org.thingsboard.server.common.msg.queue.TbCallback; |
|||
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldLinkedTelemetryMsgProto; |
|||
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto; |
|||
|
|||
@Data |
|||
public class CalculatedFieldLinkedTelemetryMsg implements ToCalculatedFieldSystemMsg { |
|||
|
|||
private final TenantId tenantId; |
|||
private final EntityId entityId; |
|||
private final CalculatedFieldLinkedTelemetryMsgProto proto; |
|||
private final TbCallback callback; |
|||
|
|||
|
|||
@Override |
|||
public MsgType getMsgType() { |
|||
return MsgType.CF_LINKED_TELEMETRY_MSG; |
|||
} |
|||
} |
|||
@ -0,0 +1,90 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors.calculatedField; |
|||
|
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.thingsboard.server.actors.ActorSystemContext; |
|||
import org.thingsboard.server.actors.TbActorCtx; |
|||
import org.thingsboard.server.actors.TbActorException; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg; |
|||
import org.thingsboard.server.common.msg.cf.CalculatedFieldEntityLifecycleMsg; |
|||
import org.thingsboard.server.common.msg.cf.CalculatedFieldInitMsg; |
|||
import org.thingsboard.server.common.msg.cf.CalculatedFieldLinkInitMsg; |
|||
import org.thingsboard.server.common.msg.cf.CalculatedFieldPartitionChangeMsg; |
|||
|
|||
/** |
|||
* Created by ashvayka on 15.03.18. |
|||
*/ |
|||
@Slf4j |
|||
public class CalculatedFieldManagerActor extends AbstractCalculatedFieldActor { |
|||
|
|||
private final CalculatedFieldManagerMessageProcessor processor; |
|||
|
|||
public CalculatedFieldManagerActor(ActorSystemContext systemContext, TenantId tenantId) { |
|||
super(systemContext, tenantId); |
|||
this.processor = new CalculatedFieldManagerMessageProcessor(systemContext, tenantId); |
|||
} |
|||
|
|||
@Override |
|||
public void init(TbActorCtx ctx) throws TbActorException { |
|||
super.init(ctx); |
|||
log.debug("[{}] Starting CF manager actor.", processor.tenantId); |
|||
try { |
|||
processor.init(ctx); |
|||
log.debug("[{}] CF manager actor started.", processor.tenantId); |
|||
} catch (Exception e) { |
|||
log.warn("[{}] Unknown failure", processor.tenantId, e); |
|||
throw new TbActorException("Failed to initialize manager actor", e); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
protected boolean doProcessCfMsg(ToCalculatedFieldSystemMsg msg) throws CalculatedFieldException { |
|||
switch (msg.getMsgType()) { |
|||
case CF_PARTITIONS_CHANGE_MSG: |
|||
processor.onPartitionChange((CalculatedFieldPartitionChangeMsg) msg); |
|||
break; |
|||
case CF_INIT_MSG: |
|||
processor.onFieldInitMsg((CalculatedFieldInitMsg) msg); |
|||
break; |
|||
case CF_LINK_INIT_MSG: |
|||
processor.onLinkInitMsg((CalculatedFieldLinkInitMsg) msg); |
|||
break; |
|||
case CF_STATE_RESTORE_MSG: |
|||
processor.onStateRestoreMsg((CalculatedFieldStateRestoreMsg) msg); |
|||
break; |
|||
case CF_ENTITY_LIFECYCLE_MSG: |
|||
processor.onEntityLifecycleMsg((CalculatedFieldEntityLifecycleMsg) msg); |
|||
break; |
|||
case CF_TELEMETRY_MSG: |
|||
processor.onTelemetryMsg((CalculatedFieldTelemetryMsg) msg); |
|||
break; |
|||
case CF_LINKED_TELEMETRY_MSG: |
|||
processor.onLinkedTelemetryMsg((CalculatedFieldLinkedTelemetryMsg) msg); |
|||
break; |
|||
default: |
|||
return false; |
|||
} |
|||
return true; |
|||
} |
|||
|
|||
@Override |
|||
void logProcessingException(Exception e) { |
|||
log.warn("[{}] Processing failure", tenantId, e); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,46 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors.calculatedField; |
|||
|
|||
import org.thingsboard.server.actors.ActorSystemContext; |
|||
import org.thingsboard.server.actors.TbActor; |
|||
import org.thingsboard.server.actors.TbActorId; |
|||
import org.thingsboard.server.actors.TbEntityActorId; |
|||
import org.thingsboard.server.actors.TbStringActorId; |
|||
import org.thingsboard.server.actors.service.ContextBasedCreator; |
|||
import org.thingsboard.server.common.data.id.EntityId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
|
|||
public class CalculatedFieldManagerActorCreator extends ContextBasedCreator { |
|||
|
|||
private final TenantId tenantId; |
|||
|
|||
public CalculatedFieldManagerActorCreator(ActorSystemContext context, TenantId tenantId) { |
|||
super(context); |
|||
this.tenantId = tenantId; |
|||
} |
|||
|
|||
@Override |
|||
public TbActorId createActorId() { |
|||
return new TbStringActorId("CFM|" + tenantId); |
|||
} |
|||
|
|||
@Override |
|||
public TbActor createActor() { |
|||
return new CalculatedFieldManagerActor(context, tenantId); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,491 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors.calculatedField; |
|||
|
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.thingsboard.server.actors.ActorSystemContext; |
|||
import org.thingsboard.server.actors.TbActorCtx; |
|||
import org.thingsboard.server.actors.TbActorRef; |
|||
import org.thingsboard.server.actors.TbCalculatedFieldEntityActorId; |
|||
import org.thingsboard.server.actors.service.DefaultActorService; |
|||
import org.thingsboard.server.actors.shared.AbstractContextAwareMsgProcessor; |
|||
import org.thingsboard.server.common.data.DataConstants; |
|||
import org.thingsboard.server.common.data.EntityType; |
|||
import org.thingsboard.server.common.data.cf.CalculatedField; |
|||
import org.thingsboard.server.common.data.cf.CalculatedFieldLink; |
|||
import org.thingsboard.server.common.data.id.AssetId; |
|||
import org.thingsboard.server.common.data.id.CalculatedFieldId; |
|||
import org.thingsboard.server.common.data.id.DeviceId; |
|||
import org.thingsboard.server.common.data.id.EntityId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.msg.cf.CalculatedFieldEntityLifecycleMsg; |
|||
import org.thingsboard.server.common.msg.cf.CalculatedFieldInitMsg; |
|||
import org.thingsboard.server.common.msg.cf.CalculatedFieldLinkInitMsg; |
|||
import org.thingsboard.server.common.msg.cf.CalculatedFieldPartitionChangeMsg; |
|||
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; |
|||
import org.thingsboard.server.common.msg.queue.ServiceType; |
|||
import org.thingsboard.server.common.msg.queue.TbCallback; |
|||
import org.thingsboard.server.dao.cf.CalculatedFieldService; |
|||
import org.thingsboard.server.service.cf.CalculatedFieldProcessingService; |
|||
import org.thingsboard.server.service.cf.CalculatedFieldStateService; |
|||
import org.thingsboard.server.service.cf.cache.CalculatedFieldEntityProfileCache; |
|||
import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId; |
|||
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx; |
|||
import org.thingsboard.server.service.profile.TbAssetProfileCache; |
|||
import org.thingsboard.server.service.profile.TbDeviceProfileCache; |
|||
|
|||
import java.util.ArrayList; |
|||
import java.util.Collections; |
|||
import java.util.HashMap; |
|||
import java.util.List; |
|||
import java.util.Map; |
|||
import java.util.concurrent.CopyOnWriteArrayList; |
|||
|
|||
import static org.thingsboard.server.utils.CalculatedFieldUtils.fromProto; |
|||
|
|||
|
|||
/** |
|||
* @author Andrew Shvayka |
|||
*/ |
|||
@Slf4j |
|||
public class CalculatedFieldManagerMessageProcessor extends AbstractContextAwareMsgProcessor { |
|||
|
|||
private final Map<CalculatedFieldId, CalculatedFieldCtx> calculatedFields = new HashMap<>(); |
|||
private final Map<EntityId, List<CalculatedFieldCtx>> entityIdCalculatedFields = new HashMap<>(); |
|||
private final Map<EntityId, List<CalculatedFieldLink>> entityIdCalculatedFieldLinks = new HashMap<>(); |
|||
|
|||
private final CalculatedFieldProcessingService cfExecService; |
|||
private final CalculatedFieldStateService cfStateService; |
|||
private final CalculatedFieldEntityProfileCache cfEntityCache; |
|||
private final CalculatedFieldService cfDaoService; |
|||
private final TbAssetProfileCache assetProfileCache; |
|||
private final TbDeviceProfileCache deviceProfileCache; |
|||
protected final TenantId tenantId; |
|||
|
|||
protected TbActorCtx ctx; |
|||
|
|||
CalculatedFieldManagerMessageProcessor(ActorSystemContext systemContext, TenantId tenantId) { |
|||
super(systemContext); |
|||
this.cfEntityCache = systemContext.getCalculatedFieldEntityProfileCache(); |
|||
this.cfExecService = systemContext.getCalculatedFieldProcessingService(); |
|||
this.cfStateService = systemContext.getCalculatedFieldStateService(); |
|||
this.cfDaoService = systemContext.getCalculatedFieldService(); |
|||
this.assetProfileCache = systemContext.getAssetProfileCache(); |
|||
this.deviceProfileCache = systemContext.getDeviceProfileCache(); |
|||
this.tenantId = tenantId; |
|||
} |
|||
|
|||
void init(TbActorCtx ctx) { |
|||
this.ctx = ctx; |
|||
} |
|||
|
|||
public void onFieldInitMsg(CalculatedFieldInitMsg msg) throws CalculatedFieldException { |
|||
log.info("[{}] Processing CF init message.", msg.getCf().getId()); |
|||
var cf = msg.getCf(); |
|||
var cfCtx = new CalculatedFieldCtx(cf, systemContext.getTbelInvokeService(), systemContext.getApiLimitService()); |
|||
try { |
|||
cfCtx.init(); |
|||
} catch (Exception e) { |
|||
throw CalculatedFieldException.builder().ctx(cfCtx).eventEntity(cf.getEntityId()).cause(e).errorMessage("Failed to initialize CF context").build(); |
|||
} |
|||
calculatedFields.put(cf.getId(), cfCtx); |
|||
// We use copy on write lists to safely pass the reference to another actor for the iteration.
|
|||
// Alternative approach would be to use any list but avoid modifications to the list (change the complete map value instead)
|
|||
entityIdCalculatedFields.computeIfAbsent(cf.getEntityId(), id -> new CopyOnWriteArrayList<>()).add(cfCtx); |
|||
msg.getCallback().onSuccess(); |
|||
} |
|||
|
|||
public void onLinkInitMsg(CalculatedFieldLinkInitMsg msg) { |
|||
log.info("[{}] Processing CF link init message for entity [{}].", msg.getLink().getCalculatedFieldId(), msg.getLink().getEntityId()); |
|||
var link = msg.getLink(); |
|||
// We use copy on write lists to safely pass the reference to another actor for the iteration.
|
|||
// Alternative approach would be to use any list but avoid modifications to the list (change the complete map value instead)
|
|||
entityIdCalculatedFieldLinks.computeIfAbsent(link.getEntityId(), id -> new CopyOnWriteArrayList<>()).add(link); |
|||
msg.getCallback().onSuccess(); |
|||
} |
|||
|
|||
public void onStateRestoreMsg(CalculatedFieldStateRestoreMsg msg) { |
|||
var cfId = msg.getId().cfId(); |
|||
var calculatedField = calculatedFields.get(cfId); |
|||
|
|||
if (calculatedField != null) { |
|||
msg.getState().setRequiredArguments(calculatedField.getArgNames()); |
|||
log.debug("Pushing CF state restore msg to specific actor [{}]", msg.getId().entityId()); |
|||
getOrCreateActor(msg.getId().entityId()).tell(msg); |
|||
} else { |
|||
cfStateService.removeState(msg.getId(), msg.getCallback()); |
|||
} |
|||
} |
|||
|
|||
public void onEntityLifecycleMsg(CalculatedFieldEntityLifecycleMsg msg) throws CalculatedFieldException { |
|||
log.info("Processing entity lifecycle event: [{}] for entity: [{}]", msg.getData().getEvent(), msg.getData().getEntityId()); |
|||
var entityType = msg.getData().getEntityId().getEntityType(); |
|||
var event = msg.getData().getEvent(); |
|||
switch (entityType) { |
|||
case CALCULATED_FIELD: { |
|||
switch (event) { |
|||
case CREATED: |
|||
onCfCreated(msg.getData(), msg.getCallback()); |
|||
break; |
|||
case UPDATED: |
|||
onCfUpdated(msg.getData(), msg.getCallback()); |
|||
break; |
|||
case DELETED: |
|||
onCfDeleted(msg.getData(), msg.getCallback()); |
|||
break; |
|||
default: |
|||
msg.getCallback().onSuccess(); |
|||
break; |
|||
} |
|||
break; |
|||
} |
|||
case DEVICE: |
|||
case ASSET: { |
|||
switch (event) { |
|||
case CREATED: |
|||
onEntityCreated(msg.getData(), msg.getCallback()); |
|||
break; |
|||
case UPDATED: |
|||
onEntityUpdated(msg.getData(), msg.getCallback()); |
|||
break; |
|||
case DELETED: |
|||
onEntityDeleted(msg.getData(), msg.getCallback()); |
|||
break; |
|||
default: |
|||
msg.getCallback().onSuccess(); |
|||
break; |
|||
} |
|||
break; |
|||
} |
|||
default: { |
|||
msg.getCallback().onSuccess(); |
|||
} |
|||
} |
|||
} |
|||
|
|||
private void onEntityCreated(ComponentLifecycleMsg msg, TbCallback callback) { |
|||
EntityId entityId = msg.getEntityId(); |
|||
EntityId profileId = getProfileId(tenantId, entityId); |
|||
cfEntityCache.add(tenantId, profileId, entityId); |
|||
if (!isMyPartition(entityId, callback)) { |
|||
return; |
|||
} |
|||
var entityIdFields = getCalculatedFieldsByEntityId(entityId); |
|||
var profileIdFields = getCalculatedFieldsByEntityId(profileId); |
|||
var fieldsCount = entityIdFields.size() + profileIdFields.size(); |
|||
if (fieldsCount > 0) { |
|||
MultipleTbCallback multiCallback = new MultipleTbCallback(fieldsCount, callback); |
|||
entityIdFields.forEach(ctx -> initCfForEntity(entityId, ctx, true, multiCallback)); |
|||
profileIdFields.forEach(ctx -> initCfForEntity(entityId, ctx, true, multiCallback)); |
|||
} else { |
|||
callback.onSuccess(); |
|||
} |
|||
} |
|||
|
|||
private void onEntityUpdated(ComponentLifecycleMsg msg, TbCallback callback) { |
|||
if (msg.getOldProfileId() != null && !msg.getOldProfileId().equals(msg.getProfileId())) { |
|||
cfEntityCache.update(tenantId, msg.getOldProfileId(), msg.getProfileId(), msg.getEntityId()); |
|||
if (!isMyPartition(msg.getEntityId(), callback)) { |
|||
return; |
|||
} |
|||
var oldProfileCfs = getCalculatedFieldsByEntityId(msg.getOldProfileId()); |
|||
var newProfileCfs = getCalculatedFieldsByEntityId(msg.getProfileId()); |
|||
var fieldsCount = oldProfileCfs.size() + newProfileCfs.size(); |
|||
if (fieldsCount > 0) { |
|||
MultipleTbCallback multiCallback = new MultipleTbCallback(fieldsCount, callback); |
|||
var entityId = msg.getEntityId(); |
|||
oldProfileCfs.forEach(ctx -> deleteCfForEntity(entityId, ctx.getCfId(), multiCallback)); |
|||
newProfileCfs.forEach(ctx -> initCfForEntity(entityId, ctx, true, multiCallback)); |
|||
} else { |
|||
callback.onSuccess(); |
|||
} |
|||
} |
|||
} |
|||
|
|||
private void onEntityDeleted(ComponentLifecycleMsg msg, TbCallback callback) { |
|||
cfEntityCache.evict(tenantId, msg.getEntityId()); |
|||
if (isMyPartition(msg.getEntityId(), callback)) { |
|||
log.debug("Pushing entity lifecycle msg to specific actor [{}]", msg.getEntityId()); |
|||
getOrCreateActor(msg.getEntityId()).tell(new CalculatedFieldEntityDeleteMsg(tenantId, msg.getEntityId(), callback)); |
|||
} |
|||
} |
|||
|
|||
private void onCfCreated(ComponentLifecycleMsg msg, TbCallback callback) throws CalculatedFieldException { |
|||
var cfId = new CalculatedFieldId(msg.getEntityId().getId()); |
|||
if (calculatedFields.containsKey(cfId)) { |
|||
log.warn("[{}] CF was already initialized [{}]", tenantId, cfId); |
|||
callback.onSuccess(); |
|||
} else { |
|||
var cf = cfDaoService.findById(msg.getTenantId(), cfId); |
|||
if (cf == null) { |
|||
log.warn("[{}] Failed to lookup CF by id [{}]", tenantId, cfId); |
|||
callback.onSuccess(); |
|||
} else { |
|||
var cfCtx = new CalculatedFieldCtx(cf, systemContext.getTbelInvokeService(), systemContext.getApiLimitService()); |
|||
try { |
|||
cfCtx.init(); |
|||
} catch (Exception e) { |
|||
throw CalculatedFieldException.builder().ctx(cfCtx).eventEntity(cf.getEntityId()).cause(e).errorMessage("Failed to initialize CF context").build(); |
|||
} |
|||
calculatedFields.put(cf.getId(), cfCtx); |
|||
// We use copy on write lists to safely pass the reference to another actor for the iteration.
|
|||
// Alternative approach would be to use any list but avoid modifications to the list (change the complete map value instead)
|
|||
entityIdCalculatedFields.computeIfAbsent(cf.getEntityId(), id -> new CopyOnWriteArrayList<>()).add(cfCtx); |
|||
addLinks(cf); |
|||
initCf(cfCtx, callback, false); |
|||
} |
|||
} |
|||
} |
|||
|
|||
private void onCfUpdated(ComponentLifecycleMsg msg, TbCallback callback) throws CalculatedFieldException { |
|||
var cfId = new CalculatedFieldId(msg.getEntityId().getId()); |
|||
var oldCfCtx = calculatedFields.get(cfId); |
|||
if (oldCfCtx == null) { |
|||
onCfCreated(msg, callback); |
|||
} else { |
|||
var newCf = cfDaoService.findById(msg.getTenantId(), cfId); |
|||
if (newCf == null) { |
|||
log.warn("[{}] Failed to lookup CF by id [{}]", tenantId, cfId); |
|||
callback.onSuccess(); |
|||
} else { |
|||
var newCfCtx = new CalculatedFieldCtx(newCf, systemContext.getTbelInvokeService(), systemContext.getApiLimitService()); |
|||
try { |
|||
newCfCtx.init(); |
|||
} catch (Exception e) { |
|||
throw CalculatedFieldException.builder().ctx(newCfCtx).eventEntity(newCfCtx.getEntityId()).cause(e).errorMessage("Failed to initialize CF context").build(); |
|||
} |
|||
calculatedFields.put(newCf.getId(), newCfCtx); |
|||
List<CalculatedFieldCtx> oldCfList = entityIdCalculatedFields.get(newCf.getEntityId()); |
|||
List<CalculatedFieldCtx> newCfList = new CopyOnWriteArrayList<>(); |
|||
boolean found = false; |
|||
for (CalculatedFieldCtx oldCtx : oldCfList) { |
|||
if (oldCtx.getCfId().equals(newCf.getId())) { |
|||
newCfList.add(newCfCtx); |
|||
found = true; |
|||
} else { |
|||
newCfList.add(oldCtx); |
|||
} |
|||
} |
|||
if (!found) { |
|||
newCfList.add(newCfCtx); |
|||
} |
|||
entityIdCalculatedFields.put(newCf.getEntityId(), newCfList); |
|||
|
|||
deleteLinks(oldCfCtx); |
|||
addLinks(newCf); |
|||
|
|||
// We use copy on write lists to safely pass the reference to another actor for the iteration.
|
|||
// Alternative approach would be to use any list but avoid modifications to the list (change the complete map value instead)
|
|||
var stateChanges = newCfCtx.hasStateChanges(oldCfCtx); |
|||
if (stateChanges || newCfCtx.hasOtherSignificantChanges(oldCfCtx)) { |
|||
initCf(newCfCtx, callback, stateChanges); |
|||
} else { |
|||
callback.onSuccess(); |
|||
} |
|||
} |
|||
} |
|||
} |
|||
|
|||
private void onCfDeleted(ComponentLifecycleMsg msg, TbCallback callback) { |
|||
var cfId = new CalculatedFieldId(msg.getEntityId().getId()); |
|||
var cfCtx = calculatedFields.remove(cfId); |
|||
if (cfCtx == null) { |
|||
log.warn("[{}] CF was already deleted [{}]", tenantId, cfId); |
|||
callback.onSuccess(); |
|||
} else { |
|||
entityIdCalculatedFields.get(cfCtx.getEntityId()).remove(cfCtx); |
|||
deleteLinks(cfCtx); |
|||
|
|||
EntityId entityId = cfCtx.getEntityId(); |
|||
EntityType entityType = cfCtx.getEntityId().getEntityType(); |
|||
if (isProfileEntity(entityType)) { |
|||
var entityIds = cfEntityCache.getMyEntityIdsByProfileId(tenantId, entityId); |
|||
if (!entityIds.isEmpty()) { |
|||
//TODO: no need to do this if we cache all created actors and know which one belong to us;
|
|||
var multiCallback = new MultipleTbCallback(entityIds.size(), callback); |
|||
entityIds.forEach(id -> deleteCfForEntity(id, cfId, multiCallback)); |
|||
} else { |
|||
callback.onSuccess(); |
|||
} |
|||
} else { |
|||
if (isMyPartition(entityId, callback)) { |
|||
deleteCfForEntity(entityId, cfId, callback); |
|||
} |
|||
} |
|||
} |
|||
} |
|||
|
|||
public void onTelemetryMsg(CalculatedFieldTelemetryMsg msg) { |
|||
EntityId entityId = msg.getEntityId(); |
|||
log.debug("Received telemetry msg from entity [{}]", entityId); |
|||
// 2 = 1 for CF processing + 1 for links processing
|
|||
MultipleTbCallback callback = new MultipleTbCallback(2, msg.getCallback()); |
|||
// process all cfs related to entity, or it's profile;
|
|||
var entityIdFields = getCalculatedFieldsByEntityId(entityId); |
|||
var profileIdFields = getCalculatedFieldsByEntityId(getProfileId(tenantId, entityId)); |
|||
if (!entityIdFields.isEmpty() || !profileIdFields.isEmpty()) { |
|||
log.debug("Pushing telemetry msg to specific actor [{}]", entityId); |
|||
getOrCreateActor(entityId).tell(new EntityCalculatedFieldTelemetryMsg(msg, entityIdFields, profileIdFields, callback)); |
|||
} else { |
|||
callback.onSuccess(); |
|||
} |
|||
// process all links (if any);
|
|||
List<CalculatedFieldEntityCtxId> linkedCalculatedFields = filterCalculatedFieldLinks(msg); |
|||
var linksSize = linkedCalculatedFields.size(); |
|||
if (linksSize > 0) { |
|||
cfExecService.pushMsgToLinks(msg, linkedCalculatedFields, callback); |
|||
} else { |
|||
callback.onSuccess(); |
|||
} |
|||
} |
|||
|
|||
public void onLinkedTelemetryMsg(CalculatedFieldLinkedTelemetryMsg msg) { |
|||
EntityId sourceEntityId = msg.getEntityId(); |
|||
log.debug("Received linked telemetry msg from entity [{}]", sourceEntityId); |
|||
var proto = msg.getProto(); |
|||
var linksList = proto.getLinksList(); |
|||
for (var linkProto : linksList) { |
|||
var link = fromProto(linkProto); |
|||
var targetEntityId = link.entityId(); |
|||
var targetEntityType = targetEntityId.getEntityType(); |
|||
var cf = calculatedFields.get(link.cfId()); |
|||
if (EntityType.DEVICE_PROFILE.equals(targetEntityType) || EntityType.ASSET_PROFILE.equals(targetEntityType)) { |
|||
// iterate over all entities that belong to profile and push the message for corresponding CF
|
|||
var entityIds = cfEntityCache.getMyEntityIdsByProfileId(tenantId, targetEntityId); |
|||
if (!entityIds.isEmpty()) { |
|||
MultipleTbCallback callback = new MultipleTbCallback(entityIds.size(), msg.getCallback()); |
|||
var newMsg = new EntityCalculatedFieldLinkedTelemetryMsg(tenantId, sourceEntityId, proto.getMsg(), cf, callback); |
|||
entityIds.forEach(entityId -> { |
|||
log.debug("Pushing linked telemetry msg to specific actor [{}]", entityId); |
|||
getOrCreateActor(entityId).tell(newMsg); |
|||
}); |
|||
} else { |
|||
msg.getCallback().onSuccess(); |
|||
} |
|||
} else { |
|||
log.debug("Pushing linked telemetry msg to specific actor [{}]", targetEntityId); |
|||
var newMsg = new EntityCalculatedFieldLinkedTelemetryMsg(tenantId, sourceEntityId, proto.getMsg(), cf, msg.getCallback()); |
|||
getOrCreateActor(targetEntityId).tell(newMsg); |
|||
} |
|||
} |
|||
} |
|||
|
|||
private List<CalculatedFieldEntityCtxId> filterCalculatedFieldLinks(CalculatedFieldTelemetryMsg msg) { |
|||
EntityId entityId = msg.getEntityId(); |
|||
var proto = msg.getProto(); |
|||
List<CalculatedFieldEntityCtxId> result = new ArrayList<>(); |
|||
for (var link : getCalculatedFieldLinksByEntityId(entityId)) { |
|||
CalculatedFieldCtx ctx = calculatedFields.get(link.getCalculatedFieldId()); |
|||
if (ctx.linkMatches(entityId, proto)) { |
|||
result.add(ctx.toCalculatedFieldEntityCtxId()); |
|||
} |
|||
} |
|||
return result; |
|||
} |
|||
|
|||
private List<CalculatedFieldCtx> getCalculatedFieldsByEntityId(EntityId entityId) { |
|||
if (entityId == null) { |
|||
return Collections.emptyList(); |
|||
} |
|||
var result = entityIdCalculatedFields.get(entityId); |
|||
if (result == null) { |
|||
result = Collections.emptyList(); |
|||
} |
|||
return result; |
|||
} |
|||
|
|||
private List<CalculatedFieldLink> getCalculatedFieldLinksByEntityId(EntityId entityId) { |
|||
if (entityId == null) { |
|||
return Collections.emptyList(); |
|||
} |
|||
var result = entityIdCalculatedFieldLinks.get(entityId); |
|||
if (result == null) { |
|||
result = Collections.emptyList(); |
|||
} |
|||
return result; |
|||
} |
|||
|
|||
private void initCf(CalculatedFieldCtx cfCtx, TbCallback callback, boolean forceStateReinit) { |
|||
EntityId entityId = cfCtx.getEntityId(); |
|||
EntityType entityType = cfCtx.getEntityId().getEntityType(); |
|||
if (isProfileEntity(entityType)) { |
|||
var entityIds = cfEntityCache.getMyEntityIdsByProfileId(tenantId, entityId); |
|||
if (!entityIds.isEmpty()) { |
|||
var multiCallback = new MultipleTbCallback(entityIds.size(), callback); |
|||
entityIds.forEach(id -> initCfForEntity(id, cfCtx, forceStateReinit, multiCallback)); |
|||
} else { |
|||
callback.onSuccess(); |
|||
} |
|||
} else { |
|||
if (isMyPartition(entityId, callback)) { |
|||
initCfForEntity(entityId, cfCtx, forceStateReinit, callback); |
|||
} |
|||
} |
|||
} |
|||
|
|||
private void deleteCfForEntity(EntityId entityId, CalculatedFieldId cfId, TbCallback callback) { |
|||
log.debug("Pushing delete CF msg to specific actor [{}]", entityId); |
|||
getOrCreateActor(entityId).tell(new CalculatedFieldEntityDeleteMsg(tenantId, cfId, callback)); |
|||
} |
|||
|
|||
private void initCfForEntity(EntityId entityId, CalculatedFieldCtx cfCtx, boolean forceStateReinit, TbCallback callback) { |
|||
log.debug("Pushing entity init CF msg to specific actor [{}]", entityId); |
|||
getOrCreateActor(entityId).tell(new EntityInitCalculatedFieldMsg(tenantId, cfCtx, callback, forceStateReinit)); |
|||
} |
|||
|
|||
private boolean isMyPartition(EntityId entityId, TbCallback callback) { |
|||
if (!systemContext.getPartitionService().resolve(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME, tenantId, entityId).isMyPartition()) { |
|||
log.debug("[{}] Entity belongs to external partition.", entityId); |
|||
callback.onSuccess(); |
|||
return false; |
|||
} |
|||
return true; |
|||
} |
|||
|
|||
private static boolean isProfileEntity(EntityType entityType) { |
|||
return EntityType.DEVICE_PROFILE.equals(entityType) || EntityType.ASSET_PROFILE.equals(entityType); |
|||
} |
|||
|
|||
private EntityId getProfileId(TenantId tenantId, EntityId entityId) { |
|||
return switch (entityId.getEntityType()) { |
|||
case ASSET -> assetProfileCache.get(tenantId, (AssetId) entityId).getId(); |
|||
case DEVICE -> deviceProfileCache.get(tenantId, (DeviceId) entityId).getId(); |
|||
default -> null; |
|||
}; |
|||
} |
|||
|
|||
private TbActorRef getOrCreateActor(EntityId entityId) { |
|||
return ctx.getOrCreateChildActor(new TbCalculatedFieldEntityActorId(entityId), |
|||
() -> DefaultActorService.CF_ENTITY_DISPATCHER_NAME, |
|||
() -> new CalculatedFieldEntityActorCreator(systemContext, tenantId, entityId), |
|||
() -> true); |
|||
} |
|||
|
|||
private void addLinks(CalculatedField newCf) { |
|||
var newLinks = newCf.getConfiguration().buildCalculatedFieldLinks(tenantId, newCf.getEntityId(), newCf.getId()); |
|||
newLinks.forEach(link -> entityIdCalculatedFieldLinks.computeIfAbsent(link.getEntityId(), id -> new CopyOnWriteArrayList<>()).add(link)); |
|||
} |
|||
|
|||
private void deleteLinks(CalculatedFieldCtx cfCtx) { |
|||
var oldCf = cfCtx.getCalculatedField(); |
|||
var oldLinks = oldCf.getConfiguration().buildCalculatedFieldLinks(tenantId, oldCf.getEntityId(), oldCf.getId()); |
|||
oldLinks.forEach(link -> entityIdCalculatedFieldLinks.computeIfAbsent(link.getEntityId(), id -> new CopyOnWriteArrayList<>()).remove(link)); |
|||
} |
|||
|
|||
public void onPartitionChange(CalculatedFieldPartitionChangeMsg msg) { |
|||
ctx.broadcastToChildren(msg, true); |
|||
} |
|||
} |
|||
@ -0,0 +1,40 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors.calculatedField; |
|||
|
|||
import lombok.Data; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.msg.MsgType; |
|||
import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg; |
|||
import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId; |
|||
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldState; |
|||
|
|||
@Data |
|||
public class CalculatedFieldStateRestoreMsg implements ToCalculatedFieldSystemMsg { |
|||
|
|||
private final CalculatedFieldEntityCtxId id; |
|||
private final CalculatedFieldState state; |
|||
|
|||
@Override |
|||
public MsgType getMsgType() { |
|||
return MsgType.CF_STATE_RESTORE_MSG; |
|||
} |
|||
|
|||
@Override |
|||
public TenantId getTenantId() { |
|||
return id.tenantId(); |
|||
} |
|||
} |
|||
@ -0,0 +1,39 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors.calculatedField; |
|||
|
|||
import lombok.Data; |
|||
import org.thingsboard.server.common.data.id.EntityId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.msg.MsgType; |
|||
import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg; |
|||
import org.thingsboard.server.common.msg.queue.TbCallback; |
|||
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto; |
|||
|
|||
@Data |
|||
public class CalculatedFieldTelemetryMsg implements ToCalculatedFieldSystemMsg { |
|||
|
|||
private final TenantId tenantId; |
|||
private final EntityId entityId; |
|||
private final CalculatedFieldTelemetryMsgProto proto; |
|||
private final TbCallback callback; |
|||
|
|||
|
|||
@Override |
|||
public MsgType getMsgType() { |
|||
return MsgType.CF_TELEMETRY_MSG; |
|||
} |
|||
} |
|||
@ -0,0 +1,42 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors.calculatedField; |
|||
|
|||
import lombok.Data; |
|||
import org.thingsboard.server.common.data.id.EntityId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.msg.MsgType; |
|||
import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg; |
|||
import org.thingsboard.server.common.msg.queue.TbCallback; |
|||
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto; |
|||
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx; |
|||
|
|||
import java.util.List; |
|||
|
|||
@Data |
|||
public class EntityCalculatedFieldLinkedTelemetryMsg implements ToCalculatedFieldSystemMsg { |
|||
|
|||
private final TenantId tenantId; |
|||
private final EntityId entityId; |
|||
private final CalculatedFieldTelemetryMsgProto proto; |
|||
private final CalculatedFieldCtx ctx; |
|||
private final TbCallback callback; |
|||
|
|||
@Override |
|||
public MsgType getMsgType() { |
|||
return MsgType.CF_LINKED_TELEMETRY_MSG; |
|||
} |
|||
} |
|||
@ -0,0 +1,56 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors.calculatedField; |
|||
|
|||
import lombok.Data; |
|||
import org.thingsboard.server.common.data.id.EntityId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.msg.MsgType; |
|||
import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg; |
|||
import org.thingsboard.server.common.msg.queue.TbCallback; |
|||
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto; |
|||
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx; |
|||
|
|||
import java.util.List; |
|||
|
|||
@Data |
|||
public class EntityCalculatedFieldTelemetryMsg implements ToCalculatedFieldSystemMsg { |
|||
|
|||
private final TenantId tenantId; |
|||
private final EntityId entityId; |
|||
private final CalculatedFieldTelemetryMsgProto proto; |
|||
// Both lists are effectively immutable in CalculatedFieldManagerMessageProcessor and must stay so.
|
|||
private final List<CalculatedFieldCtx> entityIdFields; |
|||
private final List<CalculatedFieldCtx> profileIdFields; |
|||
private final TbCallback callback; |
|||
|
|||
public EntityCalculatedFieldTelemetryMsg(CalculatedFieldTelemetryMsg msg, |
|||
List<CalculatedFieldCtx> entityIdFields, |
|||
List<CalculatedFieldCtx> profileIdFields, |
|||
TbCallback callback) { |
|||
this.tenantId = msg.getTenantId(); |
|||
this.entityId = msg.getEntityId(); |
|||
this.proto = msg.getProto(); |
|||
this.entityIdFields = entityIdFields; |
|||
this.profileIdFields = profileIdFields; |
|||
this.callback = callback; |
|||
} |
|||
|
|||
@Override |
|||
public MsgType getMsgType() { |
|||
return MsgType.CF_ENTITY_TELEMETRY_MSG; |
|||
} |
|||
} |
|||
@ -0,0 +1,41 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors.calculatedField; |
|||
|
|||
import lombok.Data; |
|||
import org.thingsboard.server.common.data.id.EntityId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.msg.MsgType; |
|||
import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg; |
|||
import org.thingsboard.server.common.msg.queue.TbCallback; |
|||
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto; |
|||
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx; |
|||
|
|||
import java.util.List; |
|||
|
|||
@Data |
|||
public class EntityInitCalculatedFieldMsg implements ToCalculatedFieldSystemMsg { |
|||
|
|||
private final TenantId tenantId; |
|||
private final CalculatedFieldCtx ctx; |
|||
private final TbCallback callback; |
|||
private final boolean forceReinit; |
|||
|
|||
@Override |
|||
public MsgType getMsgType() { |
|||
return MsgType.CF_ENTITY_INIT_CF_MSG; |
|||
} |
|||
} |
|||
@ -0,0 +1,56 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors.calculatedField; |
|||
|
|||
import lombok.Getter; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.thingsboard.server.common.msg.queue.TbCallback; |
|||
|
|||
import java.util.UUID; |
|||
import java.util.concurrent.atomic.AtomicInteger; |
|||
|
|||
@Slf4j |
|||
public class MultipleTbCallback implements TbCallback { |
|||
@Getter |
|||
private final UUID id; |
|||
private final AtomicInteger counter; |
|||
private final TbCallback callback; |
|||
|
|||
public MultipleTbCallback(int count, TbCallback callback) { |
|||
id = UUID.randomUUID(); |
|||
this.counter = new AtomicInteger(count); |
|||
this.callback = callback; |
|||
} |
|||
|
|||
@Override |
|||
public void onSuccess() { |
|||
onSuccess(1); |
|||
} |
|||
|
|||
public void onSuccess(int number) { |
|||
log.trace("[{}][{}] onSuccess({})", id, callback.getId(), number); |
|||
if (counter.addAndGet(-number) <= 0) { |
|||
log.trace("[{}][{}] Done.", id, callback.getId()); |
|||
callback.onSuccess(); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public void onFailure(Throwable t) { |
|||
log.warn("[{}][{}] onFailure.", id, callback.getId()); |
|||
callback.onFailure(t); |
|||
} |
|||
} |
|||
@ -0,0 +1,283 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.controller; |
|||
|
|||
import com.fasterxml.jackson.core.type.TypeReference; |
|||
import com.fasterxml.jackson.databind.JsonNode; |
|||
import com.fasterxml.jackson.databind.node.ObjectNode; |
|||
import io.swagger.v3.oas.annotations.Parameter; |
|||
import io.swagger.v3.oas.annotations.media.Schema; |
|||
import lombok.RequiredArgsConstructor; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.http.HttpStatus; |
|||
import org.springframework.security.access.prepost.PreAuthorize; |
|||
import org.springframework.web.bind.annotation.PathVariable; |
|||
import org.springframework.web.bind.annotation.RequestBody; |
|||
import org.springframework.web.bind.annotation.RequestMapping; |
|||
import org.springframework.web.bind.annotation.RequestMethod; |
|||
import org.springframework.web.bind.annotation.RequestParam; |
|||
import org.springframework.web.bind.annotation.ResponseBody; |
|||
import org.springframework.web.bind.annotation.ResponseStatus; |
|||
import org.springframework.web.bind.annotation.RestController; |
|||
import org.thingsboard.common.util.JacksonUtil; |
|||
import org.thingsboard.script.api.tbel.TbelCfArg; |
|||
import org.thingsboard.script.api.tbel.TbelCfCtx; |
|||
import org.thingsboard.script.api.tbel.TbelCfSingleValueArg; |
|||
import org.thingsboard.script.api.tbel.TbelInvokeService; |
|||
import org.thingsboard.server.common.data.EntityType; |
|||
import org.thingsboard.server.common.data.EventInfo; |
|||
import org.thingsboard.server.common.data.HasTenantId; |
|||
import org.thingsboard.server.common.data.cf.CalculatedField; |
|||
import org.thingsboard.server.common.data.cf.configuration.CalculatedFieldConfiguration; |
|||
import org.thingsboard.server.common.data.event.EventType; |
|||
import org.thingsboard.server.common.data.exception.ThingsboardException; |
|||
import org.thingsboard.server.common.data.id.CalculatedFieldId; |
|||
import org.thingsboard.server.common.data.id.EntityId; |
|||
import org.thingsboard.server.common.data.id.EntityIdFactory; |
|||
import org.thingsboard.server.common.data.id.HasId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.data.page.PageData; |
|||
import org.thingsboard.server.common.data.page.PageLink; |
|||
import org.thingsboard.server.config.annotations.ApiOperation; |
|||
import org.thingsboard.server.dao.event.EventService; |
|||
import org.thingsboard.server.queue.util.TbCoreComponent; |
|||
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldScriptEngine; |
|||
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldTbelScriptEngine; |
|||
import org.thingsboard.server.service.entitiy.cf.TbCalculatedFieldService; |
|||
import org.thingsboard.server.service.security.model.SecurityUser; |
|||
import org.thingsboard.server.service.security.permission.Operation; |
|||
import org.thingsboard.server.service.security.permission.Resource; |
|||
|
|||
import java.util.ArrayList; |
|||
import java.util.Collections; |
|||
import java.util.List; |
|||
import java.util.Map; |
|||
import java.util.Objects; |
|||
import java.util.Optional; |
|||
import java.util.concurrent.TimeUnit; |
|||
|
|||
import static org.thingsboard.server.controller.ControllerConstants.CF_TEXT_SEARCH_DESCRIPTION; |
|||
import static org.thingsboard.server.controller.ControllerConstants.ENTITY_ID_PARAM_DESCRIPTION; |
|||
import static org.thingsboard.server.controller.ControllerConstants.ENTITY_TYPE_PARAM_DESCRIPTION; |
|||
import static org.thingsboard.server.controller.ControllerConstants.MARKDOWN_CODE_BLOCK_END; |
|||
import static org.thingsboard.server.controller.ControllerConstants.MARKDOWN_CODE_BLOCK_START; |
|||
import static org.thingsboard.server.controller.ControllerConstants.PAGE_NUMBER_DESCRIPTION; |
|||
import static org.thingsboard.server.controller.ControllerConstants.PAGE_SIZE_DESCRIPTION; |
|||
import static org.thingsboard.server.controller.ControllerConstants.SORT_ORDER_DESCRIPTION; |
|||
import static org.thingsboard.server.controller.ControllerConstants.SORT_PROPERTY_DESCRIPTION; |
|||
import static org.thingsboard.server.controller.ControllerConstants.TENANT_AUTHORITY_PARAGRAPH; |
|||
import static org.thingsboard.server.controller.ControllerConstants.TENANT_OR_CUSTOMER_AUTHORITY_PARAGRAPH; |
|||
import static org.thingsboard.server.controller.ControllerConstants.UUID_WIKI_LINK; |
|||
|
|||
@RestController |
|||
@TbCoreComponent |
|||
@RequestMapping("/api") |
|||
@RequiredArgsConstructor |
|||
@Slf4j |
|||
public class CalculatedFieldController extends BaseController { |
|||
|
|||
private final TbCalculatedFieldService tbCalculatedFieldService; |
|||
private final EventService eventService; |
|||
private final TbelInvokeService tbelInvokeService; |
|||
|
|||
public static final String CALCULATED_FIELD_ID = "calculatedFieldId"; |
|||
|
|||
public static final int TIMEOUT = 20; |
|||
|
|||
private static final String TEST_SCRIPT_EXPRESSION = "Execute the Script expression and return the result. The format of request: \n\n" |
|||
+ MARKDOWN_CODE_BLOCK_START |
|||
+ "{\n" + |
|||
" \"expression\": \"var temp = 0; foreach(element: temperature.values) {temp += element.value;} var avgTemperature = temp / temperature.values.size(); var adjustedTemperature = avgTemperature + 0.1 * humidity.value; return {\\\"adjustedTemperature\\\": adjustedTemperature};\",\n" + |
|||
" \"arguments\": {\n" + |
|||
" \"temperature\": {\n" + |
|||
" \"type\": \"TS_ROLLING\",\n" + |
|||
" \"timeWindow\": {\n" + |
|||
" \"startTs\": 1739775630002,\n" + |
|||
" \"endTs\": 65432211,\n" + |
|||
" \"limit\": 5\n" + |
|||
" },\n" + |
|||
" \"values\": [\n" + |
|||
" { \"ts\": 1739775639851, \"value\": 23 },\n" + |
|||
" { \"ts\": 1739775664561, \"value\": 43 },\n" + |
|||
" { \"ts\": 1739775713079, \"value\": 15 },\n" + |
|||
" { \"ts\": 1739775999522, \"value\": 34 },\n" + |
|||
" { \"ts\": 1739776228452, \"value\": 22 }\n" + |
|||
" ]\n" + |
|||
" },\n" + |
|||
" \"humidity\": { \"type\": \"SINGLE_VALUE\", \"ts\": 1739776478057, \"value\": 23 }\n" + |
|||
" }\n" + |
|||
"}" |
|||
+ MARKDOWN_CODE_BLOCK_END |
|||
+ "\n\n Expected result JSON contains \"output\" and \"error\"."; |
|||
|
|||
@ApiOperation(value = "Create Or Update Calculated Field (saveCalculatedField)", |
|||
notes = "Creates or Updates the Calculated Field. When creating calculated field, platform generates Calculated Field Id as " + UUID_WIKI_LINK + |
|||
"The newly created Calculated Field Id will be present in the response. " + |
|||
"Specify existing Calculated Field Id to update the calculated field. " + |
|||
"Referencing non-existing Calculated Field Id will cause 'Not Found' error. " + |
|||
"Remove 'id', 'tenantId' from the request body example (below) to create new Calculated Field entity. " |
|||
+ TENANT_OR_CUSTOMER_AUTHORITY_PARAGRAPH) |
|||
@PreAuthorize("hasAnyAuthority('TENANT_ADMIN')") |
|||
@RequestMapping(value = "/calculatedField", method = RequestMethod.POST) |
|||
@ResponseBody |
|||
public CalculatedField saveCalculatedField(@io.swagger.v3.oas.annotations.parameters.RequestBody(description = "A JSON value representing the calculated field.") |
|||
@RequestBody CalculatedField calculatedField) throws Exception { |
|||
calculatedField.setTenantId(getTenantId()); |
|||
checkEntity(calculatedField.getId(), calculatedField, Resource.CALCULATED_FIELD); |
|||
checkEntityId(calculatedField.getEntityId(), Operation.WRITE_CALCULATED_FIELD); |
|||
checkReferencedEntities(calculatedField.getConfiguration(), getCurrentUser()); |
|||
return tbCalculatedFieldService.save(calculatedField, getCurrentUser()); |
|||
} |
|||
|
|||
@ApiOperation(value = "Get Calculated Field (getCalculatedFieldById)", |
|||
notes = "Fetch the Calculated Field object based on the provided Calculated Field Id." |
|||
) |
|||
@PreAuthorize("hasAnyAuthority('TENANT_ADMIN')") |
|||
@RequestMapping(value = "/calculatedField/{calculatedFieldId}", method = RequestMethod.GET) |
|||
@ResponseBody |
|||
public CalculatedField getCalculatedFieldById(@Parameter @PathVariable(CALCULATED_FIELD_ID) String strCalculatedFieldId) throws ThingsboardException { |
|||
checkParameter(CALCULATED_FIELD_ID, strCalculatedFieldId); |
|||
CalculatedFieldId calculatedFieldId = new CalculatedFieldId(toUUID(strCalculatedFieldId)); |
|||
CalculatedField calculatedField = tbCalculatedFieldService.findById(calculatedFieldId, getCurrentUser()); |
|||
checkNotNull(calculatedField); |
|||
checkEntityId(calculatedField.getEntityId(), Operation.READ_CALCULATED_FIELD); |
|||
return calculatedField; |
|||
} |
|||
|
|||
@ApiOperation(value = "Get Calculated Fields by Entity Id (getCalculatedFieldsByEntityId)", |
|||
notes = "Fetch the Calculated Fields based on the provided Entity Id." |
|||
) |
|||
@PreAuthorize("hasAnyAuthority('TENANT_ADMIN')") |
|||
@RequestMapping(value = "/{entityType}/{entityId}/calculatedFields", params = {"pageSize", "page"}, method = RequestMethod.GET) |
|||
@ResponseBody |
|||
public PageData<CalculatedField> getCalculatedFieldsByEntityId( |
|||
@Parameter(description = ENTITY_TYPE_PARAM_DESCRIPTION, required = true, schema = @Schema(defaultValue = "DEVICE")) @PathVariable("entityType") String entityType, |
|||
@Parameter(description = ENTITY_ID_PARAM_DESCRIPTION, required = true) @PathVariable("entityId") String entityIdStr, |
|||
@Parameter(description = PAGE_SIZE_DESCRIPTION, required = true) @RequestParam int pageSize, |
|||
@Parameter(description = PAGE_NUMBER_DESCRIPTION, required = true) @RequestParam int page, |
|||
@Parameter(description = CF_TEXT_SEARCH_DESCRIPTION) @RequestParam(required = false) String textSearch, |
|||
@Parameter(description = SORT_PROPERTY_DESCRIPTION, schema = @Schema(allowableValues = {"createdTime", "name"})) @RequestParam(required = false) String sortProperty, |
|||
@Parameter(description = SORT_ORDER_DESCRIPTION, schema = @Schema(allowableValues = {"ASC", "DESC"})) @RequestParam(required = false) String sortOrder) throws ThingsboardException { |
|||
PageLink pageLink = createPageLink(pageSize, page, textSearch, sortProperty, sortOrder); |
|||
checkParameter("entityId", entityIdStr); |
|||
EntityId entityId = EntityIdFactory.getByTypeAndUuid(entityType, entityIdStr); |
|||
checkEntityId(entityId, Operation.READ_CALCULATED_FIELD); |
|||
return checkNotNull(tbCalculatedFieldService.findAllByTenantIdAndEntityId(entityId, getCurrentUser(), pageLink)); |
|||
} |
|||
|
|||
@ApiOperation(value = "Delete Calculated Field (deleteCalculatedField)", |
|||
notes = "Deletes the calculated field. Referencing non-existing Calculated Field Id will cause an error." + TENANT_OR_CUSTOMER_AUTHORITY_PARAGRAPH) |
|||
@PreAuthorize("hasAuthority('TENANT_ADMIN')") |
|||
@RequestMapping(value = "/calculatedField/{calculatedFieldId}", method = RequestMethod.DELETE) |
|||
@ResponseStatus(value = HttpStatus.OK) |
|||
public void deleteCalculatedField(@PathVariable(CALCULATED_FIELD_ID) String strCalculatedFieldId) throws Exception { |
|||
checkParameter(CALCULATED_FIELD_ID, strCalculatedFieldId); |
|||
CalculatedFieldId calculatedFieldId = new CalculatedFieldId(toUUID(strCalculatedFieldId)); |
|||
CalculatedField calculatedField = checkCalculatedFieldId(calculatedFieldId, Operation.DELETE); |
|||
checkEntityId(calculatedField.getEntityId(), Operation.WRITE_CALCULATED_FIELD); |
|||
tbCalculatedFieldService.delete(calculatedField, getCurrentUser()); |
|||
} |
|||
|
|||
@ApiOperation(value = "Get latest calculated field debug event (getLatestCalculatedFieldDebugEvent)", |
|||
notes = "Gets latest calculated field debug event for specified calculated field id. " + |
|||
"Referencing non-existing calculated field id will cause an error. " + TENANT_AUTHORITY_PARAGRAPH) |
|||
@PreAuthorize("hasAnyAuthority('TENANT_ADMIN')") |
|||
@RequestMapping(value = "/calculatedField/{calculatedFieldId}/debug", method = RequestMethod.GET) |
|||
@ResponseBody |
|||
public JsonNode getLatestCalculatedFieldDebugEvent(@Parameter @PathVariable(CALCULATED_FIELD_ID) String strCalculatedFieldId) throws ThingsboardException { |
|||
checkParameter(CALCULATED_FIELD_ID, strCalculatedFieldId); |
|||
CalculatedFieldId calculatedFieldId = new CalculatedFieldId(toUUID(strCalculatedFieldId)); |
|||
CalculatedField calculatedField = checkCalculatedFieldId(calculatedFieldId, Operation.READ); |
|||
checkEntityId(calculatedField.getEntityId(), Operation.READ_CALCULATED_FIELD); |
|||
TenantId tenantId = getCurrentUser().getTenantId(); |
|||
return Optional.ofNullable(eventService.findLatestEvents(tenantId, calculatedFieldId, EventType.DEBUG_CALCULATED_FIELD, 1)) |
|||
.flatMap(events -> events.stream().map(EventInfo::getBody).findFirst()) |
|||
.orElse(null); |
|||
} |
|||
|
|||
@ApiOperation(value = "Test Script expression", |
|||
notes = TEST_SCRIPT_EXPRESSION + TENANT_AUTHORITY_PARAGRAPH) |
|||
@PreAuthorize("hasAuthority('TENANT_ADMIN')") |
|||
@RequestMapping(value = "/calculatedField/testScript", method = RequestMethod.POST) |
|||
@ResponseBody |
|||
public JsonNode testScript( |
|||
@io.swagger.v3.oas.annotations.parameters.RequestBody(description = "Test calculated field TBEL expression.") |
|||
@RequestBody JsonNode inputParams) { |
|||
String expression = inputParams.get("expression").asText(); |
|||
Map<String, TbelCfArg> arguments = Objects.requireNonNullElse( |
|||
JacksonUtil.convertValue(inputParams.get("arguments"), new TypeReference<>() { |
|||
}), |
|||
Collections.emptyMap() |
|||
); |
|||
|
|||
ArrayList<String> ctxAndArgNames = new ArrayList<>(arguments.size() + 1); |
|||
ctxAndArgNames.add("ctx"); |
|||
ctxAndArgNames.addAll(arguments.keySet()); |
|||
|
|||
String output = ""; |
|||
String errorText = ""; |
|||
|
|||
try { |
|||
if (tbelInvokeService == null) { |
|||
throw new IllegalArgumentException("TBEL script engine is disabled!"); |
|||
} |
|||
|
|||
CalculatedFieldScriptEngine calculatedFieldScriptEngine = new CalculatedFieldTbelScriptEngine( |
|||
getTenantId(), |
|||
tbelInvokeService, |
|||
expression, |
|||
ctxAndArgNames.toArray(String[]::new) |
|||
); |
|||
|
|||
|
|||
Object[] args = new Object[ctxAndArgNames.size()]; |
|||
args[0] = new TbelCfCtx(arguments); |
|||
for (int i = 1; i < ctxAndArgNames.size(); i++) { |
|||
var arg = arguments.get(ctxAndArgNames.get(i)); |
|||
if (arg instanceof TbelCfSingleValueArg svArg) { |
|||
args[i] = svArg.getValue(); |
|||
} else { |
|||
args[i] = arg; |
|||
} |
|||
} |
|||
|
|||
JsonNode json = calculatedFieldScriptEngine.executeJsonAsync(args).get(TIMEOUT, TimeUnit.SECONDS); |
|||
output = JacksonUtil.toString(json); |
|||
} catch (Exception e) { |
|||
log.error("Error evaluating expression", e); |
|||
errorText = e.getMessage(); |
|||
} |
|||
|
|||
ObjectNode result = JacksonUtil.newObjectNode(); |
|||
result.put("output", output); |
|||
result.put("error", errorText); |
|||
return result; |
|||
} |
|||
|
|||
private <E extends HasId<I> & HasTenantId, I extends EntityId> void checkReferencedEntities(CalculatedFieldConfiguration calculatedFieldConfig, SecurityUser user) throws ThingsboardException { |
|||
List<EntityId> referencedEntityIds = calculatedFieldConfig.getReferencedEntities(); |
|||
for (EntityId referencedEntityId : referencedEntityIds) { |
|||
EntityType entityType = referencedEntityId.getEntityType(); |
|||
switch (entityType) { |
|||
case TENANT, CUSTOMER, ASSET, DEVICE -> checkEntityId(referencedEntityId, Operation.READ); |
|||
default -> |
|||
throw new IllegalArgumentException("Calculated fields do not support '" + entityType + "' for referenced entities."); |
|||
} |
|||
} |
|||
|
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,24 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.exception; |
|||
|
|||
public class CalculatedFieldStateException extends RuntimeException { |
|||
|
|||
public CalculatedFieldStateException(String message) { |
|||
super(message); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,93 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.cf; |
|||
|
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.thingsboard.server.actors.ActorSystemContext; |
|||
import org.thingsboard.server.actors.calculatedField.CalculatedFieldStateRestoreMsg; |
|||
import org.thingsboard.server.common.msg.queue.TbCallback; |
|||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; |
|||
import org.thingsboard.server.exception.CalculatedFieldStateException; |
|||
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateProto; |
|||
import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg; |
|||
import org.thingsboard.server.queue.common.TbProtoQueueMsg; |
|||
import org.thingsboard.server.queue.common.state.QueueStateService; |
|||
import org.thingsboard.server.queue.discovery.QueueKey; |
|||
import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId; |
|||
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldState; |
|||
|
|||
import java.util.Collection; |
|||
import java.util.Set; |
|||
import java.util.stream.Collectors; |
|||
|
|||
import static org.thingsboard.server.utils.CalculatedFieldUtils.fromProto; |
|||
import static org.thingsboard.server.utils.CalculatedFieldUtils.toProto; |
|||
|
|||
public abstract class AbstractCalculatedFieldStateService implements CalculatedFieldStateService { |
|||
|
|||
@Autowired |
|||
private ActorSystemContext actorSystemContext; |
|||
|
|||
protected QueueStateService<TbProtoQueueMsg<ToCalculatedFieldMsg>, TbProtoQueueMsg<CalculatedFieldStateProto>> stateService; |
|||
|
|||
@Override |
|||
public final void persistState(CalculatedFieldEntityCtxId stateId, CalculatedFieldState state, TbCallback callback) { |
|||
if (state.isSizeExceedsLimit()) { |
|||
throw new CalculatedFieldStateException("State size exceeds the maximum allowed limit. The state will not be persisted to RocksDB."); |
|||
} |
|||
doPersist(stateId, toProto(stateId, state), callback); |
|||
} |
|||
|
|||
protected abstract void doPersist(CalculatedFieldEntityCtxId stateId, CalculatedFieldStateProto stateMsgProto, TbCallback callback); |
|||
|
|||
@Override |
|||
public final void removeState(CalculatedFieldEntityCtxId stateId, TbCallback callback) { |
|||
doRemove(stateId, callback); |
|||
} |
|||
|
|||
protected abstract void doRemove(CalculatedFieldEntityCtxId stateId, TbCallback callback); |
|||
|
|||
protected void processRestoredState(CalculatedFieldStateProto stateMsg) { |
|||
var id = fromProto(stateMsg.getId()); |
|||
var state = fromProto(stateMsg); |
|||
processRestoredState(id, state); |
|||
} |
|||
|
|||
protected void processRestoredState(CalculatedFieldEntityCtxId id, CalculatedFieldState state) { |
|||
actorSystemContext.tell(new CalculatedFieldStateRestoreMsg(id, state)); |
|||
} |
|||
|
|||
@Override |
|||
public void restore(QueueKey queueKey, Set<TopicPartitionInfo> partitions) { |
|||
stateService.update(queueKey, partitions); |
|||
} |
|||
|
|||
@Override |
|||
public void delete(Set<TopicPartitionInfo> partitions) { |
|||
stateService.delete(partitions); |
|||
} |
|||
|
|||
@Override |
|||
public Set<TopicPartitionInfo> getPartitions() { |
|||
return stateService.getPartitions().values().stream().flatMap(Collection::stream).collect(Collectors.toSet()); |
|||
} |
|||
|
|||
@Override |
|||
public void stop() { |
|||
stateService.stop(); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,45 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.cf; |
|||
|
|||
import org.thingsboard.server.common.data.cf.CalculatedField; |
|||
import org.thingsboard.server.common.data.cf.CalculatedFieldLink; |
|||
import org.thingsboard.server.common.data.id.CalculatedFieldId; |
|||
import org.thingsboard.server.common.data.id.EntityId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx; |
|||
|
|||
import java.util.List; |
|||
|
|||
public interface CalculatedFieldCache { |
|||
|
|||
CalculatedField getCalculatedField(CalculatedFieldId calculatedFieldId); |
|||
|
|||
List<CalculatedField> getCalculatedFieldsByEntityId(EntityId entityId); |
|||
|
|||
List<CalculatedFieldLink> getCalculatedFieldLinksByEntityId(EntityId entityId); |
|||
|
|||
CalculatedFieldCtx getCalculatedFieldCtx(CalculatedFieldId calculatedFieldId); |
|||
|
|||
List<CalculatedFieldCtx> getCalculatedFieldCtxsByEntityId(EntityId entityId); |
|||
|
|||
void addCalculatedField(TenantId tenantId, CalculatedFieldId calculatedFieldId); |
|||
|
|||
void updateCalculatedField(TenantId tenantId, CalculatedFieldId calculatedFieldId); |
|||
|
|||
void evict(CalculatedFieldId calculatedFieldId); |
|||
|
|||
} |
|||
@ -0,0 +1,19 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.cf; |
|||
|
|||
public interface CalculatedFieldInitService { |
|||
} |
|||
@ -0,0 +1,43 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.cf; |
|||
|
|||
import com.google.common.util.concurrent.ListenableFuture; |
|||
import org.thingsboard.server.actors.calculatedField.CalculatedFieldTelemetryMsg; |
|||
import org.thingsboard.server.common.data.cf.configuration.Argument; |
|||
import org.thingsboard.server.common.data.id.CalculatedFieldId; |
|||
import org.thingsboard.server.common.data.id.EntityId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.msg.queue.TbCallback; |
|||
import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId; |
|||
import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry; |
|||
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx; |
|||
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldState; |
|||
|
|||
import java.util.List; |
|||
import java.util.Map; |
|||
|
|||
public interface CalculatedFieldProcessingService { |
|||
|
|||
ListenableFuture<CalculatedFieldState> fetchStateFromDb(CalculatedFieldCtx ctx, EntityId entityId); |
|||
|
|||
Map<String, ArgumentEntry> fetchArgsFromDb(TenantId tenantId, EntityId entityId, Map<String, Argument> arguments); |
|||
|
|||
void pushMsgToRuleEngine(TenantId tenantId, EntityId entityId, CalculatedFieldResult calculationResult, List<CalculatedFieldId> cfIds, TbCallback callback); |
|||
|
|||
void pushMsgToLinks(CalculatedFieldTelemetryMsg msg, List<CalculatedFieldEntityCtxId> linkedCalculatedFields, TbCallback callback); |
|||
|
|||
} |
|||
@ -0,0 +1,44 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.cf; |
|||
|
|||
import com.google.common.util.concurrent.FutureCallback; |
|||
import org.thingsboard.rule.engine.api.AttributesDeleteRequest; |
|||
import org.thingsboard.rule.engine.api.AttributesSaveRequest; |
|||
import org.thingsboard.rule.engine.api.RuleEngineCalculatedFieldQueueService; |
|||
import org.thingsboard.rule.engine.api.TimeseriesDeleteRequest; |
|||
import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; |
|||
import org.thingsboard.server.common.data.kv.TimeseriesSaveResult; |
|||
|
|||
import java.util.List; |
|||
|
|||
public interface CalculatedFieldQueueService extends RuleEngineCalculatedFieldQueueService { |
|||
|
|||
/** |
|||
* Filter CFs based on the request entity. Push to the queue if any matching CF exist; |
|||
* |
|||
* @param request - telemetry save request; |
|||
* @param callback |
|||
*/ |
|||
void pushRequestToQueue(TimeseriesSaveRequest request, TimeseriesSaveResult result, FutureCallback<Void> callback); |
|||
|
|||
void pushRequestToQueue(AttributesSaveRequest request, List<Long> result, FutureCallback<Void> callback); |
|||
|
|||
void pushRequestToQueue(AttributesDeleteRequest request, List<String> result, FutureCallback<Void> callback); |
|||
|
|||
void pushRequestToQueue(TimeseriesDeleteRequest request, List<String> result, FutureCallback<Void> callback); |
|||
|
|||
} |
|||
@ -0,0 +1,37 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.cf; |
|||
|
|||
import com.fasterxml.jackson.databind.JsonNode; |
|||
import lombok.Data; |
|||
import org.thingsboard.server.common.data.AttributeScope; |
|||
import org.thingsboard.server.common.data.cf.configuration.OutputType; |
|||
|
|||
@Data |
|||
public final class CalculatedFieldResult { |
|||
|
|||
private final OutputType type; |
|||
private final AttributeScope scope; |
|||
private final JsonNode result; |
|||
|
|||
public boolean isEmpty() { |
|||
return result == null || result.isMissingNode() || result.isNull() || |
|||
(result.isObject() && result.isEmpty()) || |
|||
(result.isArray() && result.isEmpty()) || |
|||
(result.isTextual() && result.asText().isEmpty()); |
|||
} |
|||
|
|||
} |
|||