From baf0e948dbc2f73ddb0ce3e287e53c8167690942 Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Mon, 17 Feb 2025 17:21:12 +0200 Subject: [PATCH] replaced cf debug events creation --- .../server/actors/ActorSystemContext.java | 16 ++- ...CalculatedFieldEntityMessageProcessor.java | 124 +++++++++--------- .../cf/ctx/state/TsRollingArgumentEntry.java | 3 +- 3 files changed, 75 insertions(+), 68 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index 20f61115e6..14c8a41fb5 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -60,7 +60,6 @@ import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.common.msg.tools.TbRateLimits; -import org.thingsboard.server.common.msg.tools.TbRateLimitsException; import org.thingsboard.server.common.stats.TbApiUsageReportClient; import org.thingsboard.server.dao.alarm.AlarmCommentService; import org.thingsboard.server.dao.asset.AssetProfileService; @@ -817,11 +816,7 @@ public class ActorSystemContext { } public void persistCalculatedFieldDebugEvent(TenantId tenantId, CalculatedFieldId calculatedFieldId, EntityId entityId, Map arguments, UUID tbMsgId, TbMsgType tbMsgType, String result, Throwable error) { - if (calculatedFieldsDebugPerTenantEnabled) { - if (!rateLimitService.checkRateLimit(LimitedApi.CALCULATED_FIELD_DEBUG_EVENTS, (Object) tenantId, calculatedFieldsDebugPerTenantLimitsConfiguration)) { - log.trace("[{}] Calculated field debug event limits exceeded!", tenantId); - throw new TbRateLimitsException("Failed to persist calculated field debug event due to rate limits!"); - } + if (checkLimits(tenantId)) { try { CalculatedFieldDebugEvent.CalculatedFieldDebugEventBuilder eventBuilder = CalculatedFieldDebugEvent.builder() .tenantId(tenantId) @@ -856,6 +851,15 @@ public class ActorSystemContext { } } + private boolean checkLimits(TenantId tenantId) { + if (calculatedFieldsDebugPerTenantEnabled && + !rateLimitService.checkRateLimit(LimitedApi.CALCULATED_FIELD_DEBUG_EVENTS, (Object) tenantId, calculatedFieldsDebugPerTenantLimitsConfiguration)) { + log.trace("[{}] Calculated field debug event limits exceeded!", tenantId); + return false; + } + return true; + } + public static Exception toException(Throwable error) { return Exception.class.isInstance(error) ? (Exception) error : new Exception(error); } diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java index 60c76b720d..17ec258c80 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java @@ -107,8 +107,15 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM log.info("Force reinitialization of CF: [{}].", cfCtx.getCfId()); states.remove(cfCtx.getCfId()); } - var cfState = getOrInitState(cfCtx); - processStateIfReady(cfCtx, Collections.singletonList(cfCtx.getCfId()), cfState, null, null, msg.getCallback()); + try { + var cfState = getOrInitState(cfCtx); + processStateIfReady(cfCtx, Collections.singletonList(cfCtx.getCfId()), cfState, null, null, msg.getCallback()); + } catch (Exception e) { + if (DebugModeUtil.isDebugFailuresAvailable(cfCtx.getCalculatedField())) { + systemContext.persistCalculatedFieldDebugEvent(tenantId, cfCtx.getCfId(), entityId, null, null, null, null, e); + } + msg.getCallback().onFailure(e); + } } public void process(CalculatedFieldEntityDeleteMsg msg) { @@ -146,31 +153,45 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM var proto = msg.getProto(); var ctx = msg.getCtx(); var callback = new MultipleTbCallback(CALLBACKS_PER_CF, msg.getCallback()); - List cfIds = getCalculatedFieldIds(proto); - if (cfIds.contains(ctx.getCfId())) { - callback.onSuccess(CALLBACKS_PER_CF); - } else { - if (proto.getTsDataCount() > 0) { - processArgumentValuesUpdate(ctx, cfIds, callback, mapToArguments(ctx, msg.getEntityId(), proto.getTsDataList()), toTbMsgId(proto), toTbMsgType(proto)); - } else if (proto.getAttrDataCount() > 0) { - processArgumentValuesUpdate(ctx, cfIds, callback, mapToArguments(ctx, msg.getEntityId(), proto.getScope(), proto.getAttrDataList()), toTbMsgId(proto), toTbMsgType(proto)); - } else { + try { + List cfIds = getCalculatedFieldIds(proto); + if (cfIds.contains(ctx.getCfId())) { callback.onSuccess(CALLBACKS_PER_CF); + } else { + if (proto.getTsDataCount() > 0) { + processArgumentValuesUpdate(ctx, cfIds, callback, mapToArguments(ctx, msg.getEntityId(), proto.getTsDataList()), toTbMsgId(proto), toTbMsgType(proto)); + } else if (proto.getAttrDataCount() > 0) { + processArgumentValuesUpdate(ctx, cfIds, callback, mapToArguments(ctx, msg.getEntityId(), proto.getScope(), proto.getAttrDataList()), toTbMsgId(proto), toTbMsgType(proto)); + } else { + callback.onSuccess(CALLBACKS_PER_CF); + } } + } catch (Exception e) { + if (DebugModeUtil.isDebugFailuresAvailable(ctx.getCalculatedField())) { + systemContext.persistCalculatedFieldDebugEvent(tenantId, ctx.getCfId(), entityId, null, null, null, null, e); + } + callback.onFailure(e); } } private void process(CalculatedFieldCtx ctx, CalculatedFieldTelemetryMsgProto proto, Collection cfIds, List cfIdList, MultipleTbCallback callback) { - if (cfIds.contains(ctx.getCfId())) { - callback.onSuccess(CALLBACKS_PER_CF); - } else { - if (proto.getTsDataCount() > 0) { - processTelemetry(ctx, proto, cfIdList, callback); - } else if (proto.getAttrDataCount() > 0) { - processAttributes(ctx, proto, cfIdList, callback); - } else { + try { + if (cfIds.contains(ctx.getCfId())) { callback.onSuccess(CALLBACKS_PER_CF); + } else { + if (proto.getTsDataCount() > 0) { + processTelemetry(ctx, proto, cfIdList, callback); + } else if (proto.getAttrDataCount() > 0) { + processAttributes(ctx, proto, cfIdList, callback); + } else { + callback.onSuccess(CALLBACKS_PER_CF); + } + } + } catch (Exception e) { + if (DebugModeUtil.isDebugFailuresAvailable(ctx.getCalculatedField())) { + systemContext.persistCalculatedFieldDebugEvent(tenantId, ctx.getCfId(), entityId, null, null, null, null, e); } + callback.onFailure(e); } } @@ -197,18 +218,12 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM state = getOrInitState(ctx); justRestored = true; } - try { - if (state.updateState(newArgValues) || justRestored) { - cfIdList = new ArrayList<>(cfIdList); - cfIdList.add(ctx.getCfId()); - processStateIfReady(ctx, cfIdList, state, tbMsgId, tbMsgType, callback); - } else { - callback.onSuccess(CALLBACKS_PER_CF); - } - } catch (Exception e) { - if (DebugModeUtil.isDebugFailuresAvailable(ctx.getCalculatedField())) { - systemContext.persistCalculatedFieldDebugEvent(tenantId, ctx.getCfId(), entityId, state.getArguments(), tbMsgId, tbMsgType, null, e); - } + if (state.updateState(newArgValues) || justRestored) { + cfIdList = new ArrayList<>(cfIdList); + cfIdList.add(ctx.getCfId()); + processStateIfReady(ctx, cfIdList, state, tbMsgId, tbMsgType, callback); + } else { + callback.onSuccess(CALLBACKS_PER_CF); } } @@ -218,44 +233,31 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM if (state != null) { return state; } else { - try { - ListenableFuture stateFuture = systemContext.getCalculatedFieldProcessingService().fetchStateFromDb(ctx, entityId); - // Ugly but necessary. We do not expect to often fetch data from DB. Only once per pair lifetime. - // This call happens while processing the CF pack from the queue consumer. So the timeout should be relatively low. - // Alternatively, we can fetch the state outside the actor system and push separate command to create this actor, - // but this will significantly complicate the code. - state = stateFuture.get(1, TimeUnit.MINUTES); - states.put(ctx.getCfId(), state); - } catch (Exception e) { - if (DebugModeUtil.isDebugFailuresAvailable(ctx.getCalculatedField())) { - systemContext.persistCalculatedFieldDebugEvent(tenantId, ctx.getCfId(), entityId, null, null, null, null, e); - } - throw new RuntimeException(e); - } + ListenableFuture stateFuture = systemContext.getCalculatedFieldProcessingService().fetchStateFromDb(ctx, entityId); + // Ugly but necessary. We do not expect to often fetch data from DB. Only once per pair lifetime. + // This call happens while processing the CF pack from the queue consumer. So the timeout should be relatively low. + // Alternatively, we can fetch the state outside the actor system and push separate command to create this actor, + // but this will significantly complicate the code. + state = stateFuture.get(1, TimeUnit.MINUTES); + states.put(ctx.getCfId(), state); } return state; } @SneakyThrows private void processStateIfReady(CalculatedFieldCtx ctx, List cfIdList, CalculatedFieldState state, UUID tbMsgId, TbMsgType tbMsgType, TbCallback callback) { - try { - CalculatedFieldEntityCtxId ctxId = new CalculatedFieldEntityCtxId(tenantId, ctx.getCfId(), entityId); - if (state.isReady() && ctx.isInitialized()) { - CalculatedFieldResult calculationResult = state.performCalculation(ctx).get(5, TimeUnit.SECONDS); - state.checkStateSize(ctxId, ctx.getMaxStateSizeInKBytes()); - cfService.pushMsgToRuleEngine(tenantId, entityId, calculationResult, cfIdList, callback); - if (DebugModeUtil.isDebugAllAvailable(ctx.getCalculatedField())) { - systemContext.persistCalculatedFieldDebugEvent(tenantId, ctx.getCfId(), entityId, state.getArguments(), tbMsgId, tbMsgType, JacksonUtil.writeValueAsString(calculationResult.getResult()), null); - } - } else { - callback.onSuccess(); // State was updated but no calculation performed; - } - cfStateService.persistState(ctxId, state, callback); - } catch (Exception e) { - if (DebugModeUtil.isDebugFailuresAvailable(ctx.getCalculatedField())) { - systemContext.persistCalculatedFieldDebugEvent(tenantId, ctx.getCfId(), entityId, state.getArguments(), tbMsgId, tbMsgType, null, e); + CalculatedFieldEntityCtxId ctxId = new CalculatedFieldEntityCtxId(tenantId, ctx.getCfId(), entityId); + if (state.isReady() && ctx.isInitialized()) { + CalculatedFieldResult calculationResult = state.performCalculation(ctx).get(5, TimeUnit.SECONDS); + state.checkStateSize(ctxId, ctx.getMaxStateSizeInKBytes()); + cfService.pushMsgToRuleEngine(tenantId, entityId, calculationResult, cfIdList, callback); + if (DebugModeUtil.isDebugAllAvailable(ctx.getCalculatedField())) { + systemContext.persistCalculatedFieldDebugEvent(tenantId, ctx.getCfId(), entityId, state.getArguments(), tbMsgId, tbMsgType, JacksonUtil.writeValueAsString(calculationResult.getResult()), null); } + } else { + callback.onSuccess(); // State was updated but no calculation performed; } + cfStateService.persistState(ctxId, state, callback); } private Map mapToArguments(CalculatedFieldCtx ctx, List data) { diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/TsRollingArgumentEntry.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/TsRollingArgumentEntry.java index 92722fb96b..e489c8ecef 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/TsRollingArgumentEntry.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/TsRollingArgumentEntry.java @@ -119,7 +119,8 @@ public class TsRollingArgumentEntry implements ArgumentEntry { } cleanupExpiredRecords(); } catch (Exception e) { - throw new IllegalArgumentException("Time series rolling arguments supports only numeric values."); + log.warn("Time series rolling arguments supports only numeric values."); +// throw new IllegalArgumentException("Time series rolling arguments supports only numeric values."); } }