diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
index 9e02946574..3cc211c26e 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
@@ -61,12 +61,17 @@ import org.thingsboard.server.service.cluster.discovery.DiscoveryService;
import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
import org.thingsboard.server.service.component.ComponentDiscoveryService;
+import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
@Slf4j
@Component
@@ -156,6 +161,10 @@ public class ActorSystemContext {
@Getter
private AuditLogService auditLogService;
+ @Autowired
+ @Getter
+ private TelemetrySubscriptionService tsSubService;
+
@Autowired
@Getter
@Setter
@@ -224,6 +233,21 @@ public class ActorSystemContext {
@Getter
private final Config config;
+ @Getter
+ private ExecutorService tsCallBackExecutor;
+
+ @PostConstruct
+ public void initExecutor() {
+ tsCallBackExecutor = Executors.newSingleThreadExecutor();
+ }
+
+ @PreDestroy
+ public void shutdownExecutor() {
+ if (tsCallBackExecutor != null) {
+ tsCallBackExecutor.shutdownNow();
+ }
+ }
+
public ActorSystemContext() {
config = ConfigFactory.parseResources(AKKA_CONF_FILE_NAME).withFallback(ConfigFactory.load());
}
@@ -345,7 +369,7 @@ public class ActorSystemContext {
return Exception.class.isInstance(error) ? (Exception) error : new Exception(error);
}
- public ListeningExecutor getExecutor() {
+ public ListeningExecutor getJsExecutor() {
//TODO: take thread count from yml.
return new JsExecutorService(1);
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
index 1d45b61dc3..b3b00725d5 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- *
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,12 +15,18 @@
*/
package org.thingsboard.server.actors.ruleChain;
-import akka.actor.ActorContext;
import akka.actor.ActorRef;
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import org.thingsboard.rule.engine.api.ListeningExecutor;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.server.actors.ActorSystemContext;
+import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.RuleNodeId;
+import org.thingsboard.server.common.data.kv.AttributeKvEntry;
+import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.dao.alarm.AlarmService;
@@ -35,6 +41,8 @@ import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.dao.user.UserService;
import scala.concurrent.duration.Duration;
+import javax.annotation.Nullable;
+import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@@ -43,6 +51,7 @@ import java.util.concurrent.TimeUnit;
*/
class DefaultTbContext implements TbContext {
+ private static final Function super List, ? extends Void> LIST_VOID_FUNCTION = v -> null;
private final ActorSystemContext mainCtx;
private final RuleNodeCtx nodeCtx;
@@ -112,9 +121,36 @@ class DefaultTbContext implements TbContext {
relationTypes.forEach(type -> tellNext(msg, type));
}
+ @Override
+ public void saveAndNotify(EntityId entityId, List ts, FutureCallback callback) {
+ saveAndNotify(entityId, ts, 0L, callback);
+ }
+
+ @Override
+ public void saveAndNotify(EntityId entityId, List ts, long ttl, FutureCallback callback) {
+ ListenableFuture> saveFuture = mainCtx.getTsService().save(entityId, ts, ttl);
+ Futures.addCallback(saveFuture, new FutureCallback>() {
+ @Override
+ public void onSuccess(@Nullable List result) {
+ mainCtx.getTsSubService().onLocalTimeseriesUpdate(entityId, ts);
+ callback.onSuccess(null);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ callback.onFailure(t);
+ }
+ }, mainCtx.getTsCallBackExecutor());
+ }
+
+ @Override
+ public void saveAndNotify(EntityId entityId, String scope, Set attributes, FutureCallback callback) {
+
+ }
+
@Override
public ListeningExecutor getJsExecutor() {
- return mainCtx.getExecutor();
+ return mainCtx.getJsExecutor();
}
@Override
diff --git a/application/src/main/java/org/thingsboard/server/controller/HttpValidationCallback.java b/application/src/main/java/org/thingsboard/server/controller/HttpValidationCallback.java
new file mode 100644
index 0000000000..fb1f3e7bee
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/controller/HttpValidationCallback.java
@@ -0,0 +1,32 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.controller;
+
+import com.google.common.util.concurrent.FutureCallback;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.context.request.async.DeferredResult;
+import org.thingsboard.server.service.security.ValidationCallback;
+
+/**
+ * Created by ashvayka on 21.02.17.
+ */
+public class HttpValidationCallback extends ValidationCallback> {
+
+ public HttpValidationCallback(DeferredResult response, FutureCallback> action) {
+ super(response, action);
+ }
+
+}
diff --git a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java
index a2867cecdc..c2a2df841c 100644
--- a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java
+++ b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java
@@ -69,6 +69,7 @@ import org.thingsboard.server.service.security.model.SecurityUser;
import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
import javax.annotation.Nullable;
+import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.ArrayList;
import java.util.Arrays;
@@ -101,6 +102,7 @@ public class TelemetryController extends BaseController {
private ExecutorService executor;
+ @PostConstruct
public void initExecutor() {
executor = Executors.newSingleThreadExecutor();
}
diff --git a/application/src/main/java/org/thingsboard/server/service/security/AccessValidator.java b/application/src/main/java/org/thingsboard/server/service/security/AccessValidator.java
index 01bd23869d..02e12fe53f 100644
--- a/application/src/main/java/org/thingsboard/server/service/security/AccessValidator.java
+++ b/application/src/main/java/org/thingsboard/server/service/security/AccessValidator.java
@@ -22,7 +22,7 @@ import org.thingsboard.server.common.data.id.EntityIdFactory;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.rule.RuleChain;
-import org.thingsboard.server.controller.ValidationCallback;
+import org.thingsboard.server.controller.HttpValidationCallback;
import org.thingsboard.server.dao.alarm.AlarmService;
import org.thingsboard.server.dao.asset.AssetService;
import org.thingsboard.server.dao.customer.CustomerService;
@@ -91,7 +91,7 @@ public class AccessValidator {
final DeferredResult response = new DeferredResult<>();
- validate(currentUser, entityId, new ValidationCallback(response,
+ validate(currentUser, entityId, new HttpValidationCallback(response,
new FutureCallback>() {
@Override
public void onSuccess(@Nullable DeferredResult result) {
@@ -107,7 +107,7 @@ public class AccessValidator {
return response;
}
- public void validate(SecurityUser currentUser, EntityId entityId, ValidationCallback callback) {
+ public void validate(SecurityUser currentUser, EntityId entityId, FutureCallback callback) {
switch (entityId.getEntityType()) {
case DEVICE:
validateDevice(currentUser, entityId, callback);
@@ -130,7 +130,7 @@ public class AccessValidator {
}
}
- private void validateDevice(final SecurityUser currentUser, EntityId entityId, ValidationCallback callback) {
+ private void validateDevice(final SecurityUser currentUser, EntityId entityId, FutureCallback callback) {
if (currentUser.isSystemAdmin()) {
callback.onSuccess(ValidationResult.accessDenied(SYSTEM_ADMINISTRATOR_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION));
} else {
@@ -151,7 +151,7 @@ public class AccessValidator {
}
}
- private void validateAsset(final SecurityUser currentUser, EntityId entityId, ValidationCallback callback) {
+ private void validateAsset(final SecurityUser currentUser, EntityId entityId, FutureCallback callback) {
if (currentUser.isSystemAdmin()) {
callback.onSuccess(ValidationResult.accessDenied(SYSTEM_ADMINISTRATOR_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION));
} else {
@@ -173,7 +173,7 @@ public class AccessValidator {
}
- private void validateRuleChain(final SecurityUser currentUser, EntityId entityId, ValidationCallback callback) {
+ private void validateRuleChain(final SecurityUser currentUser, EntityId entityId, FutureCallback callback) {
if (currentUser.isCustomerUser()) {
callback.onSuccess(ValidationResult.accessDenied(CUSTOMER_USER_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION));
} else {
@@ -194,7 +194,7 @@ public class AccessValidator {
}
}
- private void validateCustomer(final SecurityUser currentUser, EntityId entityId, ValidationCallback callback) {
+ private void validateCustomer(final SecurityUser currentUser, EntityId entityId, FutureCallback callback) {
if (currentUser.isSystemAdmin()) {
callback.onSuccess(ValidationResult.accessDenied(SYSTEM_ADMINISTRATOR_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION));
} else {
@@ -215,7 +215,7 @@ public class AccessValidator {
}
}
- private void validateTenant(final SecurityUser currentUser, EntityId entityId, ValidationCallback callback) {
+ private void validateTenant(final SecurityUser currentUser, EntityId entityId, FutureCallback callback) {
if (currentUser.isCustomerUser()) {
callback.onSuccess(ValidationResult.accessDenied(CUSTOMER_USER_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION));
} else if (currentUser.isSystemAdmin()) {
@@ -234,7 +234,7 @@ public class AccessValidator {
}
}
- private FutureCallback getCallback(ValidationCallback callback, Function transformer) {
+ private FutureCallback getCallback(FutureCallback callback, Function transformer) {
return new FutureCallback() {
@Override
public void onSuccess(@Nullable T result) {
diff --git a/application/src/main/java/org/thingsboard/server/controller/ValidationCallback.java b/application/src/main/java/org/thingsboard/server/service/security/ValidationCallback.java
similarity index 59%
rename from application/src/main/java/org/thingsboard/server/controller/ValidationCallback.java
rename to application/src/main/java/org/thingsboard/server/service/security/ValidationCallback.java
index 6b2718f568..7feae03516 100644
--- a/application/src/main/java/org/thingsboard/server/controller/ValidationCallback.java
+++ b/application/src/main/java/org/thingsboard/server/service/security/ValidationCallback.java
@@ -1,23 +1,6 @@
-/**
- * Copyright © 2016-2018 The Thingsboard Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.thingsboard.server.controller;
+package org.thingsboard.server.service.security;
import com.google.common.util.concurrent.FutureCallback;
-import org.springframework.http.ResponseEntity;
-import org.springframework.web.context.request.async.DeferredResult;
import org.thingsboard.server.actors.plugin.ValidationResult;
import org.thingsboard.server.actors.plugin.ValidationResultCode;
import org.thingsboard.server.extensions.api.exception.AccessDeniedException;
@@ -26,14 +9,14 @@ import org.thingsboard.server.extensions.api.exception.InternalErrorException;
import org.thingsboard.server.extensions.api.exception.UnauthorizedException;
/**
- * Created by ashvayka on 21.02.17.
+ * Created by ashvayka on 31.03.18.
*/
-public class ValidationCallback implements FutureCallback {
+public class ValidationCallback implements FutureCallback {
- private final DeferredResult response;
- private final FutureCallback> action;
+ private final T response;
+ private final FutureCallback action;
- public ValidationCallback(DeferredResult response, FutureCallback> action) {
+ public ValidationCallback(T response, FutureCallback action) {
this.response = response;
this.action = action;
}
diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
index 359949eef9..81cdac93ed 100644
--- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
+++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
@@ -5,8 +5,10 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
+import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.extensions.core.plugin.telemetry.sub.Subscription;
+import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionState;
import java.util.HashMap;
import java.util.List;
@@ -23,13 +25,10 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
@Autowired
private TelemetryWebSocketService wsService;
-
private final Map> subscriptionsByEntityId = new HashMap<>();
private final Map> subscriptionsByWsSessionId = new HashMap<>();
-
-
@Override
public void onAttributesUpdateFromServer(EntityId entityId, String scope, List attributes) {
@@ -39,4 +38,29 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
public void onTimeseriesUpdateFromServer(EntityId entityId, List entries) {
}
+
+ @Override
+ public void cleanupLocalWsSessionSubscriptions(TelemetryWebSocketSessionRef sessionRef, String sessionId) {
+
+ }
+
+ @Override
+ public void removeSubscription(String sessionId, int cmdId) {
+
+ }
+
+ @Override
+ public void addLocalWsSubscription(String sessionId, EntityId entityId, SubscriptionState sub) {
+
+ }
+
+ @Override
+ public void onLocalTimeseriesUpdate(EntityId entityId, Map> ts) {
+
+ }
+
+ @Override
+ public void onLocalAttributesUpdate(EntityId entityId, String scope, Set attributes) {
+
+ }
}
diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java
index 6d6c33e65b..57dd93d79f 100644
--- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java
+++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java
@@ -2,36 +2,46 @@ package org.thingsboard.server.service.telemetry;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Function;
import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.hazelcast.util.function.Consumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
+import org.thingsboard.server.actors.plugin.ValidationResult;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityIdFactory;
+import org.thingsboard.server.common.data.kv.Aggregation;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
+import org.thingsboard.server.common.data.kv.BaseTsKvQuery;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
+import org.thingsboard.server.common.data.kv.TsKvQuery;
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.extensions.api.exception.UnauthorizedException;
-import org.thingsboard.server.extensions.api.plugins.PluginCallback;
-import org.thingsboard.server.extensions.api.plugins.PluginContext;
-import org.thingsboard.server.extensions.api.plugins.ws.PluginWebsocketSessionRef;
import org.thingsboard.server.extensions.api.plugins.ws.SessionEvent;
import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.AttributesSubscriptionCmd;
+import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.GetHistoryCmd;
import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.SubscriptionCmd;
import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.TelemetryPluginCmd;
import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.TelemetryPluginCmdsWrapper;
+import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.TimeseriesSubscriptionCmd;
import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionErrorCode;
import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionState;
import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionType;
import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionUpdate;
+import org.thingsboard.server.service.security.AccessValidator;
+import javax.annotation.Nullable;
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -41,6 +51,8 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.stream.Collectors;
/**
@@ -50,6 +62,8 @@ import java.util.stream.Collectors;
@Slf4j
public class DefaultTelemetryWebSocketService implements TelemetryWebSocketService {
+ public static final int DEFAULT_LIMIT = 100;
+ public static final Aggregation DEFAULT_AGGREGATION = Aggregation.NONE;
private static final int UNKNOWN_SUBSCRIPTION_ID = 0;
private static final String PROCESSING_MSG = "[{}] Processing: {}";
private static final ObjectMapper jsonMapper = new ObjectMapper();
@@ -65,12 +79,29 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
@Autowired
private TelemetryWebSocketMsgEndpoint msgEndpoint;
+ @Autowired
+ private AccessValidator accessValidator;
+
@Autowired
private AttributesService attributesService;
@Autowired
private TimeseriesService tsService;
+ private ExecutorService executor;
+
+ @PostConstruct
+ public void initExecutor() {
+ executor = Executors.newSingleThreadExecutor();
+ }
+
+ @PreDestroy
+ public void shutdownExecutor() {
+ if (executor != null) {
+ executor.shutdownNow();
+ }
+ }
+
@Override
public void handleWebSocketSessionEvent(TelemetryWebSocketSessionRef sessionRef, SessionEvent event) {
String sessionId = sessionRef.getSessionId();
@@ -169,44 +200,190 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
};
if (StringUtils.isEmpty(cmd.getScope())) {
- //ValidationCallback?
- ctx.loadAttributes(entityId, Arrays.asList(DataConstants.allScopes()), keys, callback);
+ accessValidator.validate(sessionRef.getSecurityCtx(), entityId, getAttributesFetchCallback(entityId, keys, callback));
} else {
- ctx.loadAttributes(entityId, cmd.getScope(), keys, callback);
+ accessValidator.validate(sessionRef.getSecurityCtx(), entityId, getAttributesFetchCallback(entityId, cmd.getScope(), keys, callback));
+ }
+ }
+
+ private void handleWsHistoryCmd(TelemetryWebSocketSessionRef sessionRef, GetHistoryCmd cmd) {
+ String sessionId = sessionRef.getSessionId();
+ WsSessionMetaData sessionMD = wsSessionsMap.get(sessionId);
+ if (sessionMD == null) {
+ log.warn("[{}] Session meta data not found. ", sessionId);
+ SubscriptionUpdate update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR,
+ SESSION_META_DATA_NOT_FOUND);
+ sendWsMsg(sessionRef, update);
+ return;
+ }
+ if (cmd.getEntityId() == null || cmd.getEntityId().isEmpty() || cmd.getEntityType() == null || cmd.getEntityType().isEmpty()) {
+ SubscriptionUpdate update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.BAD_REQUEST,
+ "Device id is empty!");
+ sendWsMsg(sessionRef, update);
+ return;
+ }
+ if (cmd.getKeys() == null || cmd.getKeys().isEmpty()) {
+ SubscriptionUpdate update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.BAD_REQUEST,
+ "Keys are empty!");
+ sendWsMsg(sessionRef, update);
+ return;
}
+ EntityId entityId = EntityIdFactory.getByTypeAndId(cmd.getEntityType(), cmd.getEntityId());
+ List keys = new ArrayList<>(getKeys(cmd).orElse(Collections.emptySet()));
+ List queries = keys.stream().map(key -> new BaseTsKvQuery(key, cmd.getStartTs(), cmd.getEndTs(), cmd.getInterval(), getLimit(cmd.getLimit()), getAggregation(cmd.getAgg())))
+ .collect(Collectors.toList());
+
+ FutureCallback> callback = new FutureCallback>() {
+ @Override
+ public void onSuccess(List data) {
+ sendWsMsg(sessionRef, new SubscriptionUpdate(cmd.getCmdId(), data));
+ }
+
+ @Override
+ public void onFailure(Throwable e) {
+ SubscriptionUpdate update;
+ if (UnauthorizedException.class.isInstance(e)) {
+ update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.UNAUTHORIZED,
+ SubscriptionErrorCode.UNAUTHORIZED.getDefaultMsg());
+ } else {
+ update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR,
+ FAILED_TO_FETCH_DATA);
+ }
+ sendWsMsg(sessionRef, update);
+ }
+ };
+ accessValidator.validate(sessionRef.getSecurityCtx(), entityId,
+ on(r -> Futures.addCallback(tsService.findAll(entityId, queries), callback, executor), callback::onFailure));
}
- private void handleWsAttributesSubscription(PluginContext ctx, PluginWebsocketSessionRef sessionRef,
+ private void handleWsAttributesSubscription(TelemetryWebSocketSessionRef sessionRef,
AttributesSubscriptionCmd cmd, String sessionId, EntityId entityId) {
- PluginCallback> callback = new PluginCallback>() {
+ FutureCallback> callback = new FutureCallback>() {
@Override
- public void onSuccess(PluginContext ctx, List data) {
+ public void onSuccess(List data) {
List attributesData = data.stream().map(d -> new BasicTsKvEntry(d.getLastUpdateTs(), d)).collect(Collectors.toList());
- sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), attributesData));
+ sendWsMsg(sessionRef, new SubscriptionUpdate(cmd.getCmdId(), attributesData));
Map subState = new HashMap<>(attributesData.size());
attributesData.forEach(v -> subState.put(v.getKey(), v.getTs()));
SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.ATTRIBUTES, true, subState, cmd.getScope());
- subscriptionManager.addLocalWsSubscription(ctx, sessionId, entityId, sub);
+ subscriptionManager.addLocalWsSubscription(sessionId, entityId, sub);
}
@Override
- public void onFailure(PluginContext ctx, Exception e) {
+ public void onFailure(Throwable e) {
log.error(FAILED_TO_FETCH_ATTRIBUTES, e);
SubscriptionUpdate update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR,
FAILED_TO_FETCH_ATTRIBUTES);
- sendWsMsg(ctx, sessionRef, update);
+ sendWsMsg(sessionRef, update);
}
};
+
if (StringUtils.isEmpty(cmd.getScope())) {
- ctx.loadAttributes(entityId, Arrays.asList(DataConstants.allScopes()), callback);
+ accessValidator.validate(sessionRef.getSecurityCtx(), entityId, getAttributesFetchCallback(entityId, callback));
+ } else {
+ accessValidator.validate(sessionRef.getSecurityCtx(), entityId, getAttributesFetchCallback(entityId, cmd.getScope(), callback));
+ }
+ }
+
+ private void handleWsTimeseriesSubscriptionCmd(TelemetryWebSocketSessionRef sessionRef, TimeseriesSubscriptionCmd cmd) {
+ String sessionId = sessionRef.getSessionId();
+ log.debug("[{}] Processing: {}", sessionId, cmd);
+
+ if (validateSessionMetadata(sessionRef, cmd, sessionId)) {
+ if (cmd.isUnsubscribe()) {
+ unsubscribe(sessionRef, cmd, sessionId);
+ } else if (validateSubscriptionCmd(sessionRef, cmd)) {
+ EntityId entityId = EntityIdFactory.getByTypeAndId(cmd.getEntityType(), cmd.getEntityId());
+ Optional> keysOptional = getKeys(cmd);
+
+ if (keysOptional.isPresent()) {
+ handleWsTimeseriesSubscriptionByKeys(sessionRef, cmd, sessionId, entityId);
+ } else {
+ handleWsTimeseriesSubscription(sessionRef, cmd, sessionId, entityId);
+ }
+ }
+ }
+ }
+
+ private void handleWsTimeseriesSubscriptionByKeys(TelemetryWebSocketSessionRef sessionRef,
+ TimeseriesSubscriptionCmd cmd, String sessionId, EntityId entityId) {
+ long startTs;
+ if (cmd.getTimeWindow() > 0) {
+ List keys = new ArrayList<>(getKeys(cmd).orElse(Collections.emptySet()));
+ log.debug("[{}] fetching timeseries data for last {} ms for keys: ({}) for device : {}", sessionId, cmd.getTimeWindow(), cmd.getKeys(), entityId);
+ startTs = cmd.getStartTs();
+ long endTs = cmd.getStartTs() + cmd.getTimeWindow();
+ List queries = keys.stream().map(key -> new BaseTsKvQuery(key, startTs, endTs, cmd.getInterval(),
+ getLimit(cmd.getLimit()), getAggregation(cmd.getAgg()))).collect(Collectors.toList());
+
+ final FutureCallback> callback = getSubscriptionCallback(sessionRef, cmd, sessionId, entityId, startTs, keys);
+ accessValidator.validate(sessionRef.getSecurityCtx(), entityId,
+ on(r -> Futures.addCallback(tsService.findAll(entityId, queries), callback, executor), callback::onFailure));
} else {
- ctx.loadAttributes(entityId, cmd.getScope(), callback);
+ List keys = new ArrayList<>(getKeys(cmd).orElse(Collections.emptySet()));
+ startTs = System.currentTimeMillis();
+ log.debug("[{}] fetching latest timeseries data for keys: ({}) for device : {}", sessionId, cmd.getKeys(), entityId);
+ final FutureCallback> callback = getSubscriptionCallback(sessionRef, cmd, sessionId, entityId, startTs, keys);
+ accessValidator.validate(sessionRef.getSecurityCtx(), entityId,
+ on(r -> Futures.addCallback(tsService.findLatest(entityId, keys), callback, executor), callback::onFailure));
}
}
+ private void handleWsTimeseriesSubscription(TelemetryWebSocketSessionRef sessionRef,
+ TimeseriesSubscriptionCmd cmd, String sessionId, EntityId entityId) {
+ FutureCallback> callback = new FutureCallback>() {
+ @Override
+ public void onSuccess(List data) {
+ sendWsMsg(sessionRef, new SubscriptionUpdate(cmd.getCmdId(), data));
+ Map subState = new HashMap<>(data.size());
+ data.forEach(v -> subState.put(v.getKey(), v.getTs()));
+ SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.TIMESERIES, true, subState, cmd.getScope());
+ subscriptionManager.addLocalWsSubscription(sessionId, entityId, sub);
+ }
+
+ @Override
+ public void onFailure(Throwable e) {
+ SubscriptionUpdate update;
+ if (UnauthorizedException.class.isInstance(e)) {
+ update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.UNAUTHORIZED,
+ SubscriptionErrorCode.UNAUTHORIZED.getDefaultMsg());
+ } else {
+ update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR,
+ FAILED_TO_FETCH_DATA);
+ }
+ sendWsMsg(sessionRef, update);
+ }
+ };
+ accessValidator.validate(sessionRef.getSecurityCtx(), entityId,
+ on(r -> Futures.addCallback(tsService.findAllLatest(entityId), callback, executor), callback::onFailure));
+ }
+
+ private FutureCallback> getSubscriptionCallback(final TelemetryWebSocketSessionRef sessionRef, final TimeseriesSubscriptionCmd cmd, final String sessionId, final EntityId entityId, final long startTs, final List keys) {
+ return new FutureCallback>() {
+ @Override
+ public void onSuccess(List data) {
+ sendWsMsg(sessionRef, new SubscriptionUpdate(cmd.getCmdId(), data));
+
+ Map subState = new HashMap<>(keys.size());
+ keys.forEach(key -> subState.put(key, startTs));
+ data.forEach(v -> subState.put(v.getKey(), v.getTs()));
+ SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.TIMESERIES, false, subState, cmd.getScope());
+ subscriptionManager.addLocalWsSubscription(sessionId, entityId, sub);
+ }
+
+ @Override
+ public void onFailure(Throwable e) {
+ log.error(FAILED_TO_FETCH_DATA, e);
+ SubscriptionUpdate update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR,
+ FAILED_TO_FETCH_DATA);
+ sendWsMsg(sessionRef, update);
+ }
+ };
+ }
+
private void unsubscribe(TelemetryWebSocketSessionRef sessionRef, SubscriptionCmd cmd, String sessionId) {
if (cmd.getEntityId() == null || cmd.getEntityId().isEmpty()) {
subscriptionManager.cleanupLocalWsSessionSubscriptions(sessionRef, sessionId);
@@ -258,4 +435,105 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
}
}
+ private ListenableFuture> mergeAllAttributesFutures(List>> futures) {
+ return Futures.transform(Futures.successfulAsList(futures),
+ (Function super List>, ? extends List>) input -> {
+ List tmp = new ArrayList<>();
+ if (input != null) {
+ input.forEach(tmp::addAll);
+ }
+ return tmp;
+ }, executor);
+ }
+
+ private FutureCallback getAttributesFetchCallback(final EntityId entityId, final List keys, final FutureCallback> callback) {
+ return new FutureCallback() {
+ @Override
+ public void onSuccess(@Nullable ValidationResult result) {
+ List>> futures = new ArrayList<>();
+ for (String scope : DataConstants.allScopes()) {
+ futures.add(attributesService.find(entityId, scope, keys));
+ }
+
+ ListenableFuture> future = mergeAllAttributesFutures(futures);
+ Futures.addCallback(future, callback);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ callback.onFailure(t);
+ }
+ };
+ }
+
+ private FutureCallback getAttributesFetchCallback(final EntityId entityId, final String scope, final List keys, final FutureCallback> callback) {
+ return new FutureCallback() {
+ @Override
+ public void onSuccess(@Nullable ValidationResult result) {
+ Futures.addCallback(attributesService.find(entityId, scope, keys), callback);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ callback.onFailure(t);
+ }
+ };
+ }
+
+ private FutureCallback getAttributesFetchCallback(final EntityId entityId, final FutureCallback> callback) {
+ return new FutureCallback() {
+ @Override
+ public void onSuccess(@Nullable ValidationResult result) {
+ List>> futures = new ArrayList<>();
+ for (String scope : DataConstants.allScopes()) {
+ futures.add(attributesService.findAll(entityId, scope));
+ }
+
+ ListenableFuture> future = mergeAllAttributesFutures(futures);
+ Futures.addCallback(future, callback);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ callback.onFailure(t);
+ }
+ };
+ }
+
+ private FutureCallback getAttributesFetchCallback(final EntityId entityId, final String scope, final FutureCallback> callback) {
+ return new FutureCallback() {
+ @Override
+ public void onSuccess(@Nullable ValidationResult result) {
+ Futures.addCallback(attributesService.findAll(entityId, scope), callback);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ callback.onFailure(t);
+ }
+ };
+ }
+
+ private FutureCallback on(Consumer success, Consumer failure) {
+ return new FutureCallback() {
+ @Override
+ public void onSuccess(@Nullable ValidationResult result) {
+ success.accept(result);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ failure.accept(t);
+ }
+ };
+ }
+
+
+ private static Aggregation getAggregation(String agg) {
+ return StringUtils.isEmpty(agg) ? DEFAULT_AGGREGATION : Aggregation.valueOf(agg);
+ }
+
+ private int getLimit(int limit) {
+ return limit == 0 ? DEFAULT_LIMIT : limit;
+ }
}
diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetrySubscriptionService.java
index 9673629e9d..44e8512559 100644
--- a/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetrySubscriptionService.java
+++ b/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetrySubscriptionService.java
@@ -2,10 +2,13 @@ package org.thingsboard.server.service.telemetry;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
+import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionState;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
/**
* Created by ashvayka on 27.03.18.
@@ -21,4 +24,8 @@ public interface TelemetrySubscriptionService {
void removeSubscription(String sessionId, int cmdId);
void addLocalWsSubscription(String sessionId, EntityId entityId, SubscriptionState sub);
+
+ void onLocalTimeseriesUpdate(EntityId entityId, List ts);
+
+ void onLocalAttributesUpdate(EntityId entityId, String scope, Set attributes);
}
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
index 46fa1d2087..6b8947037a 100644
--- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- *
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,7 +15,12 @@
*/
package org.thingsboard.rule.engine.api;
+import com.google.common.util.concurrent.FutureCallback;
+import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.RuleNodeId;
+import org.thingsboard.server.common.data.kv.AttributeKvEntry;
+import org.thingsboard.server.common.data.kv.KvEntry;
+import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.dao.alarm.AlarmService;
@@ -30,6 +35,8 @@ import org.thingsboard.server.dao.rule.RuleService;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.dao.user.UserService;
+import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.UUID;
@@ -56,6 +63,12 @@ public interface TbContext {
void tellError(TbMsg msg, Throwable th);
+ void saveAndNotify(EntityId entityId, List ts, FutureCallback callback);
+
+ void saveAndNotify(EntityId entityId, List ts, long ttl, FutureCallback callback);
+
+ void saveAndNotify(EntityId entityId, String scope, Set attributes, FutureCallback callback);
+
RuleNodeId getSelfId();
AttributesService getAttributesService();