diff --git a/application/src/test/java/org/thingsboard/server/controller/TbTestWebSocketClient.java b/application/src/test/java/org/thingsboard/server/controller/TbTestWebSocketClient.java index 2b5dabb3b9..a306294237 100644 --- a/application/src/test/java/org/thingsboard/server/controller/TbTestWebSocketClient.java +++ b/application/src/test/java/org/thingsboard/server/controller/TbTestWebSocketClient.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -60,12 +60,12 @@ public class TbTestWebSocketClient extends WebSocketClient { public void onMessage(String s) { log.info("RECEIVED: {}", s); lastMsg = s; - if (reply != null) { - reply.countDown(); - } if (update != null) { update.countDown(); } + if (reply != null) { + reply.countDown(); + } } @Override diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/AbstractMqttAttributesIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/AbstractMqttAttributesIntegrationTest.java index 556af34371..4502e79ff2 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/AbstractMqttAttributesIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/AbstractMqttAttributesIntegrationTest.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -35,6 +35,7 @@ import org.thingsboard.server.common.data.query.DeviceTypeFilter; import org.thingsboard.server.common.data.query.EntityData; import org.thingsboard.server.common.data.query.EntityKey; import org.thingsboard.server.common.data.query.EntityKeyType; +import org.thingsboard.server.common.data.query.SingleEntityFilter; import org.thingsboard.server.gen.transport.TransportApiProtos; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataUpdate; @@ -319,7 +320,8 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt protected void processJsonTestRequestAttributesValuesFromTheServer(String attrPubTopic, String attrSubTopic, String attrReqTopicPrefix) throws Exception { MqttTestClient client = new MqttTestClient(); client.connectAndWait(accessToken); - DeviceTypeFilter dtf = new DeviceTypeFilter(savedDevice.getType(), savedDevice.getName()); + SingleEntityFilter dtf = new SingleEntityFilter(); + dtf.setSingleEntity(savedDevice.getId()); String clientKeysStr = "clientStr,clientBool,clientDbl,clientLong,clientJson"; String sharedKeysStr = "sharedStr,sharedBool,sharedDbl,sharedLong,sharedJson"; List clientKeysList = List.of(clientKeysStr.split(",")); @@ -389,7 +391,8 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt 100); assertNotNull(device); - DeviceTypeFilter dtf = new DeviceTypeFilter(device.getType(), device.getName()); + SingleEntityFilter dtf = new SingleEntityFilter(); + dtf.setSingleEntity(device.getId()); String clientKeysStr = "clientStr,clientBool,clientDbl,clientLong,clientJson"; String sharedKeysStr = "sharedStr,sharedBool,sharedDbl,sharedLong,sharedJson"; List clientKeysList = List.of(clientKeysStr.split(",")); @@ -443,7 +446,8 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt 100); assertNotNull(device); - DeviceTypeFilter dtf = new DeviceTypeFilter(device.getType(), device.getName()); + SingleEntityFilter dtf = new SingleEntityFilter(); + dtf.setSingleEntity(device.getId()); String sharedKeysStr = "sharedStr,sharedBool,sharedDbl,sharedLong,sharedJson"; List sharedKeysList = List.of(sharedKeysStr.split(",")); List csKeys = getEntityKeys(clientKeysList, CLIENT_ATTRIBUTE); @@ -569,7 +573,7 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt assertEquals(JacksonUtil.toJsonNode(expectedRequestPayload), JacksonUtil.fromBytes(callback.getPayloadBytes())); } - protected void validateProtoClientResponseGateway(MqttTestCallback callback, String deviceName) throws InterruptedException, InvalidProtocolBufferException { + protected void validateProtoClientResponseGateway(MqttTestCallback callback, String deviceName) throws InterruptedException, InvalidProtocolBufferException { callback.getSubscribeLatch().await(3, TimeUnit.SECONDS); assertEquals(MqttQoS.AT_LEAST_ONCE.value(), callback.getQoS()); TransportApiProtos.GatewayAttributeResponseMsg expectedGatewayAttributeResponseMsg = getExpectedGatewayAttributeResponseMsg(deviceName, true); diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java index 3ce56dfbf3..802e746122 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java @@ -141,6 +141,7 @@ public class CachedAttributesService implements AttributesService { .filter(Objects::nonNull) .collect(Collectors.toList()); if (wrappedCachedAttributes.size() == attributeKeys.size()) { + log.trace("[{}][{}] Found all attributes from cache: {}", entityId, scope, attributeKeys); return Futures.immediateFuture(cachedAttributes); } @@ -152,6 +153,7 @@ public class CachedAttributesService implements AttributesService { return cacheExecutor.submit(() -> { var cacheTransaction = cache.newTransactionForKeys(notFoundKeys); try { + log.trace("[{}][{}] Lookup attributes from db: {}", entityId, scope, notFoundAttributeKeys); List result = attributesDao.find(tenantId, entityId, scope, notFoundAttributeKeys); for (AttributeKvEntry foundInDbAttribute : result) { AttributeCacheKey attributeCacheKey = new AttributeCacheKey(scope, entityId, foundInDbAttribute.getKey()); @@ -164,6 +166,7 @@ public class CachedAttributesService implements AttributesService { List mergedAttributes = new ArrayList<>(cachedAttributes); mergedAttributes.addAll(result); cacheTransaction.commit(); + log.trace("[{}][{}] Commit cache transaction: {}", entityId, scope, notFoundAttributeKeys); return mergedAttributes; } catch (Throwable e) { cacheTransaction.rollback(); @@ -211,7 +214,9 @@ public class CachedAttributesService implements AttributesService { for (var attribute : attributes) { ListenableFuture future = attributesDao.save(tenantId, entityId, scope, attribute); futures.add(Futures.transform(future, key -> { + log.trace("[{}][{}][{}] Before cache evict: {}", entityId, scope, key, attribute); cache.evictOrPut(new AttributeCacheKey(scope, entityId, key), attribute); + log.trace("[{}][{}][{}] after cache evict.", entityId, scope, key); return key; }, cacheExecutor)); }