From 3af07efd7f04502986e7e949ecdb2b7d6a822cdc Mon Sep 17 00:00:00 2001 From: Andrii Shvaika Date: Wed, 27 Jul 2022 15:30:08 +0300 Subject: [PATCH] Support for bulk delete of events --- .../src/main/resources/thingsboard.yml | 8 +- .../data/event/StatisticsEventFilter.java | 14 +- .../server/dao/event/BaseEventService.java | 18 +-- .../server/dao/event/EventDao.java | 23 +++ .../dao/sql/event/ErrorEventRepository.java | 34 +++++ .../event/EventPartitionConfiguration.java | 45 ++++++ .../server/dao/sql/event/EventRepository.java | 129 +---------------- .../server/dao/sql/event/JpaBaseEventDao.java | 132 ++++++++++++++++-- .../sql/event/LifecycleEventRepository.java | 37 +++++ .../event/RuleChainDebugEventRepository.java | 59 ++++++-- .../event/RuleNodeDebugEventRepository.java | 49 +++++++ .../sql/event/SqlEventCleanupRepository.java | 9 +- .../sql/event/StatisticsEventRepository.java | 57 +++++++- 13 files changed, 435 insertions(+), 179 deletions(-) create mode 100644 dao/src/main/java/org/thingsboard/server/dao/sql/event/EventPartitionConfiguration.java diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 5015cc2175..1c50fb9ced 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -259,6 +259,8 @@ sql: batch_max_delay: "${SQL_EVENTS_BATCH_MAX_DELAY_MS:100}" stats_print_interval_ms: "${SQL_EVENTS_BATCH_STATS_PRINT_MS:10000}" batch_threads: "${SQL_EVENTS_BATCH_THREADS:3}" # batch thread count have to be a prime number like 3 or 5 to gain perfect hash distribution + partition_size: "${SQL_EVENTS_REGULAR_PARTITION_SIZE_HOURS:168}" # Number of hours to partition the events. The current value corresponds to one week. + debug_partition_size: "${SQL_EVENTS_REGULAR_PARTITION_SIZE_HOURS:1}" # Number of hours to partition the debug events. The current value corresponds to one hour. edge_events: batch_size: "${SQL_EDGE_EVENTS_BATCH_SIZE:1000}" batch_max_delay: "${SQL_EDGE_EVENTS_BATCH_MAX_DELAY_MS:100}" @@ -285,8 +287,10 @@ sql: events: enabled: "${SQL_TTL_EVENTS_ENABLED:true}" execution_interval_ms: "${SQL_TTL_EVENTS_EXECUTION_INTERVAL:3600000}" # Number of milliseconds (max random initial delay and fixed period). - events_ttl: "${SQL_TTL_EVENTS_EVENTS_TTL:0}" # Number of seconds - debug_events_ttl: "${SQL_TTL_EVENTS_DEBUG_EVENTS_TTL:604800}" # Number of seconds. The current value corresponds to one week + # Number of seconds. TTL is disabled by default. Accuracy of the cleanup depends on the sql.events.partition_size parameter. + events_ttl: "${SQL_TTL_EVENTS_EVENTS_TTL:0}" + # Number of seconds. The current value corresponds to one week. Accuracy of the cleanup depends on the sql.events.debug_partition_size parameter. + debug_events_ttl: "${SQL_TTL_EVENTS_DEBUG_EVENTS_TTL:604800}" edge_events: enabled: "${SQL_TTL_EDGE_EVENTS_ENABLED:true}" execution_interval_ms: "${SQL_TTL_EDGE_EVENTS_EXECUTION_INTERVAL:86400000}" # Number of milliseconds. The current value corresponds to one day diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/event/StatisticsEventFilter.java b/common/data/src/main/java/org/thingsboard/server/common/data/event/StatisticsEventFilter.java index a48d03dcce..598d750c0e 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/event/StatisticsEventFilter.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/event/StatisticsEventFilter.java @@ -27,9 +27,13 @@ public class StatisticsEventFilter implements EventFilter { @ApiModelProperty(position = 1, value = "String value representing the server name, identifier or ip address where the platform is running", example = "ip-172-31-24-152") protected String server; @ApiModelProperty(position = 2, value = "The minimum number of successfully processed messages", example = "25") - protected Integer messagesProcessed; - @ApiModelProperty(position = 3, value = "The minimum number of errors occurred during messages processing", example = "30") - protected Integer errorsOccurred; + protected Integer minMessagesProcessed; + @ApiModelProperty(position = 3, value = "The maximum number of successfully processed messages", example = "250") + protected Integer maxMessagesProcessed; + @ApiModelProperty(position = 4, value = "The minimum number of errors occurred during messages processing", example = "30") + protected Integer minErrorsOccurred; + @ApiModelProperty(position = 5, value = "The maximum number of errors occurred during messages processing", example = "300") + protected Integer maxErrorsOccurred; @Override public EventType getEventType() { @@ -38,6 +42,8 @@ public class StatisticsEventFilter implements EventFilter { @Override public boolean isNotEmpty() { - return !StringUtils.isEmpty(server) || (messagesProcessed != null && messagesProcessed > 0) || (errorsOccurred != null && errorsOccurred > 0); + return !StringUtils.isEmpty(server) + || (minMessagesProcessed != null && minMessagesProcessed > 0) || (minErrorsOccurred != null && minErrorsOccurred > 0) + || (maxMessagesProcessed != null && maxMessagesProcessed > 0) || (maxErrorsOccurred != null && maxErrorsOccurred > 0); } } diff --git a/dao/src/main/java/org/thingsboard/server/dao/event/BaseEventService.java b/dao/src/main/java/org/thingsboard/server/dao/event/BaseEventService.java index 302e3f0fe8..fad7f6b310 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/event/BaseEventService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/event/BaseEventService.java @@ -117,19 +117,11 @@ public class BaseEventService implements EventService { @Override public void removeEvents(TenantId tenantId, EntityId entityId, EventFilter eventFilter, Long startTime, Long endTime) { -// TimePageLink eventsPageLink = new TimePageLink(1000, 0, null, null, startTime, endTime); -// PageData eventsPageData; -// do { -// if (eventFilter == null) { -// eventsPageData = findEvents(tenantId, entityId, eventsPageLink); -// } else { -// eventsPageData = findEventsByFilter(tenantId, entityId, eventFilter, eventsPageLink); -// } -// -// eventDao.removeAllByIds(eventsPageData.getData().stream() -// .map(IdBased::getUuidId) -// .collect(Collectors.toList())); -// } while (eventsPageData.hasNext()); + if (eventFilter == null) { + eventDao.removeEvents(tenantId.getId(), entityId.getId(), startTime, endTime); + } else { + eventDao.removeEvents(tenantId.getId(), entityId.getId(), eventFilter, startTime, endTime); + } } @Override diff --git a/dao/src/main/java/org/thingsboard/server/dao/event/EventDao.java b/dao/src/main/java/org/thingsboard/server/dao/event/EventDao.java index b7ca1282c7..e09ea59872 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/event/EventDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/event/EventDao.java @@ -21,6 +21,7 @@ import org.thingsboard.server.common.data.event.Event; import org.thingsboard.server.common.data.event.EventFilter; import org.thingsboard.server.common.data.event.EventType; import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.TimePageLink; import org.thingsboard.server.dao.Dao; @@ -71,4 +72,26 @@ public interface EventDao { * @param debugEventExpTs the expiration time of the debug events */ void cleanupEvents(long regularEventExpTs, long debugEventExpTs); + + /** + * Removes all events for the specified entity and time interval + * + * @param tenantId + * @param entityId + * @param startTime + * @param endTime + */ + void removeEvents(UUID tenantId, UUID entityId, Long startTime, Long endTime); + + /** + * + * Removes all events for the specified entity, event filter and time interval + * + * @param tenantId + * @param entityId + * @param eventFilter + * @param startTime + * @param endTime + */ + void removeEvents(UUID tenantId, UUID entityId, EventFilter eventFilter, Long startTime, Long endTime); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/event/ErrorEventRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/event/ErrorEventRepository.java index b4fcfcdcc7..c00d4c0f7d 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/event/ErrorEventRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/event/ErrorEventRepository.java @@ -18,8 +18,10 @@ package org.thingsboard.server.dao.sql.event; import org.springframework.data.domain.Page; import org.springframework.data.domain.Pageable; import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.data.jpa.repository.Modifying; import org.springframework.data.jpa.repository.Query; import org.springframework.data.repository.query.Param; +import org.springframework.transaction.annotation.Transactional; import org.thingsboard.server.common.data.event.ErrorEvent; import org.thingsboard.server.common.data.event.LifecycleEvent; import org.thingsboard.server.dao.model.sql.ErrorEventEntity; @@ -77,4 +79,36 @@ public interface ErrorEventRepository extends EventRepository= :startTime) " + + "AND (:endTime IS NULL OR e.ts <= :endTime)" + ) + void removeEvents(@Param("tenantId") UUID tenantId, + @Param("entityId") UUID entityId, + @Param("startTime") Long startTime, + @Param("endTime") Long endTime); + + @Transactional + @Modifying + @Query(nativeQuery = true, + value = "DELETE FROM error_event e WHERE " + + "e.tenant_id = :tenantId " + + "AND e.entity_id = :entityId " + + "AND (:startTime IS NULL OR e.ts >= :startTime) " + + "AND (:endTime IS NULL OR e.ts <= :endTime) " + + "AND (:serviceId IS NULL OR e.service_id ILIKE concat('%', :serviceId, '%')) " + + "AND (:method IS NULL OR e.e_method ILIKE concat('%', :method, '%')) " + + "AND (:error IS NULL OR e.e_error ILIKE concat('%', :error, '%'))" + ) + void removeEvents(@Param("tenantId") UUID tenantId, + @Param("entityId") UUID entityId, + @Param("startTime") Long startTime, + @Param("endTime") Long endTime, + @Param("serviceId") String server, + @Param("method") String method, + @Param("error") String error); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/event/EventPartitionConfiguration.java b/dao/src/main/java/org/thingsboard/server/dao/sql/event/EventPartitionConfiguration.java new file mode 100644 index 0000000000..c89fd6462f --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/event/EventPartitionConfiguration.java @@ -0,0 +1,45 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.sql.event; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; +import org.thingsboard.server.common.data.event.EventType; + +import javax.annotation.PostConstruct; +import java.util.concurrent.TimeUnit; + +@Component +public class EventPartitionConfiguration { + + @Value("${sql.events.partition_size:168}") + private int regularPartitionSizeInHours; + @Value("${sql.events.debug_partition_size:1}") + private int debugPartitionSizeInHours; + + private long regularPartitionSizeInMs; + private long debugPartitionSizeInMs; + + @PostConstruct + public void init() { + regularPartitionSizeInMs = TimeUnit.HOURS.toMillis(regularPartitionSizeInHours); + debugPartitionSizeInMs = TimeUnit.HOURS.toMillis(debugPartitionSizeInHours); + } + + public long getPartitionSizeInMs(EventType eventType) { + return eventType.isDebug() ? debugPartitionSizeInMs : regularPartitionSizeInMs; + } +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/event/EventRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/event/EventRepository.java index 46e0ff6662..19865cdc8b 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/event/EventRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/event/EventRepository.java @@ -28,137 +28,12 @@ import org.thingsboard.server.dao.model.sql.EventEntity; import java.util.List; import java.util.UUID; -/** - * Created by Valerii Sosliuk on 5/3/2017. - */ public interface EventRepository, V extends Event> { List findLatestEvents(UUID tenantId, UUID entityId, int limit); Page findEvents(UUID tenantId, UUID entityId, Long startTime, Long endTime, Pageable pageable); -// -// @Query(nativeQuery = true, -// value = "SELECT e.id, e.created_time, e.body, e.entity_id, e.entity_type, e.event_type, e.event_uid, e.tenant_id, ts FROM " + -// "(SELECT *, e.body\\:\\:jsonb as json_body FROM event e WHERE " + -// "e.tenant_id = :tenantId " + -// "AND e.entity_type = :entityType " + -// "AND e.entity_id = :entityId " + -// "AND e.event_type = :eventType " + -// "AND e.created_time >= :startTime AND (:endTime = 0 OR e.created_time <= :endTime) " + -// ") AS e WHERE " + -// "(:type IS NULL OR lower(json_body->>'type') LIKE concat('%', lower(:type\\:\\:varchar), '%')) " + -// "AND (:server IS NULL OR lower(json_body->>'server') LIKE concat('%', lower(:server\\:\\:varchar), '%')) " + -// "AND (:entityName IS NULL OR lower(json_body->>'entityName') LIKE concat('%', lower(:entityName\\:\\:varchar), '%')) " + -// "AND (:relationType IS NULL OR lower(json_body->>'relationType') LIKE concat('%', lower(:relationType\\:\\:varchar), '%')) " + -// "AND (:bodyEntityId IS NULL OR lower(json_body->>'entityId') LIKE concat('%', lower(:bodyEntityId\\:\\:varchar), '%')) " + -// "AND (:msgType IS NULL OR lower(json_body->>'msgType') LIKE concat('%', lower(:msgType\\:\\:varchar), '%')) " + -// "AND ((:isError = FALSE) OR (json_body->>'error') IS NOT NULL) " + -// "AND (:error IS NULL OR lower(json_body->>'error') LIKE concat('%', lower(:error\\:\\:varchar), '%')) " + -// "AND (:data IS NULL OR lower(json_body->>'data') LIKE concat('%', lower(:data\\:\\:varchar), '%')) " + -// "AND (:metadata IS NULL OR lower(json_body->>'metadata') LIKE concat('%', lower(:metadata\\:\\:varchar), '%')) ", -// countQuery = "SELECT count(*) FROM " + -// "(SELECT *, e.body\\:\\:jsonb as json_body FROM event e WHERE " + -// "e.tenant_id = :tenantId " + -// "AND e.entity_type = :entityType " + -// "AND e.entity_id = :entityId " + -// "AND e.event_type = :eventType " + -// "AND e.created_time >= :startTime AND (:endTime = 0 OR e.created_time <= :endTime) " + -// ") AS e WHERE " + -// "(:type IS NULL OR lower(json_body->>'type') LIKE concat('%', lower(:type\\:\\:varchar), '%')) " + -// "AND (:server IS NULL OR lower(json_body->>'server') LIKE concat('%', lower(:server\\:\\:varchar), '%')) " + -// "AND (:entityName IS NULL OR lower(json_body->>'entityName') LIKE concat('%', lower(:entityName\\:\\:varchar), '%')) " + -// "AND (:relationType IS NULL OR lower(json_body->>'relationType') LIKE concat('%', lower(:relationType\\:\\:varchar), '%')) " + -// "AND (:bodyEntityId IS NULL OR lower(json_body->>'entityId') LIKE concat('%', lower(:bodyEntityId\\:\\:varchar), '%')) " + -// "AND (:msgType IS NULL OR lower(json_body->>'msgType') LIKE concat('%', lower(:msgType\\:\\:varchar), '%')) " + -// "AND ((:isError = FALSE) OR (json_body->>'error') IS NOT NULL) " + -// "AND (:error IS NULL OR lower(json_body->>'error') LIKE concat('%', lower(:error\\:\\:varchar), '%')) " + -// "AND (:data IS NULL OR lower(json_body->>'data') LIKE concat('%', lower(:data\\:\\:varchar), '%')) " + -// "AND (:metadata IS NULL OR lower(json_body->>'metadata') LIKE concat('%', lower(:metadata\\:\\:varchar), '%'))" -// ) -// Page findDebugRuleNodeEvents(@Param("tenantId") UUID tenantId, -// @Param("entityId") UUID entityId, -// @Param("entityType") String entityType, -// @Param("eventType") String eventType, -// @Param("startTime") Long startTime, -// @Param("endTime") Long endTime, -// @Param("type") String type, -// @Param("server") String server, -// @Param("entityName") String entityName, -// @Param("relationType") String relationType, -// @Param("bodyEntityId") String bodyEntityId, -// @Param("msgType") String msgType, -// @Param("isError") boolean isError, -// @Param("error") String error, -// @Param("data") String data, -// @Param("metadata") String metadata, -// Pageable pageable); -// -// @Query(nativeQuery = true, -// value = "SELECT e.id, e.created_time, e.body, e.entity_id, e.entity_type, e.event_type, e.event_uid, e.tenant_id, ts FROM " + -// "(SELECT *, e.body\\:\\:jsonb as json_body FROM event e WHERE " + -// "e.tenant_id = :tenantId " + -// "AND e.entity_type = :entityType " + -// "AND e.entity_id = :entityId " + -// "AND e.event_type = 'ERROR' " + -// "AND e.created_time >= :startTime AND (:endTime = 0 OR e.created_time <= :endTime) " + -// ") AS e WHERE " + -// "(:server IS NULL OR lower(json_body->>'server') LIKE concat('%', lower(:server\\:\\:varchar), '%')) " + -// "AND (:method IS NULL OR lower(json_body->>'method') LIKE concat('%', lower(:method\\:\\:varchar), '%')) " + -// "AND (:error IS NULL OR lower(json_body->>'error') LIKE concat('%', lower(:error\\:\\:varchar), '%'))", -// countQuery = "SELECT count(*) FROM " + -// "(SELECT *, e.body\\:\\:jsonb as json_body FROM event e WHERE " + -// "e.tenant_id = :tenantId " + -// "AND e.entity_type = :entityType " + -// "AND e.entity_id = :entityId " + -// "AND e.event_type = 'ERROR' " + -// "AND e.created_time >= :startTime AND (:endTime = 0 OR e.created_time <= :endTime) " + -// ") AS e WHERE " + -// "(:server IS NULL OR lower(json_body->>'server') LIKE concat('%', lower(:server\\:\\:varchar), '%')) " + -// "AND (:method IS NULL OR lower(json_body->>'method') LIKE concat('%', lower(:method\\:\\:varchar), '%')) " + -// "AND (:error IS NULL OR lower(json_body->>'error') LIKE concat('%', lower(:error\\:\\:varchar), '%'))") -// Page findErrorEvents(@Param("tenantId") UUID tenantId, -// @Param("entityId") UUID entityId, -// @Param("entityType") String entityType, -// @Param("startTime") Long startTime, -// @Param("endTime") Long endTIme, -// @Param("server") String server, -// @Param("method") String method, -// @Param("error") String error, -// Pageable pageable); -// -// -// @Query(nativeQuery = true, -// value = "SELECT e.id, e.created_time, e.body, e.entity_id, e.entity_type, e.event_type, e.event_uid, e.tenant_id, ts FROM " + -// "(SELECT *, e.body\\:\\:jsonb as json_body FROM event e WHERE " + -// "e.tenant_id = :tenantId " + -// "AND e.entity_type = :entityType " + -// "AND e.entity_id = :entityId " + -// "AND e.event_type = 'STATS' " + -// "AND e.created_time >= :startTime AND (:endTime = 0 OR e.created_time <= :endTime) " + -// ") AS e WHERE " + -// "(:server IS NULL OR lower(e.body\\:\\:json->>'server') LIKE concat('%', lower(:server\\:\\:varchar), '%')) " + -// "AND (:messagesProcessed = 0 OR (json_body->>'messagesProcessed')\\:\\:integer >= :messagesProcessed) " + -// "AND (:errorsOccurred = 0 OR (json_body->>'errorsOccurred')\\:\\:integer >= :errorsOccurred) ", -// countQuery = "SELECT count(*) FROM " + -// "(SELECT *, e.body\\:\\:jsonb as json_body FROM event e WHERE " + -// "e.tenant_id = :tenantId " + -// "AND e.entity_type = :entityType " + -// "AND e.entity_id = :entityId " + -// "AND e.event_type = 'LC_EVENT' " + -// "AND e.created_time >= :startTime AND (:endTime = 0 OR e.created_time <= :endTime) " + -// ") AS e WHERE " + -// "(:server IS NULL OR lower(e.body\\:\\:json->>'server') LIKE concat('%', lower(:server\\:\\:varchar), '%')) " + -// "AND (:messagesProcessed = 0 OR (json_body->>'messagesProcessed')\\:\\:integer >= :messagesProcessed) " + -// "AND (:errorsOccurred = 0 OR (json_body->>'errorsOccurred')\\:\\:integer >= :errorsOccurred) ") -// Page findStatisticsEvents(@Param("tenantId") UUID tenantId, -// @Param("entityId") UUID entityId, -// @Param("entityType") String entityType, -// @Param("startTime") Long startTime, -// @Param("endTime") Long endTIme, -// @Param("server") String server, -// @Param("messagesProcessed") Integer messagesProcessed, -// @Param("errorsOccurred") Integer errorsOccurred, -// Pageable pageable); -// + void removeEvents(UUID tenantId, UUID entityId, Long startTime, Long endTime); + } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/event/JpaBaseEventDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/event/JpaBaseEventDao.java index 0a39ab1be0..525150fd90 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/event/JpaBaseEventDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/event/JpaBaseEventDao.java @@ -48,13 +48,11 @@ import org.thingsboard.server.dao.timeseries.SqlPartition; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.util.Comparator; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; @@ -65,12 +63,12 @@ import java.util.function.Function; @Component public class JpaBaseEventDao implements EventDao { - public static final long REGULAR_PARTITION_DURATION = TimeUnit.DAYS.toMillis(1); - public static final long DEBUG_PARTITION_DURATION = TimeUnit.HOURS.toMillis(1); - private final Map> partitionsByEventType = new ConcurrentHashMap<>(); private static final ReentrantLock partitionCreationLock = new ReentrantLock(); + @Autowired + private EventPartitionConfiguration partitionConfiguration; + @Autowired private SqlPartitioningRepository partitioningRepository; @@ -170,12 +168,12 @@ public class JpaBaseEventDao implements EventDao { } private void savePartitionIfNotExist(Event event) { - var partitionsMap = partitionsByEventType.get(event.getType()); - var partitionDuration = event.getType().isDebug() ? DEBUG_PARTITION_DURATION : REGULAR_PARTITION_DURATION; + EventType type = event.getType(); + var partitionsMap = partitionsByEventType.get(type); + var partitionDuration = partitionConfiguration.getPartitionSizeInMs(type); long partitionStartTs = event.getCreatedTime() - (event.getCreatedTime() % partitionDuration); if (partitionsMap.get(partitionStartTs) == null) { - savePartition(partitionsMap, new SqlPartition(event.getType().getTable(), partitionStartTs, - partitionStartTs + partitionDuration, Long.toString(partitionStartTs))); + savePartition(partitionsMap, new SqlPartition(type.getTable(), partitionStartTs, partitionStartTs + partitionDuration, Long.toString(partitionStartTs))); } } @@ -228,6 +226,42 @@ public class JpaBaseEventDao implements EventDao { } } + @Override + public void removeEvents(UUID tenantId, UUID entityId, Long startTime, Long endTime) { + log.debug("[{}][{}] Remove events [{}-{}] ", tenantId, entityId, startTime, endTime); + for (EventType eventType : EventType.values()) { + getEventRepository(eventType).removeEvents(tenantId, entityId, startTime, endTime); + } + } + + @Override + public void removeEvents(UUID tenantId, UUID entityId, EventFilter eventFilter, Long startTime, Long endTime) { + if (eventFilter.isNotEmpty()) { + switch (eventFilter.getEventType()) { + case DEBUG_RULE_NODE: + removeEventsByFilter(tenantId, entityId, (RuleNodeDebugEventFilter) eventFilter, startTime, endTime); + break; + case DEBUG_RULE_CHAIN: + removeEventsByFilter(tenantId, entityId, (RuleChainDebugEventFilter) eventFilter, startTime, endTime); + break; + case LC_EVENT: + removeEventsByFilter(tenantId, entityId, (LifeCycleEventFilter) eventFilter, startTime, endTime); + break; + case ERROR: + removeEventsByFilter(tenantId, entityId, (ErrorEventFilter) eventFilter, startTime, endTime); + break; + case STATS: + removeEventsByFilter(tenantId, entityId, (StatisticsEventFilter) eventFilter, startTime, endTime); + break; + default: + throw new RuntimeException("Not supported event type: " + eventFilter.getEventType()); + } + } else { + getEventRepository(eventFilter.getEventType()).removeEvents(tenantId, entityId, startTime, endTime); + } + } + + private PageData findEventByFilter(UUID tenantId, UUID entityId, RuleChainDebugEventFilter eventFilter, TimePageLink pageLink) { return DaoUtil.toPageData( ruleChainDebugEventRepository.findEvents( @@ -305,12 +339,88 @@ public class JpaBaseEventDao implements EventDao { pageLink.getStartTime(), pageLink.getEndTime(), eventFilter.getServer(), - eventFilter.getMessagesProcessed(), - eventFilter.getErrorsOccurred(), + eventFilter.getMinMessagesProcessed(), + eventFilter.getMaxMessagesProcessed(), + eventFilter.getMinErrorsOccurred(), + eventFilter.getMaxErrorsOccurred(), DaoUtil.toPageable(pageLink)) ); } + private void removeEventsByFilter(UUID tenantId, UUID entityId, RuleChainDebugEventFilter eventFilter, Long startTime, Long endTime) { + ruleChainDebugEventRepository.removeEvents( + tenantId, + entityId, + startTime, + endTime, + eventFilter.getServer(), + eventFilter.getMessage(), + eventFilter.isError(), + eventFilter.getErrorStr()); + } + + private void removeEventsByFilter(UUID tenantId, UUID entityId, RuleNodeDebugEventFilter eventFilter, Long startTime, Long endTime) { + parseUUID(eventFilter.getEntityId(), "Entity Id"); + parseUUID(eventFilter.getMsgId(), "Message Id"); + ruleNodeDebugEventRepository.removeEvents( + tenantId, + entityId, + startTime, + endTime, + eventFilter.getServer(), + eventFilter.getMsgDirectionType(), + eventFilter.getEntityId(), + eventFilter.getEntityType(), + eventFilter.getMsgId(), + eventFilter.getMsgType(), + eventFilter.getRelationType(), + eventFilter.getDataSearch(), + eventFilter.getMetadataSearch(), + eventFilter.isError(), + eventFilter.getErrorStr()); + } + + private void removeEventsByFilter(UUID tenantId, UUID entityId, ErrorEventFilter eventFilter, Long startTime, Long endTime) { + errorEventRepository.removeEvents( + tenantId, + entityId, + startTime, + endTime, + eventFilter.getServer(), + eventFilter.getMethod(), + eventFilter.getErrorStr()); + + } + + private void removeEventsByFilter(UUID tenantId, UUID entityId, LifeCycleEventFilter eventFilter, Long startTime, Long endTime) { + boolean statusFilterEnabled = !StringUtils.isEmpty(eventFilter.getStatus()); + boolean statusFilter = statusFilterEnabled && eventFilter.getStatus().equalsIgnoreCase("Success"); + lcEventRepository.removeEvents( + tenantId, + entityId, + startTime, + endTime, + eventFilter.getServer(), + eventFilter.getEvent(), + statusFilterEnabled, + statusFilter, + eventFilter.getErrorStr()); + } + + private void removeEventsByFilter(UUID tenantId, UUID entityId, StatisticsEventFilter eventFilter, Long startTime, Long endTime) { + statsEventRepository.removeEvents( + tenantId, + entityId, + startTime, + endTime, + eventFilter.getServer(), + eventFilter.getMinMessagesProcessed(), + eventFilter.getMaxMessagesProcessed(), + eventFilter.getMinErrorsOccurred(), + eventFilter.getMaxErrorsOccurred() + ); + } + @Override public List findLatestEvents(UUID tenantId, UUID entityId, EventType eventType, int limit) { return DaoUtil.convertDataList(getEventRepository(eventType).findLatestEvents(tenantId, entityId, limit)); diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/event/LifecycleEventRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/event/LifecycleEventRepository.java index 298e57a1b9..e4a7f40256 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/event/LifecycleEventRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/event/LifecycleEventRepository.java @@ -18,8 +18,10 @@ package org.thingsboard.server.dao.sql.event; import org.springframework.data.domain.Page; import org.springframework.data.domain.Pageable; import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.data.jpa.repository.Modifying; import org.springframework.data.jpa.repository.Query; import org.springframework.data.repository.query.Param; +import org.springframework.transaction.annotation.Transactional; import org.thingsboard.server.common.data.event.LifecycleEvent; import org.thingsboard.server.dao.model.sql.LifecycleEventEntity; @@ -77,4 +79,39 @@ public interface LifecycleEventRepository extends EventRepository= :startTime) " + + "AND (:endTime IS NULL OR e.ts <= :endTime)" + ) + void removeEvents(@Param("tenantId") UUID tenantId, + @Param("entityId") UUID entityId, + @Param("startTime") Long startTime, + @Param("endTime") Long endTime); + + @Transactional + @Modifying + @Query(nativeQuery = true, + value = "DELETE FROM lc_event e WHERE " + + "e.tenant_id = :tenantId " + + "AND e.entity_id = :entityId " + + "AND (:startTime IS NULL OR e.ts >= :startTime) " + + "AND (:endTime IS NULL OR e.ts <= :endTime) " + + "AND (:serviceId IS NULL OR e.service_id ILIKE concat('%', :serviceId, '%')) " + + "AND (:eventType IS NULL OR e.e_type ILIKE concat('%', :eventType, '%')) " + + "AND ((:statusFilterEnabled = FALSE) OR e.e_success = :statusFilter) " + + "AND (:error IS NULL OR e.e_error ILIKE concat('%', :error, '%'))" + ) + void removeEvents(@Param("tenantId") UUID tenantId, + @Param("entityId") UUID entityId, + @Param("startTime") Long startTime, + @Param("endTime") Long endTime, + @Param("serviceId") String server, + @Param("eventType") String eventType, + @Param("statusFilterEnabled") boolean statusFilterEnabled, + @Param("statusFilter") boolean statusFilter, + @Param("error") String error); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/event/RuleChainDebugEventRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/event/RuleChainDebugEventRepository.java index 32d27765a5..d01a54fa30 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/event/RuleChainDebugEventRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/event/RuleChainDebugEventRepository.java @@ -18,8 +18,10 @@ package org.thingsboard.server.dao.sql.event; import org.springframework.data.domain.Page; import org.springframework.data.domain.Pageable; import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.data.jpa.repository.Modifying; import org.springframework.data.jpa.repository.Query; import org.springframework.data.repository.query.Param; +import org.springframework.transaction.annotation.Transactional; import org.thingsboard.server.common.data.event.RuleChainDebugEvent; import org.thingsboard.server.common.data.event.RuleNodeDebugEvent; import org.thingsboard.server.dao.model.sql.RuleChainDebugEventEntity; @@ -43,10 +45,10 @@ public interface RuleChainDebugEventRepository extends EventRepository findEvents(@Param("tenantId") UUID tenantId, - @Param("entityId") UUID entityId, - @Param("startTime") Long startTime, - @Param("endTime") Long endTime, - Pageable pageable); + @Param("entityId") UUID entityId, + @Param("startTime") Long startTime, + @Param("endTime") Long endTime, + Pageable pageable); @Query(nativeQuery = true, value = "SELECT * FROM rule_chain_debug_event e WHERE " + @@ -70,13 +72,46 @@ public interface RuleChainDebugEventRepository extends EventRepository findEvents(@Param("tenantId") UUID tenantId, - @Param("entityId") UUID entityId, - @Param("startTime") Long startTime, - @Param("endTime") Long endTime, - @Param("serviceId") String server, - @Param("message") String message, - @Param("isError") boolean isError, - @Param("error") String error, - Pageable pageable); + @Param("entityId") UUID entityId, + @Param("startTime") Long startTime, + @Param("endTime") Long endTime, + @Param("serviceId") String server, + @Param("message") String message, + @Param("isError") boolean isError, + @Param("error") String error, + Pageable pageable); + @Transactional + @Modifying + @Query("DELETE FROM RuleChainDebugEventEntity e WHERE " + + "e.tenantId = :tenantId " + + "AND e.entityId = :entityId " + + "AND (:startTime IS NULL OR e.ts >= :startTime) " + + "AND (:endTime IS NULL OR e.ts <= :endTime)" + ) + void removeEvents(@Param("tenantId") UUID tenantId, + @Param("entityId") UUID entityId, + @Param("startTime") Long startTime, + @Param("endTime") Long endTime); + + @Transactional + @Modifying + @Query(nativeQuery = true, + value = "DELETE FROM rule_chain_debug_event e WHERE " + + "e.tenant_id = :tenantId " + + "AND e.entity_id = :entityId " + + "AND (:startTime IS NULL OR e.ts >= :startTime) " + + "AND (:endTime IS NULL OR e.ts <= :endTime) " + + "AND (:serviceId IS NULL OR e.service_id ILIKE concat('%', :serviceId, '%')) " + + "AND (:message IS NULL OR e.e_message ILIKE concat('%', :message, '%')) " + + "AND ((:isError = FALSE) OR e.e_error IS NOT NULL) " + + "AND (:error IS NULL OR e.e_error ILIKE concat('%', :error, '%'))") + void removeEvents(@Param("tenantId") UUID tenantId, + @Param("entityId") UUID entityId, + @Param("startTime") Long startTime, + @Param("endTime") Long endTime, + @Param("serviceId") String server, + @Param("message") String message, + @Param("isError") boolean isError, + @Param("error") String error); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/event/RuleNodeDebugEventRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/event/RuleNodeDebugEventRepository.java index 6238acd5ac..98d404c36f 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/event/RuleNodeDebugEventRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/event/RuleNodeDebugEventRepository.java @@ -18,8 +18,10 @@ package org.thingsboard.server.dao.sql.event; import org.springframework.data.domain.Page; import org.springframework.data.domain.Pageable; import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.data.jpa.repository.Modifying; import org.springframework.data.jpa.repository.Query; import org.springframework.data.repository.query.Param; +import org.springframework.transaction.annotation.Transactional; import org.thingsboard.server.common.data.event.ErrorEvent; import org.thingsboard.server.common.data.event.RuleNodeDebugEvent; import org.thingsboard.server.dao.model.sql.ErrorEventEntity; @@ -100,4 +102,51 @@ public interface RuleNodeDebugEventRepository extends EventRepository= :startTime) " + + "AND (:endTime IS NULL OR e.ts <= :endTime)" + ) + void removeEvents(@Param("tenantId") UUID tenantId, + @Param("entityId") UUID entityId, + @Param("startTime") Long startTime, + @Param("endTime") Long endTime); + + @Transactional + @Modifying + @Query(nativeQuery = true, + value = "DELETE FROM rule_node_debug_event e WHERE " + + "e.tenant_id = :tenantId " + + "AND e.entity_id = :entityId " + + "AND (:startTime IS NULL OR e.ts >= :startTime) " + + "AND (:endTime IS NULL OR e.ts <= :endTime) " + + "AND (:serviceId IS NULL OR e.service_id ILIKE concat('%', :serviceId, '%')) " + + "AND (:eventType IS NULL OR e.e_type ILIKE concat('%', :eventType, '%')) " + + "AND (:eventEntityId IS NULL OR e.e_entity_id = uuid(:eventEntityId)) " + + "AND (:eventEntityType IS NULL OR e.e_entity_type ILIKE concat('%', :eventEntityType, '%')) " + + "AND (:msgId IS NULL OR e.e_msg_id = uuid(:msgId)) " + + "AND (:msgType IS NULL OR e.e_msg_type ILIKE concat('%', :msgType, '%')) " + + "AND (:relationType IS NULL OR e.e_relation_type ILIKE concat('%', :relationType, '%')) " + + "AND (:data IS NULL OR e.e_data ILIKE concat('%', :data, '%')) " + + "AND (:metadata IS NULL OR e.e_metadata ILIKE concat('%', :metadata, '%')) " + + "AND ((:isError = FALSE) OR e.e_error IS NOT NULL) " + + "AND (:error IS NULL OR e.e_error ILIKE concat('%', :error, '%'))") + void removeEvents(@Param("tenantId") UUID tenantId, + @Param("entityId") UUID entityId, + @Param("startTime") Long startTime, + @Param("endTime") Long endTime, + @Param("serviceId") String server, + @Param("eventType") String type, + @Param("eventEntityId") String eventEntityId, + @Param("eventEntityType") String eventEntityType, + @Param("msgId") String eventMsgId, + @Param("msgType") String eventMsgType, + @Param("relationType") String relationType, + @Param("data") String data, + @Param("metadata") String metadata, + @Param("isError") boolean isError, + @Param("error") String error); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/event/SqlEventCleanupRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/event/SqlEventCleanupRepository.java index 54b4565097..d8e5d6dcd5 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/event/SqlEventCleanupRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/event/SqlEventCleanupRepository.java @@ -16,7 +16,7 @@ package org.thingsboard.server.dao.sql.event; import lombok.extern.slf4j.Slf4j; -import org.postgresql.util.PSQLException; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Repository; import org.thingsboard.server.common.data.event.EventType; import org.thingsboard.server.dao.sql.JpaAbstractDaoListeningExecutorService; @@ -28,8 +28,6 @@ import java.sql.SQLException; import java.util.ArrayList; import java.util.List; -import static org.thingsboard.server.dao.sql.event.JpaBaseEventDao.DEBUG_PARTITION_DURATION; -import static org.thingsboard.server.dao.sql.event.JpaBaseEventDao.REGULAR_PARTITION_DURATION; @Slf4j @Repository @@ -38,6 +36,9 @@ public class SqlEventCleanupRepository extends JpaAbstractDaoListeningExecutorSe private static final String SELECT_PARTITIONS_STMT = "SELECT tablename from pg_tables WHERE schemaname = 'public' and tablename like concat(?, '_%')"; private static final int PSQL_VERSION_14 = 140000; + @Autowired + private EventPartitionConfiguration partitionConfiguration; + private volatile Integer currentServerVersion; @Override @@ -50,7 +51,7 @@ public class SqlEventCleanupRepository extends JpaAbstractDaoListeningExecutorSe } private void cleanupEvents(EventType eventType, long eventExpTime) { - var partitionDuration = eventType.isDebug() ? DEBUG_PARTITION_DURATION : REGULAR_PARTITION_DURATION; + var partitionDuration = partitionConfiguration.getPartitionSizeInMs(eventType); List partitions = fetchPartitions(eventType); for (var partitionTs : partitions) { var partitionEndTs = partitionTs + partitionDuration; diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/event/StatisticsEventRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/event/StatisticsEventRepository.java index 5eee96efbc..67116100da 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/event/StatisticsEventRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/event/StatisticsEventRepository.java @@ -18,8 +18,10 @@ package org.thingsboard.server.dao.sql.event; import org.springframework.data.domain.Page; import org.springframework.data.domain.Pageable; import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.data.jpa.repository.Modifying; import org.springframework.data.jpa.repository.Query; import org.springframework.data.repository.query.Param; +import org.springframework.transaction.annotation.Transactional; import org.thingsboard.server.common.data.event.StatisticsEvent; import org.thingsboard.server.dao.model.sql.StatisticsEventEntity; @@ -51,8 +53,10 @@ public interface StatisticsEventRepository extends EventRepository= :startTime) " + "AND (:endTime IS NULL OR e.ts <= :endTime) " + "AND (:serviceId IS NULL OR e.service_id ILIKE concat('%', :serviceId, '%')) " + - "AND (:messagesProcessed IS NULL OR e.e_messages_processed >= :messagesProcessed) " + - "AND (:errorsOccurred IS NULL OR e.e_errors_occurred >= :errorsOccurred)" + "AND (:minMessagesProcessed IS NULL OR e.e_messages_processed >= :minMessagesProcessed) " + + "AND (:maxMessagesProcessed IS NULL OR e.e_messages_processed < :maxMessagesProcessed) " + + "AND (:minErrorsOccurred IS NULL OR e.e_errors_occurred >= :minErrorsOccurred) " + + "AND (:maxErrorsOccurred IS NULL OR e.e_errors_occurred < :maxErrorsOccurred)" , countQuery = "SELECT count(*) FROM stats_event e WHERE " + "e.tenant_id = :tenantId " + @@ -60,16 +64,57 @@ public interface StatisticsEventRepository extends EventRepository= :startTime) " + "AND (:endTime IS NULL OR e.ts <= :endTime) " + "AND (:serviceId IS NULL OR e.service_id ILIKE concat('%', :serviceId, '%')) " + - "AND (:messagesProcessed IS NULL OR e.e_messages_processed >= :messagesProcessed) " + - "AND (:errorsOccurred IS NULL OR e.e_errors_occurred >= :errorsOccurred)" + "AND (:minMessagesProcessed IS NULL OR e.e_messages_processed >= :minMessagesProcessed) " + + "AND (:maxMessagesProcessed IS NULL OR e.e_messages_processed < :maxMessagesProcessed) " + + "AND (:minErrorsOccurred IS NULL OR e.e_errors_occurred >= :minErrorsOccurred) " + + "AND (:maxErrorsOccurred IS NULL OR e.e_errors_occurred < :maxErrorsOccurred)" ) Page findEvents(@Param("tenantId") UUID tenantId, @Param("entityId") UUID entityId, @Param("startTime") Long startTime, @Param("endTime") Long endTime, @Param("serviceId") String server, - @Param("messagesProcessed") Integer messagesProcessed, - @Param("errorsOccurred") Integer errorsOccurred, + @Param("minMessagesProcessed") Integer minMessagesProcessed, + @Param("maxMessagesProcessed") Integer maxMessagesProcessed, + @Param("minErrorsOccurred") Integer minErrorsOccurred, + @Param("maxErrorsOccurred") Integer maxErrorsOccurred, Pageable pageable); + @Transactional + @Modifying + @Query("DELETE FROM StatisticsEventEntity e WHERE " + + "e.tenantId = :tenantId " + + "AND e.entityId = :entityId " + + "AND (:startTime IS NULL OR e.ts >= :startTime) " + + "AND (:endTime IS NULL OR e.ts <= :endTime)" + ) + void removeEvents(@Param("tenantId") UUID tenantId, + @Param("entityId") UUID entityId, + @Param("startTime") Long startTime, + @Param("endTime") Long endTime); + + @Transactional + @Modifying + @Query(nativeQuery = true, + value = "DELETE FROM stats_event e WHERE " + + "e.tenant_id = :tenantId " + + "AND e.entity_id = :entityId " + + "AND (:startTime IS NULL OR e.ts >= :startTime) " + + "AND (:endTime IS NULL OR e.ts <= :endTime) " + + "AND (:serviceId IS NULL OR e.service_id ILIKE concat('%', :serviceId, '%')) " + + "AND (:minMessagesProcessed IS NULL OR e.e_messages_processed >= :minMessagesProcessed) " + + "AND (:maxMessagesProcessed IS NULL OR e.e_messages_processed < :maxMessagesProcessed) " + + "AND (:minErrorsOccurred IS NULL OR e.e_errors_occurred >= :minErrorsOccurred) " + + "AND (:maxErrorsOccurred IS NULL OR e.e_errors_occurred < :maxErrorsOccurred)" + + ) + void removeEvents(@Param("tenantId") UUID tenantId, + @Param("entityId") UUID entityId, + @Param("startTime") Long startTime, + @Param("endTime") Long endTime, + @Param("serviceId") String server, + @Param("minMessagesProcessed") Integer minMessagesProcessed, + @Param("maxMessagesProcessed") Integer maxMessagesProcessed, + @Param("minErrorsOccurred") Integer minErrorsOccurred, + @Param("maxErrorsOccurred") Integer maxErrorsOccurred); }