diff --git a/application/src/main/java/org/thingsboard/server/service/ttl/AbstractCleanUpService.java b/application/src/main/java/org/thingsboard/server/service/ttl/AbstractCleanUpService.java index 5865d9e39d..596f5a8754 100644 --- a/application/src/main/java/org/thingsboard/server/service/ttl/AbstractCleanUpService.java +++ b/application/src/main/java/org/thingsboard/server/service/ttl/AbstractCleanUpService.java @@ -32,4 +32,8 @@ public abstract class AbstractCleanUpService { return partitionService.resolve(ServiceType.TB_CORE, TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID).isMyPartition(); } + protected boolean isTenantPartitionMine(TenantId tenantId) { + return partitionService.resolve(ServiceType.TB_CORE, tenantId, tenantId).isMyPartition(); + } + } diff --git a/application/src/main/java/org/thingsboard/server/service/ttl/NotificationsCleanUpService.java b/application/src/main/java/org/thingsboard/server/service/ttl/NotificationsCleanUpService.java index ddc95b74e9..83855bc8f8 100644 --- a/application/src/main/java/org/thingsboard/server/service/ttl/NotificationsCleanUpService.java +++ b/application/src/main/java/org/thingsboard/server/service/ttl/NotificationsCleanUpService.java @@ -20,33 +20,41 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; +import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.notification.NotificationRequestConfig; +import org.thingsboard.server.common.data.page.PageDataIterable; import org.thingsboard.server.dao.notification.NotificationRequestDao; import org.thingsboard.server.dao.sqlts.insert.sql.SqlPartitioningRepository; +import org.thingsboard.server.dao.tenant.TenantService; import org.thingsboard.server.queue.discovery.PartitionService; +import java.time.Instant; import java.util.concurrent.TimeUnit; import static org.thingsboard.server.dao.model.ModelConstants.NOTIFICATION_TABLE_NAME; +@Slf4j @Service @ConditionalOnExpression("${sql.ttl.notifications.enabled:true} && ${sql.ttl.notifications.ttl:0} > 0") -@Slf4j public class NotificationsCleanUpService extends AbstractCleanUpService { private final SqlPartitioningRepository partitioningRepository; private final NotificationRequestDao notificationRequestDao; + private final TenantService tenantService; @Value("${sql.ttl.notifications.ttl:2592000}") private long ttlInSec; @Value("${sql.notifications.partition_size:168}") private int partitionSizeInHours; + @Value("${sql.ttl.notifications.removal_batch_size:10000}") + private int removalBatchSize; public NotificationsCleanUpService(PartitionService partitionService, SqlPartitioningRepository partitioningRepository, - NotificationRequestDao notificationRequestDao) { + NotificationRequestDao notificationRequestDao, TenantService tenantService) { super(partitionService); this.partitioningRepository = partitioningRepository; this.notificationRequestDao = notificationRequestDao; + this.tenantService = tenantService; } @Scheduled(initialDelayString = "#{T(org.apache.commons.lang3.RandomUtils).nextLong(0, ${sql.ttl.notifications.checking_interval_ms:86400000})}", @@ -54,18 +62,65 @@ public class NotificationsCleanUpService extends AbstractCleanUpService { public void cleanUp() { long expTime = System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(ttlInSec); long partitionDurationMs = TimeUnit.HOURS.toMillis(partitionSizeInHours); - if (!isSystemTenantPartitionMine()) { + if (isSystemTenantPartitionMine()) { + partitioningRepository.dropPartitionsBefore(NOTIFICATION_TABLE_NAME, expTime, partitionDurationMs); + } else { partitioningRepository.cleanupPartitionsCache(NOTIFICATION_TABLE_NAME, expTime, partitionDurationMs); - return; } - long lastRemovedNotificationTs = partitioningRepository.dropPartitionsBefore(NOTIFICATION_TABLE_NAME, expTime, partitionDurationMs); - if (lastRemovedNotificationTs > 0) { - long gap = TimeUnit.MINUTES.toMillis(10); - long requestExpTime = lastRemovedNotificationTs - TimeUnit.SECONDS.toMillis(NotificationRequestConfig.MAX_SENDING_DELAY) - gap; - int removed = notificationRequestDao.removeAllByCreatedTimeBefore(requestExpTime); - log.info("Removed {} outdated notification requests older than {}", removed, requestExpTime); + long gap = TimeUnit.MINUTES.toMillis(10); + long requestExpTime = expTime - TimeUnit.SECONDS.toMillis(NotificationRequestConfig.MAX_SENDING_DELAY) - gap; + cleanUpNotificationRequests(requestExpTime); + } + + private void cleanUpNotificationRequests(long expirationTime) { + log.info("Starting notification requests cleanup for records older than {}", Instant.ofEpochMilli(expirationTime)); + int totalRemoved = 0; + int tenantsProcessed = 0; + + // Clean up SYSADMIN's notification requests on the system node only + if (isSystemTenantPartitionMine()) { + try { + totalRemoved += cleanUpByTenant(TenantId.SYS_TENANT_ID, expirationTime); + } catch (Exception e) { + log.warn("Failed to clean up notification requests for sysadmin {}", TenantId.SYS_TENANT_ID, e); + } + } + // Each node cleans up notification requests for its own tenants + PageDataIterable tenants = new PageDataIterable<>(tenantService::findTenantsIds, 10_000); + for (TenantId tenantId : tenants) { + try { + if (!isTenantPartitionMine(tenantId)) { + continue; + } + int tenantRemoved = cleanUpByTenant(tenantId, expirationTime); + totalRemoved += tenantRemoved; + tenantsProcessed++; + if (tenantRemoved > 0) { + log.trace("Removed {} notification requests for tenant {}", tenantRemoved, tenantId); + } + } catch (Exception e) { + log.warn("Failed to clean up notification requests for tenant {}", tenantId, e); + } } + + log.info("Notification requests cleanup completed. Processed {} tenants, removed {} total records older than {}", tenantsProcessed, totalRemoved, Instant.ofEpochMilli(expirationTime)); + } + + private int cleanUpByTenant(TenantId tenantId, long expirationTime) { + int totalRemoved = 0; + int batchRemoved; + + do { + batchRemoved = notificationRequestDao.removeByTenantIdAndCreatedTimeBeforeBatch(tenantId, expirationTime, removalBatchSize); + totalRemoved += batchRemoved; + + if (batchRemoved > 0) { + log.trace("Removed {} notification requests in batch for tenant {}", batchRemoved, tenantId); + } + } while (batchRemoved >= removalBatchSize); + + return totalRemoved; } } diff --git a/application/src/main/java/org/thingsboard/server/service/ttl/rpc/RpcCleanUpService.java b/application/src/main/java/org/thingsboard/server/service/ttl/rpc/RpcCleanUpService.java index 9404a0ae72..a945bfd8a5 100644 --- a/application/src/main/java/org/thingsboard/server/service/ttl/rpc/RpcCleanUpService.java +++ b/application/src/main/java/org/thingsboard/server/service/ttl/rpc/RpcCleanUpService.java @@ -15,69 +15,87 @@ */ package org.thingsboard.server.service.ttl.rpc; -import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import org.thingsboard.server.common.data.id.TenantId; -import org.thingsboard.server.common.data.page.PageData; -import org.thingsboard.server.common.data.page.PageLink; +import org.thingsboard.server.common.data.page.PageDataIterable; import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration; -import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.dao.rpc.RpcDao; import org.thingsboard.server.dao.tenant.TbTenantProfileCache; import org.thingsboard.server.dao.tenant.TenantService; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.util.TbCoreComponent; +import org.thingsboard.server.service.ttl.AbstractCleanUpService; -import java.util.Date; +import java.time.Instant; import java.util.Optional; import java.util.concurrent.TimeUnit; -@TbCoreComponent -@Service @Slf4j -@RequiredArgsConstructor -public class RpcCleanUpService { - @Value("${sql.ttl.rpc.enabled}") - private boolean ttlTaskExecutionEnabled; +@Service +@TbCoreComponent +@ConditionalOnExpression("${sql.ttl.rpc.enabled:true}") +public class RpcCleanUpService extends AbstractCleanUpService { + + @Value("${sql.ttl.rpc.removal_batch_size:10000}") + private int removalBatchSize; + private final RpcDao rpcDao; private final TenantService tenantService; - private final PartitionService partitionService; private final TbTenantProfileCache tenantProfileCache; - private final RpcDao rpcDao; + + public RpcCleanUpService(TenantService tenantService, PartitionService partitionService, TbTenantProfileCache tenantProfileCache, RpcDao rpcDao) { + super(partitionService); + this.tenantService = tenantService; + this.tenantProfileCache = tenantProfileCache; + this.rpcDao = rpcDao; + } @Scheduled(initialDelayString = "#{T(org.apache.commons.lang3.RandomUtils).nextLong(0, ${sql.ttl.rpc.checking_interval})}", fixedDelayString = "${sql.ttl.rpc.checking_interval}") public void cleanUp() { - if (ttlTaskExecutionEnabled) { - PageLink tenantsBatchRequest = new PageLink(10_000, 0); - PageData tenantsIds; - do { - tenantsIds = tenantService.findTenantsIds(tenantsBatchRequest); - for (TenantId tenantId : tenantsIds.getData()) { - if (!partitionService.resolve(ServiceType.TB_CORE, tenantId, tenantId).isMyPartition()) { - continue; - } - - Optional tenantProfileConfiguration = tenantProfileCache.get(tenantId).getProfileConfiguration(); - if (tenantProfileConfiguration.isEmpty() || tenantProfileConfiguration.get().getRpcTtlDays() == 0) { - continue; - } - - long ttl = TimeUnit.DAYS.toMillis(tenantProfileConfiguration.get().getRpcTtlDays()); - long expirationTime = System.currentTimeMillis() - ttl; - - int totalRemoved = rpcDao.deleteOutdatedRpcByTenantId(tenantId, expirationTime); - - if (totalRemoved > 0) { - log.info("Removed {} outdated rpc(s) for tenant {} older than {}", totalRemoved, tenantId, new Date(expirationTime)); - } + PageDataIterable tenants = new PageDataIterable<>(tenantService::findTenantsIds, 10_000); + for (TenantId tenantId : tenants) { + try { + if (!isTenantPartitionMine(tenantId)) { + continue; } - tenantsBatchRequest = tenantsBatchRequest.nextPageLink(); - } while (tenantsIds.hasNext()); + Optional tenantProfileConfiguration = tenantProfileCache.get(tenantId).getProfileConfiguration(); + if (tenantProfileConfiguration.isEmpty() || tenantProfileConfiguration.get().getRpcTtlDays() == 0) { + continue; + } + + long ttl = TimeUnit.DAYS.toMillis(tenantProfileConfiguration.get().getRpcTtlDays()); + long expirationTime = System.currentTimeMillis() - ttl; + + int totalRemoved = cleanUpByTenant(tenantId, expirationTime); + + if (totalRemoved > 0) { + log.info("Removed {} outdated rpc(s) for tenant {} older than {}", totalRemoved, tenantId, Instant.ofEpochMilli(expirationTime)); + } + } catch (Exception e) { + log.warn("Failed to clean up rpc by ttl for tenant {}", tenantId, e); + } } } + private int cleanUpByTenant(TenantId tenantId, long expirationTime) { + int totalRemoved = 0; + int batchRemoved; + + do { + batchRemoved = rpcDao.deleteOutdatedRpcByTenantIdBatch(tenantId, expirationTime, removalBatchSize); + totalRemoved += batchRemoved; + + if (batchRemoved > 0) { + log.trace("Removed {} rpc in batch for tenant {}", batchRemoved, tenantId); + } + } while (batchRemoved >= removalBatchSize); + + return totalRemoved; + } + } diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 21688abc50..60a158febe 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -421,10 +421,11 @@ sql: edge_events_ttl: "${SQL_TTL_EDGE_EVENTS_TTL:2628000}" # Number of seconds. The current value corresponds to one month alarms: checking_interval: "${SQL_ALARMS_TTL_CHECKING_INTERVAL:7200000}" # Number of milliseconds. The current value corresponds to two hours - removal_batch_size: "${SQL_ALARMS_TTL_REMOVAL_BATCH_SIZE:3000}" # To delete outdated alarms not all at once but in batches + removal_batch_size: "${SQL_ALARMS_TTL_REMOVAL_BATCH_SIZE:3000}" # Batch size for records removal rpc: enabled: "${SQL_TTL_RPC_ENABLED:true}" # Enable/disable TTL (Time To Live) for rpc call records checking_interval: "${SQL_RPC_TTL_CHECKING_INTERVAL:7200000}" # Number of milliseconds. The current value corresponds to two hours + removal_batch_size: "${SQL_RPC_TTL_REMOVAL_BATCH_SIZE:10000}" # Batch size for records removal audit_logs: enabled: "${SQL_TTL_AUDIT_LOGS_ENABLED:true}" # Enable/disable TTL (Time To Live) for audit log records ttl: "${SQL_TTL_AUDIT_LOGS_SECS:0}" # Disabled by default. The accuracy of the cleanup depends on the sql.audit_logs.partition_size @@ -433,6 +434,7 @@ sql: enabled: "${SQL_TTL_NOTIFICATIONS_ENABLED:true}" # Enable/disable TTL (Time To Live) for notification center records ttl: "${SQL_TTL_NOTIFICATIONS_SECS:2592000}" # Default value - 30 days checking_interval_ms: "${SQL_TTL_NOTIFICATIONS_CHECKING_INTERVAL_MS:86400000}" # Default value - 1 day + removal_batch_size: "${SQL_TTL_NOTIFICATIONS_REMOVAL_BATCH_SIZE:10000}" # Batch size for records removal relations: max_level: "${SQL_RELATIONS_MAX_LEVEL:50}" # This value has to be reasonably small to prevent infinite recursion as early as possible pool_size: "${SQL_RELATIONS_POOL_SIZE:4}" # This value has to be reasonably small to prevent the relation query from blocking all other DB calls diff --git a/application/src/test/java/org/thingsboard/server/service/ttl/NotificationsCleanUpServiceTest.java b/application/src/test/java/org/thingsboard/server/service/ttl/NotificationsCleanUpServiceTest.java new file mode 100644 index 0000000000..d74652325a --- /dev/null +++ b/application/src/test/java/org/thingsboard/server/service/ttl/NotificationsCleanUpServiceTest.java @@ -0,0 +1,144 @@ +/** + * Copyright © 2016-2026 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.ttl; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.test.util.ReflectionTestUtils; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; +import org.thingsboard.server.dao.notification.NotificationRequestDao; +import org.thingsboard.server.dao.sqlts.insert.sql.SqlPartitioningRepository; +import org.thingsboard.server.dao.tenant.TenantService; +import org.thingsboard.server.queue.discovery.PartitionService; + +import java.util.List; +import java.util.UUID; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class NotificationsCleanUpServiceTest { + + @Mock + private PartitionService partitionService; + @Mock + private SqlPartitioningRepository partitioningRepository; + @Mock + private NotificationRequestDao notificationRequestDao; + @Mock + private TenantService tenantService; + + private NotificationsCleanUpService cleanUpService; + + private static final int BATCH_SIZE = 3; + + @BeforeEach + public void setUp() { + cleanUpService = new NotificationsCleanUpService(partitionService, partitioningRepository, notificationRequestDao, tenantService); + ReflectionTestUtils.setField(cleanUpService, "ttlInSec", 2592000L); + ReflectionTestUtils.setField(cleanUpService, "partitionSizeInHours", 168); + ReflectionTestUtils.setField(cleanUpService, "removalBatchSize", BATCH_SIZE); + } + + @Test + public void testBatchLoopCallsDaoMultipleTimes() { + TopicPartitionInfo myPartition = TopicPartitionInfo.builder().topic("tb_core").myPartition(true).build(); + when(partitionService.resolve(any(), any(), any())).thenReturn(myPartition); + when(partitioningRepository.dropPartitionsBefore(anyString(), anyLong(), anyLong())) + .thenReturn(System.currentTimeMillis()); + + TenantId tenantId = TenantId.fromUUID(UUID.randomUUID()); + when(tenantService.findTenantsIds(any())) + .thenReturn(new PageData<>(List.of(tenantId), 1, 1, false)); + + // Sysadmin: returns 3 (full batch), then 1 (partial) -> 2 calls + when(notificationRequestDao.removeByTenantIdAndCreatedTimeBeforeBatch(eq(TenantId.SYS_TENANT_ID), anyLong(), eq(BATCH_SIZE))) + .thenReturn(BATCH_SIZE) + .thenReturn(1); + // Tenant: returns 3, 3, 0 -> 3 calls + when(notificationRequestDao.removeByTenantIdAndCreatedTimeBeforeBatch(eq(tenantId), anyLong(), eq(BATCH_SIZE))) + .thenReturn(BATCH_SIZE) + .thenReturn(BATCH_SIZE) + .thenReturn(0); + + cleanUpService.cleanUp(); + + verify(notificationRequestDao, times(2)) + .removeByTenantIdAndCreatedTimeBeforeBatch(eq(TenantId.SYS_TENANT_ID), anyLong(), eq(BATCH_SIZE)); + verify(notificationRequestDao, times(3)) + .removeByTenantIdAndCreatedTimeBeforeBatch(eq(tenantId), anyLong(), eq(BATCH_SIZE)); + } + + @Test + public void testSkipsTenantNotOnMyPartition() { + TopicPartitionInfo myPartition = TopicPartitionInfo.builder().topic("tb_core").myPartition(true).build(); + TopicPartitionInfo notMyPartition = TopicPartitionInfo.builder().topic("tb_core").myPartition(false).build(); + when(partitionService.resolve(any(), eq(TenantId.SYS_TENANT_ID), eq(TenantId.SYS_TENANT_ID))) + .thenReturn(myPartition); + when(partitioningRepository.dropPartitionsBefore(anyString(), anyLong(), anyLong())) + .thenReturn(System.currentTimeMillis()); + + // Sysadmin: no records + when(notificationRequestDao.removeByTenantIdAndCreatedTimeBeforeBatch(eq(TenantId.SYS_TENANT_ID), anyLong(), eq(BATCH_SIZE))) + .thenReturn(0); + + TenantId myTenant = TenantId.fromUUID(UUID.randomUUID()); + TenantId otherTenant = TenantId.fromUUID(UUID.randomUUID()); + when(tenantService.findTenantsIds(any())) + .thenReturn(new PageData<>(List.of(myTenant, otherTenant), 2, 1, false)); + when(partitionService.resolve(any(), eq(myTenant), eq(myTenant))).thenReturn(myPartition); + when(partitionService.resolve(any(), eq(otherTenant), eq(otherTenant))).thenReturn(notMyPartition); + + when(notificationRequestDao.removeByTenantIdAndCreatedTimeBeforeBatch(eq(myTenant), anyLong(), eq(BATCH_SIZE))) + .thenReturn(0); + + cleanUpService.cleanUp(); + + verify(notificationRequestDao).removeByTenantIdAndCreatedTimeBeforeBatch(eq(myTenant), anyLong(), eq(BATCH_SIZE)); + verify(notificationRequestDao, never()).removeByTenantIdAndCreatedTimeBeforeBatch(eq(otherTenant), anyLong(), anyInt()); + } + + @Test + public void testNoPartitionsDropped_stillCleansUpRequests() { + TopicPartitionInfo myPartition = TopicPartitionInfo.builder().topic("tb_core").myPartition(true).build(); + when(partitionService.resolve(any(), any(), any())).thenReturn(myPartition); + when(partitioningRepository.dropPartitionsBefore(anyString(), anyLong(), anyLong())) + .thenReturn(0L); + + when(notificationRequestDao.removeByTenantIdAndCreatedTimeBeforeBatch(eq(TenantId.SYS_TENANT_ID), anyLong(), eq(BATCH_SIZE))) + .thenReturn(0); + when(tenantService.findTenantsIds(any())) + .thenReturn(new PageData<>(List.of(), 0, 0, false)); + + cleanUpService.cleanUp(); + + verify(notificationRequestDao).removeByTenantIdAndCreatedTimeBeforeBatch(eq(TenantId.SYS_TENANT_ID), anyLong(), eq(BATCH_SIZE)); + } + +} diff --git a/application/src/test/java/org/thingsboard/server/service/ttl/rpc/RpcCleanUpServiceTest.java b/application/src/test/java/org/thingsboard/server/service/ttl/rpc/RpcCleanUpServiceTest.java new file mode 100644 index 0000000000..22cc1be1dd --- /dev/null +++ b/application/src/test/java/org/thingsboard/server/service/ttl/rpc/RpcCleanUpServiceTest.java @@ -0,0 +1,136 @@ +/** + * Copyright © 2016-2026 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.ttl.rpc; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.test.util.ReflectionTestUtils; +import org.thingsboard.server.common.data.TenantProfile; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration; +import org.thingsboard.server.common.data.tenant.profile.TenantProfileData; +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; +import org.thingsboard.server.dao.rpc.RpcDao; +import org.thingsboard.server.dao.tenant.TbTenantProfileCache; +import org.thingsboard.server.dao.tenant.TenantService; +import org.thingsboard.server.queue.discovery.PartitionService; + +import java.util.List; +import java.util.UUID; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class RpcCleanUpServiceTest { + + @Mock + private PartitionService partitionService; + @Mock + private RpcDao rpcDao; + @Mock + private TenantService tenantService; + @Mock + private TbTenantProfileCache tenantProfileCache; + + private RpcCleanUpService cleanUpService; + + private static final int BATCH_SIZE = 3; + + @BeforeEach + public void setUp() { + cleanUpService = new RpcCleanUpService(tenantService, partitionService, tenantProfileCache, rpcDao); + ReflectionTestUtils.setField(cleanUpService, "removalBatchSize", BATCH_SIZE); + } + + @Test + public void testBatchLoopCallsDaoMultipleTimes() { + TenantId tenantId = TenantId.fromUUID(UUID.randomUUID()); + setupTenant(tenantId, 7); + + // Returns 3 (full batch), 3 (full batch), 1 (partial) -> 3 calls + when(rpcDao.deleteOutdatedRpcByTenantIdBatch(eq(tenantId), anyLong(), eq(BATCH_SIZE))) + .thenReturn(BATCH_SIZE) + .thenReturn(BATCH_SIZE) + .thenReturn(1); + + cleanUpService.cleanUp(); + + verify(rpcDao, times(3)).deleteOutdatedRpcByTenantIdBatch(eq(tenantId), anyLong(), eq(BATCH_SIZE)); + } + + @Test + public void testSkipsTenantNotOnMyPartition() { + TenantId myTenant = TenantId.fromUUID(UUID.randomUUID()); + TenantId otherTenant = TenantId.fromUUID(UUID.randomUUID()); + + TopicPartitionInfo myPartition = TopicPartitionInfo.builder().topic("tb_core").myPartition(true).build(); + TopicPartitionInfo notMyPartition = TopicPartitionInfo.builder().topic("tb_core").myPartition(false).build(); + + when(tenantService.findTenantsIds(any())) + .thenReturn(new PageData<>(List.of(myTenant, otherTenant), 2, 1, false)); + when(partitionService.resolve(any(), eq(myTenant), eq(myTenant))).thenReturn(myPartition); + when(partitionService.resolve(any(), eq(otherTenant), eq(otherTenant))).thenReturn(notMyPartition); + + setupTenantProfile(myTenant, 7); + when(rpcDao.deleteOutdatedRpcByTenantIdBatch(eq(myTenant), anyLong(), eq(BATCH_SIZE))) + .thenReturn(0); + + cleanUpService.cleanUp(); + + verify(rpcDao).deleteOutdatedRpcByTenantIdBatch(eq(myTenant), anyLong(), eq(BATCH_SIZE)); + verify(rpcDao, never()).deleteOutdatedRpcByTenantIdBatch(eq(otherTenant), anyLong(), anyInt()); + } + + @Test + public void testSkipsTenantWithZeroTtl() { + TenantId tenantId = TenantId.fromUUID(UUID.randomUUID()); + setupTenant(tenantId, 0); + + cleanUpService.cleanUp(); + + verify(rpcDao, never()).deleteOutdatedRpcByTenantIdBatch(any(), anyLong(), anyInt()); + } + + private void setupTenant(TenantId tenantId, int rpcTtlDays) { + TopicPartitionInfo myPartition = TopicPartitionInfo.builder().topic("tb_core").myPartition(true).build(); + when(partitionService.resolve(any(), eq(tenantId), eq(tenantId))).thenReturn(myPartition); + when(tenantService.findTenantsIds(any())) + .thenReturn(new PageData<>(List.of(tenantId), 1, 1, false)); + setupTenantProfile(tenantId, rpcTtlDays); + } + + private void setupTenantProfile(TenantId tenantId, int rpcTtlDays) { + TenantProfile profile = new TenantProfile(); + TenantProfileData profileData = new TenantProfileData(); + DefaultTenantProfileConfiguration config = new DefaultTenantProfileConfiguration(); + config.setRpcTtlDays(rpcTtlDays); + profileData.setConfiguration(config); + profile.setProfileData(profileData); + when(tenantProfileCache.get(tenantId)).thenReturn(profile); + } + +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/notification/NotificationRequestDao.java b/dao/src/main/java/org/thingsboard/server/dao/notification/NotificationRequestDao.java index 96a86073d4..8e1408f4fc 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/notification/NotificationRequestDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/notification/NotificationRequestDao.java @@ -50,7 +50,7 @@ public interface NotificationRequestDao extends Dao { boolean existsByTenantIdAndStatusAndTemplateId(TenantId tenantId, NotificationRequestStatus status, NotificationTemplateId templateId); - int removeAllByCreatedTimeBefore(long ts); + int removeByTenantIdAndCreatedTimeBeforeBatch(TenantId tenantId, long ts, int batchSize); NotificationRequestInfo findInfoById(TenantId tenantId, NotificationRequestId id); diff --git a/dao/src/main/java/org/thingsboard/server/dao/rpc/RpcDao.java b/dao/src/main/java/org/thingsboard/server/dao/rpc/RpcDao.java index f88b672a34..37fe2950b4 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/rpc/RpcDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/rpc/RpcDao.java @@ -24,12 +24,13 @@ import org.thingsboard.server.common.data.rpc.RpcStatus; import org.thingsboard.server.dao.Dao; public interface RpcDao extends Dao { + PageData findAllByDeviceId(TenantId tenantId, DeviceId deviceId, PageLink pageLink); PageData findAllByDeviceIdAndStatus(TenantId tenantId, DeviceId deviceId, RpcStatus rpcStatus, PageLink pageLink); PageData findAllRpcByTenantId(TenantId tenantId, PageLink pageLink); - int deleteOutdatedRpcByTenantId(TenantId tenantId, Long expirationTime); + int deleteOutdatedRpcByTenantIdBatch(TenantId tenantId, Long expirationTime, int batchSize); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/notification/JpaNotificationRequestDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/notification/JpaNotificationRequestDao.java index 9d32e91ca2..f4cc406a75 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/notification/JpaNotificationRequestDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/notification/JpaNotificationRequestDao.java @@ -98,8 +98,8 @@ public class JpaNotificationRequestDao extends JpaAbstractDao implements RpcDao, @Transactional @Override - public int deleteOutdatedRpcByTenantId(TenantId tenantId, Long expirationTime) { - return rpcRepository.deleteOutdatedRpcByTenantId(tenantId.getId(), expirationTime); + public int deleteOutdatedRpcByTenantIdBatch(TenantId tenantId, Long expirationTime, int batchSize) { + return rpcRepository.deleteOutdatedRpcByTenantIdBatch(tenantId.getId(), expirationTime, batchSize); } @Override diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/rpc/RpcRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/rpc/RpcRepository.java index 8a9489333f..3f76170c84 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/rpc/RpcRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/rpc/RpcRepository.java @@ -21,20 +21,27 @@ import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.Modifying; import org.springframework.data.jpa.repository.Query; import org.springframework.data.repository.query.Param; +import org.springframework.transaction.annotation.Transactional; import org.thingsboard.server.common.data.rpc.RpcStatus; import org.thingsboard.server.dao.model.sql.RpcEntity; import java.util.UUID; public interface RpcRepository extends JpaRepository { + Page findAllByTenantIdAndDeviceId(UUID tenantId, UUID deviceId, Pageable pageable); Page findAllByTenantIdAndDeviceIdAndStatus(UUID tenantId, UUID deviceId, RpcStatus status, Pageable pageable); Page findAllByTenantId(UUID tenantId, Pageable pageable); + @Transactional @Modifying - @Query(value = "DELETE FROM rpc WHERE tenant_id = :tenantId AND created_time < :expirationTime", + @Query(value = "DELETE FROM rpc WHERE id IN " + + "(SELECT id FROM rpc WHERE tenant_id = :tenantId AND created_time < :expirationTime LIMIT :batchSize)", nativeQuery = true) - int deleteOutdatedRpcByTenantId(@Param("tenantId") UUID tenantId, @Param("expirationTime") Long expirationTime); + int deleteOutdatedRpcByTenantIdBatch(@Param("tenantId") UUID tenantId, + @Param("expirationTime") Long expirationTime, + @Param("batchSize") int batchSize); + } diff --git a/dao/src/test/java/org/thingsboard/server/dao/sql/notification/JpaNotificationRequestDaoTest.java b/dao/src/test/java/org/thingsboard/server/dao/sql/notification/JpaNotificationRequestDaoTest.java new file mode 100644 index 0000000000..5e2fe2308b --- /dev/null +++ b/dao/src/test/java/org/thingsboard/server/dao/sql/notification/JpaNotificationRequestDaoTest.java @@ -0,0 +1,133 @@ +/** + * Copyright © 2016-2026 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.sql.notification; + +import org.junit.After; +import org.junit.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.thingsboard.server.common.data.id.NotificationRequestId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.notification.NotificationRequest; +import org.thingsboard.server.common.data.notification.NotificationRequestStatus; +import org.thingsboard.server.dao.AbstractJpaDaoTest; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.assertj.core.api.Assertions.assertThat; + +public class JpaNotificationRequestDaoTest extends AbstractJpaDaoTest { + + @Autowired + JpaNotificationRequestDao notificationRequestDao; + + private final List createdRequests = new ArrayList<>(); + + @After + public void tearDown() { + for (NotificationRequest request : createdRequests) { + notificationRequestDao.removeById(request.getTenantId(), request.getId().getId()); + } + createdRequests.clear(); + } + + @Test + public void testBatchDeletion() { + TenantId sysTenantId = TenantId.SYS_TENANT_ID; + long now = System.currentTimeMillis(); + long oldTimestamp = now - TimeUnit.DAYS.toMillis(30); + + NotificationRequest oldRequest1 = createNotificationRequest(sysTenantId, oldTimestamp); + notificationRequestDao.save(sysTenantId, oldRequest1); + + NotificationRequest oldRequest2 = createNotificationRequest(sysTenantId, oldTimestamp); + notificationRequestDao.save(sysTenantId, oldRequest2); + + NotificationRequest freshRequest = createNotificationRequest(sysTenantId, now); + notificationRequestDao.save(sysTenantId, freshRequest); + + TenantId tenant2Id = TenantId.fromUUID(UUID.fromString("3d193a7a-774b-4c05-84d5-f7fdcf7a37cf")); + NotificationRequest tenant2Request = createNotificationRequest(tenant2Id, oldTimestamp); + notificationRequestDao.save(tenant2Id, tenant2Request); + + int batchSize = 10_000; + + assertThat(notificationRequestDao.removeByTenantIdAndCreatedTimeBeforeBatch(sysTenantId, oldTimestamp - 1, batchSize)).isEqualTo(0); + + long expirationTime = now - TimeUnit.DAYS.toMillis(15); + assertThat(notificationRequestDao.removeByTenantIdAndCreatedTimeBeforeBatch(sysTenantId, expirationTime, batchSize)).isEqualTo(2); + + assertThat(notificationRequestDao.findById(sysTenantId, freshRequest.getId().getId())).isNotNull(); + assertThat(notificationRequestDao.removeByTenantIdAndCreatedTimeBeforeBatch(tenant2Id, now + 1, batchSize)).isEqualTo(1); + } + + @Test + public void testBatchDeletionWithSmallBatchSize() { + TenantId tenantId = TenantId.SYS_TENANT_ID; + long oldTimestamp = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(30); + + for (int i = 0; i < 10; i++) { + NotificationRequest request = createNotificationRequest(tenantId, oldTimestamp); + notificationRequestDao.save(tenantId, request); + } + + int batchSize = 3; + long expirationTime = System.currentTimeMillis(); + + assertThat(notificationRequestDao.removeByTenantIdAndCreatedTimeBeforeBatch(tenantId, expirationTime, batchSize)).isEqualTo(3); + assertThat(notificationRequestDao.removeByTenantIdAndCreatedTimeBeforeBatch(tenantId, expirationTime, batchSize)).isEqualTo(3); + assertThat(notificationRequestDao.removeByTenantIdAndCreatedTimeBeforeBatch(tenantId, expirationTime, batchSize)).isEqualTo(3); + assertThat(notificationRequestDao.removeByTenantIdAndCreatedTimeBeforeBatch(tenantId, expirationTime, batchSize)).isEqualTo(1); + assertThat(notificationRequestDao.removeByTenantIdAndCreatedTimeBeforeBatch(tenantId, expirationTime, batchSize)).isEqualTo(0); + } + + @Test + public void testBatchDeletionIsolationBetweenTenants() { + TenantId tenant1 = TenantId.SYS_TENANT_ID; + TenantId tenant2 = TenantId.fromUUID(UUID.fromString("3d193a7a-774b-4c05-84d5-f7fdcf7a37cf")); + long oldTimestamp = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(30); + + for (int i = 0; i < 5; i++) { + NotificationRequest request = createNotificationRequest(tenant1, oldTimestamp); + notificationRequestDao.save(tenant1, request); + } + + for (int i = 0; i < 3; i++) { + NotificationRequest request = createNotificationRequest(tenant2, oldTimestamp); + notificationRequestDao.save(tenant2, request); + } + + int batchSize = 10_000; + long expirationTime = System.currentTimeMillis(); + + assertThat(notificationRequestDao.removeByTenantIdAndCreatedTimeBeforeBatch(tenant1, expirationTime, batchSize)).isEqualTo(5); + assertThat(notificationRequestDao.removeByTenantIdAndCreatedTimeBeforeBatch(tenant2, expirationTime, batchSize)).isEqualTo(3); + } + + private NotificationRequest createNotificationRequest(TenantId tenantId, long createdTime) { + NotificationRequest request = new NotificationRequest(); + request.setId(new NotificationRequestId(UUID.randomUUID())); + request.setTenantId(tenantId); + request.setCreatedTime(createdTime); + request.setTargets(List.of(UUID.randomUUID())); + request.setStatus(NotificationRequestStatus.SENT); + createdRequests.add(request); + return request; + } + +} diff --git a/dao/src/test/java/org/thingsboard/server/dao/sql/rpc/JpaRpcDaoTest.java b/dao/src/test/java/org/thingsboard/server/dao/sql/rpc/JpaRpcDaoTest.java index 1629922685..921339a92b 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/sql/rpc/JpaRpcDaoTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/sql/rpc/JpaRpcDaoTest.java @@ -51,9 +51,10 @@ public class JpaRpcDaoTest extends AbstractJpaDaoTest { rpc.setDeviceId(new DeviceId(UUID.randomUUID())); rpcDao.saveAndFlush(rpc.getTenantId(), rpc); - assertThat(rpcDao.deleteOutdatedRpcByTenantId(TenantId.SYS_TENANT_ID, 0L)).isEqualTo(0); - assertThat(rpcDao.deleteOutdatedRpcByTenantId(TenantId.SYS_TENANT_ID, Long.MAX_VALUE)).isEqualTo(2); - assertThat(rpcDao.deleteOutdatedRpcByTenantId(tenantId, System.currentTimeMillis() + 1)).isEqualTo(1); + int batchSize = 10_000; + assertThat(rpcDao.deleteOutdatedRpcByTenantIdBatch(TenantId.SYS_TENANT_ID, 0L, batchSize)).isEqualTo(0); + assertThat(rpcDao.deleteOutdatedRpcByTenantIdBatch(TenantId.SYS_TENANT_ID, Long.MAX_VALUE, batchSize)).isEqualTo(2); + assertThat(rpcDao.deleteOutdatedRpcByTenantIdBatch(tenantId, System.currentTimeMillis() + 1, batchSize)).isEqualTo(1); } }