diff --git a/application/src/main/java/org/thingsboard/server/service/ttl/rpc/RpcCleanUpService.java b/application/src/main/java/org/thingsboard/server/service/ttl/rpc/RpcCleanUpService.java new file mode 100644 index 0000000000..c0985eb4c1 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/ttl/rpc/RpcCleanUpService.java @@ -0,0 +1,83 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.ttl.rpc; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Service; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.page.PageLink; +import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration; +import org.thingsboard.server.common.msg.queue.ServiceType; +import org.thingsboard.server.dao.rpc.RpcDao; +import org.thingsboard.server.dao.tenant.TbTenantProfileCache; +import org.thingsboard.server.dao.tenant.TenantDao; +import org.thingsboard.server.queue.discovery.PartitionService; +import org.thingsboard.server.queue.util.TbCoreComponent; + +import java.util.Date; +import java.util.Optional; +import java.util.concurrent.TimeUnit; + +@TbCoreComponent +@Service +@Slf4j +@RequiredArgsConstructor +public class RpcCleanUpService { + @Value("${sql.ttl.rpc.enabled}") + private boolean ttlTaskExecutionEnabled; + + private final TenantDao tenantDao; + private final PartitionService partitionService; + private final TbTenantProfileCache tenantProfileCache; + private final RpcDao rpcDao; + + @Scheduled(initialDelayString = "#{T(org.apache.commons.lang3.RandomUtils).nextLong(0, ${sql.ttl.rpc.checking_interval})}", fixedDelayString = "${sql.ttl.rpc.checking_interval}") + public void cleanUp() { + if (ttlTaskExecutionEnabled) { + PageLink tenantsBatchRequest = new PageLink(10_000, 0); + PageData tenantsIds; + do { + tenantsIds = tenantDao.findTenantsIds(tenantsBatchRequest); + for (TenantId tenantId : tenantsIds.getData()) { + if (!partitionService.resolve(ServiceType.TB_CORE, tenantId, tenantId).isMyPartition()) { + continue; + } + + Optional tenantProfileConfiguration = tenantProfileCache.get(tenantId).getProfileConfiguration(); + if (tenantProfileConfiguration.isEmpty() || tenantProfileConfiguration.get().getRpcTtlDays() == 0) { + continue; + } + + long ttl = TimeUnit.DAYS.toMillis(tenantProfileConfiguration.get().getRpcTtlDays()); + long expirationTime = System.currentTimeMillis() - ttl; + + long totalRemoved = rpcDao.deleteOutdatedRpcByTenantId(tenantId, expirationTime); + + if (totalRemoved > 0) { + log.info("Removed {} outdated rpc(s) for tenant {} older than {}", totalRemoved, tenantId, new Date(expirationTime)); + } + } + + tenantsBatchRequest = tenantsBatchRequest.nextPageLink(); + } while (tenantsIds.hasNext()); + } + } + +} diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 54721ba2fa..7fd0d47184 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -276,6 +276,9 @@ sql: alarms: checking_interval: "${SQL_ALARMS_TTL_CHECKING_INTERVAL:7200000}" # Number of milliseconds. The current value corresponds to two hours removal_batch_size: "${SQL_ALARMS_TTL_REMOVAL_BATCH_SIZE:3000}" # To delete outdated alarms not all at once but in batches + rpc: + enabled: "${SQL_TTL_RPC_ENABLED:true}" + checking_interval: "${SQL_RPC_TTL_CHECKING_INTERVAL:7200000}" # Number of milliseconds. The current value corresponds to two hours # Actor system parameters actors: diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/tenant/profile/DefaultTenantProfileConfiguration.java b/common/data/src/main/java/org/thingsboard/server/common/data/tenant/profile/DefaultTenantProfileConfiguration.java index 8cdccfe8bd..ce10f95055 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/tenant/profile/DefaultTenantProfileConfiguration.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/tenant/profile/DefaultTenantProfileConfiguration.java @@ -56,6 +56,7 @@ public class DefaultTenantProfileConfiguration implements TenantProfileConfigura private int defaultStorageTtlDays; private int alarmsTtlDays; + private int rpcTtlDays; private double warnThreshold; diff --git a/dao/src/main/java/org/thingsboard/server/dao/rpc/RpcDao.java b/dao/src/main/java/org/thingsboard/server/dao/rpc/RpcDao.java index 753d1043f3..a3aa32720a 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/rpc/RpcDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/rpc/RpcDao.java @@ -16,6 +16,7 @@ package org.thingsboard.server.dao.rpc; import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.rpc.Rpc; @@ -24,4 +25,6 @@ import org.thingsboard.server.dao.Dao; public interface RpcDao extends Dao { PageData findAllByDeviceId(DeviceId deviceId, RpcStatus rpcStatus, PageLink pageLink); + + Long deleteOutdatedRpcByTenantId(TenantId tenantId, Long expirationTime); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/rpc/JpaRpcDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/rpc/JpaRpcDao.java index 1a538c9d9d..22fc3396f7 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/rpc/JpaRpcDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/rpc/JpaRpcDao.java @@ -20,6 +20,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.data.repository.CrudRepository; import org.springframework.stereotype.Component; import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.rpc.Rpc; @@ -53,4 +54,9 @@ public class JpaRpcDao extends JpaAbstractDao implements RpcDao public PageData findAllByDeviceId(DeviceId deviceId, RpcStatus rpcStatus, PageLink pageLink) { return DaoUtil.toPageData(rpcRepository.findAllByDeviceIdAndStatus(deviceId.getId(), rpcStatus, DaoUtil.toPageable(pageLink))); } + + @Override + public Long deleteOutdatedRpcByTenantId(TenantId tenantId, Long expirationTime) { + return rpcRepository.deleteOutdatedRpcByTenantId(tenantId.getId(), expirationTime); + } } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/rpc/RpcRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/rpc/RpcRepository.java index bc04aa7e71..565d87e627 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/rpc/RpcRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/rpc/RpcRepository.java @@ -17,7 +17,9 @@ package org.thingsboard.server.dao.sql.rpc; import org.springframework.data.domain.Page; import org.springframework.data.domain.Pageable; +import org.springframework.data.jpa.repository.Query; import org.springframework.data.repository.CrudRepository; +import org.springframework.data.repository.query.Param; import org.thingsboard.server.common.data.rpc.RpcStatus; import org.thingsboard.server.dao.model.sql.RpcEntity; @@ -25,4 +27,8 @@ import java.util.UUID; public interface RpcRepository extends CrudRepository { Page findAllByDeviceIdAndStatus(UUID deviceId, RpcStatus status, Pageable pageable); + + @Query(value = "WITH deleted AS (DELETE FROM rpc WHERE (tenant_id = :tenantId AND created_time < :expirationTime) IS TRUE RETURNING *) SELECT count(*) FROM deleted", + nativeQuery = true) + Long deleteOutdatedRpcByTenantId(@Param("tenantId") UUID tenantId, @Param("expirationTime") Long expirationTime); } diff --git a/ui-ngx/src/app/modules/home/components/profile/tenant/default-tenant-profile-configuration.component.html b/ui-ngx/src/app/modules/home/components/profile/tenant/default-tenant-profile-configuration.component.html index 4dd248a10c..66cb503bd3 100644 --- a/ui-ngx/src/app/modules/home/components/profile/tenant/default-tenant-profile-configuration.component.html +++ b/ui-ngx/src/app/modules/home/components/profile/tenant/default-tenant-profile-configuration.component.html @@ -196,6 +196,18 @@ {{ 'tenant-profile.alarms-ttl-days-days-range' | translate}} + + tenant-profile.rpc-ttl-days + + + {{ 'tenant-profile.rpc-ttl-days-required' | translate}} + + + {{ 'tenant-profile.rpc-ttl-days-days-range' | translate}} + + tenant-profile.max-rule-node-executions-per-message { this.updateModel(); diff --git a/ui-ngx/src/app/shared/models/tenant.model.ts b/ui-ngx/src/app/shared/models/tenant.model.ts index 4ce7acb5c5..5a3683f212 100644 --- a/ui-ngx/src/app/shared/models/tenant.model.ts +++ b/ui-ngx/src/app/shared/models/tenant.model.ts @@ -51,6 +51,8 @@ export interface DefaultTenantProfileConfiguration { maxSms: number; defaultStorageTtlDays: number; + alarmsTtlDays: number; + rpcTtlDays: number; } export type TenantProfileConfigurations = DefaultTenantProfileConfiguration; @@ -81,7 +83,9 @@ export function createTenantProfileConfiguration(type: TenantProfileType): Tenan maxRuleNodeExecutionsPerMessage: 0, maxEmails: 0, maxSms: 0, - defaultStorageTtlDays: 0 + defaultStorageTtlDays: 0, + alarmsTtlDays: 0, + rpcTtlDays: 0 }; configuration = {...defaultConfiguration, type: TenantProfileType.DEFAULT}; break; diff --git a/ui-ngx/src/assets/locale/locale.constant-en_US.json b/ui-ngx/src/assets/locale/locale.constant-en_US.json index abe4cba802..7eb244ff4d 100644 --- a/ui-ngx/src/assets/locale/locale.constant-en_US.json +++ b/ui-ngx/src/assets/locale/locale.constant-en_US.json @@ -2570,6 +2570,9 @@ "alarms-ttl-days": "Alarms TTL days (0 - unlimited)", "alarms-ttl-days-required": "Alarms TTL days required", "alarms-ttl-days-days-range": "Alarms TTL days can't be negative", + "rpc-ttl-days": "RPC TTL days (0 - unlimited)", + "rpc-ttl-days-required": "RPC TTL days required", + "rpc-ttl-days-days-range": "RPC TTL days can't be negative", "max-rule-node-executions-per-message": "Maximum number of rule node executions per message (0 - unlimited)", "max-rule-node-executions-per-message-required": "Maximum number of rule node executions per message is required.", "max-rule-node-executions-per-message-range": "Maximum number of rule node executions per message can't be negative",