@ -16,7 +16,6 @@
package org.thingsboard.rule.engine.telemetry ;
import com.fasterxml.jackson.core.type.TypeReference ;
import com.fasterxml.jackson.databind.JsonNode ;
import com.google.common.util.concurrent.FutureCallback ;
import com.google.gson.JsonParser ;
import jakarta.annotation.Nullable ;
@ -43,24 +42,23 @@ import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.msg.TbMsg ;
import java.util.ArrayList ;
import java.util.Collections ;
import java.util.List ;
import java.util.Map ;
import java.util.Optional ;
import static org.thingsboard.server.common.data.DataConstants.SCOPE ;
import static org.thingsboard.server.common.data.msg.TbMsgType.ATTRIBUTES_DELETED ;
import static org.thingsboard.server.common.data.msg.TbMsgType.POST_ATTRIBUTES_REQUEST ;
import static org.thingsboard.server.common.data.msg.TbMsgType.POST_TELEMETRY_REQUEST ;
import static org.thingsboard.server.common.data.msg.TbMsgType.TIMESERIES_DELETED ;
@Slf4j
@RuleNode (
type = ComponentType . ACTION ,
name = "push to calculated fields" ,
name = "apply to calculated fields" ,
configClazz = EmptyNodeConfiguration . class ,
nodeDescription = "Pushes messages to the calculated fields for further processing" ,
nodeDetails = "Forwards incoming messages to the calculated fields, where they will be processed to compute values based on predefined calculation rules without persisting any data in the database." ,
nodeDescription = "Processes incoming messages for calculated fields" ,
nodeDetails = "This node processes incoming messages to update telemetry or attributes for predefined calculated fields without storing the original telemetry or attributes in the database. " +
"It ensures that calculated fields receive and process the necessary data without persisting the incoming values." ,
configDirective = "tbNodeEmptyConfig" ,
icon = "send "
icon = "call_made "
)
public class TbCalculatedFieldsNode implements TbNode {
@ -73,29 +71,20 @@ public class TbCalculatedFieldsNode implements TbNode {
@Override
public void onMsg ( TbContext ctx , TbMsg msg ) {
if ( ! msg . isTypeOneOf ( POST_ATTRIBUTES_REQUEST , POST_TELEMETRY_REQUEST , ATTRIBUTES_DELETED , TIMESERIES_DELETED ) ) {
ctx . tellFailure ( msg , new IllegalArgumentException ( "Unsupported msg type: " + msg . getType ( ) ) ) ;
return ;
}
String src = msg . getData ( ) ;
if ( msg . isTypeOf ( POST_TELEMETRY_REQUEST ) ) {
processPostTelemetryRequest ( ctx , msg , src ) ;
} else if ( msg . isTypeOf ( POST_ATTRIBUTES_REQUEST ) ) {
processPostAttributesRequest ( ctx , msg , src ) ;
} else if ( msg . isTypeOf ( TIMESERIES_DELETED ) ) {
processTimeSeriesDeleted ( ctx , msg , src ) ;
} else {
processAttributesDeleted ( ctx , msg , src ) ;
switch ( msg . getInternalType ( ) ) {
case POST_TELEMETRY_REQUEST - > processPostTelemetryRequest ( ctx , msg ) ;
case POST_ATTRIBUTES_REQUEST - > processPostAttributesRequest ( ctx , msg ) ;
case TIMESERIES_DELETED - > processTimeSeriesDeleted ( ctx , msg ) ;
case ATTRIBUTES_DELETED - > processAttributesDeleted ( ctx , msg ) ;
default - > ctx . tellFailure ( msg , new IllegalArgumentException ( "Unsupported msg type: " + msg . getType ( ) ) ) ;
}
}
private void processPostTelemetryRequest ( TbContext ctx , TbMsg msg , String src ) {
Map < Long , List < KvEntry > > tsKvMap = JsonConverter . convertToTelemetry ( JsonParser . parseString ( src ) , System . currentTimeMillis ( ) ) ;
private void processPostTelemetryRequest ( TbContext ctx , TbMsg msg ) {
Map < Long , List < KvEntry > > tsKvMap = JsonConverter . convertToTelemetry ( JsonParser . parseString ( msg . getData ( ) ) , System . currentTimeMillis ( ) ) ;
if ( tsKvMap . isEmpty ( ) ) {
ctx . tellFailure ( msg , new IllegalArgumentException ( "Msg body is empty: " + src ) ) ;
ctx . tellFailure ( msg , new IllegalArgumentException ( "Msg body is empty: " + msg . getData ( ) ) ) ;
return ;
}
@ -120,8 +109,8 @@ public class TbCalculatedFieldsNode implements TbNode {
ctx . getCalculatedFieldQueueService ( ) . pushRequestToQueue ( timeseriesSaveRequest , timeseriesSaveRequest . getCallback ( ) ) ;
}
private void processPostAttributesRequest ( TbContext ctx , TbMsg msg , String src ) {
List < AttributeKvEntry > newAttributes = new ArrayList < > ( JsonConverter . convertToAttributes ( JsonParser . parseString ( src ) ) ) ;
private void processPostAttributesRequest ( TbContext ctx , TbMsg msg ) {
List < AttributeKvEntry > newAttributes = new ArrayList < > ( JsonConverter . convertToAttributes ( JsonParser . parseString ( msg . getData ( ) ) ) ) ;
if ( newAttributes . isEmpty ( ) ) {
ctx . tellSuccess ( msg ) ;
@ -141,10 +130,11 @@ public class TbCalculatedFieldsNode implements TbNode {
ctx . getCalculatedFieldQueueService ( ) . pushRequestToQueue ( attributesSaveRequest , attributesSaveRequest . getCallback ( ) ) ;
}
private void processTimeSeriesDeleted ( TbContext ctx , TbMsg msg , String src ) {
JsonNode dataJson = JacksonUtil . toJsonNode ( msg . getData ( ) ) ;
List < String > keysToDelete = JacksonUtil . convertValue ( dataJson . get ( "timeSeries" ) , new TypeReference < > ( ) {
} ) ;
private void processTimeSeriesDeleted ( TbContext ctx , TbMsg msg ) {
List < String > keysToDelete = Optional . ofNullable (
JacksonUtil . convertValue ( JacksonUtil . toJsonNode ( msg . getData ( ) ) . get ( "timeseries" ) , new TypeReference < List < String > > ( ) {
} )
) . orElse ( Collections . emptyList ( ) ) ;
if ( keysToDelete . isEmpty ( ) ) {
ctx . tellSuccess ( msg ) ;
@ -161,10 +151,12 @@ public class TbCalculatedFieldsNode implements TbNode {
. callback ( new FutureCallback < List < String > > ( ) {
@Override
public void onSuccess ( @Nullable List < String > tmp ) {
ctx . tellSuccess ( msg ) ;
}
@Override
public void onFailure ( Throwable t ) {
ctx . tellFailure ( msg , t ) ;
}
} )
. build ( ) ;
@ -172,10 +164,11 @@ public class TbCalculatedFieldsNode implements TbNode {
ctx . getCalculatedFieldQueueService ( ) . pushRequestToQueue ( timeseriesDeleteRequest , keysToDelete , getCalculatedFieldCallback ( timeseriesDeleteRequest . getCallback ( ) , keysToDelete ) ) ;
}
private void processAttributesDeleted ( TbContext ctx , TbMsg msg , String src ) {
JsonNode dataJson = JacksonUtil . toJsonNode ( msg . getData ( ) ) ;
List < String > keysToDelete = JacksonUtil . convertValue ( dataJson . get ( "attributes" ) , new TypeReference < > ( ) {
} ) ;
private void processAttributesDeleted ( TbContext ctx , TbMsg msg ) {
List < String > keysToDelete = Optional . ofNullable (
JacksonUtil . convertValue ( JacksonUtil . toJsonNode ( msg . getData ( ) ) . get ( "attributes" ) , new TypeReference < List < String > > ( ) {
} )
) . orElse ( Collections . emptyList ( ) ) ;
if ( keysToDelete . isEmpty ( ) ) {
ctx . tellSuccess ( msg ) ;