From 2eb7949d57aa24db60554d169f7ec2a9532d433a Mon Sep 17 00:00:00 2001 From: Andrii Shvaika Date: Mon, 6 Jul 2020 17:00:46 +0300 Subject: [PATCH] Improvements --- .../server/actors/ActorSystemContext.java | 9 ++-- .../actors/ruleChain/DefaultTbContext.java | 3 +- ...efaultTbEntityDataSubscriptionService.java | 18 +------ .../subscription/TbAlarmDataSubCtx.java | 48 +++++++++++++++---- .../DefaultAlarmSubscriptionService.java | 4 +- .../common/data/query/AlarmDataQuery.java | 1 - .../data/query/RelationsQueryFilter.java | 1 + .../query/DefaultAlarmQueryRepository.java | 3 +- .../query/DefaultEntityQueryRepository.java | 15 ++++-- .../rule/engine/action/TbAlarmNodeTest.java | 7 ++- 10 files changed, 70 insertions(+), 39 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index de368fd05a..2f2e5c9e01 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -76,6 +76,7 @@ import org.thingsboard.server.service.script.JsExecutorService; import org.thingsboard.server.service.script.JsInvokeService; import org.thingsboard.server.service.session.DeviceSessionCacheService; import org.thingsboard.server.service.state.DeviceStateService; +import org.thingsboard.server.service.telemetry.AlarmSubscriptionService; import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService; import org.thingsboard.server.service.transport.TbCoreToTransportService; @@ -167,10 +168,6 @@ public class ActorSystemContext { @Getter private EventService eventService; - @Autowired - @Getter - private AlarmService alarmService; - @Autowired @Getter private RelationService relationService; @@ -187,6 +184,10 @@ public class ActorSystemContext { @Getter private TelemetrySubscriptionService tsSubService; + @Autowired + @Getter + private AlarmSubscriptionService alarmService; + @Autowired @Getter private JsInvokeService jsSandbox; diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index 7610f89719..64752719af 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -22,6 +22,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.core.RedisTemplate; import org.thingsboard.common.util.ListeningExecutor; import org.thingsboard.rule.engine.api.MailService; +import org.thingsboard.rule.engine.api.RuleEngineAlarmService; import org.thingsboard.rule.engine.api.RuleEngineRpcService; import org.thingsboard.rule.engine.api.RuleEngineTelemetryService; import org.thingsboard.rule.engine.api.ScriptEngine; @@ -351,7 +352,7 @@ class DefaultTbContext implements TbContext { } @Override - public AlarmService getAlarmService() { + public RuleEngineAlarmService getAlarmService() { return mainCtx.getAlarmService(); } diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java index 58957b01bb..de4fef7928 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java @@ -266,16 +266,7 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc AlarmDataUpdate update = new AlarmDataUpdate(cmd.getCmdId(), new PageData<>(Collections.emptyList(), 1, 0, false), null); wsService.sendWsMsg(ctx.getSessionId(), update); } else { - ctx.setLastFetchTs(System.currentTimeMillis()); - PageData alarms = alarmService.findAlarmDataByQueryForEntities(ctx.getTenantId(), ctx.getCustomerId(), - ctx.getQuery().getPageLink(), ctx.getOrderedEntityIds()); - alarms = ctx.setAndMergeAlarmsData(alarms); - AlarmDataUpdate update = new AlarmDataUpdate(cmd.getCmdId(), alarms, null); - wsService.sendWsMsg(ctx.getSessionId(), update); - if (adq.getPageLink().getTimeWindow() > 0) { - //TODO: refresh list of entities periodically (similar to time-series subscription). - createAlarmSubscriptions(ctx); - } + ctx.fetchAlarmsAndCreateSubscriptions(); } } @@ -321,7 +312,7 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc private TbAlarmDataSubCtx createSubCtx(TelemetryWebSocketSessionRef sessionRef, AlarmDataCmd cmd) { Map sessionSubs = subscriptionsBySessionId.computeIfAbsent(sessionRef.getSessionId(), k -> new HashMap<>()); - TbAlarmDataSubCtx ctx = new TbAlarmDataSubCtx(serviceId, wsService, sessionRef, cmd.getCmdId()); + TbAlarmDataSubCtx ctx = new TbAlarmDataSubCtx(serviceId, wsService, localSubscriptionService, alarmService, sessionRef, cmd.getCmdId()); ctx.setQuery(cmd.getQuery()); sessionSubs.put(cmd.getCmdId(), ctx); return ctx; @@ -467,11 +458,6 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc createTelemetrySubscriptions(ctx, keys, true); } - private void createAlarmSubscriptions(TbAlarmDataSubCtx ctx) { - List subscriptions = ctx.createSubscriptions(); - subscriptions.forEach(localSubscriptionService::addSubscription); - } - private void createTelemetrySubscriptions(TbEntityDataSubCtx ctx, List keys, boolean latest) { List tbSubs = ctx.createSubscriptions(keys, latest); tbSubs.forEach(sub -> localSubscriptionService.addSubscription(sub)); diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmDataSubCtx.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmDataSubCtx.java index 852c616d22..c8f4d0f2e6 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmDataSubCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmDataSubCtx.java @@ -27,8 +27,10 @@ import org.thingsboard.server.common.data.query.AlarmData; import org.thingsboard.server.common.data.query.AlarmDataPageLink; import org.thingsboard.server.common.data.query.AlarmDataQuery; import org.thingsboard.server.common.data.query.EntityData; +import org.thingsboard.server.dao.alarm.AlarmService; import org.thingsboard.server.service.telemetry.TelemetryWebSocketService; import org.thingsboard.server.service.telemetry.TelemetryWebSocketSessionRef; +import org.thingsboard.server.service.telemetry.cmd.v2.AlarmDataCmd; import org.thingsboard.server.service.telemetry.cmd.v2.AlarmDataUpdate; import org.thingsboard.server.service.telemetry.sub.AlarmSubscriptionUpdate; @@ -45,6 +47,8 @@ import java.util.stream.Collectors; @Slf4j public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx { + private final TbLocalSubscriptionService localSubscriptionService; + private final AlarmService alarmService; @Getter @Setter private final LinkedHashMap entitiesMap; @@ -59,15 +63,32 @@ public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx { private boolean tooManyEntities; private Map subToEntityIdMap; - @Setter - private long lastFetchTs; - public TbAlarmDataSubCtx(String serviceId, TelemetryWebSocketService wsService, TelemetryWebSocketSessionRef sessionRef, int cmdId) { + public TbAlarmDataSubCtx(String serviceId, TelemetryWebSocketService wsService, + TbLocalSubscriptionService localSubscriptionService, + AlarmService alarmService, + TelemetryWebSocketSessionRef sessionRef, int cmdId) { super(serviceId, wsService, sessionRef, cmdId); + this.localSubscriptionService = localSubscriptionService; + this.alarmService = alarmService; this.entitiesMap = new LinkedHashMap<>(); this.alarmsMap = new HashMap<>(); } + public void fetchAlarmsAndCreateSubscriptions() { + PageData alarms = alarmService.findAlarmDataByQueryForEntities(getTenantId(), getCustomerId(), + query.getPageLink(), getOrderedEntityIds()); + alarms = setAndMergeAlarmsData(alarms); + AlarmDataUpdate update = new AlarmDataUpdate(cmdId, alarms, null); + wsService.sendWsMsg(getSessionId(), update); + if (query.getPageLink().getTimeWindow() > 0) { + clearSubscriptions(); + //TODO: refresh list of entities periodically (similar to time-series subscription). + List subscriptions = createSubscriptions(); + subscriptions.forEach(localSubscriptionService::addSubscription); + } + } + public void setEntitiesData(PageData entitiesData) { entitiesMap.clear(); tooManyEntities = entitiesData.hasNext(); @@ -99,31 +120,42 @@ public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx { public List createSubscriptions() { this.subToEntityIdMap = new HashMap<>(); AlarmDataPageLink pageLink = query.getPageLink(); + long startTs = System.currentTimeMillis() - pageLink.getTimeWindow(); List result = new ArrayList<>(); for (EntityData entityData : entitiesMap.values()) { int subIdx = sessionRef.getSessionSubIdSeq().incrementAndGet(); subToEntityIdMap.put(subIdx, entityData.getEntityId()); log.trace("[{}][{}][{}] Creating alarms subscription for [{}] with query: {}", serviceId, cmdId, subIdx, entityData.getEntityId(), pageLink); result.add(TbAlarmsSubscription.builder() + .type(TbSubscriptionType.ALARMS) .serviceId(serviceId) .sessionId(sessionRef.getSessionId()) .subscriptionId(subIdx) .tenantId(sessionRef.getSecurityCtx().getTenantId()) .entityId(entityData.getEntityId()) .updateConsumer(this::sendWsMsg) - .ts(lastFetchTs) + .ts(startTs) .build()); } return result; } + public void clearSubscriptions() { + if (subToEntityIdMap != null) { + for (Integer subId : subToEntityIdMap.keySet()) { + localSubscriptionService.cancelSubscription(sessionRef.getSessionId(), subId); + } + subToEntityIdMap.clear(); + } + } + private void sendWsMsg(String sessionId, AlarmSubscriptionUpdate subscriptionUpdate) { Alarm alarm = subscriptionUpdate.getAlarm(); AlarmId alarmId = alarm.getId(); if (subscriptionUpdate.isAlarmDeleted()) { Alarm deleted = alarmsMap.remove(alarmId); if (deleted != null) { - //TODO: invalidate current page; + fetchAlarmsAndCreateSubscriptions(); } } else { AlarmData current = alarmsMap.get(alarmId); @@ -131,14 +163,14 @@ public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx { boolean matchesFilter = filter(alarm); if (onCurrentPage) { if (matchesFilter) { - AlarmData updated = new AlarmData(alarm, current.getName(), current.getEntityId()); + AlarmData updated = new AlarmData(alarm, current.getOriginatorName(), current.getEntityId()); alarmsMap.put(alarmId, updated); wsService.sendWsMsg(sessionId, new AlarmDataUpdate(cmdId, null, Collections.singletonList(updated))); } else { - //TODO: invalidate current page; + fetchAlarmsAndCreateSubscriptions(); } } else if (matchesFilter && query.getPageLink().getPage() == 0) { - //TODO: invalidate current page; + fetchAlarmsAndCreateSubscriptions(); } } } diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultAlarmSubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultAlarmSubscriptionService.java index 1182848669..3aa9cdc6c0 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultAlarmSubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultAlarmSubscriptionService.java @@ -166,7 +166,7 @@ public class DefaultAlarmSubscriptionService extends AbstractSubscriptionService TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId); if (currentPartitions.contains(tpi)) { if (subscriptionManagerService.isPresent()) { - subscriptionManagerService.get().onAlarmUpdate(tenantId, entityId, alarm); + subscriptionManagerService.get().onAlarmUpdate(tenantId, entityId, alarm, TbCallback.EMPTY); } else { log.warn("Possible misconfiguration because subscriptionManagerService is null!"); } @@ -186,7 +186,7 @@ public class DefaultAlarmSubscriptionService extends AbstractSubscriptionService TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId); if (currentPartitions.contains(tpi)) { if (subscriptionManagerService.isPresent()) { - subscriptionManagerService.get().onAlarmDeleted(tenantId, entityId, alarm); + subscriptionManagerService.get().onAlarmDeleted(tenantId, entityId, alarm, TbCallback.EMPTY); } else { log.warn("Possible misconfiguration because subscriptionManagerService is null!"); } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/query/AlarmDataQuery.java b/common/data/src/main/java/org/thingsboard/server/common/data/query/AlarmDataQuery.java index 1cf9044085..6052a8cd5f 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/query/AlarmDataQuery.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/query/AlarmDataQuery.java @@ -16,7 +16,6 @@ package org.thingsboard.server.common.data.query; import com.fasterxml.jackson.annotation.JsonIgnore; -import lombok.Getter; import lombok.ToString; import java.util.List; diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/query/RelationsQueryFilter.java b/common/data/src/main/java/org/thingsboard/server/common/data/query/RelationsQueryFilter.java index 3e7783f738..2cc43335c2 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/query/RelationsQueryFilter.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/query/RelationsQueryFilter.java @@ -19,6 +19,7 @@ import lombok.Data; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.relation.EntitySearchDirection; import org.thingsboard.server.common.data.relation.EntityTypeFilter; +import org.thingsboard.server.common.data.relation.RelationTypeGroup; import java.util.List; diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/query/DefaultAlarmQueryRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/query/DefaultAlarmQueryRepository.java index a6b188a3fb..32a319388e 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/query/DefaultAlarmQueryRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/query/DefaultAlarmQueryRepository.java @@ -61,6 +61,7 @@ public class DefaultAlarmQueryRepository implements AlarmQueryRepository { alarmFieldColumnMap.put("severity", ModelConstants.ALARM_SEVERITY_PROPERTY); alarmFieldColumnMap.put("originator_id", ModelConstants.ALARM_ORIGINATOR_ID_PROPERTY); alarmFieldColumnMap.put("originator_type", ModelConstants.ALARM_ORIGINATOR_TYPE_PROPERTY); + alarmFieldColumnMap.put("originator", "originator_name"); } public static final String SELECT_ORIGINATOR_NAME = " CASE" + @@ -124,7 +125,7 @@ public class DefaultAlarmQueryRepository implements AlarmQueryRepository { EntityDataSortOrder sortOrder = pageLink.getSortOrder(); if (sortOrder != null && sortOrder.getKey().getType().equals(EntityKeyType.ALARM_FIELD)) { String sortOrderKey = sortOrder.getKey().getKey(); - sortPart.append("a.").append(alarmFieldColumnMap.getOrDefault(sortOrderKey, sortOrderKey)) + sortPart.append(alarmFieldColumnMap.getOrDefault(sortOrderKey, sortOrderKey)) .append(" ").append(sortOrder.getDirection().name()); ctx.addUuidListParameter("entity_ids", orderedEntityIds.stream().map(EntityId::getId).collect(Collectors.toList())); if (pageLink.isSearchPropagatedAlarms()) { diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/query/DefaultEntityQueryRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/query/DefaultEntityQueryRepository.java index fabedec348..53463325e5 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/query/DefaultEntityQueryRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/query/DefaultEntityQueryRepository.java @@ -447,14 +447,21 @@ public class DefaultEntityQueryRepository implements EntityQueryRepository { if (!single) { whereFilter.append(" ("); } - whereFilter.append(" re.relation_type = :where_relation_type").append(entityTypeFilterIdx).append(" and re.") - .append(entityFilter.getDirection().equals(EntitySearchDirection.FROM) ? "to" : "from") - .append("_type in (:where_entity_types").append(entityTypeFilterIdx).append(")"); + List whereEntityTypes = etf.getEntityTypes().stream().map(EntityType::name).collect(Collectors.toList()); + whereFilter + .append(" re.relation_type = :where_relation_type").append(entityTypeFilterIdx); + if (!whereEntityTypes.isEmpty()) { + whereFilter.append(" and re.") + .append(entityFilter.getDirection().equals(EntitySearchDirection.FROM) ? "to" : "from") + .append("_type in (:where_entity_types").append(entityTypeFilterIdx).append(")"); + } if (!single) { whereFilter.append(" )"); } ctx.addStringParameter("where_relation_type" + entityTypeFilterIdx, relationType); - ctx.addStringListParameter("where_entity_types" + entityTypeFilterIdx, etf.getEntityTypes().stream().map(EntityType::name).collect(Collectors.toList())); + if (!whereEntityTypes.isEmpty()) { + ctx.addStringListParameter("where_entity_types" + entityTypeFilterIdx, whereEntityTypes); + } entityTypeFilterIdx++; } } else { diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbAlarmNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbAlarmNodeTest.java index 064346c6fe..6996a1117a 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbAlarmNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbAlarmNodeTest.java @@ -30,6 +30,7 @@ import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; import org.mockito.stubbing.Answer; import org.thingsboard.common.util.ListeningExecutor; +import org.thingsboard.rule.engine.api.RuleEngineAlarmService; import org.thingsboard.rule.engine.api.ScriptEngine; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNodeConfiguration; @@ -43,6 +44,7 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgDataType; import org.thingsboard.server.common.msg.TbMsgMetaData; +import org.thingsboard.server.dao.alarm.AlarmOperationResult; import org.thingsboard.server.dao.alarm.AlarmService; import javax.script.ScriptException; @@ -79,7 +81,7 @@ public class TbAlarmNodeTest { @Mock private TbContext ctx; @Mock - private AlarmService alarmService; + private RuleEngineAlarmService alarmService; @Mock private ScriptEngine detailsJs; @@ -289,7 +291,8 @@ public class TbAlarmNodeTest { when(detailsJs.executeJsonAsync(msg)).thenReturn(Futures.immediateFuture(null)); when(alarmService.findLatestByOriginatorAndType(tenantId, originator, "SomeType")).thenReturn(Futures.immediateFuture(activeAlarm)); - when(alarmService.clearAlarm(eq(activeAlarm.getTenantId()), eq(activeAlarm.getId()), org.mockito.Mockito.any(JsonNode.class), anyLong())).thenReturn(Futures.immediateFuture(true)); + when(alarmService.clearAlarm(eq(activeAlarm.getTenantId()), eq(activeAlarm.getId()), org.mockito.Mockito.any(JsonNode.class), anyLong())) + .thenReturn(Futures.immediateFuture( false)); when(alarmService.findAlarmByIdAsync(eq(activeAlarm.getTenantId()), eq(activeAlarm.getId()))).thenReturn(Futures.immediateFuture(activeAlarm)); // doAnswer((Answer) invocationOnMock -> (Alarm) (invocationOnMock.getArguments())[0]).when(alarmService).createOrUpdateAlarm(activeAlarm);