Browse Source

Merge pull request #14762 from AndriiLandiak/clean-up-job-improvements

Fixed notification requests and RPC cleanup timeout on large datasets
pull/15195/merge
Viacheslav Klimov 3 months ago
committed by GitHub
parent
commit
debe877dbf
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 4
      application/src/main/java/org/thingsboard/server/service/ttl/AbstractCleanUpService.java
  2. 75
      application/src/main/java/org/thingsboard/server/service/ttl/NotificationsCleanUpService.java
  3. 94
      application/src/main/java/org/thingsboard/server/service/ttl/rpc/RpcCleanUpService.java
  4. 4
      application/src/main/resources/thingsboard.yml
  5. 144
      application/src/test/java/org/thingsboard/server/service/ttl/NotificationsCleanUpServiceTest.java
  6. 136
      application/src/test/java/org/thingsboard/server/service/ttl/rpc/RpcCleanUpServiceTest.java
  7. 2
      dao/src/main/java/org/thingsboard/server/dao/notification/NotificationRequestDao.java
  8. 3
      dao/src/main/java/org/thingsboard/server/dao/rpc/RpcDao.java
  9. 4
      dao/src/main/java/org/thingsboard/server/dao/sql/notification/JpaNotificationRequestDao.java
  10. 8
      dao/src/main/java/org/thingsboard/server/dao/sql/notification/NotificationRequestRepository.java
  11. 4
      dao/src/main/java/org/thingsboard/server/dao/sql/rpc/JpaRpcDao.java
  12. 11
      dao/src/main/java/org/thingsboard/server/dao/sql/rpc/RpcRepository.java
  13. 133
      dao/src/test/java/org/thingsboard/server/dao/sql/notification/JpaNotificationRequestDaoTest.java
  14. 7
      dao/src/test/java/org/thingsboard/server/dao/sql/rpc/JpaRpcDaoTest.java

4
application/src/main/java/org/thingsboard/server/service/ttl/AbstractCleanUpService.java

@ -32,4 +32,8 @@ public abstract class AbstractCleanUpService {
return partitionService.resolve(ServiceType.TB_CORE, TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID).isMyPartition();
}
protected boolean isTenantPartitionMine(TenantId tenantId) {
return partitionService.resolve(ServiceType.TB_CORE, tenantId, tenantId).isMyPartition();
}
}

75
application/src/main/java/org/thingsboard/server/service/ttl/NotificationsCleanUpService.java

