@ -34,16 +34,22 @@ import org.thingsboard.rule.engine.api.AttributesSaveRequest;
import org.thingsboard.server.cluster.TbClusterService ;
import org.thingsboard.server.common.data.AttributeScope ;
import org.thingsboard.server.common.data.Device ;
import org.thingsboard.server.common.data.DeviceProfile ;
import org.thingsboard.server.common.data.device.profile.DeviceProfileData ;
import org.thingsboard.server.common.data.EntityType ;
import org.thingsboard.server.common.data.id.DeviceId ;
import org.thingsboard.server.common.data.id.DeviceProfileId ;
import org.thingsboard.server.common.data.id.EntityId ;
import org.thingsboard.server.common.data.id.TenantId ;
import org.thingsboard.server.common.data.kv.AttributeKvEntry ;
import org.thingsboard.server.common.data.kv.AttributesSaveResult ;
import org.thingsboard.server.common.data.msg.TbMsgType ;
import org.thingsboard.server.common.data.notification.rule.trigger.DeviceActivityTrigger ;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent ;
import org.thingsboard.server.common.msg.TbMsg ;
import org.thingsboard.server.common.msg.TbMsgMetaData ;
import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor ;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg ;
import org.thingsboard.server.common.msg.queue.ServiceType ;
import org.thingsboard.server.common.msg.queue.TbCallback ;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo ;
@ -54,6 +60,7 @@ import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.gen.transport.TransportProtos ;
import org.thingsboard.server.queue.discovery.PartitionService ;
import org.thingsboard.server.queue.usagestats.DefaultTbApiUsageReportClient ;
import org.thingsboard.server.service.profile.TbDeviceProfileCache ;
import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService ;
import java.time.Duration ;
@ -83,6 +90,7 @@ import static org.mockito.BDDMockito.willReturn;
import static org.mockito.Mockito.doAnswer ;
import static org.mockito.Mockito.doReturn ;
import static org.mockito.Mockito.lenient ;
import static org.mockito.Mockito.mock ;
import static org.mockito.Mockito.never ;
import static org.mockito.Mockito.spy ;
import static org.mockito.Mockito.times ;
@ -120,6 +128,8 @@ class DefaultDeviceStateServiceTest {
NotificationRuleProcessor notificationRuleProcessor ;
@Mock
DefaultTbApiUsageReportClient defaultTbApiUsageReportClient ;
@Mock
TbDeviceProfileCache deviceProfileCache ;
long defaultInactivityTimeoutMs = Duration . ofMinutes ( 10L ) . toMillis ( ) ;
@ -137,6 +147,7 @@ class DefaultDeviceStateServiceTest {
void setUp ( ) {
service = spy ( new DefaultDeviceStateService ( deviceService , attributesService , tsService , clusterService , partitionService , entityQueryRepository , null , defaultTbApiUsageReportClient , notificationRuleProcessor ) ) ;
ReflectionTestUtils . setField ( service , "tsSubService" , telemetrySubscriptionService ) ;
ReflectionTestUtils . setField ( service , "deviceProfileCache" , deviceProfileCache ) ;
ReflectionTestUtils . setField ( service , "defaultInactivityTimeoutMs" , defaultInactivityTimeoutMs ) ;
ReflectionTestUtils . setField ( service , "defaultStateCheckIntervalInSec" , 60 ) ;
ReflectionTestUtils . setField ( service , "defaultActivityStatsIntervalInSec" , 60 ) ;
@ -600,6 +611,331 @@ class DefaultDeviceStateServiceTest {
) ;
}
@Test
void givenAttributeValueIsPositive_whenOnDeviceInactivityTimeoutUpdate_thenMarksOverridden ( ) {
// GIVEN
DeviceProfileId profileId = new DeviceProfileId ( UUID . randomUUID ( ) ) ;
DeviceStateData stateData = DeviceStateData . builder ( )
. tenantId ( tenantId )
. deviceId ( deviceId )
. deviceProfileId ( profileId )
. state ( DeviceState . builder ( ) . lastActivityTime ( 100L ) . build ( ) )
. metaData ( new TbMsgMetaData ( ) )
. build ( ) ;
service . deviceStates . put ( deviceId , stateData ) ;
service . getPartitionedEntities ( tpi ) . add ( deviceId ) ;
mockSuccessfulSaveAttributes ( ) ;
doReturn ( 200L ) . when ( service ) . getCurrentTimeMillis ( ) ;
// WHEN
service . onDeviceInactivityTimeoutUpdate ( tenantId , deviceId , 5_000L ) ;
// THEN
assertThat ( stateData . isInactivityTimeoutOverridden ( ) ) . isTrue ( ) ;
assertThat ( stateData . getState ( ) . getInactivityTimeout ( ) ) . isEqualTo ( 5_000L ) ;
}
@Test
void givenAttributeRemovedAndProfileTimeoutSet_whenOnDeviceInactivityTimeoutUpdate_thenFallsBackToProfile ( ) {
// GIVEN
long profileTimeoutMs = Duration . ofMinutes ( 2L ) . toMillis ( ) ;
DeviceProfileId profileId = new DeviceProfileId ( UUID . randomUUID ( ) ) ;
DeviceProfile profile = profileWithInactivityTimeout ( profileId , profileTimeoutMs ) ;
when ( deviceProfileCache . get ( tenantId , profileId ) ) . thenReturn ( profile ) ;
DeviceStateData stateData = DeviceStateData . builder ( )
. tenantId ( tenantId )
. deviceId ( deviceId )
. deviceProfileId ( profileId )
. state ( DeviceState . builder ( ) . lastActivityTime ( 100L ) . inactivityTimeout ( 5_000L ) . build ( ) )
. metaData ( new TbMsgMetaData ( ) )
. inactivityTimeoutOverridden ( true )
. build ( ) ;
service . deviceStates . put ( deviceId , stateData ) ;
service . getPartitionedEntities ( tpi ) . add ( deviceId ) ;
mockSuccessfulSaveAttributes ( ) ;
doReturn ( 200L ) . when ( service ) . getCurrentTimeMillis ( ) ;
// WHEN — sentinel value 0L means "attribute deleted"
service . onDeviceInactivityTimeoutUpdate ( tenantId , deviceId , 0L ) ;
// THEN
assertThat ( stateData . isInactivityTimeoutOverridden ( ) ) . isFalse ( ) ;
assertThat ( stateData . getState ( ) . getInactivityTimeout ( ) ) . isEqualTo ( profileTimeoutMs ) ;
}
@Test
void givenAttributeRemovedAndNoProfileTimeout_whenOnDeviceInactivityTimeoutUpdate_thenFallsBackToYamlDefault ( ) {
// GIVEN
DeviceProfileId profileId = new DeviceProfileId ( UUID . randomUUID ( ) ) ;
DeviceProfile profile = profileWithInactivityTimeout ( profileId , null ) ;
when ( deviceProfileCache . get ( tenantId , profileId ) ) . thenReturn ( profile ) ;
DeviceStateData stateData = DeviceStateData . builder ( )
. tenantId ( tenantId )
. deviceId ( deviceId )
. deviceProfileId ( profileId )
. state ( DeviceState . builder ( ) . lastActivityTime ( 100L ) . inactivityTimeout ( 5_000L ) . build ( ) )
. metaData ( new TbMsgMetaData ( ) )
. inactivityTimeoutOverridden ( true )
. build ( ) ;
service . deviceStates . put ( deviceId , stateData ) ;
service . getPartitionedEntities ( tpi ) . add ( deviceId ) ;
mockSuccessfulSaveAttributes ( ) ;
doReturn ( 200L ) . when ( service ) . getCurrentTimeMillis ( ) ;
// WHEN
service . onDeviceInactivityTimeoutUpdate ( tenantId , deviceId , 0L ) ;
// THEN
assertThat ( stateData . isInactivityTimeoutOverridden ( ) ) . isFalse ( ) ;
assertThat ( stateData . getState ( ) . getInactivityTimeout ( ) ) . isEqualTo ( defaultInactivityTimeoutMs ) ;
}
@Test
void givenDeviceProfileUpdatedEvent_whenOnComponentLifecycleEvent_thenUpdatesNonOverriddenDevicesAndSkipsOverridden ( ) {
// GIVEN
long newProfileTimeoutMs = Duration . ofMinutes ( 3L ) . toMillis ( ) ;
DeviceProfileId profileId = new DeviceProfileId ( UUID . randomUUID ( ) ) ;
DeviceProfile updatedProfile = profileWithInactivityTimeout ( profileId , newProfileTimeoutMs ) ;
when ( deviceProfileCache . get ( tenantId , profileId ) ) . thenReturn ( updatedProfile ) ;
DeviceId nonOverriddenDeviceId = new DeviceId ( UUID . randomUUID ( ) ) ;
DeviceStateData nonOverridden = DeviceStateData . builder ( )
. tenantId ( tenantId )
. deviceId ( nonOverriddenDeviceId )
. deviceProfileId ( profileId )
. state ( DeviceState . builder ( ) . lastActivityTime ( 100L ) . inactivityTimeout ( defaultInactivityTimeoutMs ) . build ( ) )
. metaData ( new TbMsgMetaData ( ) )
. build ( ) ;
DeviceId overriddenDeviceId = new DeviceId ( UUID . randomUUID ( ) ) ;
long overrideValue = 7_000L ;
DeviceStateData overridden = DeviceStateData . builder ( )
. tenantId ( tenantId )
. deviceId ( overriddenDeviceId )
. deviceProfileId ( profileId )
. state ( DeviceState . builder ( ) . lastActivityTime ( 100L ) . inactivityTimeout ( overrideValue ) . build ( ) )
. metaData ( new TbMsgMetaData ( ) )
. inactivityTimeoutOverridden ( true )
. build ( ) ;
DeviceId otherProfileDeviceId = new DeviceId ( UUID . randomUUID ( ) ) ;
DeviceProfileId otherProfileId = new DeviceProfileId ( UUID . randomUUID ( ) ) ;
DeviceStateData otherProfile = DeviceStateData . builder ( )
. tenantId ( tenantId )
. deviceId ( otherProfileDeviceId )
. deviceProfileId ( otherProfileId )
. state ( DeviceState . builder ( ) . lastActivityTime ( 100L ) . inactivityTimeout ( defaultInactivityTimeoutMs ) . build ( ) )
. metaData ( new TbMsgMetaData ( ) )
. build ( ) ;
service . deviceStates . put ( nonOverriddenDeviceId , nonOverridden ) ;
service . deviceStates . put ( overriddenDeviceId , overridden ) ;
service . deviceStates . put ( otherProfileDeviceId , otherProfile ) ;
doAnswer ( inv - > null ) . when ( service ) . checkAndUpdateState ( any ( DeviceId . class ) , any ( DeviceStateData . class ) ) ;
// WHEN
service . onComponentLifecycleEvent ( new ComponentLifecycleMsg ( tenantId , profileId , ComponentLifecycleEvent . UPDATED ) ) ;
// THEN
assertThat ( nonOverridden . getState ( ) . getInactivityTimeout ( ) ) . isEqualTo ( newProfileTimeoutMs ) ;
assertThat ( overridden . getState ( ) . getInactivityTimeout ( ) ) . isEqualTo ( overrideValue ) ;
assertThat ( otherProfile . getState ( ) . getInactivityTimeout ( ) ) . isEqualTo ( defaultInactivityTimeoutMs ) ;
verify ( service ) . checkAndUpdateState ( nonOverriddenDeviceId , nonOverridden ) ;
verify ( service , never ( ) ) . checkAndUpdateState ( eq ( overriddenDeviceId ) , any ( ) ) ;
verify ( service , never ( ) ) . checkAndUpdateState ( eq ( otherProfileDeviceId ) , any ( ) ) ;
}
@Test
void givenProfileClearedToNull_whenOnComponentLifecycleEvent_thenFallsBackToYamlDefault ( ) {
// GIVEN
DeviceProfileId profileId = new DeviceProfileId ( UUID . randomUUID ( ) ) ;
DeviceProfile updatedProfile = profileWithInactivityTimeout ( profileId , null ) ;
when ( deviceProfileCache . get ( tenantId , profileId ) ) . thenReturn ( updatedProfile ) ;
DeviceStateData stateData = DeviceStateData . builder ( )
. tenantId ( tenantId )
. deviceId ( deviceId )
. deviceProfileId ( profileId )
. state ( DeviceState . builder ( ) . lastActivityTime ( 100L ) . inactivityTimeout ( 60_000L ) . build ( ) )
. metaData ( new TbMsgMetaData ( ) )
. build ( ) ;
service . deviceStates . put ( deviceId , stateData ) ;
doAnswer ( inv - > null ) . when ( service ) . checkAndUpdateState ( any ( DeviceId . class ) , any ( DeviceStateData . class ) ) ;
// WHEN
service . onComponentLifecycleEvent ( new ComponentLifecycleMsg ( tenantId , profileId , ComponentLifecycleEvent . UPDATED ) ) ;
// THEN
assertThat ( stateData . getState ( ) . getInactivityTimeout ( ) ) . isEqualTo ( defaultInactivityTimeoutMs ) ;
}
@Test
void givenNonDeviceProfileEntity_whenOnComponentLifecycleEvent_thenIgnored ( ) {
// GIVEN
DeviceStateData stateData = DeviceStateData . builder ( )
. tenantId ( tenantId )
. deviceId ( deviceId )
. deviceProfileId ( new DeviceProfileId ( UUID . randomUUID ( ) ) )
. state ( DeviceState . builder ( ) . inactivityTimeout ( 60_000L ) . build ( ) )
. metaData ( new TbMsgMetaData ( ) )
. build ( ) ;
service . deviceStates . put ( deviceId , stateData ) ;
// WHEN
service . onComponentLifecycleEvent ( new ComponentLifecycleMsg ( tenantId , deviceId , ComponentLifecycleEvent . UPDATED ) ) ;
// THEN
verify ( service , never ( ) ) . checkAndUpdateState ( any ( DeviceId . class ) , any ( DeviceStateData . class ) ) ;
assertThat ( stateData . getState ( ) . getInactivityTimeout ( ) ) . isEqualTo ( 60_000L ) ;
}
@Test
void givenSecondProfileUpdateWithSameTimeout_whenOnComponentLifecycleEvent_thenSkipsIteration ( ) {
// GIVEN
long profileTimeoutMs = Duration . ofMinutes ( 3L ) . toMillis ( ) ;
DeviceProfileId profileId = new DeviceProfileId ( UUID . randomUUID ( ) ) ;
DeviceProfile profile = profileWithInactivityTimeout ( profileId , profileTimeoutMs ) ;
when ( deviceProfileCache . get ( tenantId , profileId ) ) . thenReturn ( profile ) ;
DeviceStateData stateData = DeviceStateData . builder ( )
. tenantId ( tenantId )
. deviceId ( deviceId )
. deviceProfileId ( profileId )
. state ( DeviceState . builder ( ) . lastActivityTime ( 100L ) . inactivityTimeout ( defaultInactivityTimeoutMs ) . build ( ) )
. metaData ( new TbMsgMetaData ( ) )
. build ( ) ;
service . deviceStates . put ( deviceId , stateData ) ;
doAnswer ( inv - > null ) . when ( service ) . checkAndUpdateState ( any ( DeviceId . class ) , any ( DeviceStateData . class ) ) ;
// WHEN — first event triggers iteration; second event with same resolved timeout hits cache
ComponentLifecycleMsg event = new ComponentLifecycleMsg ( tenantId , profileId , ComponentLifecycleEvent . UPDATED ) ;
service . onComponentLifecycleEvent ( event ) ;
service . onComponentLifecycleEvent ( event ) ;
// THEN — checkAndUpdateState called only once (during first event), second event short-circuited by cache
verify ( service , times ( 1 ) ) . checkAndUpdateState ( eq ( deviceId ) , any ( DeviceStateData . class ) ) ;
}
@Test
void givenDeviceProfileDeletedEvent_whenOnComponentLifecycleEvent_thenIgnored ( ) {
// GIVEN
DeviceProfileId profileId = new DeviceProfileId ( UUID . randomUUID ( ) ) ;
DeviceStateData stateData = DeviceStateData . builder ( )
. tenantId ( tenantId )
. deviceId ( deviceId )
. deviceProfileId ( profileId )
. state ( DeviceState . builder ( ) . inactivityTimeout ( 60_000L ) . build ( ) )
. metaData ( new TbMsgMetaData ( ) )
. build ( ) ;
service . deviceStates . put ( deviceId , stateData ) ;
// WHEN
service . onComponentLifecycleEvent ( new ComponentLifecycleMsg ( tenantId , profileId , ComponentLifecycleEvent . DELETED ) ) ;
// THEN
verify ( service , never ( ) ) . checkAndUpdateState ( any ( DeviceId . class ) , any ( DeviceStateData . class ) ) ;
assertThat ( stateData . getState ( ) . getInactivityTimeout ( ) ) . isEqualTo ( 60_000L ) ;
}
@Test
void givenDeviceReassignedToAnotherProfile_whenOnQueueMsg_thenUpdatesProfileIdAndReResolvesTimeout ( ) throws Exception {
// GIVEN
long oldProfileTimeoutMs = Duration . ofMinutes ( 2L ) . toMillis ( ) ;
long newProfileTimeoutMs = Duration . ofMinutes ( 7L ) . toMillis ( ) ;
DeviceProfileId oldProfileId = new DeviceProfileId ( UUID . randomUUID ( ) ) ;
DeviceProfileId newProfileId = new DeviceProfileId ( UUID . randomUUID ( ) ) ;
when ( deviceProfileCache . get ( tenantId , newProfileId ) ) . thenReturn ( profileWithInactivityTimeout ( newProfileId , newProfileTimeoutMs ) ) ;
DeviceStateData stateData = DeviceStateData . builder ( )
. tenantId ( tenantId )
. deviceId ( deviceId )
. deviceProfileId ( oldProfileId )
. state ( DeviceState . builder ( ) . lastActivityTime ( 100L ) . inactivityTimeout ( oldProfileTimeoutMs ) . build ( ) )
. metaData ( new TbMsgMetaData ( ) )
. build ( ) ;
service . deviceStates . put ( deviceId , stateData ) ;
Device device = new Device ( ) ;
device . setId ( deviceId ) ;
device . setTenantId ( tenantId ) ;
device . setDeviceProfileId ( newProfileId ) ;
device . setName ( "test-device" ) ;
device . setLabel ( "label" ) ;
device . setType ( "default" ) ;
doReturn ( device ) . when ( deviceService ) . findDeviceById ( TenantId . SYS_TENANT_ID , deviceId ) ;
doAnswer ( inv - > null ) . when ( service ) . checkAndUpdateState ( any ( DeviceId . class ) , any ( DeviceStateData . class ) ) ;
// WHEN — simulate the proto path that fires on device update
TransportProtos . DeviceStateServiceMsgProto proto = TransportProtos . DeviceStateServiceMsgProto . newBuilder ( )
. setTenantIdMSB ( tenantId . getId ( ) . getMostSignificantBits ( ) )
. setTenantIdLSB ( tenantId . getId ( ) . getLeastSignificantBits ( ) )
. setDeviceIdMSB ( deviceId . getId ( ) . getMostSignificantBits ( ) )
. setDeviceIdLSB ( deviceId . getId ( ) . getLeastSignificantBits ( ) )
. setUpdated ( true )
. build ( ) ;
TbCallback callback = mock ( TbCallback . class ) ;
service . onQueueMsg ( proto , callback ) ;
// THEN
assertThat ( stateData . getDeviceProfileId ( ) ) . isEqualTo ( newProfileId ) ;
assertThat ( stateData . getState ( ) . getInactivityTimeout ( ) ) . isEqualTo ( newProfileTimeoutMs ) ;
verify ( service ) . checkAndUpdateState ( deviceId , stateData ) ;
}
@Test
void givenDeviceReassignedAndOverridden_whenOnQueueMsg_thenUpdatesProfileIdButKeepsOverride ( ) throws Exception {
// GIVEN
long overrideValue = 5_000L ;
long newProfileTimeoutMs = Duration . ofMinutes ( 7L ) . toMillis ( ) ;
DeviceProfileId oldProfileId = new DeviceProfileId ( UUID . randomUUID ( ) ) ;
DeviceProfileId newProfileId = new DeviceProfileId ( UUID . randomUUID ( ) ) ;
DeviceStateData stateData = DeviceStateData . builder ( )
. tenantId ( tenantId )
. deviceId ( deviceId )
. deviceProfileId ( oldProfileId )
. state ( DeviceState . builder ( ) . lastActivityTime ( 100L ) . inactivityTimeout ( overrideValue ) . build ( ) )
. metaData ( new TbMsgMetaData ( ) )
. inactivityTimeoutOverridden ( true )
. build ( ) ;
service . deviceStates . put ( deviceId , stateData ) ;
Device device = new Device ( ) ;
device . setId ( deviceId ) ;
device . setTenantId ( tenantId ) ;
device . setDeviceProfileId ( newProfileId ) ;
device . setName ( "test-device" ) ;
device . setLabel ( "label" ) ;
device . setType ( "default" ) ;
doReturn ( device ) . when ( deviceService ) . findDeviceById ( TenantId . SYS_TENANT_ID , deviceId ) ;
// WHEN
TransportProtos . DeviceStateServiceMsgProto proto = TransportProtos . DeviceStateServiceMsgProto . newBuilder ( )
. setTenantIdMSB ( tenantId . getId ( ) . getMostSignificantBits ( ) )
. setTenantIdLSB ( tenantId . getId ( ) . getLeastSignificantBits ( ) )
. setDeviceIdMSB ( deviceId . getId ( ) . getMostSignificantBits ( ) )
. setDeviceIdLSB ( deviceId . getId ( ) . getLeastSignificantBits ( ) )
. setUpdated ( true )
. build ( ) ;
TbCallback callback = mock ( TbCallback . class ) ;
service . onQueueMsg ( proto , callback ) ;
// THEN — profileId is updated but override timeout is kept; new-profile cache is NOT consulted
assertThat ( stateData . getDeviceProfileId ( ) ) . isEqualTo ( newProfileId ) ;
assertThat ( stateData . getState ( ) . getInactivityTimeout ( ) ) . isEqualTo ( overrideValue ) ;
verify ( deviceProfileCache , never ( ) ) . get ( eq ( tenantId ) , eq ( newProfileId ) ) ;
}
private static DeviceProfile profileWithInactivityTimeout ( DeviceProfileId profileId , Long inactivityTimeoutMs ) {
DeviceProfile profile = new DeviceProfile ( profileId ) ;
DeviceProfileData data = new DeviceProfileData ( ) ;
data . setInactivityTimeoutMs ( inactivityTimeoutMs ) ;
profile . setProfileData ( data ) ;
return profile ;
}
@Test
void givenStateDataIsNull_whenUpdateActivityState_thenShouldCleanupDevice ( ) {
// GIVEN