@ -15,24 +15,18 @@
* /
package org.thingsboard.server.service.query ;
import com.fasterxml.jackson.databind.node.ArrayNode ;
import com.fasterxml.jackson.databind.node.ObjectNode ;
import com.google.common.util.concurrent.FutureCallback ;
import com.google.common.util.concurrent.Futures ;
import com.google.common.util.concurrent.ListenableFuture ;
import lombok.extern.slf4j.Slf4j ;
import org.checkerframework.checker.nullness.qual.Nullable ;
import org.springframework.beans.factory.annotation.Autowired ;
import org.springframework.beans.factory.annotation.Value ;
import org.springframework.http.HttpStatus ;
import org.springframework.http.ResponseEntity ;
import org.springframework.stereotype.Service ;
import org.springframework.util.CollectionUtils ;
import org.springframework.web.context.request.async.DeferredResult ;
import org.thingsboard.common.util.JacksonUtil ;
import org.thingsboard.common.util.KvUtil ;
import org.thingsboard.server.common.data.AttributeScope ;
import org.thingsboard.server.common.data.EntityType ;
import org.thingsboard.server.common.data.exception.ThingsboardErrorCode ;
import org.thingsboard.server.common.data.exception.ThingsboardException ;
import org.thingsboard.server.common.data.id.EntityId ;
import org.thingsboard.server.common.data.id.TenantId ;
import org.thingsboard.server.common.data.kv.AttributeKvEntry ;
@ -40,6 +34,7 @@ import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.query.AlarmCountQuery ;
import org.thingsboard.server.common.data.query.AlarmData ;
import org.thingsboard.server.common.data.query.AlarmDataQuery ;
import org.thingsboard.server.common.data.query.AvailableEntityKeys ;
import org.thingsboard.server.common.data.query.ComplexFilterPredicate ;
import org.thingsboard.server.common.data.query.DynamicValue ;
import org.thingsboard.server.common.data.query.EntityCountQuery ;
@ -56,16 +51,13 @@ import org.thingsboard.server.common.data.query.SimpleKeyFilterPredicate;
import org.thingsboard.server.dao.alarm.AlarmService ;
import org.thingsboard.server.dao.attributes.AttributesService ;
import org.thingsboard.server.dao.entity.EntityService ;
import org.thingsboard.server.dao.model.ModelConstants ;
import org.thingsboard.server.dao.sql.query.EntityKeyMapping ;
import org.thingsboard.server.dao.timeseries.TimeseriesService ;
import org.thingsboard.server.queue.util.TbCoreComponent ;
import org.thingsboard.server.service.executors.DbCallbackExecutorService ;
import org.thingsboard.server.service.security.AccessValidator ;
import org.thingsboard.server.service.security.model.SecurityUser ;
import java.util.ArrayList ;
import java.util.Collection ;
import java.util.Collections ;
import java.util.LinkedHashMap ;
import java.util.List ;
@ -73,9 +65,10 @@ import java.util.Map;
import java.util.Optional ;
import java.util.Set ;
import java.util.concurrent.ExecutionException ;
import java.util.function.Consumer ;
import java.util.stream.Collectors ;
import static com.google.common.util.concurrent.Futures.immediateFuture ;
@Service
@Slf4j
@TbCoreComponent
@ -138,20 +131,12 @@ public class DefaultEntityQueryService implements EntityQueryService {
}
private < T > void resolveDynamicValue ( DynamicValue < T > dynamicValue , SecurityUser user , FilterPredicateType predicateType ) {
EntityId entityId ;
switch ( dynamicValue . getSourceType ( ) ) {
case CURRENT_TENANT :
entityId = user . getTenantId ( ) ;
break ;
case CURRENT_CUSTOMER :
entityId = user . getCustomerId ( ) ;
break ;
case CURRENT_USER :
entityId = user . getId ( ) ;
break ;
default :
throw new RuntimeException ( "Not supported operation for source type: {" + dynamicValue . getSourceType ( ) + "}" ) ;
}
EntityId entityId = switch ( dynamicValue . getSourceType ( ) ) {
case CURRENT_TENANT - > user . getTenantId ( ) ;
case CURRENT_CUSTOMER - > user . getCustomerId ( ) ;
case CURRENT_USER - > user . getId ( ) ;
default - > throw new RuntimeException ( "Not supported operation for source type: {" + dynamicValue . getSourceType ( ) + "}" ) ;
} ;
try {
Optional < AttributeKvEntry > valueOpt = attributesService . find ( user . getTenantId ( ) , entityId ,
@ -242,101 +227,51 @@ public class DefaultEntityQueryService implements EntityQueryService {
}
@Override
public DeferredResult < ResponseEntity > getKeysByQuery ( SecurityUser securityUser , TenantId tenantId , EntityDataQuery query ,
boolean isTimeseries , boolean isAttributes , String attributesScope ) {
final DeferredResult < ResponseEntity > response = new DeferredResult < > ( ) ;
public ListenableFuture < AvailableEntityKeys > getKeysByQuery ( SecurityUser securityUser , TenantId tenantId , EntityDataQuery query ,
boolean isTimeseries , boolean isAttributes , AttributeScope scope ) {
if ( ! isAttributes & & ! isTimeseries ) {
replyWithEmptyResponse ( response ) ;
return response ;
return immediateFuture ( AvailableEntityKeys . none ( ) ) ;
}
List < EntityId > ids = this . findEntityDataByQuery ( securityUser , query ) . getData ( ) . stream ( )
List < EntityId > ids = findEntityDataByQuery ( securityUser , query ) . getData ( ) . stream ( )
. map ( EntityData : : getEntityId )
. collect ( Collectors . toList ( ) ) ;
. toList ( ) ;
if ( ids . isEmpty ( ) ) {
replyWithEmptyResponse ( response ) ;
return response ;
return immediateFuture ( AvailableEntityKeys . none ( ) ) ;
}
Set < EntityType > types = ids . stream ( ) . map ( EntityId : : getEntityType ) . collect ( Collectors . toSet ( ) ) ;
final ListenableFuture < List < String > > timeseriesKeysFuture ;
final ListenableFuture < List < String > > attributesKeysFuture ;
ListenableFuture < List < String > > timeseriesKeysFuture ;
ListenableFuture < List < String > > attributesKeysFuture ;
if ( isTimeseries ) {
timeseriesKeysFuture = dbCallbackExecutor . submit ( ( ) - > timeseriesService . findAllKeysByEntityIds ( tenantId , ids ) ) ;
timeseriesKeysFuture = timeseriesService . findAllKeysByEntityIdsAsync ( tenantId , ids ) ;
} else {
timeseriesKeysFuture = null ;
timeseriesKeysFuture = immediateFuture ( Collections . emptyList ( ) ) ;
}
if ( isAttributes ) {
Map < EntityType , List < EntityId > > typesMap = ids . stream ( ) . collect ( Collectors . groupingBy ( EntityId : : getEntityType ) ) ;
List < ListenableFuture < List < String > > > futures = new ArrayList < > ( typesMap . size ( ) ) ;
typesMap . forEach ( ( type , entityIds ) - > futures . add ( dbCallbackExecutor . submit ( ( ) - > attributesService . findAllKeysByEntityIds ( tenantId , entityIds , attribute sS cope) ) ) ) ;
typesMap . forEach ( ( type , entityIds ) - > futures . add ( dbCallbackExecutor . submit ( ( ) - > attributesService . findAllKeysByEntityIds ( tenantId , entityIds , scope ) ) ) ) ;
attributesKeysFuture = Futures . transform ( Futures . allAsList ( futures ) , lists - > {
if ( CollectionUtils . isEmpty ( lists ) ) {
return Collections . emptyList ( ) ;
}
return lists . stream ( ) . flatMap ( List : : stream ) . distinct ( ) . sorted ( ) . collect ( Collectors . toList ( ) ) ;
} , dbCallbackExecutor ) ;
} else {
attributesKeysFuture = null ;
}
if ( isTimeseries & & isAttributes ) {
Futures . whenAllComplete ( timeseriesKeysFuture , attributesKeysFuture ) . run ( ( ) - > {
try {
replyWithResponse ( response , types , timeseriesKeysFuture . get ( ) , attributesKeysFuture . get ( ) ) ;
} catch ( Exception e ) {
log . error ( "Failed to fetch timeseries and attributes keys!" , e ) ;
AccessValidator . handleError ( e , response , HttpStatus . INTERNAL_SERVER_ERROR ) ;
}
return lists . stream ( ) . flatMap ( List : : stream ) . distinct ( ) . sorted ( ) . toList ( ) ;
} , dbCallbackExecutor ) ;
} else if ( isTimeseries ) {
addCallback ( timeseriesKeysFuture , keys - > replyWithResponse ( response , types , keys , null ) ,
error - > {
log . error ( "Failed to fetch timeseries keys!" , error ) ;
AccessValidator . handleError ( error , response , HttpStatus . INTERNAL_SERVER_ERROR ) ;
} ) ;
} else {
addCallback ( attributesKeysFuture , keys - > replyWithResponse ( response , types , null , keys ) ,
error - > {
log . error ( "Failed to fetch attributes keys!" , error ) ;
AccessValidator . handleError ( error , response , HttpStatus . INTERNAL_SERVER_ERROR ) ;
} ) ;
attributesKeysFuture = immediateFuture ( Collections . emptyList ( ) ) ;
}
return response ;
}
private void replyWithResponse ( DeferredResult < ResponseEntity > response , Set < EntityType > types , List < String > timeseriesKeys , List < String > attributesKeys ) {
ObjectNode json = JacksonUtil . newObjectNode ( ) ;
addItemsToArrayNode ( json . putArray ( "entityTypes" ) , types ) ;
addItemsToArrayNode ( json . putArray ( "timeseries" ) , timeseriesKeys ) ;
addItemsToArrayNode ( json . putArray ( "attribute" ) , attributesKeys ) ;
response . setResult ( new ResponseEntity < > ( json , HttpStatus . OK ) ) ;
}
private void replyWithEmptyResponse ( DeferredResult < ResponseEntity > response ) {
replyWithResponse ( response , Collections . emptySet ( ) , Collections . emptyList ( ) , Collections . emptyList ( ) ) ;
}
private void addItemsToArrayNode ( ArrayNode arrayNode , Collection < ? > collection ) {
if ( ! CollectionUtils . isEmpty ( collection ) ) {
collection . forEach ( item - > arrayNode . add ( item . toString ( ) ) ) ;
}
}
private void addCallback ( ListenableFuture < List < String > > future , Consumer < List < String > > success , Consumer < Throwable > error ) {
Futures . addCallback ( future , new FutureCallback < List < String > > ( ) {
@Override
public void onSuccess ( @Nullable List < String > keys ) {
success . accept ( keys ) ;
}
@Override
public void onFailure ( Throwable t ) {
error . accept ( t ) ;
}
} , dbCallbackExecutor ) ;
return Futures . whenAllComplete ( timeseriesKeysFuture , attributesKeysFuture )
. call ( ( ) - > {
try {
return new AvailableEntityKeys ( types , Futures . getDone ( timeseriesKeysFuture ) , Futures . getDone ( attributesKeysFuture ) ) ;
} catch ( ExecutionException e ) {
throw new ThingsboardException ( e . getCause ( ) , ThingsboardErrorCode . DATABASE ) ;
}
} , dbCallbackExecutor ) ;
}
}