@ -20,33 +20,41 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.notification.NotificationRequestConfig;
import org.thingsboard.server.common.data.page.PageDataIterable;
import org.thingsboard.server.dao.notification.NotificationRequestDao;
import org.thingsboard.server.dao.sqlts.insert.sql.SqlPartitioningRepository;
import org.thingsboard.server.dao.tenant.TenantService;
import org.thingsboard.server.queue.discovery.PartitionService;
import java.time.Instant;
import java.util.concurrent.TimeUnit;
import static org.thingsboard.server.dao.model.ModelConstants.NOTIFICATION_TABLE_NAME;
@Slf4j
@Service
@ConditionalOnExpression("${sql.ttl.notifications.enabled:true} && ${sql.ttl.notifications.ttl:0} > 0")
@Slf4j
public class NotificationsCleanUpService extends AbstractCleanUpService {
private final SqlPartitioningRepository partitioningRepository;
private final NotificationRequestDao notificationRequestDao;
private final TenantService tenantService;
@Value("${sql.ttl.notifications.ttl:2592000}")
private long ttlInSec;
@Value("${sql.notifications.partition_size:168}")
private int partitionSizeInHours;
@Value("${sql.ttl.notifications.removal_batch_size:10000}")
private int removalBatchSize;
public NotificationsCleanUpService(PartitionService partitionService, SqlPartitioningRepository partitioningRepository,
NotificationRequestDao notificationRequestDao) {
NotificationRequestDao notificationRequestDao, TenantService tenantService) {
super(partitionService);
this.partitioningRepository = partitioningRepository;
this.notificationRequestDao = notificationRequestDao;
this.tenantService = tenantService;
}
@Scheduled(initialDelayString = "#{T(org.apache.commons.lang3.RandomUtils).nextLong(0, ${sql.ttl.notifications.checking_interval_ms:86400000})}",
@ -54,18 +62,65 @@ public class NotificationsCleanUpService extends AbstractCleanUpService {
public void cleanUp() {
long expTime = System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(ttlInSec);
long partitionDurationMs = TimeUnit.HOURS.toMillis(partitionSizeInHours);
if (!isSystemTenantPartitionMine()) {
if (isSystemTenantPartitionMine()) {
partitioningRepository.dropPartitionsBefore(NOTIFICATION_TABLE_NAME, expTime, partitionDurationMs);
} else {
partitioningRepository.cleanupPartitionsCache(NOTIFICATION_TABLE_NAME, expTime, partitionDurationMs);
return;
}
long lastRemovedNotificationTs = partitioningRepository.dropPartitionsBefore(NOTIFICATION_TABLE_NAME, expTime, partitionDurationMs);
if (lastRemovedNotificationTs > 0) {
long gap = TimeUnit.MINUTES.toMillis(10);
long requestExpTime = lastRemovedNotificationTs - TimeUnit.SECONDS.toMillis(NotificationRequestConfig.MAX_SENDING_DELAY) - gap;
int removed = notificationRequestDao.removeAllByCreatedTimeBefore(requestExpTime);
log.info("Removed {} outdated notification requests older than {}", removed, requestExpTime);
long gap = TimeUnit.MINUTES.toMillis(10);
long requestExpTime = expTime - TimeUnit.SECONDS.toMillis(NotificationRequestConfig.MAX_SENDING_DELAY) - gap;
cleanUpNotificationRequests(requestExpTime);
}
private void cleanUpNotificationRequests(long expirationTime) {
log.info("Starting notification requests cleanup for records older than {}", Instant.ofEpochMilli(expirationTime));
int totalRemoved = 0;
int tenantsProcessed = 0;
// Clean up SYSADMIN's notification requests on the system node only
if (isSystemTenantPartitionMine()) {
try {
totalRemoved += cleanUpByTenant(TenantId.SYS_TENANT_ID, expirationTime);
} catch (Exception e) {
log.warn("Failed to clean up notification requests for sysadmin {}", TenantId.SYS_TENANT_ID, e);
}
}
// Each node cleans up notification requests for its own tenants
PageDataIterable<TenantId> tenants = new PageDataIterable<>(tenantService::findTenantsIds, 10_000);
for (TenantId tenantId : tenants) {
try {
if (!isTenantPartitionMine(tenantId)) {
continue;
}
int tenantRemoved = cleanUpByTenant(tenantId, expirationTime);
totalRemoved += tenantRemoved;
tenantsProcessed++;
if (tenantRemoved > 0) {
log.trace("Removed {} notification requests for tenant {}", tenantRemoved, tenantId);
}
} catch (Exception e) {
log.warn("Failed to clean up notification requests for tenant {}", tenantId, e);
}
}
log.info("Notification requests cleanup completed. Processed {} tenants, removed {} total records older than {}", tenantsProcessed, totalRemoved, Instant.ofEpochMilli(expirationTime));
}
private int cleanUpByTenant(TenantId tenantId, long expirationTime) {
int totalRemoved = 0;
int batchRemoved;
do {
batchRemoved = notificationRequestDao.removeByTenantIdAndCreatedTimeBeforeBatch(tenantId, expirationTime, removalBatchSize);
totalRemoved += batchRemoved;
if (batchRemoved > 0) {
log.trace("Removed {} notification requests in batch for tenant {}", batchRemoved, tenantId);
}
} while (batchRemoved >= removalBatchSize);
return totalRemoved;
}
}

94
application/src/main/java/org/thingsboard/server/service/ttl/rpc/RpcCleanUpService.java

@ -15,69 +15,87 @@
*/
package org.thingsboard.server.service.ttl.rpc;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.page.PageDataIterable;
import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.dao.rpc.RpcDao;
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
import org.thingsboard.server.dao.tenant.TenantService;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.ttl.AbstractCleanUpService;
import java.util.Date;
import java.time.Instant;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
@TbCoreComponent
@Service
@Slf4j
@RequiredArgsConstructor
public class RpcCleanUpService {
@Value("${sql.ttl.rpc.enabled}")
private boolean ttlTaskExecutionEnabled;
@Service
@TbCoreComponent
@ConditionalOnExpression("${sql.ttl.rpc.enabled:true}")
public class RpcCleanUpService extends AbstractCleanUpService {
@Value("${sql.ttl.rpc.removal_batch_size:10000}")
private int removalBatchSize;
private final RpcDao rpcDao;
private final TenantService tenantService;
private final PartitionService partitionService;
private final TbTenantProfileCache tenantProfileCache;
private final RpcDao rpcDao;
public RpcCleanUpService(TenantService tenantService, PartitionService partitionService, TbTenantProfileCache tenantProfileCache, RpcDao rpcDao) {
super(partitionService);
this.tenantService = tenantService;
this.tenantProfileCache = tenantProfileCache;
this.rpcDao = rpcDao;
}
@Scheduled(initialDelayString = "#{T(org.apache.commons.lang3.RandomUtils).nextLong(0, ${sql.ttl.rpc.checking_interval})}", fixedDelayString = "${sql.ttl.rpc.checking_interval}")
public void cleanUp() {
if (ttlTaskExecutionEnabled) {
PageLink tenantsBatchRequest = new PageLink(10_000, 0);
PageData<TenantId> tenantsIds;
do {
tenantsIds = tenantService.findTenantsIds(tenantsBatchRequest);
for (TenantId tenantId : tenantsIds.getData()) {
if (!partitionService.resolve(ServiceType.TB_CORE, tenantId, tenantId).isMyPartition()) {
continue;
}
Optional<DefaultTenantProfileConfiguration> tenantProfileConfiguration = tenantProfileCache.get(tenantId).getProfileConfiguration();
if (tenantProfileConfiguration.isEmpty() || tenantProfileConfiguration.get().getRpcTtlDays() == 0) {
continue;
}
long ttl = TimeUnit.DAYS.toMillis(tenantProfileConfiguration.get().getRpcTtlDays());
long expirationTime = System.currentTimeMillis() - ttl;
int totalRemoved = rpcDao.deleteOutdatedRpcByTenantId(tenantId, expirationTime);
if (totalRemoved > 0) {
log.info("Removed {} outdated rpc(s) for tenant {} older than {}", totalRemoved, tenantId, new Date(expirationTime));
}
PageDataIterable<TenantId> tenants = new PageDataIterable<>(tenantService::findTenantsIds, 10_000);
for (TenantId tenantId : tenants) {
try {
if (!isTenantPartitionMine(tenantId)) {
continue;
}
tenantsBatchRequest = tenantsBatchRequest.nextPageLink();
} while (tenantsIds.hasNext());
Optional<DefaultTenantProfileConfiguration> tenantProfileConfiguration = tenantProfileCache.get(tenantId).getProfileConfiguration();
if (tenantProfileConfiguration.isEmpty() || tenantProfileConfiguration.get().getRpcTtlDays() == 0) {
continue;
}
long ttl = TimeUnit.DAYS.toMillis(tenantProfileConfiguration.get().getRpcTtlDays());
long expirationTime = System.currentTimeMillis() - ttl;
int totalRemoved = cleanUpByTenant(tenantId, expirationTime);
if (totalRemoved > 0) {
log.info("Removed {} outdated rpc(s) for tenant {} older than {}", totalRemoved, tenantId, Instant.ofEpochMilli(expirationTime));
}
} catch (Exception e) {
log.warn("Failed to clean up rpc by ttl for tenant {}", tenantId, e);
}
}
}
private int cleanUpByTenant(TenantId tenantId, long expirationTime) {
int totalRemoved = 0;
int batchRemoved;
do {
batchRemoved = rpcDao.deleteOutdatedRpcByTenantIdBatch(tenantId, expirationTime, removalBatchSize);
totalRemoved += batchRemoved;
if (batchRemoved > 0) {
log.trace("Removed {} rpc in batch for tenant {}", batchRemoved, tenantId);
}
} while (batchRemoved >= removalBatchSize);
return totalRemoved;
}
}

4
application/src/main/resources/thingsboard.yml

@ -421,10 +421,11 @@ sql:
edge_events_ttl: "${SQL_TTL_EDGE_EVENTS_TTL:2628000}" # Number of seconds. The current value corresponds to one month
alarms:
checking_interval: "${SQL_ALARMS_TTL_CHECKING_INTERVAL:7200000}" # Number of milliseconds. The current value corresponds to two hours
removal_batch_size: "${SQL_ALARMS_TTL_REMOVAL_BATCH_SIZE:3000}" # To delete outdated alarms not all at once but in batches
removal_batch_size: "${SQL_ALARMS_TTL_REMOVAL_BATCH_SIZE:3000}" # Batch size for records removal
rpc:
enabled: "${SQL_TTL_RPC_ENABLED:true}" # Enable/disable TTL (Time To Live) for rpc call records
checking_interval: "${SQL_RPC_TTL_CHECKING_INTERVAL:7200000}" # Number of milliseconds. The current value corresponds to two hours
removal_batch_size: "${SQL_RPC_TTL_REMOVAL_BATCH_SIZE:10000}" # Batch size for records removal
audit_logs:
enabled: "${SQL_TTL_AUDIT_LOGS_ENABLED:true}" # Enable/disable TTL (Time To Live) for audit log records
ttl: "${SQL_TTL_AUDIT_LOGS_SECS:0}" # Disabled by default. The accuracy of the cleanup depends on the sql.audit_logs.partition_size
@ -433,6 +434,7 @@ sql:
enabled: "${SQL_TTL_NOTIFICATIONS_ENABLED:true}" # Enable/disable TTL (Time To Live) for notification center records
ttl: "${SQL_TTL_NOTIFICATIONS_SECS:2592000}" # Default value - 30 days
checking_interval_ms: "${SQL_TTL_NOTIFICATIONS_CHECKING_INTERVAL_MS:86400000}" # Default value - 1 day
removal_batch_size: "${SQL_TTL_NOTIFICATIONS_REMOVAL_BATCH_SIZE:10000}" # Batch size for records removal
relations:
max_level: "${SQL_RELATIONS_MAX_LEVEL:50}" # This value has to be reasonably small to prevent infinite recursion as early as possible
pool_size: "${SQL_RELATIONS_POOL_SIZE:4}" # This value has to be reasonably small to prevent the relation query from blocking all other DB calls

144
application/src/test/java/org/thingsboard/server/service/ttl/NotificationsCleanUpServiceTest.java

@ -0,0 +1,144 @@
/**
* Copyright © 2016-2026 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.ttl;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.test.util.ReflectionTestUtils;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.dao.notification.NotificationRequestDao;
import org.thingsboard.server.dao.sqlts.insert.sql.SqlPartitioningRepository;
import org.thingsboard.server.dao.tenant.TenantService;
import org.thingsboard.server.queue.discovery.PartitionService;
import java.util.List;
import java.util.UUID;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
public class NotificationsCleanUpServiceTest {
@Mock
private PartitionService partitionService;
@Mock
private SqlPartitioningRepository partitioningRepository;
@Mock
private NotificationRequestDao notificationRequestDao;
@Mock
private TenantService tenantService;
private NotificationsCleanUpService cleanUpService;
private static final int BATCH_SIZE = 3;
@BeforeEach
public void setUp() {
cleanUpService = new NotificationsCleanUpService(partitionService, partitioningRepository, notificationRequestDao, tenantService);
ReflectionTestUtils.setField(cleanUpService, "ttlInSec", 2592000L);
ReflectionTestUtils.setField(cleanUpService, "partitionSizeInHours", 168);
ReflectionTestUtils.setField(cleanUpService, "removalBatchSize", BATCH_SIZE);
}
@Test
public void testBatchLoopCallsDaoMultipleTimes() {
TopicPartitionInfo myPartition = TopicPartitionInfo.builder().topic("tb_core").myPartition(true).build();
when(partitionService.resolve(any(), any(), any())).thenReturn(myPartition);
when(partitioningRepository.dropPartitionsBefore(anyString(), anyLong(), anyLong()))
.thenReturn(System.currentTimeMillis());
TenantId tenantId = TenantId.fromUUID(UUID.randomUUID());
when(tenantService.findTenantsIds(any()))
.thenReturn(new PageData<>(List.of(tenantId), 1, 1, false));
// Sysadmin: returns 3 (full batch), then 1 (partial) -> 2 calls
when(notificationRequestDao.removeByTenantIdAndCreatedTimeBeforeBatch(eq(TenantId.SYS_TENANT_ID), anyLong(), eq(BATCH_SIZE)))
.thenReturn(BATCH_SIZE)
.thenReturn(1);
// Tenant: returns 3, 3, 0 -> 3 calls
when(notificationRequestDao.removeByTenantIdAndCreatedTimeBeforeBatch(eq(tenantId), anyLong(), eq(BATCH_SIZE)))
.thenReturn(BATCH_SIZE)
.thenReturn(BATCH_SIZE)
.thenReturn(0);
cleanUpService.cleanUp();
verify(notificationRequestDao, times(2))
.removeByTenantIdAndCreatedTimeBeforeBatch(eq(TenantId.SYS_TENANT_ID), anyLong(), eq(BATCH_SIZE));
verify(notificationRequestDao, times(3))
.removeByTenantIdAndCreatedTimeBeforeBatch(eq(tenantId), anyLong(), eq(BATCH_SIZE));
}
@Test
public void testSkipsTenantNotOnMyPartition() {
TopicPartitionInfo myPartition = TopicPartitionInfo.builder().topic("tb_core").myPartition(true).build();
TopicPartitionInfo notMyPartition = TopicPartitionInfo.builder().topic("tb_core").myPartition(false).build();
when(partitionService.resolve(any(), eq(TenantId.SYS_TENANT_ID), eq(TenantId.SYS_TENANT_ID)))
.thenReturn(myPartition);
when(partitioningRepository.dropPartitionsBefore(anyString(), anyLong(), anyLong()))
.thenReturn(System.currentTimeMillis());
// Sysadmin: no records
when(notificationRequestDao.removeByTenantIdAndCreatedTimeBeforeBatch(eq(TenantId.SYS_TENANT_ID), anyLong(), eq(BATCH_SIZE)))
.thenReturn(0);
TenantId myTenant = TenantId.fromUUID(UUID.randomUUID());
TenantId otherTenant = TenantId.fromUUID(UUID.randomUUID());
when(tenantService.findTenantsIds(any()))
.thenReturn(new PageData<>(List.of(myTenant, otherTenant), 2, 1, false));
when(partitionService.resolve(any(), eq(myTenant), eq(myTenant))).thenReturn(myPartition);
when(partitionService.resolve(any(), eq(otherTenant), eq(otherTenant))).thenReturn(notMyPartition);
when(notificationRequestDao.removeByTenantIdAndCreatedTimeBeforeBatch(eq(myTenant), anyLong(), eq(BATCH_SIZE)))
.thenReturn(0);
cleanUpService.cleanUp();
verify(notificationRequestDao).removeByTenantIdAndCreatedTimeBeforeBatch(eq(myTenant), anyLong(), eq(BATCH_SIZE));
verify(notificationRequestDao, never()).removeByTenantIdAndCreatedTimeBeforeBatch(eq(otherTenant), anyLong(), anyInt());
}
@Test
public void testNoPartitionsDropped_stillCleansUpRequests() {
TopicPartitionInfo myPartition = TopicPartitionInfo.builder().topic("tb_core").myPartition(true).build();
when(partitionService.resolve(any(), any(), any())).thenReturn(myPartition);
when(partitioningRepository.dropPartitionsBefore(anyString(), anyLong(), anyLong()))
.thenReturn(0L);
when(notificationRequestDao.removeByTenantIdAndCreatedTimeBeforeBatch(eq(TenantId.SYS_TENANT_ID), anyLong(), eq(BATCH_SIZE)))
.thenReturn(0);
when(tenantService.findTenantsIds(any()))
.thenReturn(new PageData<>(List.of(), 0, 0, false));
cleanUpService.cleanUp();
verify(notificationRequestDao).removeByTenantIdAndCreatedTimeBeforeBatch(eq(TenantId.SYS_TENANT_ID), anyLong(), eq(BATCH_SIZE));
}
}

136
application/src/test/java/org/thingsboard/server/service/ttl/rpc/RpcCleanUpServiceTest.java

@ -0,0 +1,136 @@
/**
* Copyright © 2016-2026 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.ttl.rpc;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.test.util.ReflectionTestUtils;
import org.thingsboard.server.common.data.TenantProfile;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration;
import org.thingsboard.server.common.data.tenant.profile.TenantProfileData;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.dao.rpc.RpcDao;
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
import org.thingsboard.server.dao.tenant.TenantService;
import org.thingsboard.server.queue.discovery.PartitionService;
import java.util.List;
import java.util.UUID;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
public class RpcCleanUpServiceTest {
@Mock
private PartitionService partitionService;
@Mock
private RpcDao rpcDao;
@Mock
private TenantService tenantService;
@Mock
private TbTenantProfileCache tenantProfileCache;
private RpcCleanUpService cleanUpService;
private static final int BATCH_SIZE = 3;
@BeforeEach
public void setUp() {
cleanUpService = new RpcCleanUpService(tenantService, partitionService, tenantProfileCache, rpcDao);
ReflectionTestUtils.setField(cleanUpService, "removalBatchSize", BATCH_SIZE);
}
@Test
public void testBatchLoopCallsDaoMultipleTimes() {
TenantId tenantId = TenantId.fromUUID(UUID.randomUUID());
setupTenant(tenantId, 7);
// Returns 3 (full batch), 3 (full batch), 1 (partial) -> 3 calls
when(rpcDao.deleteOutdatedRpcByTenantIdBatch(eq(tenantId), anyLong(), eq(BATCH_SIZE)))
.thenReturn(BATCH_SIZE)
.thenReturn(BATCH_SIZE)
.thenReturn(1);
cleanUpService.cleanUp();
verify(rpcDao, times(3)).deleteOutdatedRpcByTenantIdBatch(eq(tenantId), anyLong(), eq(BATCH_SIZE));
}
@Test
public void testSkipsTenantNotOnMyPartition() {
TenantId myTenant = TenantId.fromUUID(UUID.randomUUID());
TenantId otherTenant = TenantId.fromUUID(UUID.randomUUID());
TopicPartitionInfo myPartition = TopicPartitionInfo.builder().topic("tb_core").myPartition(true).build();
TopicPartitionInfo notMyPartition = TopicPartitionInfo.builder().topic("tb_core").myPartition(false).build();
when(tenantService.findTenantsIds(any()))
.thenReturn(new PageData<>(List.of(myTenant, otherTenant), 2, 1, false));
when(partitionService.resolve(any(), eq(myTenant), eq(myTenant))).thenReturn(myPartition);
when(partitionService.resolve(any(), eq(otherTenant), eq(otherTenant))).thenReturn(notMyPartition);
setupTenantProfile(myTenant, 7);
when(rpcDao.deleteOutdatedRpcByTenantIdBatch(eq(myTenant), anyLong(), eq(BATCH_SIZE)))
.thenReturn(0);
cleanUpService.cleanUp();
verify(rpcDao).deleteOutdatedRpcByTenantIdBatch(eq(myTenant), anyLong(), eq(BATCH_SIZE));
verify(rpcDao, never()).deleteOutdatedRpcByTenantIdBatch(eq(otherTenant), anyLong(), anyInt());
}
@Test
public void testSkipsTenantWithZeroTtl() {
TenantId tenantId = TenantId.fromUUID(UUID.randomUUID());
setupTenant(tenantId, 0);
cleanUpService.cleanUp();
verify(rpcDao, never()).deleteOutdatedRpcByTenantIdBatch(any(), anyLong(), anyInt());
}
private void setupTenant(TenantId tenantId, int rpcTtlDays) {
TopicPartitionInfo myPartition = TopicPartitionInfo.builder().topic("tb_core").myPartition(true).build();
when(partitionService.resolve(any(), eq(tenantId), eq(tenantId))).thenReturn(myPartition);
when(tenantService.findTenantsIds(any()))
.thenReturn(new PageData<>(List.of(tenantId), 1, 1, false));
setupTenantProfile(tenantId, rpcTtlDays);
}
private void setupTenantProfile(TenantId tenantId, int rpcTtlDays) {
TenantProfile profile = new TenantProfile();
TenantProfileData profileData = new TenantProfileData();
DefaultTenantProfileConfiguration config = new DefaultTenantProfileConfiguration();
config.setRpcTtlDays(rpcTtlDays);
profileData.setConfiguration(config);
profile.setProfileData(profileData);
when(tenantProfileCache.get(tenantId)).thenReturn(profile);
}
}

2
dao/src/main/java/org/thingsboard/server/dao/notification/NotificationRequestDao.java

@ -50,7 +50,7 @@ public interface NotificationRequestDao extends Dao<NotificationRequest> {
boolean existsByTenantIdAndStatusAndTemplateId(TenantId tenantId, NotificationRequestStatus status, NotificationTemplateId templateId);
int removeAllByCreatedTimeBefore(long ts);
int removeByTenantIdAndCreatedTimeBeforeBatch(TenantId tenantId, long ts, int batchSize);
NotificationRequestInfo findInfoById(TenantId tenantId, NotificationRequestId id);

3
dao/src/main/java/org/thingsboard/server/dao/rpc/RpcDao.java

@ -24,12 +24,13 @@ import org.thingsboard.server.common.data.rpc.RpcStatus;
import org.thingsboard.server.dao.Dao;
public interface RpcDao extends Dao<Rpc> {
PageData<Rpc> findAllByDeviceId(TenantId tenantId, DeviceId deviceId, PageLink pageLink);
PageData<Rpc> findAllByDeviceIdAndStatus(TenantId tenantId, DeviceId deviceId, RpcStatus rpcStatus, PageLink pageLink);
PageData<Rpc> findAllRpcByTenantId(TenantId tenantId, PageLink pageLink);
int deleteOutdatedRpcByTenantId(TenantId tenantId, Long expirationTime);
int deleteOutdatedRpcByTenantIdBatch(TenantId tenantId, Long expirationTime, int batchSize);
}

4
dao/src/main/java/org/thingsboard/server/dao/sql/notification/JpaNotificationRequestDao.java

@ -98,8 +98,8 @@ public class JpaNotificationRequestDao extends JpaAbstractDao<NotificationReques
}
@Override
public int removeAllByCreatedTimeBefore(long ts) {
return notificationRequestRepository.deleteAllByCreatedTimeBefore(ts);
public int removeByTenantIdAndCreatedTimeBeforeBatch(TenantId tenantId, long ts, int batchSize) {
return notificationRequestRepository.deleteByTenantIdAndCreatedTimeBeforeBatch(tenantId.getId(), ts, batchSize);
}
@Override

8
dao/src/main/java/org/thingsboard/server/dao/sql/notification/NotificationRequestRepository.java

@ -71,8 +71,12 @@ public interface NotificationRequestRepository extends JpaRepository<Notificatio
@Transactional
@Modifying
@Query("DELETE FROM NotificationRequestEntity r WHERE r.createdTime < :ts")
int deleteAllByCreatedTimeBefore(@Param("ts") long ts);
@Query(value = "DELETE FROM notification_request WHERE id IN " +
"(SELECT id FROM notification_request WHERE tenant_id = :tenantId AND created_time < :ts LIMIT :batchSize)",
nativeQuery = true)
int deleteByTenantIdAndCreatedTimeBeforeBatch(@Param("tenantId") UUID tenantId,
@Param("ts") long ts,
@Param("batchSize") int batchSize);
@Transactional
@Modifying

4
dao/src/main/java/org/thingsboard/server/dao/sql/rpc/JpaRpcDao.java

@ -71,8 +71,8 @@ public class JpaRpcDao extends JpaAbstractDao<RpcEntity, Rpc> implements RpcDao,
@Transactional
@Override
public int deleteOutdatedRpcByTenantId(TenantId tenantId, Long expirationTime) {
return rpcRepository.deleteOutdatedRpcByTenantId(tenantId.getId(), expirationTime);
public int deleteOutdatedRpcByTenantIdBatch(TenantId tenantId, Long expirationTime, int batchSize) {
return rpcRepository.deleteOutdatedRpcByTenantIdBatch(tenantId.getId(), expirationTime, batchSize);
}
@Override

11
dao/src/main/java/org/thingsboard/server/dao/sql/rpc/RpcRepository.java

@ -21,20 +21,27 @@ import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
import org.springframework.transaction.annotation.Transactional;
import org.thingsboard.server.common.data.rpc.RpcStatus;
import org.thingsboard.server.dao.model.sql.RpcEntity;
import java.util.UUID;
public interface RpcRepository extends JpaRepository<RpcEntity, UUID> {
Page<RpcEntity> findAllByTenantIdAndDeviceId(UUID tenantId, UUID deviceId, Pageable pageable);
Page<RpcEntity> findAllByTenantIdAndDeviceIdAndStatus(UUID tenantId, UUID deviceId, RpcStatus status, Pageable pageable);
Page<RpcEntity> findAllByTenantId(UUID tenantId, Pageable pageable);
@Transactional
@Modifying
@Query(value = "DELETE FROM rpc WHERE tenant_id = :tenantId AND created_time < :expirationTime",
@Query(value = "DELETE FROM rpc WHERE id IN " +
"(SELECT id FROM rpc WHERE tenant_id = :tenantId AND created_time < :expirationTime LIMIT :batchSize)",
nativeQuery = true)
int deleteOutdatedRpcByTenantId(@Param("tenantId") UUID tenantId, @Param("expirationTime") Long expirationTime);
int deleteOutdatedRpcByTenantIdBatch(@Param("tenantId") UUID tenantId,
@Param("expirationTime") Long expirationTime,
@Param("batchSize") int batchSize);
}

133
dao/src/test/java/org/thingsboard/server/dao/sql/notification/JpaNotificationRequestDaoTest.java

@ -0,0 +1,133 @@
/**
* Copyright © 2016-2026 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.sql.notification;
import org.junit.After;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.thingsboard.server.common.data.id.NotificationRequestId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.notification.NotificationRequest;
import org.thingsboard.server.common.data.notification.NotificationRequestStatus;
import org.thingsboard.server.dao.AbstractJpaDaoTest;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import static org.assertj.core.api.Assertions.assertThat;
public class JpaNotificationRequestDaoTest extends AbstractJpaDaoTest {
@Autowired
JpaNotificationRequestDao notificationRequestDao;
private final List<NotificationRequest> createdRequests = new ArrayList<>();
@After
public void tearDown() {
for (NotificationRequest request : createdRequests) {
notificationRequestDao.removeById(request.getTenantId(), request.getId().getId());
}
createdRequests.clear();
}
@Test
public void testBatchDeletion() {
TenantId sysTenantId = TenantId.SYS_TENANT_ID;
long now = System.currentTimeMillis();
long oldTimestamp = now - TimeUnit.DAYS.toMillis(30);
NotificationRequest oldRequest1 = createNotificationRequest(sysTenantId, oldTimestamp);
notificationRequestDao.save(sysTenantId, oldRequest1);
NotificationRequest oldRequest2 = createNotificationRequest(sysTenantId, oldTimestamp);
notificationRequestDao.save(sysTenantId, oldRequest2);
NotificationRequest freshRequest = createNotificationRequest(sysTenantId, now);
notificationRequestDao.save(sysTenantId, freshRequest);
TenantId tenant2Id = TenantId.fromUUID(UUID.fromString("3d193a7a-774b-4c05-84d5-f7fdcf7a37cf"));
NotificationRequest tenant2Request = createNotificationRequest(tenant2Id, oldTimestamp);
notificationRequestDao.save(tenant2Id, tenant2Request);
int batchSize = 10_000;
assertThat(notificationRequestDao.removeByTenantIdAndCreatedTimeBeforeBatch(sysTenantId, oldTimestamp - 1, batchSize)).isEqualTo(0);
long expirationTime = now - TimeUnit.DAYS.toMillis(15);
assertThat(notificationRequestDao.removeByTenantIdAndCreatedTimeBeforeBatch(sysTenantId, expirationTime, batchSize)).isEqualTo(2);
assertThat(notificationRequestDao.findById(sysTenantId, freshRequest.getId().getId())).isNotNull();
assertThat(notificationRequestDao.removeByTenantIdAndCreatedTimeBeforeBatch(tenant2Id, now + 1, batchSize)).isEqualTo(1);
}
@Test
public void testBatchDeletionWithSmallBatchSize() {
TenantId tenantId = TenantId.SYS_TENANT_ID;
long oldTimestamp = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(30);
for (int i = 0; i < 10; i++) {
NotificationRequest request = createNotificationRequest(tenantId, oldTimestamp);
notificationRequestDao.save(tenantId, request);
}
int batchSize = 3;
long expirationTime = System.currentTimeMillis();
assertThat(notificationRequestDao.removeByTenantIdAndCreatedTimeBeforeBatch(tenantId, expirationTime, batchSize)).isEqualTo(3);
assertThat(notificationRequestDao.removeByTenantIdAndCreatedTimeBeforeBatch(tenantId, expirationTime, batchSize)).isEqualTo(3);
assertThat(notificationRequestDao.removeByTenantIdAndCreatedTimeBeforeBatch(tenantId, expirationTime, batchSize)).isEqualTo(3);
assertThat(notificationRequestDao.removeByTenantIdAndCreatedTimeBeforeBatch(tenantId, expirationTime, batchSize)).isEqualTo(1);
assertThat(notificationRequestDao.removeByTenantIdAndCreatedTimeBeforeBatch(tenantId, expirationTime, batchSize)).isEqualTo(0);
}
@Test
public void testBatchDeletionIsolationBetweenTenants() {
TenantId tenant1 = TenantId.SYS_TENANT_ID;
TenantId tenant2 = TenantId.fromUUID(UUID.fromString("3d193a7a-774b-4c05-84d5-f7fdcf7a37cf"));
long oldTimestamp = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(30);
for (int i = 0; i < 5; i++) {
NotificationRequest request = createNotificationRequest(tenant1, oldTimestamp);
notificationRequestDao.save(tenant1, request);
}
for (int i = 0; i < 3; i++) {
NotificationRequest request = createNotificationRequest(tenant2, oldTimestamp);
notificationRequestDao.save(tenant2, request);
}
int batchSize = 10_000;
long expirationTime = System.currentTimeMillis();
assertThat(notificationRequestDao.removeByTenantIdAndCreatedTimeBeforeBatch(tenant1, expirationTime, batchSize)).isEqualTo(5);
assertThat(notificationRequestDao.removeByTenantIdAndCreatedTimeBeforeBatch(tenant2, expirationTime, batchSize)).isEqualTo(3);
}
private NotificationRequest createNotificationRequest(TenantId tenantId, long createdTime) {
NotificationRequest request = new NotificationRequest();
request.setId(new NotificationRequestId(UUID.randomUUID()));
request.setTenantId(tenantId);
request.setCreatedTime(createdTime);
request.setTargets(List.of(UUID.randomUUID()));
request.setStatus(NotificationRequestStatus.SENT);
createdRequests.add(request);
return request;
}
}

7
dao/src/test/java/org/thingsboard/server/dao/sql/rpc/JpaRpcDaoTest.java

@ -51,9 +51,10 @@ public class JpaRpcDaoTest extends AbstractJpaDaoTest {
rpc.setDeviceId(new DeviceId(UUID.randomUUID()));
rpcDao.saveAndFlush(rpc.getTenantId(), rpc);
assertThat(rpcDao.deleteOutdatedRpcByTenantId(TenantId.SYS_TENANT_ID, 0L)).isEqualTo(0);
assertThat(rpcDao.deleteOutdatedRpcByTenantId(TenantId.SYS_TENANT_ID, Long.MAX_VALUE)).isEqualTo(2);
assertThat(rpcDao.deleteOutdatedRpcByTenantId(tenantId, System.currentTimeMillis() + 1)).isEqualTo(1);
int batchSize = 10_000;
assertThat(rpcDao.deleteOutdatedRpcByTenantIdBatch(TenantId.SYS_TENANT_ID, 0L, batchSize)).isEqualTo(0);
assertThat(rpcDao.deleteOutdatedRpcByTenantIdBatch(TenantId.SYS_TENANT_ID, Long.MAX_VALUE, batchSize)).isEqualTo(2);
assertThat(rpcDao.deleteOutdatedRpcByTenantIdBatch(tenantId, System.currentTimeMillis() + 1, batchSize)).isEqualTo(1);
}
}

Loading…
Cancel
Save