Browse Source

replaced cf debug events creation

pull/12678/head
IrynaMatveieva 1 year ago
parent
commit
baf0e948db
  1. 16
      application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
  2. 124
      application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java
  3. 3
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/TsRollingArgumentEntry.java

16
application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java

@ -60,7 +60,6 @@ import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.common.msg.tools.TbRateLimits;
import org.thingsboard.server.common.msg.tools.TbRateLimitsException;
import org.thingsboard.server.common.stats.TbApiUsageReportClient;
import org.thingsboard.server.dao.alarm.AlarmCommentService;
import org.thingsboard.server.dao.asset.AssetProfileService;
@ -817,11 +816,7 @@ public class ActorSystemContext {
}
public void persistCalculatedFieldDebugEvent(TenantId tenantId, CalculatedFieldId calculatedFieldId, EntityId entityId, Map<String, ArgumentEntry> arguments, UUID tbMsgId, TbMsgType tbMsgType, String result, Throwable error) {
if (calculatedFieldsDebugPerTenantEnabled) {
if (!rateLimitService.checkRateLimit(LimitedApi.CALCULATED_FIELD_DEBUG_EVENTS, (Object) tenantId, calculatedFieldsDebugPerTenantLimitsConfiguration)) {
log.trace("[{}] Calculated field debug event limits exceeded!", tenantId);
throw new TbRateLimitsException("Failed to persist calculated field debug event due to rate limits!");
}
if (checkLimits(tenantId)) {
try {
CalculatedFieldDebugEvent.CalculatedFieldDebugEventBuilder eventBuilder = CalculatedFieldDebugEvent.builder()
.tenantId(tenantId)
@ -856,6 +851,15 @@ public class ActorSystemContext {
}
}
private boolean checkLimits(TenantId tenantId) {
if (calculatedFieldsDebugPerTenantEnabled &&
!rateLimitService.checkRateLimit(LimitedApi.CALCULATED_FIELD_DEBUG_EVENTS, (Object) tenantId, calculatedFieldsDebugPerTenantLimitsConfiguration)) {
log.trace("[{}] Calculated field debug event limits exceeded!", tenantId);
return false;
}
return true;
}
public static Exception toException(Throwable error) {
return Exception.class.isInstance(error) ? (Exception) error : new Exception(error);
}

124
application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java

@ -107,8 +107,15 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
log.info("Force reinitialization of CF: [{}].", cfCtx.getCfId());
states.remove(cfCtx.getCfId());
}
var cfState = getOrInitState(cfCtx);
processStateIfReady(cfCtx, Collections.singletonList(cfCtx.getCfId()), cfState, null, null, msg.getCallback());
try {
var cfState = getOrInitState(cfCtx);
processStateIfReady(cfCtx, Collections.singletonList(cfCtx.getCfId()), cfState, null, null, msg.getCallback());
} catch (Exception e) {
if (DebugModeUtil.isDebugFailuresAvailable(cfCtx.getCalculatedField())) {
systemContext.persistCalculatedFieldDebugEvent(tenantId, cfCtx.getCfId(), entityId, null, null, null, null, e);
}
msg.getCallback().onFailure(e);
}
}
public void process(CalculatedFieldEntityDeleteMsg msg) {
@ -146,31 +153,45 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
var proto = msg.getProto();
var ctx = msg.getCtx();
var callback = new MultipleTbCallback(CALLBACKS_PER_CF, msg.getCallback());
List<CalculatedFieldId> cfIds = getCalculatedFieldIds(proto);
if (cfIds.contains(ctx.getCfId())) {
callback.onSuccess(CALLBACKS_PER_CF);
} else {
if (proto.getTsDataCount() > 0) {
processArgumentValuesUpdate(ctx, cfIds, callback, mapToArguments(ctx, msg.getEntityId(), proto.getTsDataList()), toTbMsgId(proto), toTbMsgType(proto));
} else if (proto.getAttrDataCount() > 0) {
processArgumentValuesUpdate(ctx, cfIds, callback, mapToArguments(ctx, msg.getEntityId(), proto.getScope(), proto.getAttrDataList()), toTbMsgId(proto), toTbMsgType(proto));
} else {
try {
List<CalculatedFieldId> cfIds = getCalculatedFieldIds(proto);
if (cfIds.contains(ctx.getCfId())) {
callback.onSuccess(CALLBACKS_PER_CF);
} else {
if (proto.getTsDataCount() > 0) {
processArgumentValuesUpdate(ctx, cfIds, callback, mapToArguments(ctx, msg.getEntityId(), proto.getTsDataList()), toTbMsgId(proto), toTbMsgType(proto));
} else if (proto.getAttrDataCount() > 0) {
processArgumentValuesUpdate(ctx, cfIds, callback, mapToArguments(ctx, msg.getEntityId(), proto.getScope(), proto.getAttrDataList()), toTbMsgId(proto), toTbMsgType(proto));
} else {
callback.onSuccess(CALLBACKS_PER_CF);
}
}
} catch (Exception e) {
if (DebugModeUtil.isDebugFailuresAvailable(ctx.getCalculatedField())) {
systemContext.persistCalculatedFieldDebugEvent(tenantId, ctx.getCfId(), entityId, null, null, null, null, e);
}
callback.onFailure(e);
}
}
private void process(CalculatedFieldCtx ctx, CalculatedFieldTelemetryMsgProto proto, Collection<CalculatedFieldId> cfIds, List<CalculatedFieldId> cfIdList, MultipleTbCallback callback) {
if (cfIds.contains(ctx.getCfId())) {
callback.onSuccess(CALLBACKS_PER_CF);
} else {
if (proto.getTsDataCount() > 0) {
processTelemetry(ctx, proto, cfIdList, callback);
} else if (proto.getAttrDataCount() > 0) {
processAttributes(ctx, proto, cfIdList, callback);
} else {
try {
if (cfIds.contains(ctx.getCfId())) {
callback.onSuccess(CALLBACKS_PER_CF);
} else {
if (proto.getTsDataCount() > 0) {
processTelemetry(ctx, proto, cfIdList, callback);
} else if (proto.getAttrDataCount() > 0) {
processAttributes(ctx, proto, cfIdList, callback);
} else {
callback.onSuccess(CALLBACKS_PER_CF);
}
}
} catch (Exception e) {
if (DebugModeUtil.isDebugFailuresAvailable(ctx.getCalculatedField())) {
systemContext.persistCalculatedFieldDebugEvent(tenantId, ctx.getCfId(), entityId, null, null, null, null, e);
}
callback.onFailure(e);
}
}
@ -197,18 +218,12 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
state = getOrInitState(ctx);
justRestored = true;
}
try {
if (state.updateState(newArgValues) || justRestored) {
cfIdList = new ArrayList<>(cfIdList);
cfIdList.add(ctx.getCfId());
processStateIfReady(ctx, cfIdList, state, tbMsgId, tbMsgType, callback);
} else {
callback.onSuccess(CALLBACKS_PER_CF);
}
} catch (Exception e) {
if (DebugModeUtil.isDebugFailuresAvailable(ctx.getCalculatedField())) {
systemContext.persistCalculatedFieldDebugEvent(tenantId, ctx.getCfId(), entityId, state.getArguments(), tbMsgId, tbMsgType, null, e);
}
if (state.updateState(newArgValues) || justRestored) {
cfIdList = new ArrayList<>(cfIdList);
cfIdList.add(ctx.getCfId());
processStateIfReady(ctx, cfIdList, state, tbMsgId, tbMsgType, callback);
} else {
callback.onSuccess(CALLBACKS_PER_CF);
}
}
@ -218,44 +233,31 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
if (state != null) {
return state;
} else {
try {
ListenableFuture<CalculatedFieldState> stateFuture = systemContext.getCalculatedFieldProcessingService().fetchStateFromDb(ctx, entityId);
// Ugly but necessary. We do not expect to often fetch data from DB. Only once per <Entity, CalculatedField> pair lifetime.
// This call happens while processing the CF pack from the queue consumer. So the timeout should be relatively low.
// Alternatively, we can fetch the state outside the actor system and push separate command to create this actor,
// but this will significantly complicate the code.
state = stateFuture.get(1, TimeUnit.MINUTES);
states.put(ctx.getCfId(), state);
} catch (Exception e) {
if (DebugModeUtil.isDebugFailuresAvailable(ctx.getCalculatedField())) {
systemContext.persistCalculatedFieldDebugEvent(tenantId, ctx.getCfId(), entityId, null, null, null, null, e);
}
throw new RuntimeException(e);
}
ListenableFuture<CalculatedFieldState> stateFuture = systemContext.getCalculatedFieldProcessingService().fetchStateFromDb(ctx, entityId);
// Ugly but necessary. We do not expect to often fetch data from DB. Only once per <Entity, CalculatedField> pair lifetime.
// This call happens while processing the CF pack from the queue consumer. So the timeout should be relatively low.
// Alternatively, we can fetch the state outside the actor system and push separate command to create this actor,
// but this will significantly complicate the code.
state = stateFuture.get(1, TimeUnit.MINUTES);
states.put(ctx.getCfId(), state);
}
return state;
}
@SneakyThrows
private void processStateIfReady(CalculatedFieldCtx ctx, List<CalculatedFieldId> cfIdList, CalculatedFieldState state, UUID tbMsgId, TbMsgType tbMsgType, TbCallback callback) {
try {
CalculatedFieldEntityCtxId ctxId = new CalculatedFieldEntityCtxId(tenantId, ctx.getCfId(), entityId);
if (state.isReady() && ctx.isInitialized()) {
CalculatedFieldResult calculationResult = state.performCalculation(ctx).get(5, TimeUnit.SECONDS);
state.checkStateSize(ctxId, ctx.getMaxStateSizeInKBytes());
cfService.pushMsgToRuleEngine(tenantId, entityId, calculationResult, cfIdList, callback);
if (DebugModeUtil.isDebugAllAvailable(ctx.getCalculatedField())) {
systemContext.persistCalculatedFieldDebugEvent(tenantId, ctx.getCfId(), entityId, state.getArguments(), tbMsgId, tbMsgType, JacksonUtil.writeValueAsString(calculationResult.getResult()), null);
}
} else {
callback.onSuccess(); // State was updated but no calculation performed;
}
cfStateService.persistState(ctxId, state, callback);
} catch (Exception e) {
if (DebugModeUtil.isDebugFailuresAvailable(ctx.getCalculatedField())) {
systemContext.persistCalculatedFieldDebugEvent(tenantId, ctx.getCfId(), entityId, state.getArguments(), tbMsgId, tbMsgType, null, e);
CalculatedFieldEntityCtxId ctxId = new CalculatedFieldEntityCtxId(tenantId, ctx.getCfId(), entityId);
if (state.isReady() && ctx.isInitialized()) {
CalculatedFieldResult calculationResult = state.performCalculation(ctx).get(5, TimeUnit.SECONDS);
state.checkStateSize(ctxId, ctx.getMaxStateSizeInKBytes());
cfService.pushMsgToRuleEngine(tenantId, entityId, calculationResult, cfIdList, callback);
if (DebugModeUtil.isDebugAllAvailable(ctx.getCalculatedField())) {
systemContext.persistCalculatedFieldDebugEvent(tenantId, ctx.getCfId(), entityId, state.getArguments(), tbMsgId, tbMsgType, JacksonUtil.writeValueAsString(calculationResult.getResult()), null);
}
} else {
callback.onSuccess(); // State was updated but no calculation performed;
}
cfStateService.persistState(ctxId, state, callback);
}
private Map<String, ArgumentEntry> mapToArguments(CalculatedFieldCtx ctx, List<TsKvProto> data) {

3
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/TsRollingArgumentEntry.java

@ -119,7 +119,8 @@ public class TsRollingArgumentEntry implements ArgumentEntry {
}
cleanupExpiredRecords();
} catch (Exception e) {
throw new IllegalArgumentException("Time series rolling arguments supports only numeric values.");
log.warn("Time series rolling arguments supports only numeric values.");
// throw new IllegalArgumentException("Time series rolling arguments supports only numeric values.");
}
}

Loading…
Cancel
Save