|
|
|
@ -16,7 +16,6 @@ |
|
|
|
package org.thingsboard.server.actors.device; |
|
|
|
|
|
|
|
import akka.actor.ActorContext; |
|
|
|
import akka.event.LoggingAdapter; |
|
|
|
import com.datastax.driver.core.utils.UUIDs; |
|
|
|
import com.google.common.util.concurrent.FutureCallback; |
|
|
|
import com.google.common.util.concurrent.Futures; |
|
|
|
@ -26,12 +25,12 @@ import com.google.gson.JsonObject; |
|
|
|
import com.google.gson.JsonParser; |
|
|
|
import com.google.protobuf.InvalidProtocolBufferException; |
|
|
|
import lombok.extern.slf4j.Slf4j; |
|
|
|
import org.apache.commons.collections.CollectionUtils; |
|
|
|
import org.thingsboard.rule.engine.api.RpcError; |
|
|
|
import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg; |
|
|
|
import org.thingsboard.rule.engine.api.msg.DeviceNameOrTypeUpdateMsg; |
|
|
|
import org.thingsboard.server.actors.ActorSystemContext; |
|
|
|
import org.thingsboard.server.actors.shared.AbstractContextAwareMsgProcessor; |
|
|
|
import org.thingsboard.server.common.data.DataConstants; |
|
|
|
import org.thingsboard.server.common.data.Device; |
|
|
|
import org.thingsboard.server.common.data.id.DeviceId; |
|
|
|
import org.thingsboard.server.common.data.id.TenantId; |
|
|
|
@ -42,7 +41,6 @@ import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody; |
|
|
|
import org.thingsboard.server.common.msg.TbMsg; |
|
|
|
import org.thingsboard.server.common.msg.TbMsgDataType; |
|
|
|
import org.thingsboard.server.common.msg.TbMsgMetaData; |
|
|
|
import org.thingsboard.server.common.msg.cluster.ClusterEventMsg; |
|
|
|
import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest; |
|
|
|
import org.thingsboard.server.common.msg.session.SessionMsgType; |
|
|
|
import org.thingsboard.server.common.msg.timeout.DeviceActorClientSideRpcTimeoutMsg; |
|
|
|
@ -81,12 +79,14 @@ import java.util.HashSet; |
|
|
|
import java.util.LinkedHashMap; |
|
|
|
import java.util.List; |
|
|
|
import java.util.Map; |
|
|
|
import java.util.Optional; |
|
|
|
import java.util.Set; |
|
|
|
import java.util.UUID; |
|
|
|
import java.util.function.Consumer; |
|
|
|
import java.util.stream.Collectors; |
|
|
|
|
|
|
|
import static org.thingsboard.server.common.data.DataConstants.CLIENT_SCOPE; |
|
|
|
import static org.thingsboard.server.common.data.DataConstants.SHARED_SCOPE; |
|
|
|
|
|
|
|
/** |
|
|
|
* @author Andrew Shvayka |
|
|
|
*/ |
|
|
|
@ -263,10 +263,8 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { |
|
|
|
} |
|
|
|
|
|
|
|
private void handleGetAttributesRequest(ActorContext context, SessionInfoProto sessionInfo, GetAttributeRequestMsg request) { |
|
|
|
ListenableFuture<List<AttributeKvEntry>> clientAttributesFuture = getAttributeKvEntries(deviceId, DataConstants.CLIENT_SCOPE, toOptionalSet(request.getClientAttributeNamesList())); |
|
|
|
ListenableFuture<List<AttributeKvEntry>> sharedAttributesFuture = getAttributeKvEntries(deviceId, DataConstants.SHARED_SCOPE, toOptionalSet(request.getSharedAttributeNamesList())); |
|
|
|
int requestId = request.getRequestId(); |
|
|
|
Futures.addCallback(Futures.allAsList(Arrays.asList(clientAttributesFuture, sharedAttributesFuture)), new FutureCallback<List<List<AttributeKvEntry>>>() { |
|
|
|
Futures.addCallback(getAttributesKvEntries(request), new FutureCallback<List<List<AttributeKvEntry>>>() { |
|
|
|
@Override |
|
|
|
public void onSuccess(@Nullable List<List<AttributeKvEntry>> result) { |
|
|
|
GetAttributeResponseMsg responseMsg = GetAttributeResponseMsg.newBuilder() |
|
|
|
@ -287,16 +285,35 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { |
|
|
|
}); |
|
|
|
} |
|
|
|
|
|
|
|
private ListenableFuture<List<AttributeKvEntry>> getAttributeKvEntries(DeviceId deviceId, String scope, Optional<Set<String>> names) { |
|
|
|
if (names.isPresent()) { |
|
|
|
if (!names.get().isEmpty()) { |
|
|
|
return systemContext.getAttributesService().find(tenantId, deviceId, scope, names.get()); |
|
|
|
} else { |
|
|
|
return systemContext.getAttributesService().findAll(tenantId, deviceId, scope); |
|
|
|
} |
|
|
|
private ListenableFuture<List<List<AttributeKvEntry>>> getAttributesKvEntries(GetAttributeRequestMsg request) { |
|
|
|
ListenableFuture<List<AttributeKvEntry>> clientAttributesFuture; |
|
|
|
ListenableFuture<List<AttributeKvEntry>> sharedAttributesFuture; |
|
|
|
if (CollectionUtils.isEmpty(request.getClientAttributeNamesList()) && CollectionUtils.isEmpty(request.getSharedAttributeNamesList())) { |
|
|
|
clientAttributesFuture = findAllAttributesByScope(CLIENT_SCOPE); |
|
|
|
sharedAttributesFuture = findAllAttributesByScope(SHARED_SCOPE); |
|
|
|
} else if (!CollectionUtils.isEmpty(request.getClientAttributeNamesList()) && !CollectionUtils.isEmpty(request.getSharedAttributeNamesList())) { |
|
|
|
clientAttributesFuture = findAttributesByScope(toSet(request.getClientAttributeNamesList()), CLIENT_SCOPE); |
|
|
|
sharedAttributesFuture = findAttributesByScope(toSet(request.getSharedAttributeNamesList()), SHARED_SCOPE); |
|
|
|
} else if (CollectionUtils.isEmpty(request.getClientAttributeNamesList()) && !CollectionUtils.isEmpty(request.getSharedAttributeNamesList())) { |
|
|
|
clientAttributesFuture = Futures.immediateFuture(Collections.emptyList()); |
|
|
|
sharedAttributesFuture = findAttributesByScope(toSet(request.getSharedAttributeNamesList()), SHARED_SCOPE); |
|
|
|
} else { |
|
|
|
return Futures.immediateFuture(Collections.emptyList()); |
|
|
|
sharedAttributesFuture = Futures.immediateFuture(Collections.emptyList()); |
|
|
|
clientAttributesFuture = findAttributesByScope(toSet(request.getClientAttributeNamesList()), CLIENT_SCOPE); |
|
|
|
} |
|
|
|
return Futures.allAsList(Arrays.asList(clientAttributesFuture, sharedAttributesFuture)); |
|
|
|
} |
|
|
|
|
|
|
|
private ListenableFuture<List<AttributeKvEntry>> findAllAttributesByScope(String scope) { |
|
|
|
return systemContext.getAttributesService().findAll(tenantId, deviceId, scope); |
|
|
|
} |
|
|
|
|
|
|
|
private ListenableFuture<List<AttributeKvEntry>> findAttributesByScope(Set<String> attributesSet, String scope) { |
|
|
|
return systemContext.getAttributesService().find(tenantId, deviceId, scope, attributesSet); |
|
|
|
} |
|
|
|
|
|
|
|
private Set<String> toSet(List<String> strings) { |
|
|
|
return new HashSet<>(strings); |
|
|
|
} |
|
|
|
|
|
|
|
private void handlePostAttributesRequest(ActorContext context, SessionInfoProto sessionInfo, PostAttributeMsg postAttributes) { |
|
|
|
@ -368,7 +385,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { |
|
|
|
AttributeUpdateNotificationMsg.Builder notification = AttributeUpdateNotificationMsg.newBuilder(); |
|
|
|
if (msg.isDeleted()) { |
|
|
|
List<String> sharedKeys = msg.getDeletedKeys().stream() |
|
|
|
.filter(key -> DataConstants.SHARED_SCOPE.equals(key.getScope())) |
|
|
|
.filter(key -> SHARED_SCOPE.equals(key.getScope())) |
|
|
|
.map(AttributeKey::getAttributeKey) |
|
|
|
.collect(Collectors.toList()); |
|
|
|
if (!sharedKeys.isEmpty()) { |
|
|
|
@ -376,7 +393,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { |
|
|
|
hasNotificationData = true; |
|
|
|
} |
|
|
|
} else { |
|
|
|
if (DataConstants.SHARED_SCOPE.equals(msg.getScope())) { |
|
|
|
if (SHARED_SCOPE.equals(msg.getScope())) { |
|
|
|
List<AttributeKvEntry> attributes = new ArrayList<>(msg.getValues()); |
|
|
|
if (attributes.size() > 0) { |
|
|
|
List<TsKvProto> sharedUpdated = msg.getValues().stream().map(this::toTsKvProto) |
|
|
|
@ -545,14 +562,6 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { |
|
|
|
return json; |
|
|
|
} |
|
|
|
|
|
|
|
private Optional<Set<String>> toOptionalSet(List<String> strings) { |
|
|
|
if (strings == null || strings.isEmpty()) { |
|
|
|
return Optional.empty(); |
|
|
|
} else { |
|
|
|
return Optional.of(new HashSet<>(strings)); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private void sendToTransport(GetAttributeResponseMsg responseMsg, SessionInfoProto sessionInfo) { |
|
|
|
DeviceActorToTransportMsg msg = DeviceActorToTransportMsg.newBuilder() |
|
|
|
.setSessionIdMSB(sessionInfo.getSessionIdMSB()) |
|
|
|
|