From f621d59be323a9c9e1e5fc8a331ba52f0b8ce887 Mon Sep 17 00:00:00 2001 From: imbeacon Date: Thu, 4 Apr 2024 10:45:41 +0300 Subject: [PATCH 1/5] Added logic when ping requests for gateway device will be also activity events for devices, connected through the gateway --- .../transport/mqtt/MqttTransportHandler.java | 16 +++- .../AbstractGatewaySessionHandler.java | 36 +++++++- .../mqtt/session/GatewaySessionHandler.java | 6 +- .../session/SparkplugNodeSessionHandler.java | 4 +- .../session/GatewaySessionHandlerTest.java | 83 +++++++++++++++++++ 5 files changed, 132 insertions(+), 13 deletions(-) diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index 8360111a54..2ecea63e42 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -45,6 +45,7 @@ import io.netty.util.concurrent.GenericFutureListener; import lombok.extern.slf4j.Slf4j; import org.eclipse.leshan.core.ResponseCode; import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.server.common.adaptor.AdaptorException; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.DeviceProfile; @@ -64,7 +65,6 @@ import org.thingsboard.server.common.msg.tools.TbRateLimitsException; import org.thingsboard.server.common.transport.SessionMsgListener; import org.thingsboard.server.common.transport.TransportService; import org.thingsboard.server.common.transport.TransportServiceCallback; -import org.thingsboard.server.common.adaptor.AdaptorException; import org.thingsboard.server.common.transport.auth.SessionInfoCreator; import org.thingsboard.server.common.transport.auth.TransportDeviceInfo; import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse; @@ -323,6 +323,9 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement if (checkConnected(ctx, msg)) { ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0))); transportService.recordActivity(deviceSessionCtx.getSessionInfo()); + if (gatewaySessionHandler != null) { + gatewaySessionHandler.onGatewayPing(); + } } break; case DISCONNECT: @@ -1082,10 +1085,12 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement if (infoNode != null) { JsonNode gatewayNode = infoNode.get("gateway"); if (gatewayNode != null && gatewayNode.asBoolean()) { - gatewaySessionHandler = new GatewaySessionHandler(deviceSessionCtx, sessionId); + boolean overwriteDevicesActivity = false; if (infoNode.has(DefaultTransportService.OVERWRITE_ACTIVITY_TIME) && infoNode.get(DefaultTransportService.OVERWRITE_ACTIVITY_TIME).isBoolean()) { - sessionMetaData.setOverwriteActivityTime(infoNode.get(DefaultTransportService.OVERWRITE_ACTIVITY_TIME).asBoolean()); + overwriteDevicesActivity = infoNode.get(DefaultTransportService.OVERWRITE_ACTIVITY_TIME).asBoolean(); + sessionMetaData.setOverwriteActivityTime(overwriteDevicesActivity); } + gatewaySessionHandler = new GatewaySessionHandler(deviceSessionCtx, sessionId, overwriteDevicesActivity); } } } catch (IOException e) { @@ -1099,7 +1104,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement SparkplugTopic sparkplugTopicNode = validatedSparkplugTopicConnectedNode(connectMessage); if (sparkplugTopicNode != null) { SparkplugBProto.Payload sparkplugBProtoNode = SparkplugBProto.Payload.parseFrom(connectMessage.payload().willMessageInBytes()); - sparkplugSessionHandler = new SparkplugNodeSessionHandler(this, deviceSessionCtx, sessionId, sparkplugTopicNode); + sparkplugSessionHandler = new SparkplugNodeSessionHandler(this, deviceSessionCtx, sessionId, true, sparkplugTopicNode); sparkplugSessionHandler.onAttributesTelemetryProto(0, sparkplugBProtoNode, sparkplugTopicNode); sessionMetaData.setOverwriteActivityTime(true); } else { @@ -1354,6 +1359,9 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @Override public void onDeviceUpdate(TransportProtos.SessionInfoProto sessionInfo, Device device, Optional deviceProfileOpt) { deviceSessionCtx.onDeviceUpdate(sessionInfo, device, deviceProfileOpt); + if (gatewaySessionHandler != null) { + gatewaySessionHandler.onDeviceUpdate(sessionInfo, device, deviceProfileOpt); + } } @Override diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java index 1cb391b1f4..dc6ba367ab 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java @@ -32,17 +32,21 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttPublishMessage; +import lombok.Getter; +import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.springframework.util.CollectionUtils; import org.springframework.util.ConcurrentReferenceHashMap; +import org.thingsboard.server.common.adaptor.AdaptorException; +import org.thingsboard.server.common.adaptor.JsonConverter; +import org.thingsboard.server.common.adaptor.ProtoConverter; import org.thingsboard.server.common.data.DataConstants; +import org.thingsboard.server.common.data.Device; +import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.transport.TransportService; import org.thingsboard.server.common.transport.TransportServiceCallback; -import org.thingsboard.server.common.adaptor.AdaptorException; -import org.thingsboard.server.common.adaptor.JsonConverter; -import org.thingsboard.server.common.adaptor.ProtoConverter; import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse; import org.thingsboard.server.common.transport.auth.TransportDeviceInfo; import org.thingsboard.server.gen.transport.TransportApiProtos; @@ -64,6 +68,7 @@ import java.util.Date; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -89,6 +94,8 @@ public abstract class AbstractGatewaySessionHandler createWeakMap() { @@ -164,6 +176,12 @@ public abstract class AbstractGatewaySessionHandler transportService.recordActivity(deviceSessionCtx.getSessionInfo())); + } + } + public void onDevicesDisconnect() { devices.forEach(this::deregisterSession); } @@ -221,6 +239,16 @@ public abstract class AbstractGatewaySessionHandler deviceProfileOpt) { + log.trace("[{}][{}] onDeviceUpdate: [{}]", gateway.getTenantId(), gateway.getDeviceId(), device); + if (device.getAdditionalInfo().has(GATEWAY_PROPERTY) + && device.getAdditionalInfo().get(GATEWAY_PROPERTY).asBoolean() + && device.getAdditionalInfo().has(OVERWRITE_ACTIVITY_TIME) + && device.getAdditionalInfo().get(OVERWRITE_ACTIVITY_TIME).isBoolean()) { + overwriteDevicesActivity = device.getAdditionalInfo().get(OVERWRITE_ACTIVITY_TIME).asBoolean(); + } + } + ListenableFuture onDeviceConnect(String deviceName, String deviceType) { T result = devices.get(deviceName); if (result == null) { diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java index 9ccc88aa82..9aa228afff 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java @@ -25,10 +25,10 @@ import java.util.UUID; /** * Created by nickAS21 on 26.12.22 */ -public class GatewaySessionHandler extends AbstractGatewaySessionHandler { +public class GatewaySessionHandler extends AbstractGatewaySessionHandler { - public GatewaySessionHandler(DeviceSessionCtx deviceSessionCtx, UUID sessionId) { - super(deviceSessionCtx, sessionId); + public GatewaySessionHandler(DeviceSessionCtx deviceSessionCtx, UUID sessionId, boolean overwriteDevicesActivity) { + super(deviceSessionCtx, sessionId, overwriteDevicesActivity); } public void onDeviceConnect(MqttPublishMessage mqttMsg) throws AdaptorException { diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java index d4f2364d79..67dda4681a 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java @@ -73,8 +73,8 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler(); diff --git a/common/transport/mqtt/src/test/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandlerTest.java b/common/transport/mqtt/src/test/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandlerTest.java index bd1ddd2d54..9b52d57dc2 100644 --- a/common/transport/mqtt/src/test/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandlerTest.java +++ b/common/transport/mqtt/src/test/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandlerTest.java @@ -15,15 +15,98 @@ */ package org.thingsboard.server.transport.mqtt.session; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; import org.springframework.util.ConcurrentReferenceHashMap; +import org.thingsboard.server.common.data.id.CustomerId; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.DeviceProfileId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.transport.TransportService; +import org.thingsboard.server.common.transport.auth.TransportDeviceInfo; +import org.thingsboard.server.transport.mqtt.MqttTransportContext; + +import java.lang.reflect.Field; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.BDDMockito.willCallRealMethod; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doNothing; public class GatewaySessionHandlerTest { + @Mock + private TransportService transportService; + + @Mock + private DeviceSessionCtx deviceSessionCtx; + + @Mock + private MqttTransportContext transportContext; + + private GatewaySessionHandler handler; + + @BeforeEach + public void setup() { + MockitoAnnotations.openMocks(this); + when(deviceSessionCtx.getSessionId()).thenReturn(UUID.randomUUID()); + doNothing().when(transportService).recordActivity(any()); + when(transportContext.getTransportService()).thenReturn(transportService); + when(deviceSessionCtx.getContext()).thenReturn(transportContext); + handler = new GatewaySessionHandler(deviceSessionCtx, UUID.randomUUID(), true); + when(handler.getNodeId()).thenReturn("nodeId"); + } + + @Test + public void shouldRecordActivityWhenOnGatewayPing() throws Exception { + // Given + ConcurrentHashMap devices = new ConcurrentHashMap<>(); + TransportDeviceInfo deviceInfo = new TransportDeviceInfo(); + deviceInfo.setDeviceId(new DeviceId(UUID.randomUUID())); + deviceInfo.setTenantId(new TenantId(UUID.randomUUID())); + deviceInfo.setCustomerId(new CustomerId(UUID.randomUUID())); + deviceInfo.setDeviceName("device1"); + deviceInfo.setDeviceType("default"); + deviceInfo.setDeviceProfileId(new DeviceProfileId(UUID.randomUUID())); + deviceInfo.setAdditionalInfo("{\"gateway\": true, \"overwriteDeviceActivity\": true}"); + when(deviceSessionCtx.getDeviceInfo()).thenReturn(deviceInfo); + GatewayDeviceSessionContext gatewayDeviceSessionContext = new GatewayDeviceSessionContext(handler, deviceInfo, null, null, transportService); + devices.put("device1", gatewayDeviceSessionContext); + when(handler.getNodeId()).thenReturn("nodeId"); + Field devicesField = AbstractGatewaySessionHandler.class.getDeclaredField("devices"); + devicesField.setAccessible(true); + devicesField.set(handler, devices); + + // When + handler.onGatewayPing(); + + // Then + verify(transportService).recordActivity(gatewayDeviceSessionContext.getSessionInfo()); + } + + @Test + public void shouldNotRecordActivityWhenNoDevicesOnGatewayPing() throws Exception { + // Given + ConcurrentHashMap devices = new ConcurrentHashMap<>(); + Field devicesField = AbstractGatewaySessionHandler.class.getDeclaredField("devices"); + devicesField.setAccessible(true); + devicesField.set(handler, devices); + + // When + handler.onGatewayPing(); + + // Then + verify(transportService, never()).recordActivity(any()); + } + @Test public void givenGatewaySessionHandler_WhenCreateWeakMap_thenConcurrentReferenceHashMapClass() { GatewaySessionHandler gsh = mock(GatewaySessionHandler.class); From bead94d8327f068bfd1cf0988d310631383194ab Mon Sep 17 00:00:00 2001 From: imbeacon Date: Mon, 8 Apr 2024 11:45:26 +0300 Subject: [PATCH 2/5] Changes due to comments --- .../mqtt/session/AbstractGatewaySessionHandler.java | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java index dc6ba367ab..3be1d632be 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java @@ -16,6 +16,7 @@ package org.thingsboard.server.transport.mqtt.session; +import com.fasterxml.jackson.databind.JsonNode; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -241,11 +242,12 @@ public abstract class AbstractGatewaySessionHandler deviceProfileOpt) { log.trace("[{}][{}] onDeviceUpdate: [{}]", gateway.getTenantId(), gateway.getDeviceId(), device); - if (device.getAdditionalInfo().has(GATEWAY_PROPERTY) - && device.getAdditionalInfo().get(GATEWAY_PROPERTY).asBoolean() - && device.getAdditionalInfo().has(OVERWRITE_ACTIVITY_TIME) - && device.getAdditionalInfo().get(OVERWRITE_ACTIVITY_TIME).isBoolean()) { - overwriteDevicesActivity = device.getAdditionalInfo().get(OVERWRITE_ACTIVITY_TIME).asBoolean(); + JsonNode deviceAdditionalInfo = device.getAdditionalInfo(); + if (deviceAdditionalInfo.has(GATEWAY_PROPERTY) + && deviceAdditionalInfo.get(GATEWAY_PROPERTY).asBoolean() + && deviceAdditionalInfo.has(OVERWRITE_ACTIVITY_TIME) + && deviceAdditionalInfo.get(OVERWRITE_ACTIVITY_TIME).isBoolean()) { + overwriteDevicesActivity = deviceAdditionalInfo.get(OVERWRITE_ACTIVITY_TIME).asBoolean(); } } From a6143f3c8abe1db72c80f373bca069511c1b353e Mon Sep 17 00:00:00 2001 From: imbeacon Date: Mon, 8 Apr 2024 13:05:58 +0300 Subject: [PATCH 3/5] Improvement for update of overwrite activity time --- .../mqtt/session/AbstractGatewaySessionHandler.java | 5 +---- .../transport/service/DefaultTransportService.java | 9 ++++----- 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java index 3be1d632be..b881291756 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java @@ -243,10 +243,7 @@ public abstract class AbstractGatewaySessionHandler deviceProfileOpt) { log.trace("[{}][{}] onDeviceUpdate: [{}]", gateway.getTenantId(), gateway.getDeviceId(), device); JsonNode deviceAdditionalInfo = device.getAdditionalInfo(); - if (deviceAdditionalInfo.has(GATEWAY_PROPERTY) - && deviceAdditionalInfo.get(GATEWAY_PROPERTY).asBoolean() - && deviceAdditionalInfo.has(OVERWRITE_ACTIVITY_TIME) - && deviceAdditionalInfo.get(OVERWRITE_ACTIVITY_TIME).isBoolean()) { + if (deviceAdditionalInfo.has(GATEWAY_PROPERTY) && deviceAdditionalInfo.has(OVERWRITE_ACTIVITY_TIME)) { overwriteDevicesActivity = deviceAdditionalInfo.get(OVERWRITE_ACTIVITY_TIME).asBoolean(); } } diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java index 4aa989ed25..2f822ccb1a 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.common.transport.service; +import com.fasterxml.jackson.databind.JsonNode; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; @@ -1047,11 +1048,9 @@ public class DefaultTransportService extends TransportActivityManager implements .setDeviceProfileIdLSB(deviceProfileIdLSB) .setDeviceName(device.getName()) .setDeviceType(device.getType()).build(); - if (device.getAdditionalInfo().has("gateway") - && device.getAdditionalInfo().get("gateway").asBoolean() - && device.getAdditionalInfo().has(OVERWRITE_ACTIVITY_TIME) - && device.getAdditionalInfo().get(OVERWRITE_ACTIVITY_TIME).isBoolean()) { - md.setOverwriteActivityTime(device.getAdditionalInfo().get(OVERWRITE_ACTIVITY_TIME).asBoolean()); + JsonNode deviceAdditionalInfo = device.getAdditionalInfo(); + if (deviceAdditionalInfo.has("gateway") && deviceAdditionalInfo.has(OVERWRITE_ACTIVITY_TIME)) { + md.setOverwriteActivityTime(deviceAdditionalInfo.get(OVERWRITE_ACTIVITY_TIME).asBoolean()); } md.setSessionInfo(newSessionInfo); transportCallbackExecutor.submit(() -> md.getListener().onDeviceUpdate(newSessionInfo, device, Optional.ofNullable(newDeviceProfile))); From be045bcf4251541379efc1135b6b117e92cbb929 Mon Sep 17 00:00:00 2001 From: imbeacon Date: Tue, 16 Apr 2024 09:20:59 +0300 Subject: [PATCH 4/5] Updated due to comments --- .../server/common/data/DataConstants.java | 2 ++ .../AbstractGatewaySessionHandler.java | 6 ++---- .../session/GatewaySessionHandlerTest.java | 19 +++++++++++-------- .../service/DefaultTransportService.java | 8 +++++--- 4 files changed, 20 insertions(+), 15 deletions(-) diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java index 53b5d2ec63..b529edeb0c 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java @@ -44,6 +44,8 @@ public class DataConstants { public static final String EDGE_ID = "edgeId"; public static final String DEVICE_ID = "deviceId"; public static final String GATEWAY_PARAMETER = "gateway"; + + public static final String OVERWRITE_ACTIVITY_TIME_PARAMETER = "overwriteActivityTime"; public static final String COAP_TRANSPORT_NAME = "COAP"; public static final String LWM2M_TRANSPORT_NAME = "LWM2M"; public static final String MQTT_TRANSPORT_NAME = "MQTT"; diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java index b881291756..24a9b665ae 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java @@ -95,8 +95,6 @@ public abstract class AbstractGatewaySessionHandler deviceProfileOpt) { log.trace("[{}][{}] onDeviceUpdate: [{}]", gateway.getTenantId(), gateway.getDeviceId(), device); JsonNode deviceAdditionalInfo = device.getAdditionalInfo(); - if (deviceAdditionalInfo.has(GATEWAY_PROPERTY) && deviceAdditionalInfo.has(OVERWRITE_ACTIVITY_TIME)) { - overwriteDevicesActivity = deviceAdditionalInfo.get(OVERWRITE_ACTIVITY_TIME).asBoolean(); + if (deviceAdditionalInfo.has(DataConstants.GATEWAY_PARAMETER) && deviceAdditionalInfo.has(DataConstants.OVERWRITE_ACTIVITY_TIME_PARAMETER)) { + overwriteDevicesActivity = deviceAdditionalInfo.get(DataConstants.OVERWRITE_ACTIVITY_TIME_PARAMETER).asBoolean(); } } diff --git a/common/transport/mqtt/src/test/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandlerTest.java b/common/transport/mqtt/src/test/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandlerTest.java index 9b52d57dc2..158acc2ce2 100644 --- a/common/transport/mqtt/src/test/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandlerTest.java +++ b/common/transport/mqtt/src/test/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandlerTest.java @@ -17,8 +17,10 @@ package org.thingsboard.server.transport.mqtt.session; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.MockitoAnnotations; +import org.mockito.junit.jupiter.MockitoExtension; import org.springframework.util.ConcurrentReferenceHashMap; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.DeviceId; @@ -34,6 +36,7 @@ import java.util.concurrent.ConcurrentHashMap; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.BDDMockito.willCallRealMethod; +import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -41,6 +44,7 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doNothing; +@ExtendWith(MockitoExtension.class) public class GatewaySessionHandlerTest { @Mock @@ -56,13 +60,12 @@ public class GatewaySessionHandlerTest { @BeforeEach public void setup() { - MockitoAnnotations.openMocks(this); - when(deviceSessionCtx.getSessionId()).thenReturn(UUID.randomUUID()); - doNothing().when(transportService).recordActivity(any()); - when(transportContext.getTransportService()).thenReturn(transportService); - when(deviceSessionCtx.getContext()).thenReturn(transportContext); + lenient().when(deviceSessionCtx.getSessionId()).thenReturn(UUID.randomUUID()); + lenient().doNothing().when(transportService).recordActivity(any()); + lenient().when(transportContext.getTransportService()).thenReturn(transportService); + lenient().when(deviceSessionCtx.getContext()).thenReturn(transportContext); handler = new GatewaySessionHandler(deviceSessionCtx, UUID.randomUUID(), true); - when(handler.getNodeId()).thenReturn("nodeId"); + lenient().when(handler.getNodeId()).thenReturn("nodeId"); } @Test @@ -77,10 +80,10 @@ public class GatewaySessionHandlerTest { deviceInfo.setDeviceType("default"); deviceInfo.setDeviceProfileId(new DeviceProfileId(UUID.randomUUID())); deviceInfo.setAdditionalInfo("{\"gateway\": true, \"overwriteDeviceActivity\": true}"); - when(deviceSessionCtx.getDeviceInfo()).thenReturn(deviceInfo); + lenient().when(deviceSessionCtx.getDeviceInfo()).thenReturn(deviceInfo); GatewayDeviceSessionContext gatewayDeviceSessionContext = new GatewayDeviceSessionContext(handler, deviceInfo, null, null, transportService); devices.put("device1", gatewayDeviceSessionContext); - when(handler.getNodeId()).thenReturn("nodeId"); + lenient().when(handler.getNodeId()).thenReturn("nodeId"); Field devicesField = AbstractGatewaySessionHandler.class.getDeclaredField("devices"); devicesField.setAccessible(true); devicesField.set(handler, devices); diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java index 2f822ccb1a..a756e7a992 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java @@ -133,7 +133,6 @@ import java.util.stream.Collectors; @TbTransportComponent public class DefaultTransportService extends TransportActivityManager implements TransportService { - public static final String OVERWRITE_ACTIVITY_TIME = "overwriteActivityTime"; public static final TransportProtos.SessionEventMsg SESSION_EVENT_MSG_OPEN = TransportProtos.SessionEventMsg.newBuilder() .setSessionType(TransportProtos.SessionType.ASYNC) .setEvent(TransportProtos.SessionEvent.OPEN).build(); @@ -1049,8 +1048,11 @@ public class DefaultTransportService extends TransportActivityManager implements .setDeviceName(device.getName()) .setDeviceType(device.getType()).build(); JsonNode deviceAdditionalInfo = device.getAdditionalInfo(); - if (deviceAdditionalInfo.has("gateway") && deviceAdditionalInfo.has(OVERWRITE_ACTIVITY_TIME)) { - md.setOverwriteActivityTime(deviceAdditionalInfo.get(OVERWRITE_ACTIVITY_TIME).asBoolean()); + if (deviceAdditionalInfo.has(DataConstants.GATEWAY_PARAMETER) + && deviceAdditionalInfo.get(DataConstants.GATEWAY_PARAMETER).asBoolean() + && deviceAdditionalInfo.has(DataConstants.OVERWRITE_ACTIVITY_TIME_PARAMETER) + && deviceAdditionalInfo.get(DataConstants.OVERWRITE_ACTIVITY_TIME_PARAMETER).isBoolean()) { + md.setOverwriteActivityTime(deviceAdditionalInfo.get(DataConstants.OVERWRITE_ACTIVITY_TIME_PARAMETER).asBoolean()); } md.setSessionInfo(newSessionInfo); transportCallbackExecutor.submit(() -> md.getListener().onDeviceUpdate(newSessionInfo, device, Optional.ofNullable(newDeviceProfile))); From d8515383c044dc7265982a33ed875376d08cdfa1 Mon Sep 17 00:00:00 2001 From: imbeacon Date: Tue, 16 Apr 2024 10:07:41 +0300 Subject: [PATCH 5/5] Updated MqttTransportHandler --- .../server/transport/mqtt/MqttTransportHandler.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index 2ecea63e42..b48cb39ca0 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -1083,11 +1083,12 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement try { JsonNode infoNode = context.getMapper().readTree(device.getAdditionalInfo()); if (infoNode != null) { - JsonNode gatewayNode = infoNode.get("gateway"); + JsonNode gatewayNode = infoNode.get(DataConstants.GATEWAY_PARAMETER); if (gatewayNode != null && gatewayNode.asBoolean()) { boolean overwriteDevicesActivity = false; - if (infoNode.has(DefaultTransportService.OVERWRITE_ACTIVITY_TIME) && infoNode.get(DefaultTransportService.OVERWRITE_ACTIVITY_TIME).isBoolean()) { - overwriteDevicesActivity = infoNode.get(DefaultTransportService.OVERWRITE_ACTIVITY_TIME).asBoolean(); + if (infoNode.has(DataConstants.OVERWRITE_ACTIVITY_TIME_PARAMETER) + && infoNode.get(DataConstants.OVERWRITE_ACTIVITY_TIME_PARAMETER).isBoolean()) { + overwriteDevicesActivity = infoNode.get(DataConstants.OVERWRITE_ACTIVITY_TIME_PARAMETER).asBoolean(); sessionMetaData.setOverwriteActivityTime(overwriteDevicesActivity); } gatewaySessionHandler = new GatewaySessionHandler(deviceSessionCtx, sessionId, overwriteDevicesActivity);