@ -18,9 +18,9 @@ package org.thingsboard.rule.engine.telemetry;
import com.google.common.util.concurrent.FutureCallback ;
import com.google.common.util.concurrent.Futures ;
import com.google.common.util.concurrent.ListenableFuture ;
import com.google.common.util.concurrent.MoreExecutors ;
import com.google.gson.JsonParser ;
import lombok.extern.slf4j.Slf4j ;
import org.springframework.data.util.Pair ;
import org.thingsboard.rule.engine.api.RuleNode ;
import org.thingsboard.rule.engine.api.TbContext ;
import org.thingsboard.rule.engine.api.TbNode ;
@ -36,7 +36,9 @@ import org.thingsboard.server.common.transport.adaptor.JsonConverter;
import java.util.ArrayList ;
import java.util.List ;
import java.util.Set ;
import java.util.Map ;
import java.util.Objects ;
import java.util.function.Function ;
import java.util.stream.Collectors ;
import static org.thingsboard.server.common.data.DataConstants.CLIENT_SCOPE ;
@ -53,7 +55,8 @@ import static org.thingsboard.server.common.data.msg.TbMsgType.POST_ATTRIBUTES_R
nodeDetails = "Saves entity attributes based on configurable scope parameter. Expects messages with 'POST_ATTRIBUTES_REQUEST' message type. " +
"If upsert(update/insert) operation is completed successfully rule node will send the incoming message via <b>Success</b> chain, otherwise, <b>Failure</b> chain is used. " +
"Additionally if checkbox <b>Send attributes updated notification</b> is set to true, rule node will put the \"Attributes Updated\" " +
"event for <b>SHARED_SCOPE</b> and <b>SERVER_SCOPE</b> attributes updates to the corresponding rule engine queue." ,
"event for <b>SHARED_SCOPE</b> and <b>SERVER_SCOPE</b> attributes updates to the corresponding rule engine queue." +
"Performance checkbox 'Update Attributes On Value Change' will skip attributes overwrites for values with no changes (avoid concurrent writes because this check is not transactional; will not update 'Last updated time' for skipped attributes)." ,
uiResources = { "static/rulenode/rulenode-core-config.js" } ,
configDirective = "tbActionNodeAttributesConfig" ,
icon = "file_upload"
@ -84,49 +87,62 @@ public class TbMsgAttributesNode implements TbNode {
}
String scope = getScope ( msg . getMetaData ( ) . getValue ( SCOPE ) ) ;
boolean sendAttributesUpdateNotification = checkSendNotification ( scope ) ;
ListenableFuture < List < AttributeKvEntry > > findFuture ;
if ( config . isUpdateAttributesOnValueChange ( ) ) {
List < String > keys = newAttributes . stream ( ) . map ( KvEntry : : getKey ) . collect ( Collectors . toList ( ) ) ;
findFuture = ctx . getAttributesService ( ) . find ( ctx . getTenantId ( ) , msg . getOriginator ( ) , scope , keys ) ;
} else {
findFuture = Futures . immediateFuture ( null ) ;
if ( ! config . isUpdateAttributesOnValueChange ( ) ) {
saveAttr ( newAttributes , ctx , msg , scope , sendAttributesUpdateNotification ) ;
return ;
}
List < String > keys = newAttributes . stream ( ) . map ( KvEntry : : getKey ) . collect ( Collectors . toList ( ) ) ;
ListenableFuture < List < AttributeKvEntry > > findFuture = ctx . getAttributesService ( ) . find ( ctx . getTenantId ( ) , msg . getOriginator ( ) , scope , keys ) ;
Futures . addCallback ( findFuture , new FutureCallback < > ( ) {
@Override
public void onSuccess ( List < AttributeKvEntry > currentAttributes ) {
List < AttributeKvEntry > attributes = newAttributes ;
if ( config . isUpdateAttributesOnValueChange ( )
& & currentAttributes ! = null
& & ! currentAttributes . isEmpty ( ) ) {
Set < Pair < String , Object > > currentKeyValuePairs = currentAttributes . stream ( )
. map ( item - > Pair . of ( item . getKey ( ) , item . getValue ( ) ) )
. collect ( Collectors . toSet ( ) ) ;
attributes = attributes . stream ( )
. filter ( item - > ! currentKeyValuePairs . contains ( Pair . of ( item . getKey ( ) , item . getValue ( ) ) ) )
. collect ( Collectors . toList ( ) ) ;
}
if ( attributes . isEmpty ( ) ) {
ctx . tellSuccess ( msg ) ;
} else {
ctx . getTelemetryService ( ) . saveAndNotify (
ctx . getTenantId ( ) ,
msg . getOriginator ( ) ,
scope ,
attributes ,
checkNotifyDevice ( msg . getMetaData ( ) . getValue ( NOTIFY_DEVICE_METADATA_KEY ) ) ,
sendAttributesUpdateNotification ?
new AttributesUpdateNodeCallback ( ctx , msg , scope , attributes ) :
new TelemetryNodeCallback ( ctx , msg )
) ;
}
List < AttributeKvEntry > attributesChanged = filterChangedAttr ( currentAttributes , newAttributes ) ;
saveAttr ( attributesChanged , ctx , msg , scope , sendAttributesUpdateNotification ) ;
}
@Override
public void onFailure ( Throwable throwable ) {
ctx . tellFailure ( msg , throwable ) ;
}
} , ctx . getDbCallbackExecutor ( ) ) ;
} , MoreExecutors . directExecutor ( ) ) ;
}
void saveAttr ( List < AttributeKvEntry > attributes , TbContext ctx , TbMsg msg , String scope , boolean sendAttributesUpdateNotification ) {
if ( attributes . isEmpty ( ) ) {
ctx . tellSuccess ( msg ) ;
return ;
}
ctx . getTelemetryService ( ) . saveAndNotify (
ctx . getTenantId ( ) ,
msg . getOriginator ( ) ,
scope ,
attributes ,
checkNotifyDevice ( msg . getMetaData ( ) . getValue ( NOTIFY_DEVICE_METADATA_KEY ) ) ,
sendAttributesUpdateNotification ?
new AttributesUpdateNodeCallback ( ctx , msg , scope , attributes ) :
new TelemetryNodeCallback ( ctx , msg )
) ;
}
List < AttributeKvEntry > filterChangedAttr ( List < AttributeKvEntry > currentAttributes , List < AttributeKvEntry > newAttributes ) {
if ( currentAttributes = = null | | currentAttributes . isEmpty ( ) ) {
return newAttributes ;
}
Map < String , AttributeKvEntry > currentAttrMap = currentAttributes . stream ( )
. collect ( Collectors . toMap ( AttributeKvEntry : : getKey , Function . identity ( ) , ( existing , replacement ) - > existing ) ) ;
return newAttributes . stream ( )
. filter ( item - > {
AttributeKvEntry cacheAttr = currentAttrMap . get ( item . getKey ( ) ) ;
return cacheAttr = = null
| | ! Objects . equals ( item . getValue ( ) , cacheAttr . getValue ( ) ) //JSON and String can be equals by value, but different by type
| | ! Objects . equals ( item . getDataType ( ) , cacheAttr . getDataType ( ) ) ;
} )
. collect ( Collectors . toList ( ) ) ;
}
private boolean checkSendNotification ( String scope ) {