|
|
|
@ -27,6 +27,7 @@ import org.thingsboard.server.common.data.AttributeScope; |
|
|
|
import org.thingsboard.server.common.data.DataConstants; |
|
|
|
import org.thingsboard.server.common.data.StringUtils; |
|
|
|
import org.thingsboard.server.common.data.alarm.Alarm; |
|
|
|
import org.thingsboard.server.common.data.cf.CalculatedFieldEventType; |
|
|
|
import org.thingsboard.server.common.data.cf.CalculatedFieldType; |
|
|
|
import org.thingsboard.server.common.data.cf.configuration.Argument; |
|
|
|
import org.thingsboard.server.common.data.cf.configuration.ArgumentType; |
|
|
|
@ -75,7 +76,6 @@ import java.util.UUID; |
|
|
|
import java.util.concurrent.TimeUnit; |
|
|
|
import java.util.stream.Collectors; |
|
|
|
|
|
|
|
import static org.thingsboard.server.common.data.DataConstants.REEVALUATION_MSG; |
|
|
|
import static org.thingsboard.server.common.data.cf.configuration.PropagationCalculatedFieldConfiguration.PROPAGATION_CONFIG_ARGUMENT; |
|
|
|
import static org.thingsboard.server.service.cf.ctx.state.TsRollingArgumentEntry.getValueForTsRecord; |
|
|
|
import static org.thingsboard.server.utils.CalculatedFieldArgumentUtils.createStateByType; |
|
|
|
@ -169,7 +169,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM |
|
|
|
} |
|
|
|
if (msg.getStateAction() != StateAction.REFRESH_CTX) { |
|
|
|
if (state.isSizeOk()) { |
|
|
|
processStateIfReady(state, Collections.emptyMap(), ctx, Collections.singletonList(ctx.getCfId()), null, null, msg.getCallback()); |
|
|
|
processStateIfReady(state, Collections.emptyMap(), ctx, Collections.singletonList(ctx.getCfId()), null, msg.getEventType().name(), msg.getCallback()); |
|
|
|
} else { |
|
|
|
throw new RuntimeException(ctx.getSizeExceedsLimitMessage()); |
|
|
|
} |
|
|
|
@ -196,7 +196,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM |
|
|
|
Map<String, ArgumentEntry> fetchedArgs = cfService.fetchArgsFromDb(tenantId, entityId, dynamicSourceArgs); |
|
|
|
fetchedArgs.values().forEach(arg -> arg.setForceResetPrevious(true)); |
|
|
|
|
|
|
|
processArgumentValuesUpdate(ctx, Collections.singletonList(ctx.getCfId()), msg.getCallback(), fetchedArgs, null, null); |
|
|
|
processArgumentValuesUpdate(ctx, Collections.singletonList(ctx.getCfId()), msg.getCallback(), fetchedArgs, null, msg.getEventType().name()); |
|
|
|
} catch (Exception e) { |
|
|
|
throw CalculatedFieldException.builder().ctx(ctx).eventEntity(entityId).cause(e).build(); |
|
|
|
} |
|
|
|
@ -256,7 +256,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM |
|
|
|
state.checkStateSize(new CalculatedFieldEntityCtxId(tenantId, ctx.getCfId(), entityId), ctx.getMaxStateSize()); |
|
|
|
} |
|
|
|
if (state.isSizeOk()) { |
|
|
|
processStateIfReady(state, updatedArgs, ctx, Collections.singletonList(ctx.getCfId()), null, null, msg.getCallback()); |
|
|
|
processStateIfReady(state, updatedArgs, ctx, Collections.singletonList(ctx.getCfId()), null, TbMsgType.RELATION_ADD_OR_UPDATE.name(), msg.getCallback()); |
|
|
|
} else { |
|
|
|
throw CalculatedFieldException.builder().ctx(ctx).eventEntity(entityId).errorMessage(ctx.getSizeExceedsLimitMessage()).build(); |
|
|
|
} |
|
|
|
@ -283,7 +283,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM |
|
|
|
state.checkStateSize(new CalculatedFieldEntityCtxId(tenantId, ctx.getCfId(), entityId), ctx.getMaxStateSize()); |
|
|
|
|
|
|
|
if (state.isSizeOk()) { |
|
|
|
processStateIfReady(state, Collections.emptyMap(), ctx, Collections.singletonList(ctx.getCfId()), null, null, msg.getCallback()); |
|
|
|
processStateIfReady(state, Collections.emptyMap(), ctx, Collections.singletonList(ctx.getCfId()), null, TbMsgType.RELATION_DELETED.name(), msg.getCallback()); |
|
|
|
} else { |
|
|
|
throw new RuntimeException(ctx.getSizeExceedsLimitMessage()); |
|
|
|
} |
|
|
|
@ -293,6 +293,9 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM |
|
|
|
PropagationArgumentEntry entry = new PropagationArgumentEntry(); |
|
|
|
entry.setRemoved(msg.getRelatedEntityId()); |
|
|
|
propagationState.update(Map.of(PROPAGATION_CONFIG_ARGUMENT, entry), ctx); |
|
|
|
if (DebugModeUtil.isDebugAllAvailable(ctx.getCalculatedField())) { |
|
|
|
systemContext.persistCalculatedFieldDebugEvent(tenantId, ctx.getCfId(), entityId, state.getArgumentsJson(), null, TbMsgType.RELATION_DELETED.name(), null, null); |
|
|
|
} |
|
|
|
} |
|
|
|
msg.getCallback().onSuccess(); |
|
|
|
} |
|
|
|
@ -323,13 +326,13 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM |
|
|
|
callback.onSuccess(); |
|
|
|
} else { |
|
|
|
if (proto.getTsDataCount() > 0) { |
|
|
|
processArgumentValuesUpdate(ctx, cfIds, callback, mapToArguments(ctx, msg.getEntityId(), proto.getTsDataList()), toTbMsgId(proto), toTbMsgType(proto)); |
|
|
|
processArgumentValuesUpdate(ctx, cfIds, callback, mapToArguments(ctx, msg.getEntityId(), proto.getTsDataList()), toTbMsgId(proto), toMsgType(proto)); |
|
|
|
} else if (proto.getAttrDataCount() > 0) { |
|
|
|
processArgumentValuesUpdate(ctx, cfIds, callback, mapToArguments(ctx, msg.getEntityId(), proto.getScope(), proto.getAttrDataList()), toTbMsgId(proto), toTbMsgType(proto)); |
|
|
|
processArgumentValuesUpdate(ctx, cfIds, callback, mapToArguments(ctx, msg.getEntityId(), proto.getScope(), proto.getAttrDataList()), toTbMsgId(proto), toMsgType(proto)); |
|
|
|
} else if (proto.getRemovedTsKeysCount() > 0) { |
|
|
|
processArgumentValuesUpdate(ctx, cfIds, callback, mapToArgumentsWithFetchedValue(ctx, msg.getEntityId(), proto.getRemovedTsKeysList()), toTbMsgId(proto), toTbMsgType(proto)); |
|
|
|
processArgumentValuesUpdate(ctx, cfIds, callback, mapToArgumentsWithFetchedValue(ctx, msg.getEntityId(), proto.getRemovedTsKeysList()), toTbMsgId(proto), toMsgType(proto)); |
|
|
|
} else if (proto.getRemovedAttrKeysCount() > 0) { |
|
|
|
processArgumentValuesUpdate(ctx, cfIds, callback, mapToArgumentsWithDefaultValue(ctx, msg.getEntityId(), proto.getScope(), proto.getRemovedAttrKeysList()), toTbMsgId(proto), toTbMsgType(proto)); |
|
|
|
processArgumentValuesUpdate(ctx, cfIds, callback, mapToArgumentsWithDefaultValue(ctx, msg.getEntityId(), proto.getScope(), proto.getRemovedAttrKeysList()), toTbMsgId(proto), toMsgType(proto)); |
|
|
|
} else { |
|
|
|
callback.onSuccess(); |
|
|
|
} |
|
|
|
@ -379,7 +382,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM |
|
|
|
} |
|
|
|
if (state.isSizeOk()) { |
|
|
|
log.debug("[{}][{}] Reevaluating CF state", entityId, cfId); |
|
|
|
processStateIfReady(state, null, ctx, Collections.singletonList(cfId), null, REEVALUATION_MSG, msg.getCallback()); |
|
|
|
processStateIfReady(state, null, ctx, Collections.singletonList(cfId), null, CalculatedFieldEventType.REEVALUATION_MSG.name(), msg.getCallback()); |
|
|
|
} else { |
|
|
|
throw new RuntimeException(ctx.getSizeExceedsLimitMessage()); |
|
|
|
} |
|
|
|
@ -399,23 +402,23 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM |
|
|
|
} |
|
|
|
|
|
|
|
private void processTelemetry(CalculatedFieldCtx ctx, CalculatedFieldTelemetryMsgProto proto, List<CalculatedFieldId> cfIdList, TbCallback callback) throws CalculatedFieldException { |
|
|
|
processArgumentValuesUpdate(ctx, cfIdList, callback, mapToArguments(ctx, proto.getTsDataList()), toTbMsgId(proto), toTbMsgType(proto)); |
|
|
|
processArgumentValuesUpdate(ctx, cfIdList, callback, mapToArguments(ctx, proto.getTsDataList()), toTbMsgId(proto), toMsgType(proto)); |
|
|
|
} |
|
|
|
|
|
|
|
private void processAttributes(CalculatedFieldCtx ctx, CalculatedFieldTelemetryMsgProto proto, List<CalculatedFieldId> cfIdList, TbCallback callback) throws CalculatedFieldException { |
|
|
|
processArgumentValuesUpdate(ctx, cfIdList, callback, mapToArguments(ctx, proto.getScope(), proto.getAttrDataList()), toTbMsgId(proto), toTbMsgType(proto)); |
|
|
|
processArgumentValuesUpdate(ctx, cfIdList, callback, mapToArguments(ctx, proto.getScope(), proto.getAttrDataList()), toTbMsgId(proto), toMsgType(proto)); |
|
|
|
} |
|
|
|
|
|
|
|
private void processRemovedTelemetry(CalculatedFieldCtx ctx, CalculatedFieldTelemetryMsgProto proto, List<CalculatedFieldId> cfIdList, TbCallback callback) throws CalculatedFieldException { |
|
|
|
processArgumentValuesUpdate(ctx, cfIdList, callback, mapToArgumentsWithFetchedValue(ctx, entityId, proto.getRemovedTsKeysList()), toTbMsgId(proto), toTbMsgType(proto)); |
|
|
|
processArgumentValuesUpdate(ctx, cfIdList, callback, mapToArgumentsWithFetchedValue(ctx, entityId, proto.getRemovedTsKeysList()), toTbMsgId(proto), toMsgType(proto)); |
|
|
|
} |
|
|
|
|
|
|
|
private void processRemovedAttributes(CalculatedFieldCtx ctx, CalculatedFieldTelemetryMsgProto proto, List<CalculatedFieldId> cfIdList, TbCallback callback) throws CalculatedFieldException { |
|
|
|
processArgumentValuesUpdate(ctx, cfIdList, callback, mapToArgumentsWithDefaultValue(ctx, proto.getScope(), proto.getRemovedAttrKeysList()), toTbMsgId(proto), toTbMsgType(proto)); |
|
|
|
processArgumentValuesUpdate(ctx, cfIdList, callback, mapToArgumentsWithDefaultValue(ctx, proto.getScope(), proto.getRemovedAttrKeysList()), toTbMsgId(proto), toMsgType(proto)); |
|
|
|
} |
|
|
|
|
|
|
|
private void processArgumentValuesUpdate(CalculatedFieldCtx ctx, List<CalculatedFieldId> cfIdList, TbCallback callback, |
|
|
|
Map<String, ArgumentEntry> newArgValues, UUID tbMsgId, TbMsgType tbMsgType) throws CalculatedFieldException { |
|
|
|
Map<String, ArgumentEntry> newArgValues, UUID tbMsgId, String msgType) throws CalculatedFieldException { |
|
|
|
if (newArgValues.isEmpty()) { |
|
|
|
log.debug("[{}] No new argument values to process for CF.", ctx.getCfId()); |
|
|
|
callback.onSuccess(); |
|
|
|
@ -453,7 +456,6 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM |
|
|
|
if (!updatedArgs.isEmpty() || justRestored) { |
|
|
|
cfIdList = new ArrayList<>(cfIdList); |
|
|
|
cfIdList.add(ctx.getCfId()); |
|
|
|
String msgType = tbMsgType == null ? null : tbMsgType.name(); |
|
|
|
processStateIfReady(state, updatedArgs, ctx, cfIdList, tbMsgId, msgType, callback); |
|
|
|
} else { |
|
|
|
callback.onSuccess(); |
|
|
|
@ -777,9 +779,9 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM |
|
|
|
return null; |
|
|
|
} |
|
|
|
|
|
|
|
private TbMsgType toTbMsgType(CalculatedFieldTelemetryMsgProto proto) { |
|
|
|
private String toMsgType(CalculatedFieldTelemetryMsgProto proto) { |
|
|
|
if (!proto.getTbMsgType().isEmpty()) { |
|
|
|
return TbMsgType.valueOf(proto.getTbMsgType()); |
|
|
|
return proto.getTbMsgType(); |
|
|
|
} |
|
|
|
return null; |
|
|
|
} |
|
|
|
|