53 changed files with 3111 additions and 172 deletions
@ -0,0 +1,196 @@ |
|||
/** |
|||
* Copyright © 2016-2024 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.state; |
|||
|
|||
import lombok.Getter; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.stereotype.Service; |
|||
import org.thingsboard.rule.engine.api.RuleEngineDeviceStateManager; |
|||
import org.thingsboard.server.cluster.TbClusterService; |
|||
import org.thingsboard.server.common.data.id.DeviceId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.msg.queue.ServiceType; |
|||
import org.thingsboard.server.common.msg.queue.TbCallback; |
|||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; |
|||
import org.thingsboard.server.gen.transport.TransportProtos; |
|||
import org.thingsboard.server.queue.common.SimpleTbQueueCallback; |
|||
import org.thingsboard.server.queue.discovery.PartitionService; |
|||
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; |
|||
import org.thingsboard.server.queue.util.TbRuleEngineComponent; |
|||
|
|||
import java.util.Optional; |
|||
import java.util.UUID; |
|||
|
|||
@Slf4j |
|||
@Service |
|||
@TbRuleEngineComponent |
|||
public class DefaultRuleEngineDeviceStateManager implements RuleEngineDeviceStateManager { |
|||
|
|||
private final TbServiceInfoProvider serviceInfoProvider; |
|||
private final PartitionService partitionService; |
|||
|
|||
private final Optional<DeviceStateService> deviceStateService; |
|||
private final TbClusterService clusterService; |
|||
|
|||
public DefaultRuleEngineDeviceStateManager( |
|||
TbServiceInfoProvider serviceInfoProvider, PartitionService partitionService, |
|||
Optional<DeviceStateService> deviceStateServiceOptional, TbClusterService clusterService |
|||
) { |
|||
this.serviceInfoProvider = serviceInfoProvider; |
|||
this.partitionService = partitionService; |
|||
this.deviceStateService = deviceStateServiceOptional; |
|||
this.clusterService = clusterService; |
|||
} |
|||
|
|||
@Getter |
|||
private abstract static class ConnectivityEventInfo { |
|||
|
|||
private final TenantId tenantId; |
|||
private final DeviceId deviceId; |
|||
private final long eventTime; |
|||
|
|||
private ConnectivityEventInfo(TenantId tenantId, DeviceId deviceId, long eventTime) { |
|||
this.tenantId = tenantId; |
|||
this.deviceId = deviceId; |
|||
this.eventTime = eventTime; |
|||
} |
|||
|
|||
abstract void forwardToLocalService(); |
|||
|
|||
abstract TransportProtos.ToCoreMsg toQueueMsg(); |
|||
|
|||
} |
|||
|
|||
@Override |
|||
public void onDeviceConnect(TenantId tenantId, DeviceId deviceId, long connectTime, TbCallback callback) { |
|||
routeEvent(new ConnectivityEventInfo(tenantId, deviceId, connectTime) { |
|||
@Override |
|||
void forwardToLocalService() { |
|||
deviceStateService.ifPresent(service -> service.onDeviceConnect(tenantId, deviceId, connectTime)); |
|||
} |
|||
|
|||
@Override |
|||
TransportProtos.ToCoreMsg toQueueMsg() { |
|||
var deviceConnectMsg = TransportProtos.DeviceConnectProto.newBuilder() |
|||
.setTenantIdMSB(tenantId.getId().getMostSignificantBits()) |
|||
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) |
|||
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits()) |
|||
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()) |
|||
.setLastConnectTime(connectTime) |
|||
.build(); |
|||
return TransportProtos.ToCoreMsg.newBuilder() |
|||
.setDeviceConnectMsg(deviceConnectMsg) |
|||
.build(); |
|||
} |
|||
}, callback); |
|||
} |
|||
|
|||
@Override |
|||
public void onDeviceActivity(TenantId tenantId, DeviceId deviceId, long activityTime, TbCallback callback) { |
|||
routeEvent(new ConnectivityEventInfo(tenantId, deviceId, activityTime) { |
|||
@Override |
|||
void forwardToLocalService() { |
|||
deviceStateService.ifPresent(service -> service.onDeviceActivity(tenantId, deviceId, activityTime)); |
|||
} |
|||
|
|||
@Override |
|||
TransportProtos.ToCoreMsg toQueueMsg() { |
|||
var deviceActivityMsg = TransportProtos.DeviceActivityProto.newBuilder() |
|||
.setTenantIdMSB(tenantId.getId().getMostSignificantBits()) |
|||
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) |
|||
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits()) |
|||
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()) |
|||
.setLastActivityTime(activityTime) |
|||
.build(); |
|||
return TransportProtos.ToCoreMsg.newBuilder() |
|||
.setDeviceActivityMsg(deviceActivityMsg) |
|||
.build(); |
|||
} |
|||
}, callback); |
|||
} |
|||
|
|||
@Override |
|||
public void onDeviceDisconnect(TenantId tenantId, DeviceId deviceId, long disconnectTime, TbCallback callback) { |
|||
routeEvent(new ConnectivityEventInfo(tenantId, deviceId, disconnectTime) { |
|||
@Override |
|||
void forwardToLocalService() { |
|||
deviceStateService.ifPresent(service -> service.onDeviceDisconnect(tenantId, deviceId, disconnectTime)); |
|||
} |
|||
|
|||
@Override |
|||
TransportProtos.ToCoreMsg toQueueMsg() { |
|||
var deviceDisconnectMsg = TransportProtos.DeviceDisconnectProto.newBuilder() |
|||
.setTenantIdMSB(tenantId.getId().getMostSignificantBits()) |
|||
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) |
|||
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits()) |
|||
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()) |
|||
.setLastDisconnectTime(disconnectTime) |
|||
.build(); |
|||
return TransportProtos.ToCoreMsg.newBuilder() |
|||
.setDeviceDisconnectMsg(deviceDisconnectMsg) |
|||
.build(); |
|||
} |
|||
}, callback); |
|||
} |
|||
|
|||
@Override |
|||
public void onDeviceInactivity(TenantId tenantId, DeviceId deviceId, long inactivityTime, TbCallback callback) { |
|||
routeEvent(new ConnectivityEventInfo(tenantId, deviceId, inactivityTime) { |
|||
@Override |
|||
void forwardToLocalService() { |
|||
deviceStateService.ifPresent(service -> service.onDeviceInactivity(tenantId, deviceId, inactivityTime)); |
|||
} |
|||
|
|||
@Override |
|||
TransportProtos.ToCoreMsg toQueueMsg() { |
|||
var deviceInactivityMsg = TransportProtos.DeviceInactivityProto.newBuilder() |
|||
.setTenantIdMSB(tenantId.getId().getMostSignificantBits()) |
|||
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) |
|||
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits()) |
|||
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()) |
|||
.setLastInactivityTime(inactivityTime) |
|||
.build(); |
|||
return TransportProtos.ToCoreMsg.newBuilder() |
|||
.setDeviceInactivityMsg(deviceInactivityMsg) |
|||
.build(); |
|||
} |
|||
}, callback); |
|||
} |
|||
|
|||
private void routeEvent(ConnectivityEventInfo eventInfo, TbCallback callback) { |
|||
var tenantId = eventInfo.getTenantId(); |
|||
var deviceId = eventInfo.getDeviceId(); |
|||
long eventTime = eventInfo.getEventTime(); |
|||
|
|||
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId); |
|||
if (serviceInfoProvider.isService(ServiceType.TB_CORE) && tpi.isMyPartition() && deviceStateService.isPresent()) { |
|||
log.debug("[{}][{}] Forwarding device connectivity event to local service. Event time: [{}].", tenantId.getId(), deviceId.getId(), eventTime); |
|||
try { |
|||
eventInfo.forwardToLocalService(); |
|||
} catch (Exception e) { |
|||
log.error("[{}][{}] Failed to process device connectivity event. Event time: [{}].", tenantId.getId(), deviceId.getId(), eventTime, e); |
|||
callback.onFailure(e); |
|||
return; |
|||
} |
|||
callback.onSuccess(); |
|||
} else { |
|||
TransportProtos.ToCoreMsg msg = eventInfo.toQueueMsg(); |
|||
log.debug("[{}][{}] Sending device connectivity message to core. Event time: [{}].", tenantId.getId(), deviceId.getId(), eventTime); |
|||
clusterService.pushMsgToCore(tpi, UUID.randomUUID(), msg, new SimpleTbQueueCallback(__ -> callback.onSuccess(), callback::onFailure)); |
|||
} |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,38 @@ |
|||
/** |
|||
* Copyright © 2016-2024 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.ws.notification.sub; |
|||
|
|||
|
|||
import lombok.Getter; |
|||
import org.thingsboard.server.common.data.id.EntityId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.service.subscription.TbSubscription; |
|||
import org.thingsboard.server.service.subscription.TbSubscriptionType; |
|||
|
|||
import java.util.concurrent.atomic.AtomicInteger; |
|||
import java.util.function.BiConsumer; |
|||
|
|||
@Getter |
|||
public abstract class AbstractNotificationSubscription<T> extends TbSubscription<T> { |
|||
|
|||
protected final AtomicInteger sequence = new AtomicInteger(); |
|||
protected final AtomicInteger totalUnreadCounter = new AtomicInteger(); |
|||
|
|||
public AbstractNotificationSubscription(String serviceId, String sessionId, int subscriptionId, TenantId tenantId, EntityId entityId, TbSubscriptionType type, BiConsumer<TbSubscription<T>, T> updateProcessor) { |
|||
super(serviceId, sessionId, subscriptionId, tenantId, entityId, type, updateProcessor); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,532 @@ |
|||
/** |
|||
* Copyright © 2016-2024 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.queue; |
|||
|
|||
import com.google.common.util.concurrent.ListeningExecutorService; |
|||
import com.google.common.util.concurrent.MoreExecutors; |
|||
import org.junit.jupiter.api.AfterEach; |
|||
import org.junit.jupiter.api.BeforeEach; |
|||
import org.junit.jupiter.api.Test; |
|||
import org.junit.jupiter.api.extension.ExtendWith; |
|||
import org.mockito.ArgumentCaptor; |
|||
import org.mockito.Mock; |
|||
import org.mockito.junit.jupiter.MockitoExtension; |
|||
import org.springframework.test.util.ReflectionTestUtils; |
|||
import org.thingsboard.server.common.data.id.DeviceId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.msg.queue.TbCallback; |
|||
import org.thingsboard.server.gen.transport.TransportProtos; |
|||
import org.thingsboard.server.service.state.DeviceStateService; |
|||
|
|||
import java.util.UUID; |
|||
import java.util.concurrent.TimeUnit; |
|||
|
|||
import static org.assertj.core.api.Assertions.assertThat; |
|||
import static org.mockito.ArgumentMatchers.any; |
|||
import static org.mockito.BDDMockito.then; |
|||
import static org.mockito.Mockito.doCallRealMethod; |
|||
import static org.mockito.Mockito.doThrow; |
|||
import static org.mockito.Mockito.never; |
|||
|
|||
@ExtendWith(MockitoExtension.class) |
|||
public class DefaultTbCoreConsumerServiceTest { |
|||
|
|||
@Mock |
|||
private DeviceStateService stateServiceMock; |
|||
@Mock |
|||
private TbCoreConsumerStats statsMock; |
|||
|
|||
@Mock |
|||
private TbCallback tbCallbackMock; |
|||
|
|||
private final TenantId tenantId = TenantId.fromUUID(UUID.randomUUID()); |
|||
private final DeviceId deviceId = new DeviceId(UUID.randomUUID()); |
|||
private final long time = System.currentTimeMillis(); |
|||
|
|||
private ListeningExecutorService executor; |
|||
|
|||
@Mock |
|||
private DefaultTbCoreConsumerService defaultTbCoreConsumerServiceMock; |
|||
|
|||
@BeforeEach |
|||
public void setup() { |
|||
executor = MoreExecutors.newDirectExecutorService(); |
|||
ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "stateService", stateServiceMock); |
|||
ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "deviceActivityEventsExecutor", executor); |
|||
} |
|||
|
|||
@AfterEach |
|||
public void cleanup() { |
|||
if (executor != null) { |
|||
executor.shutdown(); |
|||
try { |
|||
if (!executor.awaitTermination(10L, TimeUnit.SECONDS)) { |
|||
executor.shutdownNow(); |
|||
} |
|||
} catch (InterruptedException e) { |
|||
executor.shutdownNow(); |
|||
Thread.currentThread().interrupt(); |
|||
} |
|||
} |
|||
} |
|||
|
|||
@Test |
|||
public void givenProcessingSuccess_whenForwardingDeviceStateMsgToStateService_thenOnSuccessCallbackIsCalled() { |
|||
// GIVEN
|
|||
var stateMsg = TransportProtos.DeviceStateServiceMsgProto.newBuilder() |
|||
.setTenantIdMSB(tenantId.getId().getMostSignificantBits()) |
|||
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) |
|||
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits()) |
|||
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()) |
|||
.setAdded(true) |
|||
.setUpdated(false) |
|||
.setDeleted(false) |
|||
.build(); |
|||
|
|||
doCallRealMethod().when(defaultTbCoreConsumerServiceMock).forwardToStateService(stateMsg, tbCallbackMock); |
|||
|
|||
// WHEN
|
|||
defaultTbCoreConsumerServiceMock.forwardToStateService(stateMsg, tbCallbackMock); |
|||
|
|||
// THEN
|
|||
then(stateServiceMock).should().onQueueMsg(stateMsg, tbCallbackMock); |
|||
} |
|||
|
|||
@Test |
|||
public void givenStatsEnabled_whenForwardingDeviceStateMsgToStateService_thenStatsAreRecorded() { |
|||
// GIVEN
|
|||
ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "stats", statsMock); |
|||
ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "statsEnabled", true); |
|||
|
|||
var stateMsg = TransportProtos.DeviceStateServiceMsgProto.newBuilder() |
|||
.setTenantIdMSB(tenantId.getId().getMostSignificantBits()) |
|||
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) |
|||
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits()) |
|||
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()) |
|||
.setAdded(true) |
|||
.setUpdated(false) |
|||
.setDeleted(false) |
|||
.build(); |
|||
|
|||
doCallRealMethod().when(defaultTbCoreConsumerServiceMock).forwardToStateService(stateMsg, tbCallbackMock); |
|||
|
|||
// WHEN
|
|||
defaultTbCoreConsumerServiceMock.forwardToStateService(stateMsg, tbCallbackMock); |
|||
|
|||
// THEN
|
|||
then(statsMock).should().log(stateMsg); |
|||
} |
|||
|
|||
@Test |
|||
public void givenStatsDisabled_whenForwardingDeviceStateMsgToStateService_thenStatsAreNotRecorded() { |
|||
// GIVEN
|
|||
ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "stats", statsMock); |
|||
ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "statsEnabled", false); |
|||
|
|||
var stateMsg = TransportProtos.DeviceStateServiceMsgProto.newBuilder() |
|||
.setTenantIdMSB(tenantId.getId().getMostSignificantBits()) |
|||
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) |
|||
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits()) |
|||
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()) |
|||
.setAdded(true) |
|||
.setUpdated(false) |
|||
.setDeleted(false) |
|||
.build(); |
|||
|
|||
doCallRealMethod().when(defaultTbCoreConsumerServiceMock).forwardToStateService(stateMsg, tbCallbackMock); |
|||
|
|||
// WHEN
|
|||
defaultTbCoreConsumerServiceMock.forwardToStateService(stateMsg, tbCallbackMock); |
|||
|
|||
// THEN
|
|||
then(statsMock).should(never()).log(stateMsg); |
|||
} |
|||
|
|||
@Test |
|||
public void givenProcessingSuccess_whenForwardingConnectMsgToStateService_thenOnSuccessCallbackIsCalled() { |
|||
// GIVEN
|
|||
var connectMsg = TransportProtos.DeviceConnectProto.newBuilder() |
|||
.setTenantIdMSB(tenantId.getId().getMostSignificantBits()) |
|||
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) |
|||
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits()) |
|||
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()) |
|||
.setLastConnectTime(time) |
|||
.build(); |
|||
|
|||
doCallRealMethod().when(defaultTbCoreConsumerServiceMock).forwardToStateService(connectMsg, tbCallbackMock); |
|||
|
|||
// WHEN
|
|||
defaultTbCoreConsumerServiceMock.forwardToStateService(connectMsg, tbCallbackMock); |
|||
|
|||
// THEN
|
|||
then(stateServiceMock).should().onDeviceConnect(tenantId, deviceId, time); |
|||
then(tbCallbackMock).should().onSuccess(); |
|||
then(tbCallbackMock).should(never()).onFailure(any()); |
|||
} |
|||
|
|||
@Test |
|||
public void givenProcessingFailure_whenForwardingConnectMsgToStateService_thenOnFailureCallbackIsCalled() { |
|||
// GIVEN
|
|||
var connectMsg = TransportProtos.DeviceConnectProto.newBuilder() |
|||
.setTenantIdMSB(tenantId.getId().getMostSignificantBits()) |
|||
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) |
|||
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits()) |
|||
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()) |
|||
.setLastConnectTime(time) |
|||
.build(); |
|||
|
|||
doCallRealMethod().when(defaultTbCoreConsumerServiceMock).forwardToStateService(connectMsg, tbCallbackMock); |
|||
|
|||
var runtimeException = new RuntimeException("Something bad happened!"); |
|||
doThrow(runtimeException).when(stateServiceMock).onDeviceConnect(tenantId, deviceId, time); |
|||
|
|||
// WHEN
|
|||
defaultTbCoreConsumerServiceMock.forwardToStateService(connectMsg, tbCallbackMock); |
|||
|
|||
// THEN
|
|||
then(tbCallbackMock).should(never()).onSuccess(); |
|||
then(tbCallbackMock).should().onFailure(runtimeException); |
|||
} |
|||
|
|||
@Test |
|||
public void givenStatsEnabled_whenForwardingConnectMsgToStateService_thenStatsAreRecorded() { |
|||
// GIVEN
|
|||
ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "stats", statsMock); |
|||
ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "statsEnabled", true); |
|||
|
|||
var connectMsg = TransportProtos.DeviceConnectProto.newBuilder() |
|||
.setTenantIdMSB(tenantId.getId().getMostSignificantBits()) |
|||
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) |
|||
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits()) |
|||
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()) |
|||
.setLastConnectTime(time) |
|||
.build(); |
|||
|
|||
doCallRealMethod().when(defaultTbCoreConsumerServiceMock).forwardToStateService(connectMsg, tbCallbackMock); |
|||
|
|||
// WHEN
|
|||
defaultTbCoreConsumerServiceMock.forwardToStateService(connectMsg, tbCallbackMock); |
|||
|
|||
// THEN
|
|||
then(statsMock).should().log(connectMsg); |
|||
} |
|||
|
|||
@Test |
|||
public void givenStatsDisabled_whenForwardingConnectMsgToStateService_thenStatsAreNotRecorded() { |
|||
// GIVEN
|
|||
ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "stats", statsMock); |
|||
ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "statsEnabled", false); |
|||
|
|||
var connectMsg = TransportProtos.DeviceConnectProto.newBuilder() |
|||
.setTenantIdMSB(tenantId.getId().getMostSignificantBits()) |
|||
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) |
|||
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits()) |
|||
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()) |
|||
.setLastConnectTime(time) |
|||
.build(); |
|||
|
|||
doCallRealMethod().when(defaultTbCoreConsumerServiceMock).forwardToStateService(connectMsg, tbCallbackMock); |
|||
|
|||
// WHEN
|
|||
defaultTbCoreConsumerServiceMock.forwardToStateService(connectMsg, tbCallbackMock); |
|||
|
|||
// THEN
|
|||
then(statsMock).should(never()).log(connectMsg); |
|||
} |
|||
|
|||
@Test |
|||
public void givenProcessingSuccess_whenForwardingActivityMsgToStateService_thenOnSuccessCallbackIsCalled() { |
|||
// GIVEN
|
|||
var activityMsg = TransportProtos.DeviceActivityProto.newBuilder() |
|||
.setTenantIdMSB(tenantId.getId().getMostSignificantBits()) |
|||
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) |
|||
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits()) |
|||
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()) |
|||
.setLastActivityTime(time) |
|||
.build(); |
|||
|
|||
doCallRealMethod().when(defaultTbCoreConsumerServiceMock).forwardToStateService(activityMsg, tbCallbackMock); |
|||
|
|||
// WHEN
|
|||
defaultTbCoreConsumerServiceMock.forwardToStateService(activityMsg, tbCallbackMock); |
|||
|
|||
// THEN
|
|||
then(stateServiceMock).should().onDeviceActivity(tenantId, deviceId, time); |
|||
then(tbCallbackMock).should().onSuccess(); |
|||
then(tbCallbackMock).should(never()).onFailure(any()); |
|||
} |
|||
|
|||
@Test |
|||
public void givenProcessingFailure_whenForwardingActivityMsgToStateService_thenOnFailureCallbackIsCalled() { |
|||
// GIVEN
|
|||
var activityMsg = TransportProtos.DeviceActivityProto.newBuilder() |
|||
.setTenantIdMSB(tenantId.getId().getMostSignificantBits()) |
|||
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) |
|||
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits()) |
|||
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()) |
|||
.setLastActivityTime(time) |
|||
.build(); |
|||
|
|||
doCallRealMethod().when(defaultTbCoreConsumerServiceMock).forwardToStateService(activityMsg, tbCallbackMock); |
|||
|
|||
var runtimeException = new RuntimeException("Something bad happened!"); |
|||
doThrow(runtimeException).when(stateServiceMock).onDeviceActivity(tenantId, deviceId, time); |
|||
|
|||
// WHEN
|
|||
defaultTbCoreConsumerServiceMock.forwardToStateService(activityMsg, tbCallbackMock); |
|||
|
|||
// THEN
|
|||
then(tbCallbackMock).should(never()).onSuccess(); |
|||
|
|||
var exceptionCaptor = ArgumentCaptor.forClass(Throwable.class); |
|||
then(tbCallbackMock).should().onFailure(exceptionCaptor.capture()); |
|||
assertThat(exceptionCaptor.getValue()) |
|||
.isInstanceOf(RuntimeException.class) |
|||
.hasMessage("Failed to update device activity for device [" + deviceId.getId() + "]!") |
|||
.hasCause(runtimeException); |
|||
} |
|||
|
|||
@Test |
|||
public void givenStatsEnabled_whenForwardingActivityMsgToStateService_thenStatsAreRecorded() { |
|||
// GIVEN
|
|||
ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "stats", statsMock); |
|||
ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "statsEnabled", true); |
|||
|
|||
var activityMsg = TransportProtos.DeviceActivityProto.newBuilder() |
|||
.setTenantIdMSB(tenantId.getId().getMostSignificantBits()) |
|||
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) |
|||
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits()) |
|||
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()) |
|||
.setLastActivityTime(time) |
|||
.build(); |
|||
|
|||
doCallRealMethod().when(defaultTbCoreConsumerServiceMock).forwardToStateService(activityMsg, tbCallbackMock); |
|||
|
|||
// WHEN
|
|||
defaultTbCoreConsumerServiceMock.forwardToStateService(activityMsg, tbCallbackMock); |
|||
|
|||
// THEN
|
|||
then(statsMock).should().log(activityMsg); |
|||
} |
|||
|
|||
@Test |
|||
public void givenStatsDisabled_whenForwardingActivityMsgToStateService_thenStatsAreNotRecorded() { |
|||
// GIVEN
|
|||
ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "stats", statsMock); |
|||
ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "statsEnabled", false); |
|||
|
|||
var activityMsg = TransportProtos.DeviceActivityProto.newBuilder() |
|||
.setTenantIdMSB(tenantId.getId().getMostSignificantBits()) |
|||
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) |
|||
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits()) |
|||
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()) |
|||
.setLastActivityTime(time) |
|||
.build(); |
|||
|
|||
doCallRealMethod().when(defaultTbCoreConsumerServiceMock).forwardToStateService(activityMsg, tbCallbackMock); |
|||
|
|||
// WHEN
|
|||
defaultTbCoreConsumerServiceMock.forwardToStateService(activityMsg, tbCallbackMock); |
|||
|
|||
// THEN
|
|||
then(statsMock).should(never()).log(activityMsg); |
|||
} |
|||
|
|||
@Test |
|||
public void givenProcessingSuccess_whenForwardingDisconnectMsgToStateService_thenOnSuccessCallbackIsCalled() { |
|||
// GIVEN
|
|||
var disconnectMsg = TransportProtos.DeviceDisconnectProto.newBuilder() |
|||
.setTenantIdMSB(tenantId.getId().getMostSignificantBits()) |
|||
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) |
|||
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits()) |
|||
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()) |
|||
.setLastDisconnectTime(time) |
|||
.build(); |
|||
|
|||
doCallRealMethod().when(defaultTbCoreConsumerServiceMock).forwardToStateService(disconnectMsg, tbCallbackMock); |
|||
|
|||
// WHEN
|
|||
defaultTbCoreConsumerServiceMock.forwardToStateService(disconnectMsg, tbCallbackMock); |
|||
|
|||
// THEN
|
|||
then(stateServiceMock).should().onDeviceDisconnect(tenantId, deviceId, time); |
|||
then(tbCallbackMock).should().onSuccess(); |
|||
then(tbCallbackMock).should(never()).onFailure(any()); |
|||
} |
|||
|
|||
@Test |
|||
public void givenProcessingFailure_whenForwardingDisconnectMsgToStateService_thenOnFailureCallbackIsCalled() { |
|||
// GIVEN
|
|||
var disconnectMsg = TransportProtos.DeviceDisconnectProto.newBuilder() |
|||
.setTenantIdMSB(tenantId.getId().getMostSignificantBits()) |
|||
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) |
|||
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits()) |
|||
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()) |
|||
.setLastDisconnectTime(time) |
|||
.build(); |
|||
|
|||
doCallRealMethod().when(defaultTbCoreConsumerServiceMock).forwardToStateService(disconnectMsg, tbCallbackMock); |
|||
|
|||
var runtimeException = new RuntimeException("Something bad happened!"); |
|||
doThrow(runtimeException).when(stateServiceMock).onDeviceDisconnect(tenantId, deviceId, time); |
|||
|
|||
// WHEN
|
|||
defaultTbCoreConsumerServiceMock.forwardToStateService(disconnectMsg, tbCallbackMock); |
|||
|
|||
// THEN
|
|||
then(tbCallbackMock).should(never()).onSuccess(); |
|||
then(tbCallbackMock).should().onFailure(runtimeException); |
|||
} |
|||
|
|||
@Test |
|||
public void givenStatsEnabled_whenForwardingDisconnectMsgToStateService_thenStatsAreRecorded() { |
|||
// GIVEN
|
|||
ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "stats", statsMock); |
|||
ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "statsEnabled", true); |
|||
|
|||
var disconnectMsg = TransportProtos.DeviceDisconnectProto.newBuilder() |
|||
.setTenantIdMSB(tenantId.getId().getMostSignificantBits()) |
|||
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) |
|||
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits()) |
|||
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()) |
|||
.setLastDisconnectTime(time) |
|||
.build(); |
|||
|
|||
doCallRealMethod().when(defaultTbCoreConsumerServiceMock).forwardToStateService(disconnectMsg, tbCallbackMock); |
|||
|
|||
// WHEN
|
|||
defaultTbCoreConsumerServiceMock.forwardToStateService(disconnectMsg, tbCallbackMock); |
|||
|
|||
// THEN
|
|||
then(statsMock).should().log(disconnectMsg); |
|||
} |
|||
|
|||
@Test |
|||
public void givenStatsDisabled_whenForwardingDisconnectMsgToStateService_thenStatsAreNotRecorded() { |
|||
// GIVEN
|
|||
ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "stats", statsMock); |
|||
ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "statsEnabled", false); |
|||
|
|||
var disconnectMsg = TransportProtos.DeviceDisconnectProto.newBuilder() |
|||
.setTenantIdMSB(tenantId.getId().getMostSignificantBits()) |
|||
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) |
|||
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits()) |
|||
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()) |
|||
.setLastDisconnectTime(time) |
|||
.build(); |
|||
|
|||
doCallRealMethod().when(defaultTbCoreConsumerServiceMock).forwardToStateService(disconnectMsg, tbCallbackMock); |
|||
|
|||
// WHEN
|
|||
defaultTbCoreConsumerServiceMock.forwardToStateService(disconnectMsg, tbCallbackMock); |
|||
|
|||
// THEN
|
|||
then(statsMock).should(never()).log(disconnectMsg); |
|||
} |
|||
|
|||
@Test |
|||
public void givenProcessingSuccess_whenForwardingInactivityMsgToStateService_thenOnSuccessCallbackIsCalled() { |
|||
// GIVEN
|
|||
var inactivityMsg = TransportProtos.DeviceInactivityProto.newBuilder() |
|||
.setTenantIdMSB(tenantId.getId().getMostSignificantBits()) |
|||
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) |
|||
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits()) |
|||
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()) |
|||
.setLastInactivityTime(time) |
|||
.build(); |
|||
|
|||
doCallRealMethod().when(defaultTbCoreConsumerServiceMock).forwardToStateService(inactivityMsg, tbCallbackMock); |
|||
|
|||
// WHEN
|
|||
defaultTbCoreConsumerServiceMock.forwardToStateService(inactivityMsg, tbCallbackMock); |
|||
|
|||
// THEN
|
|||
then(stateServiceMock).should().onDeviceInactivity(tenantId, deviceId, time); |
|||
then(tbCallbackMock).should().onSuccess(); |
|||
then(tbCallbackMock).should(never()).onFailure(any()); |
|||
} |
|||
|
|||
@Test |
|||
public void givenProcessingFailure_whenForwardingInactivityMsgToStateService_thenOnFailureCallbackIsCalled() { |
|||
// GIVEN
|
|||
var inactivityMsg = TransportProtos.DeviceInactivityProto.newBuilder() |
|||
.setTenantIdMSB(tenantId.getId().getMostSignificantBits()) |
|||
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) |
|||
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits()) |
|||
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()) |
|||
.setLastInactivityTime(time) |
|||
.build(); |
|||
|
|||
doCallRealMethod().when(defaultTbCoreConsumerServiceMock).forwardToStateService(inactivityMsg, tbCallbackMock); |
|||
|
|||
var runtimeException = new RuntimeException("Something bad happened!"); |
|||
doThrow(runtimeException).when(stateServiceMock).onDeviceInactivity(tenantId, deviceId, time); |
|||
|
|||
// WHEN
|
|||
defaultTbCoreConsumerServiceMock.forwardToStateService(inactivityMsg, tbCallbackMock); |
|||
|
|||
// THEN
|
|||
then(tbCallbackMock).should(never()).onSuccess(); |
|||
then(tbCallbackMock).should().onFailure(runtimeException); |
|||
} |
|||
|
|||
@Test |
|||
public void givenStatsEnabled_whenForwardingInactivityMsgToStateService_thenStatsAreRecorded() { |
|||
// GIVEN
|
|||
ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "stats", statsMock); |
|||
ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "statsEnabled", true); |
|||
|
|||
var inactivityMsg = TransportProtos.DeviceInactivityProto.newBuilder() |
|||
.setTenantIdMSB(tenantId.getId().getMostSignificantBits()) |
|||
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) |
|||
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits()) |
|||
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()) |
|||
.setLastInactivityTime(time) |
|||
.build(); |
|||
|
|||
doCallRealMethod().when(defaultTbCoreConsumerServiceMock).forwardToStateService(inactivityMsg, tbCallbackMock); |
|||
|
|||
// WHEN
|
|||
defaultTbCoreConsumerServiceMock.forwardToStateService(inactivityMsg, tbCallbackMock); |
|||
|
|||
// THEN
|
|||
then(statsMock).should().log(inactivityMsg); |
|||
} |
|||
|
|||
@Test |
|||
public void givenStatsDisabled_whenForwardingInactivityMsgToStateService_thenStatsAreNotRecorded() { |
|||
// GIVEN
|
|||
ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "stats", statsMock); |
|||
ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "statsEnabled", false); |
|||
|
|||
var inactivityMsg = TransportProtos.DeviceInactivityProto.newBuilder() |
|||
.setTenantIdMSB(tenantId.getId().getMostSignificantBits()) |
|||
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) |
|||
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits()) |
|||
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()) |
|||
.setLastInactivityTime(time) |
|||
.build(); |
|||
|
|||
doCallRealMethod().when(defaultTbCoreConsumerServiceMock).forwardToStateService(inactivityMsg, tbCallbackMock); |
|||
|
|||
// WHEN
|
|||
defaultTbCoreConsumerServiceMock.forwardToStateService(inactivityMsg, tbCallbackMock); |
|||
|
|||
// THEN
|
|||
then(statsMock).should(never()).log(inactivityMsg); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,266 @@ |
|||
/** |
|||
* Copyright © 2016-2024 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.state; |
|||
|
|||
import org.junit.jupiter.api.BeforeEach; |
|||
import org.junit.jupiter.api.DisplayName; |
|||
import org.junit.jupiter.api.extension.ExtendWith; |
|||
import org.junit.jupiter.params.ParameterizedTest; |
|||
import org.junit.jupiter.params.provider.Arguments; |
|||
import org.junit.jupiter.params.provider.MethodSource; |
|||
import org.mockito.ArgumentCaptor; |
|||
import org.mockito.Captor; |
|||
import org.mockito.Mock; |
|||
import org.mockito.junit.jupiter.MockitoExtension; |
|||
import org.springframework.test.util.ReflectionTestUtils; |
|||
import org.thingsboard.server.cluster.TbClusterService; |
|||
import org.thingsboard.server.common.data.id.DeviceId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.msg.queue.ServiceType; |
|||
import org.thingsboard.server.common.msg.queue.TbCallback; |
|||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; |
|||
import org.thingsboard.server.gen.transport.TransportProtos; |
|||
import org.thingsboard.server.queue.TbQueueCallback; |
|||
import org.thingsboard.server.queue.TbQueueMsgMetadata; |
|||
import org.thingsboard.server.queue.discovery.PartitionService; |
|||
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; |
|||
|
|||
import java.util.Optional; |
|||
import java.util.UUID; |
|||
import java.util.stream.Stream; |
|||
|
|||
import static org.mockito.ArgumentMatchers.any; |
|||
import static org.mockito.ArgumentMatchers.eq; |
|||
import static org.mockito.BDDMockito.given; |
|||
import static org.mockito.BDDMockito.then; |
|||
import static org.mockito.Mockito.doThrow; |
|||
import static org.mockito.Mockito.never; |
|||
|
|||
@ExtendWith(MockitoExtension.class) |
|||
public class DefaultRuleEngineDeviceStateManagerTest { |
|||
|
|||
@Mock |
|||
private static DeviceStateService deviceStateServiceMock; |
|||
@Mock |
|||
private static TbCallback tbCallbackMock; |
|||
@Mock |
|||
private static TbClusterService clusterServiceMock; |
|||
@Mock |
|||
private static TbQueueMsgMetadata metadataMock; |
|||
|
|||
@Mock |
|||
private TbServiceInfoProvider serviceInfoProviderMock; |
|||
@Mock |
|||
private PartitionService partitionServiceMock; |
|||
|
|||
@Captor |
|||
private static ArgumentCaptor<TbQueueCallback> queueCallbackCaptor; |
|||
|
|||
private static DefaultRuleEngineDeviceStateManager deviceStateManager; |
|||
|
|||
private static final TenantId TENANT_ID = TenantId.fromUUID(UUID.fromString("57ab2e6c-bc4c-11ee-a506-0242ac120002")); |
|||
private static final DeviceId DEVICE_ID = DeviceId.fromString("74a9053e-bc4c-11ee-a506-0242ac120002"); |
|||
private static final long EVENT_TS = System.currentTimeMillis(); |
|||
private static final RuntimeException RUNTIME_EXCEPTION = new RuntimeException("Something bad happened!"); |
|||
private static final TopicPartitionInfo MY_TPI = TopicPartitionInfo.builder().myPartition(true).build(); |
|||
private static final TopicPartitionInfo EXTERNAL_TPI = TopicPartitionInfo.builder().myPartition(false).build(); |
|||
|
|||
@BeforeEach |
|||
public void setup() { |
|||
deviceStateManager = new DefaultRuleEngineDeviceStateManager(serviceInfoProviderMock, partitionServiceMock, Optional.of(deviceStateServiceMock), clusterServiceMock); |
|||
} |
|||
|
|||
@ParameterizedTest |
|||
@DisplayName("Given event should be routed to local service and event processed has succeeded, " + |
|||
"when onDeviceX() is called, then should route event to local service and call onSuccess() callback.") |
|||
@MethodSource |
|||
public void givenRoutedToLocalAndProcessingSuccess_whenOnDeviceAction_thenShouldCallLocalServiceAndSuccessCallback(Runnable onDeviceAction, Runnable actionVerification) { |
|||
// GIVEN
|
|||
given(serviceInfoProviderMock.isService(ServiceType.TB_CORE)).willReturn(true); |
|||
given(partitionServiceMock.resolve(ServiceType.TB_CORE, TENANT_ID, DEVICE_ID)).willReturn(MY_TPI); |
|||
|
|||
onDeviceAction.run(); |
|||
|
|||
// THEN
|
|||
actionVerification.run(); |
|||
|
|||
then(clusterServiceMock).shouldHaveNoInteractions(); |
|||
then(tbCallbackMock).should().onSuccess(); |
|||
then(tbCallbackMock).should(never()).onFailure(any()); |
|||
} |
|||
|
|||
private static Stream<Arguments> givenRoutedToLocalAndProcessingSuccess_whenOnDeviceAction_thenShouldCallLocalServiceAndSuccessCallback() { |
|||
return Stream.of( |
|||
Arguments.of( |
|||
(Runnable) () -> deviceStateManager.onDeviceConnect(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock), |
|||
(Runnable) () -> then(deviceStateServiceMock).should().onDeviceConnect(TENANT_ID, DEVICE_ID, EVENT_TS) |
|||
), |
|||
Arguments.of( |
|||
(Runnable) () -> deviceStateManager.onDeviceActivity(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock), |
|||
(Runnable) () -> then(deviceStateServiceMock).should().onDeviceActivity(TENANT_ID, DEVICE_ID, EVENT_TS) |
|||
), |
|||
Arguments.of( |
|||
(Runnable) () -> deviceStateManager.onDeviceDisconnect(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock), |
|||
(Runnable) () -> then(deviceStateServiceMock).should().onDeviceDisconnect(TENANT_ID, DEVICE_ID, EVENT_TS) |
|||
), |
|||
Arguments.of( |
|||
(Runnable) () -> deviceStateManager.onDeviceInactivity(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock), |
|||
(Runnable) () -> then(deviceStateServiceMock).should().onDeviceInactivity(TENANT_ID, DEVICE_ID, EVENT_TS) |
|||
) |
|||
); |
|||
} |
|||
|
|||
@ParameterizedTest |
|||
@DisplayName("Given event should be routed to local service and event processed has failed, " + |
|||
"when onDeviceX() is called, then should route event to local service and call onFailure() callback.") |
|||
@MethodSource |
|||
public void givenRoutedToLocalAndProcessingFailure_whenOnDeviceAction_thenShouldCallLocalServiceAndFailureCallback( |
|||
Runnable exceptionThrowSetup, Runnable onDeviceAction, Runnable actionVerification |
|||
) { |
|||
// GIVEN
|
|||
given(serviceInfoProviderMock.isService(ServiceType.TB_CORE)).willReturn(true); |
|||
given(partitionServiceMock.resolve(ServiceType.TB_CORE, TENANT_ID, DEVICE_ID)).willReturn(MY_TPI); |
|||
|
|||
exceptionThrowSetup.run(); |
|||
|
|||
// WHEN
|
|||
onDeviceAction.run(); |
|||
|
|||
// THEN
|
|||
actionVerification.run(); |
|||
|
|||
then(clusterServiceMock).shouldHaveNoInteractions(); |
|||
then(tbCallbackMock).should(never()).onSuccess(); |
|||
then(tbCallbackMock).should().onFailure(RUNTIME_EXCEPTION); |
|||
} |
|||
|
|||
private static Stream<Arguments> givenRoutedToLocalAndProcessingFailure_whenOnDeviceAction_thenShouldCallLocalServiceAndFailureCallback() { |
|||
return Stream.of( |
|||
Arguments.of( |
|||
(Runnable) () -> doThrow(RUNTIME_EXCEPTION).when(deviceStateServiceMock).onDeviceConnect(TENANT_ID, DEVICE_ID, EVENT_TS), |
|||
(Runnable) () -> deviceStateManager.onDeviceConnect(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock), |
|||
(Runnable) () -> then(deviceStateServiceMock).should().onDeviceConnect(TENANT_ID, DEVICE_ID, EVENT_TS) |
|||
), |
|||
Arguments.of( |
|||
(Runnable) () -> doThrow(RUNTIME_EXCEPTION).when(deviceStateServiceMock).onDeviceActivity(TENANT_ID, DEVICE_ID, EVENT_TS), |
|||
(Runnable) () -> deviceStateManager.onDeviceActivity(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock), |
|||
(Runnable) () -> then(deviceStateServiceMock).should().onDeviceActivity(TENANT_ID, DEVICE_ID, EVENT_TS) |
|||
), |
|||
Arguments.of( |
|||
(Runnable) () -> doThrow(RUNTIME_EXCEPTION).when(deviceStateServiceMock).onDeviceDisconnect(TENANT_ID, DEVICE_ID, EVENT_TS), |
|||
(Runnable) () -> deviceStateManager.onDeviceDisconnect(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock), |
|||
(Runnable) () -> then(deviceStateServiceMock).should().onDeviceDisconnect(TENANT_ID, DEVICE_ID, EVENT_TS) |
|||
), |
|||
Arguments.of( |
|||
(Runnable) () -> doThrow(RUNTIME_EXCEPTION).when(deviceStateServiceMock).onDeviceInactivity(TENANT_ID, DEVICE_ID, EVENT_TS), |
|||
(Runnable) () -> deviceStateManager.onDeviceInactivity(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock), |
|||
(Runnable) () -> then(deviceStateServiceMock).should().onDeviceInactivity(TENANT_ID, DEVICE_ID, EVENT_TS) |
|||
) |
|||
); |
|||
} |
|||
|
|||
@ParameterizedTest |
|||
@DisplayName("Given event should be routed to external service, " + |
|||
"when onDeviceX() is called, then should send correct queue message to external service with correct callback object.") |
|||
@MethodSource |
|||
public void givenRoutedToExternal_whenOnDeviceAction_thenShouldSendQueueMsgToExternalServiceWithCorrectCallback(Runnable onDeviceAction, Runnable actionVerification) { |
|||
// WHEN
|
|||
ReflectionTestUtils.setField(deviceStateManager, "deviceStateService", Optional.empty()); |
|||
given(serviceInfoProviderMock.isService(ServiceType.TB_CORE)).willReturn(false); |
|||
given(partitionServiceMock.resolve(ServiceType.TB_CORE, TENANT_ID, DEVICE_ID)).willReturn(EXTERNAL_TPI); |
|||
|
|||
onDeviceAction.run(); |
|||
|
|||
// THEN
|
|||
actionVerification.run(); |
|||
|
|||
TbQueueCallback callback = queueCallbackCaptor.getValue(); |
|||
callback.onSuccess(metadataMock); |
|||
then(tbCallbackMock).should().onSuccess(); |
|||
callback.onFailure(RUNTIME_EXCEPTION); |
|||
then(tbCallbackMock).should().onFailure(RUNTIME_EXCEPTION); |
|||
} |
|||
|
|||
private static Stream<Arguments> givenRoutedToExternal_whenOnDeviceAction_thenShouldSendQueueMsgToExternalServiceWithCorrectCallback() { |
|||
return Stream.of( |
|||
Arguments.of( |
|||
(Runnable) () -> deviceStateManager.onDeviceConnect(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock), |
|||
(Runnable) () -> { |
|||
var deviceConnectMsg = TransportProtos.DeviceConnectProto.newBuilder() |
|||
.setTenantIdMSB(TENANT_ID.getId().getMostSignificantBits()) |
|||
.setTenantIdLSB(TENANT_ID.getId().getLeastSignificantBits()) |
|||
.setDeviceIdMSB(DEVICE_ID.getId().getMostSignificantBits()) |
|||
.setDeviceIdLSB(DEVICE_ID.getId().getLeastSignificantBits()) |
|||
.setLastConnectTime(EVENT_TS) |
|||
.build(); |
|||
var toCoreMsg = TransportProtos.ToCoreMsg.newBuilder() |
|||
.setDeviceConnectMsg(deviceConnectMsg) |
|||
.build(); |
|||
then(clusterServiceMock).should().pushMsgToCore(eq(EXTERNAL_TPI), any(UUID.class), eq(toCoreMsg), queueCallbackCaptor.capture()); |
|||
} |
|||
), |
|||
Arguments.of( |
|||
(Runnable) () -> deviceStateManager.onDeviceActivity(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock), |
|||
(Runnable) () -> { |
|||
var deviceActivityMsg = TransportProtos.DeviceActivityProto.newBuilder() |
|||
.setTenantIdMSB(TENANT_ID.getId().getMostSignificantBits()) |
|||
.setTenantIdLSB(TENANT_ID.getId().getLeastSignificantBits()) |
|||
.setDeviceIdMSB(DEVICE_ID.getId().getMostSignificantBits()) |
|||
.setDeviceIdLSB(DEVICE_ID.getId().getLeastSignificantBits()) |
|||
.setLastActivityTime(EVENT_TS) |
|||
.build(); |
|||
var toCoreMsg = TransportProtos.ToCoreMsg.newBuilder() |
|||
.setDeviceActivityMsg(deviceActivityMsg) |
|||
.build(); |
|||
then(clusterServiceMock).should().pushMsgToCore(eq(EXTERNAL_TPI), any(UUID.class), eq(toCoreMsg), queueCallbackCaptor.capture()); |
|||
} |
|||
), |
|||
Arguments.of( |
|||
(Runnable) () -> deviceStateManager.onDeviceDisconnect(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock), |
|||
(Runnable) () -> { |
|||
var deviceDisconnectMsg = TransportProtos.DeviceDisconnectProto.newBuilder() |
|||
.setTenantIdMSB(TENANT_ID.getId().getMostSignificantBits()) |
|||
.setTenantIdLSB(TENANT_ID.getId().getLeastSignificantBits()) |
|||
.setDeviceIdMSB(DEVICE_ID.getId().getMostSignificantBits()) |
|||
.setDeviceIdLSB(DEVICE_ID.getId().getLeastSignificantBits()) |
|||
.setLastDisconnectTime(EVENT_TS) |
|||
.build(); |
|||
var toCoreMsg = TransportProtos.ToCoreMsg.newBuilder() |
|||
.setDeviceDisconnectMsg(deviceDisconnectMsg) |
|||
.build(); |
|||
then(clusterServiceMock).should().pushMsgToCore(eq(EXTERNAL_TPI), any(UUID.class), eq(toCoreMsg), queueCallbackCaptor.capture()); |
|||
} |
|||
), |
|||
Arguments.of( |
|||
(Runnable) () -> deviceStateManager.onDeviceInactivity(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock), |
|||
(Runnable) () -> { |
|||
var deviceInactivityMsg = TransportProtos.DeviceInactivityProto.newBuilder() |
|||
.setTenantIdMSB(TENANT_ID.getId().getMostSignificantBits()) |
|||
.setTenantIdLSB(TENANT_ID.getId().getLeastSignificantBits()) |
|||
.setDeviceIdMSB(DEVICE_ID.getId().getMostSignificantBits()) |
|||
.setDeviceIdLSB(DEVICE_ID.getId().getLeastSignificantBits()) |
|||
.setLastInactivityTime(EVENT_TS) |
|||
.build(); |
|||
var toCoreMsg = TransportProtos.ToCoreMsg.newBuilder() |
|||
.setDeviceInactivityMsg(deviceInactivityMsg) |
|||
.build(); |
|||
then(clusterServiceMock).should().pushMsgToCore(eq(EXTERNAL_TPI), any(UUID.class), eq(toCoreMsg), queueCallbackCaptor.capture()); |
|||
} |
|||
) |
|||
); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,47 @@ |
|||
/** |
|||
* Copyright © 2016-2024 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.queue.common; |
|||
|
|||
import org.thingsboard.server.queue.TbQueueCallback; |
|||
import org.thingsboard.server.queue.TbQueueMsgMetadata; |
|||
|
|||
import java.util.function.Consumer; |
|||
|
|||
public class SimpleTbQueueCallback implements TbQueueCallback { |
|||
|
|||
private final Consumer<TbQueueMsgMetadata> onSuccess; |
|||
private final Consumer<Throwable> onFailure; |
|||
|
|||
public SimpleTbQueueCallback(Consumer<TbQueueMsgMetadata> onSuccess, Consumer<Throwable> onFailure) { |
|||
this.onSuccess = onSuccess; |
|||
this.onFailure = onFailure; |
|||
} |
|||
|
|||
@Override |
|||
public void onSuccess(TbQueueMsgMetadata metadata) { |
|||
if (onSuccess != null) { |
|||
onSuccess.accept(metadata); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public void onFailure(Throwable t) { |
|||
if (onFailure != null) { |
|||
onFailure.accept(t); |
|||
} |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,63 @@ |
|||
/** |
|||
* Copyright © 2016-2024 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.transport.lwm2m.server.store.util; |
|||
|
|||
import com.eclipsesource.json.Json; |
|||
import com.eclipsesource.json.JsonObject; |
|||
import org.apache.commons.lang3.NotImplementedException; |
|||
import org.eclipse.leshan.core.request.Identity; |
|||
import org.eclipse.leshan.core.util.Hex; |
|||
|
|||
import java.security.PublicKey; |
|||
|
|||
public class LwM2MIdentitySerDes { |
|||
|
|||
private static final String KEY_ADDRESS = "address"; |
|||
private static final String KEY_PORT = "port"; |
|||
private static final String KEY_ID = "id"; |
|||
private static final String KEY_CN = "cn"; |
|||
private static final String KEY_RPK = "rpk"; |
|||
protected static final String KEY_LWM2MIDENTITY_TYPE = "type"; |
|||
protected static final String LWM2MIDENTITY_TYPE_UNSECURE = "unsecure"; |
|||
protected static final String LWM2MIDENTITY_TYPE_PSK = "psk"; |
|||
protected static final String LWM2MIDENTITY_TYPE_X509 = "x509"; |
|||
protected static final String LWM2MIDENTITY_TYPE_RPK = "rpk"; |
|||
|
|||
public static JsonObject serialize(Identity identity) { |
|||
JsonObject o = Json.object(); |
|||
|
|||
if (identity.isPSK()) { |
|||
o.set(KEY_LWM2MIDENTITY_TYPE, LWM2MIDENTITY_TYPE_PSK); |
|||
o.set(KEY_ID, identity.getPskIdentity()); |
|||
} else if (identity.isRPK()) { |
|||
o.set(KEY_LWM2MIDENTITY_TYPE, LWM2MIDENTITY_TYPE_RPK); |
|||
PublicKey publicKey = identity.getRawPublicKey(); |
|||
o.set(KEY_RPK, Hex.encodeHexString(publicKey.getEncoded())); |
|||
} else if (identity.isX509()) { |
|||
o.set(KEY_LWM2MIDENTITY_TYPE, LWM2MIDENTITY_TYPE_X509); |
|||
o.set(KEY_CN, identity.getX509CommonName()); |
|||
} else { |
|||
o.set(KEY_LWM2MIDENTITY_TYPE, LWM2MIDENTITY_TYPE_UNSECURE); |
|||
o.set(KEY_ADDRESS, identity.getPeerAddress().getHostString()); |
|||
o.set(KEY_PORT, identity.getPeerAddress().getPort()); |
|||
} |
|||
return o; |
|||
} |
|||
|
|||
public static Identity deserialize(JsonObject peer) { |
|||
throw new NotImplementedException(); |
|||
} |
|||
} |
|||
@ -0,0 +1,105 @@ |
|||
/** |
|||
* Copyright © 2016-2024 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.transport.lwm2m.bootstrap; |
|||
|
|||
import org.eclipse.californium.core.network.CoapEndpoint; |
|||
import org.eclipse.californium.scandium.config.DtlsConnectorConfig; |
|||
import org.eclipse.leshan.server.californium.LeshanServer; |
|||
import org.eclipse.leshan.server.californium.bootstrap.LeshanBootstrapServer; |
|||
import org.eclipse.leshan.server.californium.registration.CaliforniumRegistrationStore; |
|||
import org.junit.jupiter.api.Test; |
|||
import org.junit.jupiter.api.extension.ExtendWith; |
|||
import org.mockito.Mock; |
|||
import org.mockito.junit.jupiter.MockitoExtension; |
|||
import org.springframework.test.util.ReflectionTestUtils; |
|||
import org.thingsboard.server.cache.ota.OtaPackageDataCache; |
|||
import org.thingsboard.server.common.transport.TransportService; |
|||
import org.thingsboard.server.transport.lwm2m.bootstrap.secure.TbLwM2MDtlsBootstrapCertificateVerifier; |
|||
import org.thingsboard.server.transport.lwm2m.bootstrap.store.LwM2MBootstrapSecurityStore; |
|||
import org.thingsboard.server.transport.lwm2m.bootstrap.store.LwM2MInMemoryBootstrapConfigStore; |
|||
import org.thingsboard.server.transport.lwm2m.config.LwM2MTransportBootstrapConfig; |
|||
import org.thingsboard.server.transport.lwm2m.config.LwM2MTransportServerConfig; |
|||
import org.thingsboard.server.transport.lwm2m.secure.TbLwM2MAuthorizer; |
|||
import org.thingsboard.server.transport.lwm2m.secure.TbLwM2MDtlsCertificateVerifier; |
|||
import org.thingsboard.server.transport.lwm2m.server.store.TbSecurityStore; |
|||
import org.thingsboard.server.transport.lwm2m.server.uplink.LwM2mUplinkMsgHandler; |
|||
|
|||
import static org.assertj.core.api.Assertions.assertThat; |
|||
import static org.mockito.BDDMockito.when; |
|||
|
|||
@ExtendWith(MockitoExtension.class) |
|||
public class LwM2MTransportBootstrapServiceTest { |
|||
|
|||
@Mock |
|||
private LwM2MTransportServerConfig serverConfig; |
|||
@Mock |
|||
private LwM2MTransportBootstrapConfig bootstrapConfig; |
|||
@Mock |
|||
private LwM2MBootstrapSecurityStore lwM2MBootstrapSecurityStore; |
|||
@Mock |
|||
private LwM2MInMemoryBootstrapConfigStore lwM2MInMemoryBootstrapConfigStore; |
|||
@Mock |
|||
private TransportService transportService; |
|||
@Mock |
|||
private TbLwM2MDtlsBootstrapCertificateVerifier certificateVerifier; |
|||
|
|||
|
|||
@Test |
|||
public void getLHServer_creates_ConnectionIdGenerator_when_connection_id_length_not_null(){ |
|||
final Integer CONNECTION_ID_LENGTH = 6; |
|||
when(serverConfig.getDtlsConnectionIdLength()).thenReturn(CONNECTION_ID_LENGTH); |
|||
var lwM2MBootstrapService = createLwM2MBootstrapService(); |
|||
|
|||
var server = lwM2MBootstrapService.getLhBootstrapServer(); |
|||
var securedEndpoint = (CoapEndpoint) ReflectionTestUtils.getField(server, "securedEndpoint"); |
|||
assertThat(securedEndpoint).isNotNull(); |
|||
|
|||
var config = (DtlsConnectorConfig) ReflectionTestUtils.getField(securedEndpoint.getConnector(), "config"); |
|||
assertThat(config).isNotNull(); |
|||
assertThat(config.getConnectionIdGenerator()).isNotNull(); |
|||
assertThat((Integer) ReflectionTestUtils.getField(config.getConnectionIdGenerator(), "connectionIdLength")) |
|||
.isEqualTo(CONNECTION_ID_LENGTH); |
|||
} |
|||
|
|||
@Test |
|||
public void getLHServer_creates_no_ConnectionIdGenerator_when_connection_id_length_is_null(){ |
|||
when(serverConfig.getDtlsConnectionIdLength()).thenReturn(null); |
|||
var lwM2MBootstrapService = createLwM2MBootstrapService(); |
|||
|
|||
var server = lwM2MBootstrapService.getLhBootstrapServer(); |
|||
var securedEndpoint = (CoapEndpoint) ReflectionTestUtils.getField(server, "securedEndpoint"); |
|||
assertThat(securedEndpoint).isNotNull(); |
|||
|
|||
var config = (DtlsConnectorConfig) ReflectionTestUtils.getField(securedEndpoint.getConnector(), "config"); |
|||
assertThat(config).isNotNull(); |
|||
assertThat(config.getConnectionIdGenerator()).isNull(); |
|||
} |
|||
|
|||
private LwM2MTransportBootstrapService createLwM2MBootstrapService() { |
|||
setDefaultConfigVariables(); |
|||
return new LwM2MTransportBootstrapService(serverConfig, bootstrapConfig, lwM2MBootstrapSecurityStore, |
|||
lwM2MInMemoryBootstrapConfigStore, transportService, certificateVerifier); |
|||
} |
|||
|
|||
private void setDefaultConfigVariables(){ |
|||
when(bootstrapConfig.getPort()).thenReturn(5683); |
|||
when(bootstrapConfig.getSecurePort()).thenReturn(5684); |
|||
when(serverConfig.isRecommendedCiphers()).thenReturn(false); |
|||
when(serverConfig.getDtlsRetransmissionTimeout()).thenReturn(9000); |
|||
} |
|||
|
|||
|
|||
} |
|||
@ -0,0 +1,61 @@ |
|||
/** |
|||
* Copyright © 2016-2024 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.transport.lwm2m.config; |
|||
|
|||
import org.junit.jupiter.api.Test; |
|||
import org.junit.jupiter.api.extension.ExtendWith; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.boot.context.properties.EnableConfigurationProperties; |
|||
import org.springframework.boot.test.context.SpringBootContextLoader; |
|||
import org.springframework.boot.test.mock.mockito.MockBean; |
|||
import org.springframework.test.context.ContextConfiguration; |
|||
import org.springframework.test.context.TestPropertySource; |
|||
import org.springframework.test.context.junit.jupiter.SpringExtension; |
|||
import org.thingsboard.server.common.transport.config.ssl.SslCredentialsConfig; |
|||
|
|||
import static org.assertj.core.api.Assertions.assertThat; |
|||
|
|||
@ExtendWith(SpringExtension.class) |
|||
@EnableConfigurationProperties(value = LwM2MTransportServerConfig.class) |
|||
@ContextConfiguration(classes = {LwM2MTransportServerConfig.class}, loader = SpringBootContextLoader.class) |
|||
@TestPropertySource(properties = { |
|||
"transport.sessions.report_timeout=10", |
|||
"transport.lwm2m.security.recommended_ciphers=true", |
|||
"transport.lwm2m.security.recommended_supported_groups=true", |
|||
"transport.lwm2m.downlink_pool_size=10", |
|||
"transport.lwm2m.uplink_pool_size=10", |
|||
"transport.lwm2m.ota_pool_size=10", |
|||
"transport.lwm2m.clean_period_in_sec=2", |
|||
"transport.lwm2m.dtls.connection_id_length=" |
|||
|
|||
}) |
|||
class LwM2MTransportServerConfigTest { |
|||
|
|||
@MockBean(name = "lwm2mServerCredentials") |
|||
private SslCredentialsConfig credentialsConfig; |
|||
|
|||
@MockBean(name = "lwm2mTrustCredentials") |
|||
private SslCredentialsConfig trustCredentialsConfig; |
|||
|
|||
@Autowired |
|||
private LwM2MTransportServerConfig serverConfig; |
|||
|
|||
@Test |
|||
void getDtlsConnectionIdLength_return_null_is_property_is_empty() { |
|||
// note: transport.lwm2m.dtls.connect_id_length is set in TestPropertySource
|
|||
assertThat(serverConfig.getDtlsConnectionIdLength()).isNull(); |
|||
} |
|||
} |
|||
@ -0,0 +1,109 @@ |
|||
/** |
|||
* Copyright © 2016-2024 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.transport.lwm2m.server; |
|||
|
|||
import org.eclipse.californium.core.network.CoapEndpoint; |
|||
import org.eclipse.californium.scandium.config.DtlsConnectorConfig; |
|||
import org.eclipse.leshan.server.californium.LeshanServer; |
|||
import org.eclipse.leshan.server.californium.registration.CaliforniumRegistrationStore; |
|||
import org.junit.jupiter.api.Test; |
|||
import org.junit.jupiter.api.extension.ExtendWith; |
|||
import org.mockito.Mock; |
|||
import org.mockito.junit.jupiter.MockitoExtension; |
|||
import org.springframework.test.util.ReflectionTestUtils; |
|||
import org.thingsboard.server.cache.ota.OtaPackageDataCache; |
|||
import org.thingsboard.server.transport.lwm2m.config.LwM2MTransportServerConfig; |
|||
import org.thingsboard.server.transport.lwm2m.secure.TbLwM2MAuthorizer; |
|||
import org.thingsboard.server.transport.lwm2m.secure.TbLwM2MDtlsCertificateVerifier; |
|||
import org.thingsboard.server.transport.lwm2m.server.store.TbSecurityStore; |
|||
import org.thingsboard.server.transport.lwm2m.server.uplink.LwM2mUplinkMsgHandler; |
|||
|
|||
import static org.assertj.core.api.Assertions.assertThat; |
|||
import static org.mockito.BDDMockito.when; |
|||
|
|||
@ExtendWith(MockitoExtension.class) |
|||
public class DefaultLwM2mTransportServiceTest { |
|||
|
|||
@Mock |
|||
private LwM2mTransportContext context; |
|||
|
|||
@Mock |
|||
private LwM2MTransportServerConfig config; |
|||
@Mock |
|||
private OtaPackageDataCache otaPackageDataCache; |
|||
@Mock |
|||
private LwM2mUplinkMsgHandler handler; |
|||
@Mock |
|||
private CaliforniumRegistrationStore registrationStore; |
|||
@Mock |
|||
private TbSecurityStore securityStore; |
|||
@Mock |
|||
private TbLwM2MDtlsCertificateVerifier certificateVerifier; |
|||
@Mock |
|||
private TbLwM2MAuthorizer authorizer; |
|||
@Mock |
|||
private LwM2mVersionedModelProvider modelProvider; |
|||
|
|||
|
|||
@Test |
|||
public void getLHServer_creates_ConnectionIdGenerator_when_connection_id_length_not_null(){ |
|||
final Integer CONNECTION_ID_LENGTH = 6; |
|||
when(config.getDtlsConnectionIdLength()).thenReturn(CONNECTION_ID_LENGTH); |
|||
var lwm2mService = createLwM2MService(); |
|||
|
|||
LeshanServer server = ReflectionTestUtils.invokeMethod(lwm2mService, "getLhServer"); |
|||
|
|||
assertThat(server).isNotNull(); |
|||
var securedEndpoint = (CoapEndpoint) ReflectionTestUtils.getField(server, "securedEndpoint"); |
|||
assertThat(securedEndpoint).isNotNull(); |
|||
|
|||
var config = (DtlsConnectorConfig) ReflectionTestUtils.getField(securedEndpoint.getConnector(), "config"); |
|||
assertThat(config).isNotNull(); |
|||
assertThat(config.getConnectionIdGenerator()).isNotNull(); |
|||
assertThat((Integer) ReflectionTestUtils.getField(config.getConnectionIdGenerator(), "connectionIdLength")) |
|||
.isEqualTo(CONNECTION_ID_LENGTH); |
|||
} |
|||
|
|||
@Test |
|||
public void getLHServer_creates_no_ConnectionIdGenerator_when_connection_id_length_is_null(){ |
|||
when(config.getDtlsConnectionIdLength()).thenReturn(null); |
|||
var lwm2mService = createLwM2MService(); |
|||
|
|||
LeshanServer server = ReflectionTestUtils.invokeMethod(lwm2mService, "getLhServer"); |
|||
|
|||
assertThat(server).isNotNull(); |
|||
var securedEndpoint = (CoapEndpoint) ReflectionTestUtils.getField(server, "securedEndpoint"); |
|||
assertThat(securedEndpoint).isNotNull(); |
|||
var config = (DtlsConnectorConfig) ReflectionTestUtils.getField(securedEndpoint.getConnector(), "config"); |
|||
assertThat(config).isNotNull(); |
|||
assertThat(config.getConnectionIdGenerator()).isNull(); |
|||
} |
|||
|
|||
private DefaultLwM2mTransportService createLwM2MService() { |
|||
setDefaultConfigVariables(); |
|||
return new DefaultLwM2mTransportService(context, config, otaPackageDataCache, handler, registrationStore, |
|||
securityStore, certificateVerifier, authorizer, modelProvider); |
|||
} |
|||
|
|||
private void setDefaultConfigVariables(){ |
|||
when(config.getPort()).thenReturn(5683); |
|||
when(config.getSecurePort()).thenReturn(5684); |
|||
when(config.isRecommendedCiphers()).thenReturn(false); |
|||
when(config.getDtlsRetransmissionTimeout()).thenReturn(9000); |
|||
} |
|||
|
|||
|
|||
} |
|||
@ -0,0 +1,265 @@ |
|||
/** |
|||
* Copyright © 2016-2024 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.transport.lwm2m.server.store; |
|||
|
|||
import org.eclipse.leshan.core.link.Link; |
|||
import org.eclipse.leshan.core.request.Identity; |
|||
import org.eclipse.leshan.core.util.NamedThreadFactory; |
|||
import org.eclipse.leshan.server.redis.serialization.RegistrationSerDes; |
|||
import org.eclipse.leshan.server.registration.Registration; |
|||
import org.eclipse.leshan.server.registration.RegistrationUpdate; |
|||
import org.junit.jupiter.api.BeforeEach; |
|||
import org.junit.jupiter.api.Test; |
|||
import org.junit.jupiter.api.extension.ExtendWith; |
|||
import org.mockito.junit.jupiter.MockitoExtension; |
|||
import org.springframework.data.redis.connection.RedisConnection; |
|||
import org.springframework.data.redis.connection.RedisConnectionFactory; |
|||
import org.springframework.integration.redis.util.RedisLockRegistry; |
|||
import org.springframework.test.util.ReflectionTestUtils; |
|||
|
|||
import java.net.InetAddress; |
|||
import java.net.InetSocketAddress; |
|||
import java.net.UnknownHostException; |
|||
import java.util.concurrent.Executors; |
|||
import java.util.concurrent.ScheduledExecutorService; |
|||
import java.util.concurrent.locks.Lock; |
|||
|
|||
import static java.nio.charset.StandardCharsets.UTF_8; |
|||
import static org.assertj.core.api.Assertions.assertThat; |
|||
import static org.mockito.ArgumentMatchers.any; |
|||
import static org.mockito.ArgumentMatchers.eq; |
|||
import static org.mockito.Mockito.lenient; |
|||
import static org.mockito.Mockito.mock; |
|||
import static org.mockito.Mockito.times; |
|||
import static org.mockito.Mockito.verify; |
|||
import static org.thingsboard.server.transport.lwm2m.server.store.TbLwM2mRedisRegistrationStore.DEFAULT_CLEAN_LIMIT; |
|||
import static org.thingsboard.server.transport.lwm2m.server.store.TbLwM2mRedisRegistrationStore.DEFAULT_CLEAN_PERIOD; |
|||
import static org.thingsboard.server.transport.lwm2m.server.store.TbLwM2mRedisRegistrationStore.DEFAULT_GRACE_PERIOD; |
|||
|
|||
|
|||
@ExtendWith(MockitoExtension.class) |
|||
class TbLwM2mRedisRegistrationStoreTest { |
|||
|
|||
RedisConnectionFactory connectionFactory; |
|||
RedisConnection connection; |
|||
RedisLockRegistry lockRegistry; |
|||
|
|||
TbLwM2mRedisRegistrationStore registrationStore; |
|||
|
|||
@BeforeEach |
|||
void setUp() { |
|||
lockRegistry = mock(RedisLockRegistry.class); |
|||
lenient().when(lockRegistry.obtain(any())).thenReturn(mock(Lock.class)); |
|||
connection = mock(RedisConnection.class); |
|||
//when(connection.set(any(byte[].class), any(byte[].class))).
|
|||
connectionFactory = mock(RedisConnectionFactory.class); |
|||
lenient().when(connectionFactory.getConnection()).thenReturn(connection); |
|||
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1, |
|||
new NamedThreadFactory(String.format("RedisRegistrationStore Cleaner (%ds)", DEFAULT_CLEAN_PERIOD))); |
|||
registrationStore = new TbLwM2mRedisRegistrationStore(connectionFactory, executorService, |
|||
DEFAULT_CLEAN_PERIOD, DEFAULT_GRACE_PERIOD, DEFAULT_CLEAN_LIMIT, lockRegistry); |
|||
} |
|||
|
|||
@Test |
|||
void testAddRegistrationWithNoOldRegistration() { |
|||
setOldRegistration(null); |
|||
Registration registration = buildRegistration(); |
|||
|
|||
assertThat(registrationStore.addRegistration(registration)).isNull(); |
|||
|
|||
byte[] endpoint = registration.getEndpoint().getBytes(UTF_8); |
|||
verify(connection, times(1)).set(getRegIdKey(registration), endpoint); |
|||
verify(connection, times(1)).set(getRegAddrKey(registration), endpoint); |
|||
verify(connection, times(1)).set(getRegIdentityKey(registration), endpoint); |
|||
verify(connection, times(3)).set(any(byte[].class), any(byte[].class)); |
|||
verify(connection, times(0)).del(any(byte[].class)); |
|||
} |
|||
|
|||
@Test |
|||
void testAddRegistrationWithOldRegistrationEqualToCurrent(){ |
|||
var oldRegistration = buildRegistration(); |
|||
setOldRegistration(oldRegistration); |
|||
Registration registration = buildRegistration(); |
|||
|
|||
var deregistration = registrationStore.addRegistration(registration); |
|||
|
|||
assertThat(deregistration.getRegistration()).isEqualTo(oldRegistration); |
|||
|
|||
byte[] endpoint = registration.getEndpoint().getBytes(UTF_8); |
|||
verify(connection, times(1)).set(getRegIdKey(registration), endpoint); |
|||
verify(connection, times(1)).set(getRegAddrKey(registration), endpoint); |
|||
verify(connection, times(1)).set(getRegIdentityKey(registration), endpoint); |
|||
verify(connection, times(3)).set(any(byte[].class), any(byte[].class)); |
|||
verify(connection, times(1)).del(getTknsRegIdKey(oldRegistration)); |
|||
verify(connection, times(1)).del(any(byte[].class)); |
|||
} |
|||
|
|||
@Test |
|||
void testAddRegistrationRemovesIndexes(){ |
|||
var oldRegistration = buildRegistration(Identity.unsecure(getTestAddress(1234))); |
|||
setOldRegistration(oldRegistration); |
|||
var registration = buildRegistration(Identity.unsecure(getTestAddress(2345))); |
|||
|
|||
var deregistration = registrationStore.addRegistration(registration); |
|||
|
|||
assertThat(deregistration.getRegistration()).isEqualTo(oldRegistration); |
|||
byte[] endpoint = registration.getEndpoint().getBytes(UTF_8); |
|||
verify(connection, times(1)).set(getRegIdKey(registration), endpoint); |
|||
verify(connection, times(1)).set(getRegAddrKey(registration), endpoint); |
|||
verify(connection, times(1)).set(getRegIdentityKey(registration), endpoint); |
|||
verify(connection, times(3)).set(any(byte[].class), any(byte[].class)); |
|||
verify(connection, times(1)).del(getRegAddrKey(oldRegistration)); |
|||
verify(connection, times(1)).del(getRegIdentityKey(oldRegistration)); |
|||
verify(connection, times(1)).del(getTknsRegIdKey(oldRegistration)); |
|||
verify(connection, times(3)).del(any(byte[].class)); |
|||
} |
|||
|
|||
@Test |
|||
void testUpdateRegistrationWhenNoRegistrationFound() { |
|||
setOldRegistration(null); |
|||
Registration registration = buildRegistration(); |
|||
RegistrationUpdate update = createUpdateFromRegistration(registration); |
|||
|
|||
assertThat(registrationStore.updateRegistration(update)).isNull(); |
|||
|
|||
verify(connection, times(1)).get(getRegIdKey(registration)); |
|||
verify(connection, times(1)).get(any(byte[].class)); |
|||
verify(connection, times(0)).del(any(byte[].class)); |
|||
} |
|||
|
|||
@Test |
|||
void testUpdateRegistrationWithSameRegistration() { |
|||
Registration registration = buildRegistration(); |
|||
setOldRegistration(registration); |
|||
RegistrationUpdate update = createUpdateFromRegistration(registration); |
|||
|
|||
assertThat(registrationStore.updateRegistration(update)).isNotNull(); |
|||
|
|||
var endpoint = registration.getEndpoint().getBytes(UTF_8); |
|||
// check registration and addressIndex here updated
|
|||
verify(connection, times(1)).set(eq(getEndpointKey(endpoint)), any(byte[].class)); |
|||
verify(connection, times(1)).set(getRegAddrKey(registration), endpoint); |
|||
verify(connection, times(2)).set(any(byte[].class), any(byte[].class)); |
|||
verify(connection, times(0)).del(any(byte[].class)); |
|||
} |
|||
|
|||
@Test |
|||
void testUpdateRegistrationWithRegistrationFromSecureIdentitiesWithDifferentAddress() { |
|||
Registration oldRegistration = buildRegistration(Identity.psk(getTestAddress(1234), "my:psk")); |
|||
setOldRegistration(oldRegistration); |
|||
Registration newRegistration = buildRegistration(Identity.psk(getTestAddress(2345), "my:psk")); |
|||
RegistrationUpdate update = createUpdateFromRegistration(newRegistration); |
|||
assertThat(oldRegistration.getEndpoint()).isEqualTo(newRegistration.getEndpoint()); |
|||
|
|||
assertThat(registrationStore.updateRegistration(update)).isNotNull(); |
|||
|
|||
var endpoint = newRegistration.getEndpoint().getBytes(UTF_8); |
|||
// check registration and addressIndex here updated
|
|||
verify(connection, times(1)).set(eq(getEndpointKey(endpoint)), any(byte[].class)); |
|||
verify(connection, times(1)).set(getRegAddrKey(newRegistration), endpoint); |
|||
// check old AddrIndex has been removed
|
|||
verify(connection, times(1)).del(getRegAddrKey(oldRegistration)); |
|||
// check identityIndex has not been removed
|
|||
verify(connection, times(0)).del(getRegIdentityKey(oldRegistration)); |
|||
// check only one key (AddrIndex) in total was removed
|
|||
verify(connection, times(1)).del(any(byte[].class)); |
|||
} |
|||
|
|||
@Test |
|||
void testGetRegistrationByIdentityReturnsRegistrationForSecureIdentityWithDifferentAddress() { |
|||
Registration registration = buildRegistration(Identity.psk(getTestAddress(1234), "my:psk")); |
|||
setOldRegistration(registration); |
|||
Identity sameIdentityWithDifferentAddress = Identity.psk(getTestAddress(2345), "my:psk"); |
|||
|
|||
Registration retrievedRegistration = registrationStore.getRegistrationByIdentity(sameIdentityWithDifferentAddress); |
|||
|
|||
assertThat(retrievedRegistration).isEqualTo(registration); |
|||
} |
|||
|
|||
private void setOldRegistration(Registration oldRegistration){ |
|||
byte[] serializedRegistration = null; |
|||
if (oldRegistration != null){ |
|||
byte[] endpoint = oldRegistration.getEndpoint().getBytes(UTF_8); |
|||
// set the AddrIndex
|
|||
byte[] regAddrKey = getRegAddrKey(oldRegistration); |
|||
lenient().when(connection.get(eq(regAddrKey))).thenReturn(endpoint); |
|||
// set the IdentityIndex
|
|||
byte[] regIdentityKey = getRegIdentityKey(oldRegistration); |
|||
lenient().when(connection.get(eq(regIdentityKey))).thenReturn(endpoint); |
|||
// set the IdIndex
|
|||
byte[] regIdKey = getRegIdKey(oldRegistration); |
|||
lenient().when(connection.get(eq(regIdKey))).thenReturn(endpoint); |
|||
// set the registration
|
|||
serializedRegistration = RegistrationSerDes.bSerialize(oldRegistration); |
|||
lenient().when(connection.get(eq(getEndpointKey(endpoint)))).thenReturn(serializedRegistration); |
|||
} |
|||
lenient().when(connection.getSet(any(byte[].class), any(byte[].class))).thenReturn(serializedRegistration); |
|||
} |
|||
|
|||
private byte[] getRegAddrKey(Registration registration){ |
|||
return ReflectionTestUtils.invokeMethod(registrationStore, "toRegAddrKey", registration.getSocketAddress()); |
|||
} |
|||
|
|||
private byte[] getRegIdentityKey(Registration registration){ |
|||
return ReflectionTestUtils.invokeMethod(registrationStore, "toRegIdentityKey", registration.getIdentity()); |
|||
} |
|||
|
|||
private byte[] getRegIdKey(Registration registration){ |
|||
return ReflectionTestUtils.invokeMethod(registrationStore, "toRegIdKey", registration.getId()); |
|||
} |
|||
|
|||
private byte[] getEndpointKey(byte[] endpoint){ |
|||
return ReflectionTestUtils.invokeMethod(registrationStore, "toEndpointKey", endpoint); |
|||
} |
|||
|
|||
private byte[] getTknsRegIdKey(Registration registration){ |
|||
return ReflectionTestUtils.invokeMethod(registrationStore, "toKey", "TKNS:REGID:", registration.getId()); |
|||
} |
|||
|
|||
private static Registration buildRegistration() { |
|||
return buildRegistration(Identity.psk(getTestAddress(), "my:psk")); |
|||
} |
|||
|
|||
private static Registration buildRegistration(Identity identity){ |
|||
return new Registration.Builder("my_reg_id", "abcde", identity) |
|||
.objectLinks(new Link[]{}) |
|||
.build(); |
|||
} |
|||
|
|||
private static RegistrationUpdate createUpdateFromRegistration(Registration registration){ |
|||
return new RegistrationUpdate( |
|||
registration.getId(), |
|||
registration.getIdentity(), |
|||
registration.getLifeTimeInSec(), |
|||
registration.getSmsNumber(), |
|||
registration.getBindingMode(), |
|||
registration.getObjectLinks(), |
|||
registration.getAdditionalRegistrationAttributes() |
|||
); |
|||
} |
|||
|
|||
private static InetSocketAddress getTestAddress() { |
|||
return getTestAddress(5684); |
|||
} |
|||
|
|||
private static InetSocketAddress getTestAddress(int port) { |
|||
try { |
|||
return new InetSocketAddress(InetAddress.getByName("1.2.3.4"), port); |
|||
} catch (UnknownHostException e) { |
|||
throw new AssertionError("Cannot create test address"); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,77 @@ |
|||
/** |
|||
* Copyright © 2016-2024 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.transport.lwm2m.server.store.util; |
|||
|
|||
import com.eclipsesource.json.JsonObject; |
|||
import org.apache.commons.lang3.NotImplementedException; |
|||
import org.eclipse.leshan.core.request.Identity; |
|||
import org.junit.jupiter.api.Test; |
|||
|
|||
import java.net.InetAddress; |
|||
import java.net.InetSocketAddress; |
|||
import java.net.UnknownHostException; |
|||
import java.security.PublicKey; |
|||
|
|||
import static org.assertj.core.api.Assertions.assertThat; |
|||
import static org.assertj.core.api.Assertions.assertThatThrownBy; |
|||
import static org.mockito.Mockito.mock; |
|||
import static org.mockito.Mockito.when; |
|||
|
|||
class LwM2MIdentitySerDesTest { |
|||
|
|||
@Test |
|||
void serializePskIdentity() { |
|||
assertThat(LwM2MIdentitySerDes.serialize(Identity.psk(getTestAddress(), "my:psk")).toString()) |
|||
.isEqualTo("{\"type\":\"psk\",\"id\":\"my:psk\"}"); |
|||
} |
|||
|
|||
|
|||
@Test |
|||
void serializeRpkIdentity() { |
|||
var public_key = mock(PublicKey.class); |
|||
when(public_key.getEncoded()).thenReturn(new byte[]{1,2,3,4,5,6,7,8,9}); |
|||
|
|||
assertThat(LwM2MIdentitySerDes.serialize(Identity.rpk(getTestAddress(), public_key)).toString()) |
|||
.isEqualTo("{\"type\":\"rpk\",\"rpk\":\"010203040506070809\"}"); |
|||
} |
|||
|
|||
@Test |
|||
void serializeX509Identity() { |
|||
assertThat(LwM2MIdentitySerDes.serialize(Identity.x509(getTestAddress(), "MyCommonName")).toString()) |
|||
.isEqualTo("{\"type\":\"x509\",\"cn\":\"MyCommonName\"}"); |
|||
} |
|||
|
|||
@Test |
|||
void serializeUnsecureIdentity() { |
|||
assertThat(LwM2MIdentitySerDes.serialize(Identity.unsecure(getTestAddress())).toString()) |
|||
.isEqualTo("{\"type\":\"unsecure\",\"address\":\"1.2.3.4\",\"port\":5684}"); |
|||
} |
|||
|
|||
|
|||
@Test |
|||
void deserialize() { |
|||
assertThatThrownBy(() -> LwM2MIdentitySerDes.deserialize(mock(JsonObject.class))) |
|||
.isInstanceOf(NotImplementedException.class); |
|||
} |
|||
|
|||
private static InetSocketAddress getTestAddress() { |
|||
try { |
|||
return new InetSocketAddress(InetAddress.getByName("1.2.3.4"), 5684); |
|||
} catch (UnknownHostException e) { |
|||
throw new AssertionError("Cannot create test address"); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,32 @@ |
|||
/** |
|||
* Copyright © 2016-2024 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.rule.engine.api; |
|||
|
|||
import org.thingsboard.server.common.data.id.DeviceId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.msg.queue.TbCallback; |
|||
|
|||
public interface RuleEngineDeviceStateManager { |
|||
|
|||
void onDeviceConnect(TenantId tenantId, DeviceId deviceId, long connectTime, TbCallback callback); |
|||
|
|||
void onDeviceActivity(TenantId tenantId, DeviceId deviceId, long activityTime, TbCallback callback); |
|||
|
|||
void onDeviceDisconnect(TenantId tenantId, DeviceId deviceId, long disconnectTime, TbCallback callback); |
|||
|
|||
void onDeviceInactivity(TenantId tenantId, DeviceId deviceId, long inactivityTime, TbCallback callback); |
|||
|
|||
} |
|||
@ -0,0 +1,173 @@ |
|||
/** |
|||
* Copyright © 2016-2024 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.rule.engine.action; |
|||
|
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.util.ConcurrentReferenceHashMap; |
|||
import org.thingsboard.rule.engine.api.RuleEngineDeviceStateManager; |
|||
import org.thingsboard.rule.engine.api.RuleNode; |
|||
import org.thingsboard.rule.engine.api.TbContext; |
|||
import org.thingsboard.rule.engine.api.TbNode; |
|||
import org.thingsboard.rule.engine.api.TbNodeConfiguration; |
|||
import org.thingsboard.rule.engine.api.TbNodeException; |
|||
import org.thingsboard.rule.engine.api.util.TbNodeUtils; |
|||
import org.thingsboard.server.common.data.EntityType; |
|||
import org.thingsboard.server.common.data.id.DeviceId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.data.msg.TbMsgType; |
|||
import org.thingsboard.server.common.data.msg.TbNodeConnectionType; |
|||
import org.thingsboard.server.common.data.plugin.ComponentType; |
|||
import org.thingsboard.server.common.msg.TbMsg; |
|||
import org.thingsboard.server.common.msg.queue.PartitionChangeMsg; |
|||
import org.thingsboard.server.common.msg.queue.TbCallback; |
|||
import org.thingsboard.server.common.msg.tools.TbRateLimits; |
|||
|
|||
import java.util.EnumSet; |
|||
import java.util.Set; |
|||
|
|||
@Slf4j |
|||
@RuleNode( |
|||
type = ComponentType.ACTION, |
|||
name = "device state", |
|||
nodeDescription = "Triggers device connectivity events", |
|||
nodeDetails = "If incoming message originator is a device, registers configured event for that device in the Device State Service, which sends appropriate message to the Rule Engine." + |
|||
" If metadata <code>ts</code> property is present, it will be used as event timestamp. Otherwise, the message timestamp will be used." + |
|||
" If originator entity type is not <code>DEVICE</code> or unexpected error happened during processing, then incoming message is forwarded using <code>Failure</code> chain." + |
|||
" If rate of connectivity events for a given originator is too high, then incoming message is forwarded using <code>Rate limited</code> chain. " + |
|||
"<br>" + |
|||
"Supported device connectivity events are:" + |
|||
"<ul>" + |
|||
"<li>Connect event</li>" + |
|||
"<li>Disconnect event</li>" + |
|||
"<li>Activity event</li>" + |
|||
"<li>Inactivity event</li>" + |
|||
"</ul>" + |
|||
"This node is particularly useful when device isn't using transports to receive data, such as when fetching data from external API or computing new data within the rule chain.", |
|||
configClazz = TbDeviceStateNodeConfiguration.class, |
|||
relationTypes = {TbNodeConnectionType.SUCCESS, TbNodeConnectionType.FAILURE, "Rate limited"}, |
|||
uiResources = {"static/rulenode/rulenode-core-config.js"}, |
|||
configDirective = "tbActionNodeDeviceStateConfig" |
|||
) |
|||
public class TbDeviceStateNode implements TbNode { |
|||
|
|||
private static final Set<TbMsgType> SUPPORTED_EVENTS = EnumSet.of( |
|||
TbMsgType.CONNECT_EVENT, TbMsgType.ACTIVITY_EVENT, TbMsgType.DISCONNECT_EVENT, TbMsgType.INACTIVITY_EVENT |
|||
); |
|||
private static final String DEFAULT_RATE_LIMIT_CONFIG = "1:1,30:60,60:3600"; |
|||
private ConcurrentReferenceHashMap<DeviceId, TbRateLimits> rateLimits; |
|||
private String rateLimitConfig; |
|||
private TbMsgType event; |
|||
|
|||
@Override |
|||
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { |
|||
TbMsgType event = TbNodeUtils.convert(configuration, TbDeviceStateNodeConfiguration.class).getEvent(); |
|||
if (event == null) { |
|||
throw new TbNodeException("Event cannot be null!", true); |
|||
} |
|||
if (!SUPPORTED_EVENTS.contains(event)) { |
|||
throw new TbNodeException("Unsupported event: " + event, true); |
|||
} |
|||
this.event = event; |
|||
rateLimits = new ConcurrentReferenceHashMap<>(); |
|||
String deviceStateNodeRateLimitConfig = ctx.getDeviceStateNodeRateLimitConfig(); |
|||
try { |
|||
rateLimitConfig = new TbRateLimits(deviceStateNodeRateLimitConfig).getConfiguration(); |
|||
} catch (Exception e) { |
|||
log.error("[{}][{}] Invalid rate limit configuration provided: [{}]. Will use default value [{}].", |
|||
ctx.getTenantId().getId(), ctx.getSelfId().getId(), deviceStateNodeRateLimitConfig, DEFAULT_RATE_LIMIT_CONFIG, e); |
|||
rateLimitConfig = DEFAULT_RATE_LIMIT_CONFIG; |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public void onMsg(TbContext ctx, TbMsg msg) { |
|||
EntityType originatorEntityType = msg.getOriginator().getEntityType(); |
|||
if (!EntityType.DEVICE.equals(originatorEntityType)) { |
|||
ctx.tellFailure(msg, new IllegalArgumentException( |
|||
"Unsupported originator entity type: [" + originatorEntityType + "]. Only DEVICE entity type is supported." |
|||
)); |
|||
return; |
|||
} |
|||
DeviceId originator = new DeviceId(msg.getOriginator().getId()); |
|||
rateLimits.compute(originator, (__, rateLimit) -> { |
|||
if (rateLimit == null) { |
|||
rateLimit = new TbRateLimits(rateLimitConfig); |
|||
} |
|||
boolean isNotRateLimited = rateLimit.tryConsume(); |
|||
if (isNotRateLimited) { |
|||
sendEventAndTell(ctx, originator, msg); |
|||
} else { |
|||
ctx.tellNext(msg, "Rate limited"); |
|||
} |
|||
return rateLimit; |
|||
}); |
|||
} |
|||
|
|||
private void sendEventAndTell(TbContext ctx, DeviceId originator, TbMsg msg) { |
|||
TenantId tenantId = ctx.getTenantId(); |
|||
long eventTs = msg.getMetaDataTs(); |
|||
|
|||
RuleEngineDeviceStateManager deviceStateManager = ctx.getDeviceStateManager(); |
|||
TbCallback callback = getMsgEnqueuedCallback(ctx, msg); |
|||
|
|||
switch (event) { |
|||
case CONNECT_EVENT: |
|||
deviceStateManager.onDeviceConnect(tenantId, originator, eventTs, callback); |
|||
break; |
|||
case ACTIVITY_EVENT: |
|||
deviceStateManager.onDeviceActivity(tenantId, originator, eventTs, callback); |
|||
break; |
|||
case DISCONNECT_EVENT: |
|||
deviceStateManager.onDeviceDisconnect(tenantId, originator, eventTs, callback); |
|||
break; |
|||
case INACTIVITY_EVENT: |
|||
deviceStateManager.onDeviceInactivity(tenantId, originator, eventTs, callback); |
|||
break; |
|||
default: |
|||
ctx.tellFailure(msg, new IllegalStateException("Configured event [" + event + "] is not supported!")); |
|||
} |
|||
} |
|||
|
|||
private TbCallback getMsgEnqueuedCallback(TbContext ctx, TbMsg msg) { |
|||
return new TbCallback() { |
|||
@Override |
|||
public void onSuccess() { |
|||
ctx.tellSuccess(msg); |
|||
} |
|||
|
|||
@Override |
|||
public void onFailure(Throwable t) { |
|||
ctx.tellFailure(msg, t); |
|||
} |
|||
}; |
|||
} |
|||
|
|||
@Override |
|||
public void onPartitionChangeMsg(TbContext ctx, PartitionChangeMsg msg) { |
|||
rateLimits.entrySet().removeIf(entry -> !ctx.isLocalEntity(entry.getKey())); |
|||
} |
|||
|
|||
@Override |
|||
public void destroy() { |
|||
if (rateLimits != null) { |
|||
rateLimits.clear(); |
|||
rateLimits = null; |
|||
} |
|||
rateLimitConfig = null; |
|||
event = null; |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,34 @@ |
|||
/** |
|||
* Copyright © 2016-2024 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.rule.engine.action; |
|||
|
|||
import lombok.Data; |
|||
import org.thingsboard.rule.engine.api.NodeConfiguration; |
|||
import org.thingsboard.server.common.data.msg.TbMsgType; |
|||
|
|||
@Data |
|||
public class TbDeviceStateNodeConfiguration implements NodeConfiguration<TbDeviceStateNodeConfiguration> { |
|||
|
|||
private TbMsgType event; |
|||
|
|||
@Override |
|||
public TbDeviceStateNodeConfiguration defaultConfiguration() { |
|||
var config = new TbDeviceStateNodeConfiguration(); |
|||
config.setEvent(TbMsgType.ACTIVITY_EVENT); |
|||
return config; |
|||
} |
|||
|
|||
} |
|||
File diff suppressed because one or more lines are too long
@ -0,0 +1,296 @@ |
|||
/** |
|||
* Copyright © 2016-2024 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.rule.engine.action; |
|||
|
|||
import org.junit.jupiter.api.BeforeEach; |
|||
import org.junit.jupiter.api.Test; |
|||
import org.junit.jupiter.api.extension.ExtendWith; |
|||
import org.junit.jupiter.params.ParameterizedTest; |
|||
import org.junit.jupiter.params.provider.Arguments; |
|||
import org.junit.jupiter.params.provider.EnumSource; |
|||
import org.junit.jupiter.params.provider.MethodSource; |
|||
import org.mockito.ArgumentCaptor; |
|||
import org.mockito.Captor; |
|||
import org.mockito.Mock; |
|||
import org.mockito.junit.jupiter.MockitoExtension; |
|||
import org.springframework.test.util.ReflectionTestUtils; |
|||
import org.springframework.util.ConcurrentReferenceHashMap; |
|||
import org.thingsboard.common.util.JacksonUtil; |
|||
import org.thingsboard.rule.engine.api.RuleEngineDeviceStateManager; |
|||
import org.thingsboard.rule.engine.api.TbContext; |
|||
import org.thingsboard.rule.engine.api.TbNodeConfiguration; |
|||
import org.thingsboard.rule.engine.api.TbNodeException; |
|||
import org.thingsboard.server.common.data.EntityType; |
|||
import org.thingsboard.server.common.data.id.DeviceId; |
|||
import org.thingsboard.server.common.data.id.EntityId; |
|||
import org.thingsboard.server.common.data.id.RuleNodeId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.data.msg.TbMsgType; |
|||
import org.thingsboard.server.common.msg.TbMsg; |
|||
import org.thingsboard.server.common.msg.TbMsgMetaData; |
|||
import org.thingsboard.server.common.msg.queue.PartitionChangeMsg; |
|||
import org.thingsboard.server.common.msg.queue.ServiceType; |
|||
import org.thingsboard.server.common.msg.queue.TbCallback; |
|||
import org.thingsboard.server.common.msg.tools.TbRateLimits; |
|||
|
|||
import java.util.UUID; |
|||
import java.util.stream.Stream; |
|||
|
|||
import static org.assertj.core.api.Assertions.assertThat; |
|||
import static org.assertj.core.api.Assertions.assertThatThrownBy; |
|||
import static org.assertj.core.api.Assertions.fail; |
|||
import static org.mockito.ArgumentMatchers.any; |
|||
import static org.mockito.ArgumentMatchers.eq; |
|||
import static org.mockito.BDDMockito.given; |
|||
import static org.mockito.BDDMockito.then; |
|||
import static org.mockito.Mockito.mock; |
|||
import static org.mockito.Mockito.never; |
|||
|
|||
@ExtendWith(MockitoExtension.class) |
|||
public class TbDeviceStateNodeTest { |
|||
|
|||
@Mock |
|||
private TbContext ctxMock; |
|||
@Mock |
|||
private static RuleEngineDeviceStateManager deviceStateManagerMock; |
|||
@Captor |
|||
private static ArgumentCaptor<TbCallback> callbackCaptor; |
|||
private TbDeviceStateNode node; |
|||
private TbDeviceStateNodeConfiguration config; |
|||
|
|||
private static final TenantId TENANT_ID = TenantId.fromUUID(UUID.randomUUID()); |
|||
private static final DeviceId DEVICE_ID = new DeviceId(UUID.randomUUID()); |
|||
private static final long METADATA_TS = 123L; |
|||
private TbMsg msg; |
|||
|
|||
@BeforeEach |
|||
public void setup() { |
|||
var metaData = new TbMsgMetaData(); |
|||
metaData.putValue("deviceName", "My humidity sensor"); |
|||
metaData.putValue("deviceType", "Humidity sensor"); |
|||
metaData.putValue("ts", String.valueOf(METADATA_TS)); |
|||
var data = JacksonUtil.newObjectNode(); |
|||
data.put("humidity", 58.3); |
|||
msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, metaData, JacksonUtil.toString(data)); |
|||
} |
|||
|
|||
@BeforeEach |
|||
public void setUp() { |
|||
node = new TbDeviceStateNode(); |
|||
config = new TbDeviceStateNodeConfiguration().defaultConfiguration(); |
|||
} |
|||
|
|||
@Test |
|||
public void givenDefaultConfiguration_whenInvoked_thenCorrectValuesAreSet() { |
|||
assertThat(config.getEvent()).isEqualTo(TbMsgType.ACTIVITY_EVENT); |
|||
} |
|||
|
|||
@Test |
|||
public void givenNullEventInConfig_whenInit_thenThrowsUnrecoverableTbNodeException() { |
|||
// GIVEN-WHEN-THEN
|
|||
assertThatThrownBy(() -> initNode(null)) |
|||
.isInstanceOf(TbNodeException.class) |
|||
.hasMessage("Event cannot be null!") |
|||
.matches(e -> ((TbNodeException) e).isUnrecoverable()); |
|||
} |
|||
|
|||
@Test |
|||
public void givenInvalidRateLimitConfig_whenInit_thenUsesDefaultConfig() { |
|||
// GIVEN
|
|||
given(ctxMock.getDeviceStateNodeRateLimitConfig()).willReturn("invalid rate limit config"); |
|||
given(ctxMock.getTenantId()).willReturn(TENANT_ID); |
|||
given(ctxMock.getSelfId()).willReturn(new RuleNodeId(UUID.randomUUID())); |
|||
|
|||
// WHEN
|
|||
try { |
|||
initNode(TbMsgType.ACTIVITY_EVENT); |
|||
} catch (Exception e) { |
|||
fail("Node failed to initialize!", e); |
|||
} |
|||
|
|||
// THEN
|
|||
String actualRateLimitConfig = (String) ReflectionTestUtils.getField(node, "rateLimitConfig"); |
|||
assertThat(actualRateLimitConfig).isEqualTo("1:1,30:60,60:3600"); |
|||
} |
|||
|
|||
@Test |
|||
public void givenMsgArrivedTooFast_whenOnMsg_thenRateLimitsThisMsg() { |
|||
// GIVEN
|
|||
ConcurrentReferenceHashMap<DeviceId, TbRateLimits> rateLimits = new ConcurrentReferenceHashMap<>(); |
|||
ReflectionTestUtils.setField(node, "rateLimits", rateLimits); |
|||
|
|||
var rateLimitMock = mock(TbRateLimits.class); |
|||
rateLimits.put(DEVICE_ID, rateLimitMock); |
|||
|
|||
given(rateLimitMock.tryConsume()).willReturn(false); |
|||
|
|||
// WHEN
|
|||
node.onMsg(ctxMock, msg); |
|||
|
|||
// THEN
|
|||
then(ctxMock).should().tellNext(msg, "Rate limited"); |
|||
then(ctxMock).should(never()).tellSuccess(any()); |
|||
then(ctxMock).should(never()).tellFailure(any(), any()); |
|||
then(ctxMock).shouldHaveNoMoreInteractions(); |
|||
then(deviceStateManagerMock).shouldHaveNoInteractions(); |
|||
} |
|||
|
|||
@Test |
|||
public void givenHasNonLocalDevices_whenOnPartitionChange_thenRemovesEntriesForNonLocalDevices() { |
|||
// GIVEN
|
|||
ConcurrentReferenceHashMap<DeviceId, TbRateLimits> rateLimits = new ConcurrentReferenceHashMap<>(); |
|||
ReflectionTestUtils.setField(node, "rateLimits", rateLimits); |
|||
|
|||
rateLimits.put(DEVICE_ID, new TbRateLimits("1:1")); |
|||
given(ctxMock.isLocalEntity(eq(DEVICE_ID))).willReturn(true); |
|||
|
|||
DeviceId nonLocalDeviceId1 = new DeviceId(UUID.randomUUID()); |
|||
rateLimits.put(nonLocalDeviceId1, new TbRateLimits("2:2")); |
|||
given(ctxMock.isLocalEntity(eq(nonLocalDeviceId1))).willReturn(false); |
|||
|
|||
DeviceId nonLocalDeviceId2 = new DeviceId(UUID.randomUUID()); |
|||
rateLimits.put(nonLocalDeviceId2, new TbRateLimits("3:3")); |
|||
given(ctxMock.isLocalEntity(eq(nonLocalDeviceId2))).willReturn(false); |
|||
|
|||
// WHEN
|
|||
node.onPartitionChangeMsg(ctxMock, new PartitionChangeMsg(ServiceType.TB_RULE_ENGINE)); |
|||
|
|||
// THEN
|
|||
assertThat(rateLimits) |
|||
.containsKey(DEVICE_ID) |
|||
.doesNotContainKey(nonLocalDeviceId1) |
|||
.doesNotContainKey(nonLocalDeviceId2) |
|||
.size().isOne(); |
|||
} |
|||
|
|||
@ParameterizedTest |
|||
@EnumSource( |
|||
value = TbMsgType.class, |
|||
names = {"CONNECT_EVENT", "ACTIVITY_EVENT", "DISCONNECT_EVENT", "INACTIVITY_EVENT"}, |
|||
mode = EnumSource.Mode.EXCLUDE |
|||
) |
|||
public void givenUnsupportedEventInConfig_whenInit_thenThrowsUnrecoverableTbNodeException(TbMsgType unsupportedEvent) { |
|||
// GIVEN-WHEN-THEN
|
|||
assertThatThrownBy(() -> initNode(unsupportedEvent)) |
|||
.isInstanceOf(TbNodeException.class) |
|||
.hasMessage("Unsupported event: " + unsupportedEvent) |
|||
.matches(e -> ((TbNodeException) e).isUnrecoverable()); |
|||
} |
|||
|
|||
@ParameterizedTest |
|||
@EnumSource(value = EntityType.class, names = "DEVICE", mode = EnumSource.Mode.EXCLUDE) |
|||
public void givenNonDeviceOriginator_whenOnMsg_thenTellsSuccessAndNoActivityActionsTriggered(EntityType unsupportedType) { |
|||
// GIVEN
|
|||
var nonDeviceOriginator = new EntityId() { |
|||
|
|||
@Override |
|||
public UUID getId() { |
|||
return UUID.randomUUID(); |
|||
} |
|||
|
|||
@Override |
|||
public EntityType getEntityType() { |
|||
return unsupportedType; |
|||
} |
|||
}; |
|||
var msg = TbMsg.newMsg(TbMsgType.ENTITY_CREATED, nonDeviceOriginator, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT); |
|||
|
|||
// WHEN
|
|||
node.onMsg(ctxMock, msg); |
|||
|
|||
// THEN
|
|||
var exceptionCaptor = ArgumentCaptor.forClass(Exception.class); |
|||
then(ctxMock).should().tellFailure(eq(msg), exceptionCaptor.capture()); |
|||
assertThat(exceptionCaptor.getValue()) |
|||
.isInstanceOf(IllegalArgumentException.class) |
|||
.hasMessage("Unsupported originator entity type: [" + unsupportedType + "]. Only DEVICE entity type is supported."); |
|||
|
|||
then(ctxMock).shouldHaveNoMoreInteractions(); |
|||
} |
|||
|
|||
@Test |
|||
public void givenMetadataDoesNotContainTs_whenOnMsg_thenMsgTsIsUsedAsEventTs() { |
|||
// GIVEN
|
|||
given(ctxMock.getDeviceStateNodeRateLimitConfig()).willReturn("1:1"); |
|||
try { |
|||
initNode(TbMsgType.ACTIVITY_EVENT); |
|||
} catch (TbNodeException e) { |
|||
fail("Node failed to initialize!", e); |
|||
} |
|||
|
|||
given(ctxMock.getTenantId()).willReturn(TENANT_ID); |
|||
given(ctxMock.getDeviceStateManager()).willReturn(deviceStateManagerMock); |
|||
|
|||
long msgTs = METADATA_TS + 1; |
|||
msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT, msgTs); |
|||
|
|||
// WHEN
|
|||
node.onMsg(ctxMock, msg); |
|||
|
|||
// THEN
|
|||
then(deviceStateManagerMock).should().onDeviceActivity(eq(TENANT_ID), eq(DEVICE_ID), eq(msgTs), any()); |
|||
} |
|||
|
|||
@ParameterizedTest |
|||
@MethodSource |
|||
public void givenSupportedEventAndDeviceOriginator_whenOnMsg_thenCorrectEventIsSentWithCorrectCallback(TbMsgType supportedEventType, Runnable actionVerification) { |
|||
// GIVEN
|
|||
given(ctxMock.getTenantId()).willReturn(TENANT_ID); |
|||
given(ctxMock.getDeviceStateNodeRateLimitConfig()).willReturn("1:1"); |
|||
given(ctxMock.getDeviceStateManager()).willReturn(deviceStateManagerMock); |
|||
|
|||
try { |
|||
initNode(supportedEventType); |
|||
} catch (TbNodeException e) { |
|||
fail("Node failed to initialize!", e); |
|||
} |
|||
|
|||
// WHEN
|
|||
node.onMsg(ctxMock, msg); |
|||
|
|||
// THEN
|
|||
actionVerification.run(); |
|||
|
|||
TbCallback actualCallback = callbackCaptor.getValue(); |
|||
|
|||
actualCallback.onSuccess(); |
|||
then(ctxMock).should().tellSuccess(msg); |
|||
|
|||
var throwable = new Throwable(); |
|||
actualCallback.onFailure(throwable); |
|||
then(ctxMock).should().tellFailure(msg, throwable); |
|||
|
|||
|
|||
then(deviceStateManagerMock).shouldHaveNoMoreInteractions(); |
|||
then(ctxMock).shouldHaveNoMoreInteractions(); |
|||
} |
|||
|
|||
private static Stream<Arguments> givenSupportedEventAndDeviceOriginator_whenOnMsg_thenCorrectEventIsSentWithCorrectCallback() { |
|||
return Stream.of( |
|||
Arguments.of(TbMsgType.CONNECT_EVENT, (Runnable) () -> then(deviceStateManagerMock).should().onDeviceConnect(eq(TENANT_ID), eq(DEVICE_ID), eq(METADATA_TS), callbackCaptor.capture())), |
|||
Arguments.of(TbMsgType.ACTIVITY_EVENT, (Runnable) () -> then(deviceStateManagerMock).should().onDeviceActivity(eq(TENANT_ID), eq(DEVICE_ID), eq(METADATA_TS), callbackCaptor.capture())), |
|||
Arguments.of(TbMsgType.DISCONNECT_EVENT, (Runnable) () -> then(deviceStateManagerMock).should().onDeviceDisconnect(eq(TENANT_ID), eq(DEVICE_ID), eq(METADATA_TS), callbackCaptor.capture())), |
|||
Arguments.of(TbMsgType.INACTIVITY_EVENT, (Runnable) () -> then(deviceStateManagerMock).should().onDeviceInactivity(eq(TENANT_ID), eq(DEVICE_ID), eq(METADATA_TS), callbackCaptor.capture())) |
|||
); |
|||
} |
|||
|
|||
private void initNode(TbMsgType event) throws TbNodeException { |
|||
config.setEvent(event); |
|||
var nodeConfig = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); |
|||
node.init(ctxMock, nodeConfig); |
|||
} |
|||
|
|||
} |
|||
Loading…
Reference in new issue