diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java index 70bb4f2421..f9c4e6f5c0 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java @@ -195,9 +195,8 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso } void processAttributesUpdate(ActorContext context, DeviceAttributesEventNotificationMsg msg) { - //TODO: improve this procedure to fetch only changed attributes. + //TODO: improve this procedure to fetch only changed attributes and support attributes deletion refreshAttributes(); - //TODO: support attributes deletion Set keys = msg.getKeys(); if (attributeSubscriptions.size() > 0) { ToDeviceMsg notification = null; diff --git a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java index 9474a621e1..98149c296a 100644 --- a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2017 The Thingsboard Authors - * + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -56,6 +56,7 @@ import org.thingsboard.server.extensions.api.plugins.ws.PluginWebsocketSessionRe import org.thingsboard.server.extensions.api.plugins.ws.msg.PluginWebsocketMsg; import akka.actor.ActorRef; +import org.w3c.dom.Attr; import javax.annotation.Nullable; @@ -91,49 +92,86 @@ public final class PluginProcessingContext implements PluginContext { @Override public void saveAttributes(DeviceId deviceId, String scope, List attributes, PluginCallback callback) { validate(deviceId); - Set keys = new HashSet<>(); - for (AttributeKvEntry attribute : attributes) { - keys.add(new AttributeKey(scope, attribute.getKey())); - } ListenableFuture> rsListFuture = pluginCtx.attributesService.save(deviceId, scope, attributes); Futures.addCallback(rsListFuture, getListCallback(callback, v -> { - onDeviceAttributesChanged(deviceId, keys); + onDeviceAttributesChanged(deviceId, scope, attributes); return null; }), executor); } @Override - public Optional loadAttribute(DeviceId deviceId, String attributeType, String attributeKey) { + public void removeAttributes(DeviceId deviceId, String scope, List keys, PluginCallback callback) { validate(deviceId); - AttributeKvEntry attribute = pluginCtx.attributesService.find(deviceId, attributeType, attributeKey); - return Optional.ofNullable(attribute); + ListenableFuture> future = pluginCtx.attributesService.removeAll(deviceId, scope, keys); + Futures.addCallback(future, getCallback(callback, v -> null), executor); + onDeviceAttributesDeleted(tenantId, deviceId, keys.stream().map(key -> new AttributeKey(scope, key)).collect(Collectors.toSet())); } @Override - public List loadAttributes(DeviceId deviceId, String attributeType, List attributeKeys) { + public void saveAttributesByDevice(TenantId tenantId, DeviceId deviceId, String scope, List attributes, PluginCallback callback) { validate(deviceId); - List result = new ArrayList<>(attributeKeys.size()); - for (String attributeKey : attributeKeys) { - AttributeKvEntry attribute = pluginCtx.attributesService.find(deviceId, attributeType, attributeKey); - if (attribute != null) { - result.add(attribute); - } - } - return result; + + ListenableFuture> rsListFuture = pluginCtx.attributesService.save(deviceId, scope, attributes); + Futures.addCallback(rsListFuture, getListCallback(callback, v -> { + onDeviceAttributesChanged(tenantId, deviceId, scope, attributes); + return null; + }), executor); + } + + @Override + public void removeAttributesByDevice(TenantId tenantId, DeviceId deviceId, String scope, List keys, PluginCallback callback) { + validate(deviceId); + ListenableFuture> future = pluginCtx.attributesService.removeAll(deviceId, scope, keys); + Futures.addCallback(future, getCallback(callback, v -> null), executor); + onDeviceAttributesDeleted(tenantId, deviceId, keys.stream().map(key -> new AttributeKey(scope, key)).collect(Collectors.toSet())); + } + + @Override + public void loadAttribute(DeviceId deviceId, String attributeType, String attributeKey, PluginCallback> callback) { + validate(deviceId); + ListenableFuture> future = pluginCtx.attributesService.find(deviceId, attributeType, attributeKey); + Futures.addCallback(future, getCallback(callback, v -> v), executor); } @Override - public List loadAttributes(DeviceId deviceId, String attributeType) { + public void loadAttributes(DeviceId deviceId, String attributeType, Collection attributeKeys, PluginCallback> callback) { validate(deviceId); - return pluginCtx.attributesService.findAll(deviceId, attributeType); + ListenableFuture> future = pluginCtx.attributesService.find(deviceId, attributeType, attributeKeys); + Futures.addCallback(future, getCallback(callback, v -> v), executor); } @Override - public void removeAttributes(DeviceId deviceId, String scope, List keys) { + public void loadAttributes(DeviceId deviceId, String attributeType, PluginCallback> callback) { validate(deviceId); - pluginCtx.attributesService.removeAll(deviceId, scope, keys); - onDeviceAttributesDeleted(deviceId, keys.stream().map(key -> new AttributeKey(scope, key)).collect(Collectors.toSet())); + ListenableFuture> future = pluginCtx.attributesService.findAll(deviceId, attributeType); + Futures.addCallback(future, getCallback(callback, v -> v), executor); + } + + @Override + public void loadAttributes(DeviceId deviceId, Collection attributeTypes, PluginCallback> callback) { + validate(deviceId); + List>> futures = new ArrayList<>(); + attributeTypes.forEach(attributeType -> futures.add(pluginCtx.attributesService.findAll(deviceId, attributeType))); + convertFuturesAndAddCallback(callback, futures); + } + + @Override + public void loadAttributes(DeviceId deviceId, Collection attributeTypes, Collection attributeKeys, PluginCallback> callback) { + validate(deviceId); + List>> futures = new ArrayList<>(); + attributeTypes.forEach(attributeType -> futures.add(pluginCtx.attributesService.find(deviceId, attributeType, attributeKeys))); + convertFuturesAndAddCallback(callback, futures); + } + + private void convertFuturesAndAddCallback(PluginCallback> callback, List>> futures) { + ListenableFuture> future = Futures.transform(Futures.successfulAsList(futures), + (Function>, ? extends List>) input -> { + List result = new ArrayList<>(); + input.forEach(r -> result.addAll(r)); + return result; + }, executor); + Futures.addCallback(future, getCallback(callback, v -> v), executor); } @Override @@ -205,18 +243,12 @@ public final class PluginProcessingContext implements PluginContext { return securityCtx; } - private void onDeviceAttributesChanged(DeviceId deviceId, AttributeKey key) { - onDeviceAttributesChanged(deviceId, Collections.singleton(key)); + private void onDeviceAttributesDeleted(TenantId tenantId, DeviceId deviceId, Set keys) { + pluginCtx.toDeviceActor(DeviceAttributesEventNotificationMsg.onDelete(tenantId, deviceId, keys)); } - private void onDeviceAttributesDeleted(DeviceId deviceId, Set keys) { - Device device = pluginCtx.deviceService.findDeviceById(deviceId); - pluginCtx.toDeviceActor(DeviceAttributesEventNotificationMsg.onDelete(device.getTenantId(), deviceId, keys)); - } - - private void onDeviceAttributesChanged(DeviceId deviceId, Set keys) { - Device device = pluginCtx.deviceService.findDeviceById(deviceId); - pluginCtx.toDeviceActor(DeviceAttributesEventNotificationMsg.onUpdate(device.getTenantId(), deviceId, keys)); + private void onDeviceAttributesChanged(TenantId tenantId, DeviceId deviceId, String scope, List values) { + pluginCtx.toDeviceActor(DeviceAttributesEventNotificationMsg.onUpdate(tenantId, deviceId, scope, values)); } private FutureCallback> getListCallback(final PluginCallback callback, Function, T> transformer) { @@ -256,11 +288,12 @@ public final class PluginProcessingContext implements PluginContext { } // TODO: replace with our own exceptions - private boolean validate(DeviceId deviceId) { + private boolean validate(DeviceId deviceId, PluginCallback callback) { if (securityCtx.isPresent()) { - PluginApiCallSecurityContext ctx = securityCtx.get(); + final PluginApiCallSecurityContext ctx = securityCtx.get(); if (ctx.isTenantAdmin() || ctx.isCustomerUser()) { - Device device = pluginCtx.deviceService.findDeviceById(deviceId); + ListenableFuture device = pluginCtx.deviceService.findDeviceById(deviceId); + Futures.addCallback(device, ); if (device == null) { throw new IllegalStateException("Device not found!"); } else { diff --git a/dao/src/main/java/org/thingsboard/server/dao/AbstractAsyncDao.java b/dao/src/main/java/org/thingsboard/server/dao/AbstractAsyncDao.java new file mode 100644 index 0000000000..9b9368d45d --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/AbstractAsyncDao.java @@ -0,0 +1,42 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * Created by ashvayka on 21.02.17. + */ +public abstract class AbstractAsyncDao extends AbstractDao { + + protected ExecutorService readResultsProcessingExecutor; + + @PostConstruct + public void startExecutor() { + readResultsProcessingExecutor = Executors.newCachedThreadPool(); + } + + @PreDestroy + public void stopExecutor() { + if (readResultsProcessingExecutor != null) { + readResultsProcessingExecutor.shutdownNow(); + } + } + +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesDao.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesDao.java index ead2c044cc..ae58d4d9c6 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesDao.java @@ -15,23 +15,28 @@ */ package org.thingsboard.server.dao.attributes; +import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.ResultSetFuture; +import com.google.common.util.concurrent.ListenableFuture; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import java.util.Collection; import java.util.List; -import java.util.UUID; +import java.util.Optional; /** * @author Andrew Shvayka */ public interface AttributesDao { - AttributeKvEntry find(EntityId entityId, String attributeType, String attributeKey); + ListenableFuture> find(EntityId entityId, String attributeType, String attributeKey); - List findAll(EntityId entityId, String attributeType); + ListenableFuture> find(EntityId entityId, String attributeType, Collection attributeKey); + + ListenableFuture> findAll(EntityId entityId, String attributeType); ResultSetFuture save(EntityId entityId, String attributeType, AttributeKvEntry attribute); - void removeAll(EntityId entityId, String scope, List keys); + ListenableFuture> removeAll(EntityId entityId, String scope, List keys); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesService.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesService.java index 5a1fd70bc7..6bf9fb2bd5 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesService.java @@ -23,18 +23,22 @@ import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.UUIDBased; import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import java.util.Collection; import java.util.List; +import java.util.Optional; /** * @author Andrew Shvayka */ public interface AttributesService { - AttributeKvEntry find(EntityId entityId, String scope, String attributeKey); + ListenableFuture> find(EntityId entityId, String scope, String attributeKey); - List findAll(EntityId entityId, String scope); + ListenableFuture> find(EntityId entityId, String scope, Collection attributeKeys); + + ListenableFuture> findAll(EntityId entityId, String scope); ListenableFuture> save(EntityId entityId, String scope, List attributes); - void removeAll(EntityId entityId, String scope, List attributeKeys); + ListenableFuture> removeAll(EntityId entityId, String scope, List attributeKeys); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesDao.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesDao.java index 5148a121de..262d15d1a3 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesDao.java @@ -18,19 +18,24 @@ package org.thingsboard.server.dao.attributes; import com.datastax.driver.core.*; import com.datastax.driver.core.querybuilder.QueryBuilder; import com.datastax.driver.core.querybuilder.Select; +import com.google.common.base.Function; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import org.thingsboard.server.common.data.id.EntityId; -import org.thingsboard.server.common.data.kv.DataType; -import org.thingsboard.server.dao.AbstractDao; +import org.thingsboard.server.dao.AbstractAsyncDao; import org.thingsboard.server.dao.model.ModelConstants; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.thingsboard.server.common.data.kv.*; import org.thingsboard.server.dao.timeseries.BaseTimeseriesDao; +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; import java.util.ArrayList; +import java.util.Collection; import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; import static org.thingsboard.server.dao.model.ModelConstants.*; import static com.datastax.driver.core.querybuilder.QueryBuilder.*; @@ -40,29 +45,55 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.*; */ @Component @Slf4j -public class BaseAttributesDao extends AbstractDao implements AttributesDao { - +public class BaseAttributesDao extends AbstractAsyncDao implements AttributesDao { + private PreparedStatement saveStmt; + @PostConstruct + public void init() { + super.startExecutor(); + } + + @PreDestroy + public void stop() { + super.stopExecutor(); + } + @Override - public AttributeKvEntry find(EntityId entityId, String attributeType, String attributeKey) { + public ListenableFuture> find(EntityId entityId, String attributeType, String attributeKey) { Select.Where select = select().from(ATTRIBUTES_KV_CF) .where(eq(ENTITY_TYPE_COLUMN, entityId.getEntityType())) .and(eq(ENTITY_ID_COLUMN, entityId.getId())) .and(eq(ATTRIBUTE_TYPE_COLUMN, attributeType)) .and(eq(ATTRIBUTE_KEY_COLUMN, attributeKey)); log.trace("Generated query [{}] for entityId {} and key {}", select, entityId, attributeKey); - return convertResultToAttributesKvEntry(attributeKey, executeRead(select).one()); + return Futures.transform(executeAsyncRead(select), (Function>) input -> + Optional.of(convertResultToAttributesKvEntry(attributeKey, input.one())) + , readResultsProcessingExecutor); } @Override - public List findAll(EntityId entityId, String attributeType) { + public ListenableFuture> find(EntityId entityId, String attributeType, Collection attributeKeys) { + List>> entries = new ArrayList<>(); + attributeKeys.forEach(attributeKey -> entries.add(find(entityId, attributeType, attributeKey))); + return Futures.transform(Futures.allAsList(entries), (Function>, ? extends List>) input -> { + List result = new ArrayList<>(); + input.stream().filter(opt -> opt.isPresent()).forEach(opt -> result.add(opt.get())); + return result; + }, readResultsProcessingExecutor); + } + + + @Override + public ListenableFuture> findAll(EntityId entityId, String attributeType) { Select.Where select = select().from(ATTRIBUTES_KV_CF) .where(eq(ENTITY_TYPE_COLUMN, entityId.getEntityType())) .and(eq(ENTITY_ID_COLUMN, entityId.getId())) .and(eq(ATTRIBUTE_TYPE_COLUMN, attributeType)); log.trace("Generated query [{}] for entityId {} and attributeType {}", select, entityId, attributeType); - return convertResultToAttributesKvEntryList(executeRead(select)); + return Futures.transform(executeAsyncRead(select), (Function>) input -> + convertResultToAttributesKvEntryList(input) + , readResultsProcessingExecutor); } @Override @@ -93,20 +124,19 @@ public class BaseAttributesDao extends AbstractDao implements AttributesDao { } @Override - public void removeAll(EntityId entityId, String attributeType, List keys) { - for (String key : keys) { - delete(entityId, attributeType, key); - } + public ListenableFuture> removeAll(EntityId entityId, String attributeType, List keys) { + List futures = keys.stream().map(key -> delete(entityId, attributeType, key)).collect(Collectors.toList()); + return Futures.allAsList(futures); } - private void delete(EntityId entityId, String attributeType, String key) { + private ResultSetFuture delete(EntityId entityId, String attributeType, String key) { Statement delete = QueryBuilder.delete().all().from(ModelConstants.ATTRIBUTES_KV_CF) .where(eq(ENTITY_TYPE_COLUMN, entityId.getEntityType())) .and(eq(ENTITY_ID_COLUMN, entityId.getId())) .and(eq(ATTRIBUTE_TYPE_COLUMN, attributeType)) .and(eq(ATTRIBUTE_KEY_COLUMN, key)); log.debug("Remove request: {}", delete.toString()); - getSession().execute(delete); + return getSession().executeAsync(delete); } private PreparedStatement getSaveStmt() { @@ -150,5 +180,4 @@ public class BaseAttributesDao extends AbstractDao implements AttributesDao { } return entries; } - } diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesService.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesService.java index d3a1cb3550..43612419d0 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesService.java @@ -27,7 +27,9 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.thingsboard.server.dao.service.Validator; +import java.util.Collection; import java.util.List; +import java.util.Optional; /** * @author Andrew Shvayka @@ -39,14 +41,21 @@ public class BaseAttributesService implements AttributesService { private AttributesDao attributesDao; @Override - public AttributeKvEntry find(EntityId entityId, String scope, String attributeKey) { + public ListenableFuture> find(EntityId entityId, String scope, String attributeKey) { validate(entityId, scope); Validator.validateString(attributeKey, "Incorrect attribute key " + attributeKey); return attributesDao.find(entityId, scope, attributeKey); } @Override - public List findAll(EntityId entityId, String scope) { + public ListenableFuture> find(EntityId entityId, String scope, Collection attributeKeys) { + validate(entityId, scope); + attributeKeys.forEach(attributeKey -> Validator.validateString(attributeKey, "Incorrect attribute key " + attributeKey)); + return attributesDao.find(entityId, scope, attributeKeys); + } + + @Override + public ListenableFuture> findAll(EntityId entityId, String scope) { validate(entityId, scope); return attributesDao.findAll(entityId, scope); } @@ -56,16 +65,16 @@ public class BaseAttributesService implements AttributesService { validate(entityId, scope); attributes.forEach(attribute -> validate(attribute)); List futures = Lists.newArrayListWithExpectedSize(attributes.size()); - for(AttributeKvEntry attribute : attributes) { + for (AttributeKvEntry attribute : attributes) { futures.add(attributesDao.save(entityId, scope, attribute)); } return Futures.allAsList(futures); } @Override - public void removeAll(EntityId entityId, String scope, List keys) { + public ListenableFuture> removeAll(EntityId entityId, String scope, List keys) { validate(entityId, scope); - attributesDao.removeAll(entityId, scope, keys); + return attributesDao.removeAll(entityId, scope, keys); } private static void validate(EntityId id, String scope) { diff --git a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceService.java b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceService.java index 8d780b6aa1..b760f5597d 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceService.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.dao.device; +import com.google.common.util.concurrent.ListenableFuture; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.DeviceId; @@ -28,6 +29,8 @@ public interface DeviceService { Device findDeviceById(DeviceId deviceId); + ListenableFuture findDeviceByIdAsync(DeviceId deviceId); + Optional findDeviceByTenantIdAndName(TenantId tenantId, String name); Device saveDevice(Device device); diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java index 81bbdf9309..42fede47f8 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java @@ -28,6 +28,7 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import org.thingsboard.server.common.data.kv.*; import org.thingsboard.server.common.data.kv.DataType; +import org.thingsboard.server.dao.AbstractAsyncDao; import org.thingsboard.server.dao.AbstractDao; import org.thingsboard.server.dao.model.ModelConstants; @@ -50,7 +51,7 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.select; */ @Component @Slf4j -public class BaseTimeseriesDao extends AbstractDao implements TimeseriesDao { +public class BaseTimeseriesDao extends AbstractAsyncDao implements TimeseriesDao { @Value("${cassandra.query.min_aggregation_step_ms}") private int minAggregationStepMs; @@ -60,8 +61,6 @@ public class BaseTimeseriesDao extends AbstractDao implements TimeseriesDao { private TsPartitionDate tsFormat; - private ExecutorService readResultsProcessingExecutor; - private PreparedStatement partitionInsertStmt; private PreparedStatement[] latestInsertStmts; private PreparedStatement[] saveStmts; @@ -71,8 +70,8 @@ public class BaseTimeseriesDao extends AbstractDao implements TimeseriesDao { @PostConstruct public void init() { + super.startExecutor(); getFetchStmt(Aggregation.NONE); - readResultsProcessingExecutor = Executors.newCachedThreadPool(); Optional partition = TsPartitionDate.parse(partitioning); if (partition.isPresent()) { tsFormat = partition.get(); @@ -84,9 +83,7 @@ public class BaseTimeseriesDao extends AbstractDao implements TimeseriesDao { @PreDestroy public void stop() { - if (readResultsProcessingExecutor != null) { - readResultsProcessingExecutor.shutdownNow(); - } + super.stopExecutor(); } @Override diff --git a/dao/src/test/java/org/thingsboard/server/dao/attributes/BaseAttributesServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/attributes/BaseAttributesServiceTest.java index 8fb5d86410..559a3130e8 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/attributes/BaseAttributesServiceTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/attributes/BaseAttributesServiceTest.java @@ -32,6 +32,7 @@ import org.springframework.beans.factory.annotation.Autowired; import java.util.Collections; import java.util.List; +import java.util.Optional; import static org.thingsboard.server.common.data.DataConstants.CLIENT_SCOPE; import static org.thingsboard.server.common.data.DataConstants.DEVICE; @@ -54,8 +55,9 @@ public class BaseAttributesServiceTest extends AbstractServiceTest { KvEntry attrValue = new StringDataEntry("attribute1", "value1"); AttributeKvEntry attr = new BaseAttributeKvEntry(attrValue, 42L); attributesService.save(deviceId, DataConstants.CLIENT_SCOPE, Collections.singletonList(attr)).get(); - AttributeKvEntry saved = attributesService.find(deviceId, DataConstants.CLIENT_SCOPE, attr.getKey()); - Assert.assertEquals(attr, saved); + Optional saved = attributesService.find(deviceId, DataConstants.CLIENT_SCOPE, attr.getKey()).get(); + Assert.assertTrue(saved.isPresent()); + Assert.assertEquals(attr, saved.get()); } @Test @@ -65,15 +67,17 @@ public class BaseAttributesServiceTest extends AbstractServiceTest { AttributeKvEntry attrOld = new BaseAttributeKvEntry(attrOldValue, 42L); attributesService.save(deviceId, DataConstants.CLIENT_SCOPE, Collections.singletonList(attrOld)).get(); - AttributeKvEntry saved = attributesService.find(deviceId, DataConstants.CLIENT_SCOPE, attrOld.getKey()); - Assert.assertEquals(attrOld, saved); + Optional saved = attributesService.find(deviceId, DataConstants.CLIENT_SCOPE, attrOld.getKey()).get(); + + Assert.assertTrue(saved.isPresent()); + Assert.assertEquals(attrOld, saved.get()); KvEntry attrNewValue = new StringDataEntry("attribute1", "value2"); AttributeKvEntry attrNew = new BaseAttributeKvEntry(attrNewValue, 73L); attributesService.save(deviceId, DataConstants.CLIENT_SCOPE, Collections.singletonList(attrNew)).get(); - saved = attributesService.find(deviceId, DataConstants.CLIENT_SCOPE, attrOld.getKey()); - Assert.assertEquals(attrNew, saved); + saved = attributesService.find(deviceId, DataConstants.CLIENT_SCOPE, attrOld.getKey()).get(); + Assert.assertEquals(attrNew, saved.get()); } @Test @@ -91,7 +95,7 @@ public class BaseAttributesServiceTest extends AbstractServiceTest { attributesService.save(deviceId, DataConstants.CLIENT_SCOPE, Collections.singletonList(attrANew)).get(); attributesService.save(deviceId, DataConstants.CLIENT_SCOPE, Collections.singletonList(attrBNew)).get(); - List saved = attributesService.findAll(deviceId, DataConstants.CLIENT_SCOPE); + List saved = attributesService.findAll(deviceId, DataConstants.CLIENT_SCOPE).get(); Assert.assertNotNull(saved); Assert.assertEquals(2, saved.size()); diff --git a/dao/src/test/java/org/thingsboard/server/dao/timeseries/TimeseriesServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/timeseries/TimeseriesServiceTest.java index 13c25c3213..fd16b75f95 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/timeseries/TimeseriesServiceTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/timeseries/TimeseriesServiceTest.java @@ -114,8 +114,8 @@ public class TimeseriesServiceTest extends AbstractServiceTest { entries.add(save(deviceId, 45000, 500)); entries.add(save(deviceId, 55000, 600)); - List list = tsService.findAll(DataConstants.DEVICE, deviceId, new BaseTsKvQuery(LONG_KEY, 0, - 60000, 3, Aggregation.NONE)).get(); + List list = tsService.findAll(DataConstants.DEVICE, deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0, + 60000, 3, Aggregation.NONE))).get(); assertEquals(3, list.size()); assertEquals(55000, list.get(0).getTs()); assertEquals(java.util.Optional.of(600L), list.get(0).getLongValue()); @@ -126,8 +126,8 @@ public class TimeseriesServiceTest extends AbstractServiceTest { assertEquals(35000, list.get(2).getTs()); assertEquals(java.util.Optional.of(400L), list.get(2).getLongValue()); - list = tsService.findAll(DataConstants.DEVICE, deviceId, new BaseTsKvQuery(LONG_KEY, 0, - 60000, 3, Aggregation.AVG)).get(); + list = tsService.findAll(DataConstants.DEVICE, deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0, + 60000, 3, Aggregation.AVG))).get(); assertEquals(3, list.size()); assertEquals(10000, list.get(0).getTs()); assertEquals(java.util.Optional.of(150L), list.get(0).getLongValue()); @@ -138,8 +138,8 @@ public class TimeseriesServiceTest extends AbstractServiceTest { assertEquals(50000, list.get(2).getTs()); assertEquals(java.util.Optional.of(550L), list.get(2).getLongValue()); - list = tsService.findAll(DataConstants.DEVICE, deviceId, new BaseTsKvQuery(LONG_KEY, 0, - 60000, 3, Aggregation.SUM)).get(); + list = tsService.findAll(DataConstants.DEVICE, deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0, + 60000, 3, Aggregation.SUM))).get(); assertEquals(3, list.size()); assertEquals(10000, list.get(0).getTs()); @@ -151,8 +151,8 @@ public class TimeseriesServiceTest extends AbstractServiceTest { assertEquals(50000, list.get(2).getTs()); assertEquals(java.util.Optional.of(1100L), list.get(2).getLongValue()); - list = tsService.findAll(DataConstants.DEVICE, deviceId, new BaseTsKvQuery(LONG_KEY, 0, - 60000, 3, Aggregation.MIN)).get(); + list = tsService.findAll(DataConstants.DEVICE, deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0, + 60000, 3, Aggregation.MIN))).get(); assertEquals(3, list.size()); assertEquals(10000, list.get(0).getTs()); @@ -164,8 +164,8 @@ public class TimeseriesServiceTest extends AbstractServiceTest { assertEquals(50000, list.get(2).getTs()); assertEquals(java.util.Optional.of(500L), list.get(2).getLongValue()); - list = tsService.findAll(DataConstants.DEVICE, deviceId, new BaseTsKvQuery(LONG_KEY, 0, - 60000, 3, Aggregation.MAX)).get(); + list = tsService.findAll(DataConstants.DEVICE, deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0, + 60000, 3, Aggregation.MAX))).get(); assertEquals(3, list.size()); assertEquals(10000, list.get(0).getTs()); @@ -177,8 +177,8 @@ public class TimeseriesServiceTest extends AbstractServiceTest { assertEquals(50000, list.get(2).getTs()); assertEquals(java.util.Optional.of(600L), list.get(2).getLongValue()); - list = tsService.findAll(DataConstants.DEVICE, deviceId, new BaseTsKvQuery(LONG_KEY, 0, - 60000, 3, Aggregation.COUNT)).get(); + list = tsService.findAll(DataConstants.DEVICE, deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0, + 60000, 3, Aggregation.COUNT))).get(); assertEquals(3, list.size()); assertEquals(10000, list.get(0).getTs()); diff --git a/dao/src/test/resources/cassandra-test.properties b/dao/src/test/resources/cassandra-test.properties index 210d2c0c9e..a4eb3d0547 100644 --- a/dao/src/test/resources/cassandra-test.properties +++ b/dao/src/test/resources/cassandra-test.properties @@ -2,7 +2,7 @@ cassandra.cluster_name=Thingsboard Cluster cassandra.keyspace_name=thingsboard -cassandra.url=127.0.0.1:9042 +cassandra.url=127.0.0.1:9142 cassandra.ssl=false diff --git a/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java b/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java index c2c5587704..64d4d535ea 100644 --- a/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java +++ b/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java @@ -94,13 +94,21 @@ public interface PluginContext { void saveAttributes(DeviceId deviceId, String attributeType, List attributes, PluginCallback callback); - Optional loadAttribute(DeviceId deviceId, String attributeType, String attributeKey); + void removeAttributes(DeviceId deviceId, String scope, List attributeKeys, PluginCallback callback); - List loadAttributes(DeviceId deviceId, String attributeType, List attributeKeys); + void saveAttributesByDevice(TenantId tenantId, DeviceId deviceId, String attributeType, List attributes, PluginCallback callback); - List loadAttributes(DeviceId deviceId, String attributeType); + void removeAttributesByDevice(TenantId tenantId, DeviceId deviceId, String scope, List attributeKeys, PluginCallback callback); - void removeAttributes(DeviceId deviceId, String scope, List attributeKeys); + void loadAttribute(DeviceId deviceId, String attributeType, String attributeKey, PluginCallback> callback); + + void loadAttributes(DeviceId deviceId, String attributeType, Collection attributeKeys, PluginCallback> callback); + + void loadAttributes(DeviceId deviceId, String attributeType, PluginCallback> callback); + + void loadAttributes(DeviceId deviceId, Collection attributeTypes, PluginCallback> callback); + + void loadAttributes(DeviceId deviceId, Collection attributeTypes, Collection attributeKeys, PluginCallback> callback); void getCustomerDevices(TenantId tenantId, CustomerId customerId, int limit, PluginCallback> callback); } diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/SubscriptionManager.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/SubscriptionManager.java index 0624876b36..ea7a185b20 100644 --- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/SubscriptionManager.java +++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/SubscriptionManager.java @@ -15,12 +15,14 @@ */ package org.thingsboard.server.extensions.core.plugin.telemetry; +import com.sun.javafx.collections.MappingChange; import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.kv.*; import org.thingsboard.server.common.msg.cluster.ServerAddress; +import org.thingsboard.server.extensions.api.plugins.PluginCallback; import org.thingsboard.server.extensions.api.plugins.PluginContext; import org.thingsboard.server.extensions.core.plugin.telemetry.handlers.TelemetryRpcMsgHandler; import org.thingsboard.server.extensions.core.plugin.telemetry.handlers.TelemetryWebsocketMsgHandler; @@ -66,28 +68,49 @@ public class SubscriptionManager { DeviceId deviceId = subscription.getDeviceId(); log.trace("[{}] Registering remote subscription [{}] for device [{}] to [{}]", sessionId, subscription.getSubscriptionId(), deviceId, address); registerSubscription(sessionId, deviceId, subscription); - List missedUpdates = new ArrayList<>(); if (subscription.getType() == SubscriptionType.ATTRIBUTES) { - subscription.getKeyStates().entrySet().forEach(e -> { - Optional latestOpt = ctx.loadAttribute(deviceId, DataConstants.CLIENT_SCOPE, e.getKey()); - if (latestOpt.isPresent()) { - AttributeKvEntry latestEntry = latestOpt.get(); - if (latestEntry.getLastUpdateTs() > e.getValue()) { - missedUpdates.add(new BasicTsKvEntry(latestEntry.getLastUpdateTs(), latestEntry)); - } + final Map keyStates = subscription.getKeyStates(); + ctx.loadAttributes(deviceId, DataConstants.CLIENT_SCOPE, keyStates.keySet(), new PluginCallback>() { + @Override + public void onSuccess(PluginContext ctx, List values) { + List missedUpdates = new ArrayList<>(); + values.forEach(latestEntry -> { + if (latestEntry.getLastUpdateTs() > keyStates.get(latestEntry.getKey())) { + missedUpdates.add(new BasicTsKvEntry(latestEntry.getLastUpdateTs(), latestEntry)); } + }); + if (!missedUpdates.isEmpty()) { + rpcHandler.onSubscriptionUpdate(ctx, address, sessionId, new SubscriptionUpdate(subscription.getSubscriptionId(), missedUpdates)); } - ); + } + + @Override + public void onFailure(PluginContext ctx, Exception e) { + log.error("Failed to fetch missed updates.", e); + } + }); } else if (subscription.getType() == SubscriptionType.TIMESERIES) { long curTs = System.currentTimeMillis(); + List queries = new ArrayList<>(); subscription.getKeyStates().entrySet().forEach(e -> { - TsKvQuery query = new BaseTsKvQuery(e.getKey(), e.getValue() + 1L, curTs); - missedUpdates.addAll(ctx.loadTimeseries(deviceId, query)); + queries.add(new BaseTsKvQuery(e.getKey(), e.getValue() + 1L, curTs)); + }); + + ctx.loadTimeseries(deviceId, queries, new PluginCallback>() { + @Override + public void onSuccess(PluginContext ctx, List missedUpdates) { + if (!missedUpdates.isEmpty()) { + rpcHandler.onSubscriptionUpdate(ctx, address, sessionId, new SubscriptionUpdate(subscription.getSubscriptionId(), missedUpdates)); + } + } + + @Override + public void onFailure(PluginContext ctx, Exception e) { + log.error("Failed to fetch missed updates.", e); + } }); } - if (!missedUpdates.isEmpty()) { - rpcHandler.onSubscriptionUpdate(ctx, address, sessionId, new SubscriptionUpdate(subscription.getSubscriptionId(), missedUpdates)); - } + } private void registerSubscription(String sessionId, DeviceId deviceId, Subscription subscription) { diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/BiPluginCallBack.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/BiPluginCallBack.java new file mode 100644 index 0000000000..bc5285caac --- /dev/null +++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/BiPluginCallBack.java @@ -0,0 +1,74 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.extensions.core.plugin.telemetry.handlers; + +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.extensions.api.plugins.PluginCallback; +import org.thingsboard.server.extensions.api.plugins.PluginContext; + +/** + * Created by ashvayka on 21.02.17. + */ +@Slf4j +public abstract class BiPluginCallBack { + + private V1 v1; + private V2 v2; + + public PluginCallback getV1Callback() { + return new PluginCallback() { + @Override + public void onSuccess(PluginContext ctx, V1 value) { + synchronized (BiPluginCallBack.this) { + BiPluginCallBack.this.v1 = value; + if (v2 != null) { + BiPluginCallBack.this.onSuccess(ctx, v1, v2); + } + } + } + + @Override + public void onFailure(PluginContext ctx, Exception e) { + BiPluginCallBack.this.onFailure(ctx, e); + } + }; + } + + public PluginCallback getV2Callback() { + return new PluginCallback() { + @Override + public void onSuccess(PluginContext ctx, V2 value) { + synchronized (BiPluginCallBack.this) { + BiPluginCallBack.this.v2 = value; + if (v1 != null) { + BiPluginCallBack.this.onSuccess(ctx, v1, v2); + } + } + + } + + @Override + public void onFailure(PluginContext ctx, Exception e) { + BiPluginCallBack.this.onFailure(ctx, e); + } + }; + } + + abstract public void onSuccess(PluginContext ctx, V1 v1, V2 v2); + + abstract public void onFailure(PluginContext ctx, Exception e); + +} diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java index e93f0b5012..28d8b7c5ce 100644 --- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java +++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java @@ -77,41 +77,58 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler { } }); } else if (entity.equals("attributes")) { - List attributes; + PluginCallback> callback = getAttributeKeysPluginCallback(msg); if (!StringUtils.isEmpty(scope)) { - attributes = ctx.loadAttributes(deviceId, scope); + ctx.loadAttributes(deviceId, scope, callback); } else { - attributes = new ArrayList<>(); - Arrays.stream(DataConstants.ALL_SCOPES).forEach(s -> attributes.addAll(ctx.loadAttributes(deviceId, s))); + ctx.loadAttributes(deviceId, Arrays.asList(DataConstants.ALL_SCOPES), callback); } - List keys = attributes.stream().map(attrKv -> attrKv.getKey()).collect(Collectors.toList()); - msg.getResponseHolder().setResult(new ResponseEntity<>(keys, HttpStatus.OK)); } } else if (method.equals("values")) { if ("timeseries".equals(entity)) { - String keys = request.getParameter("keys"); + String keysStr = request.getParameter("keys"); Optional startTs = request.getLongParamValue("startTs"); Optional endTs = request.getLongParamValue("endTs"); Optional limit = request.getIntParamValue("limit"); - Map> data = new LinkedHashMap<>(); - for (String key : keys.split(",")) { - //TODO: refactoring -// List entries = ctx.loadTimeseries(deviceId, new BaseTsKvQuery(key, startTs, endTs, limit)); -// data.put(key, entries.stream().map(v -> new TsData(v.getTs(), v.getValueAsString())).collect(Collectors.toList())); - } - msg.getResponseHolder().setResult(new ResponseEntity<>(data, HttpStatus.OK)); + Aggregation agg = Aggregation.valueOf(request.getParameter("agg", Aggregation.NONE.name())); + + List keys = Arrays.asList(keysStr.split(",")); + List queries = keys.stream().map(key -> new BaseTsKvQuery(key, startTs.get(), endTs.get(), limit.get(), agg)).collect(Collectors.toList()); + ctx.loadTimeseries(deviceId, queries, new PluginCallback>() { + @Override + public void onSuccess(PluginContext ctx, List data) { + Map> result = new LinkedHashMap<>(); + for (TsKvEntry entry : data) { + result.put(entry.getKey(), data.stream().map(v -> new TsData(v.getTs(), v.getValueAsString())).collect(Collectors.toList())); + } + msg.getResponseHolder().setResult(new ResponseEntity<>(data, HttpStatus.OK)); + } + + @Override + public void onFailure(PluginContext ctx, Exception e) { + log.error("Failed to fetch historical data", e); + msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR)); + } + }); } else if ("attributes".equals(entity)) { String keys = request.getParameter("keys", ""); - List attributes; + + PluginCallback> callback = getAttributeValuesPluginCallback(msg); if (!StringUtils.isEmpty(scope)) { - attributes = getAttributeKvEntries(ctx, scope, deviceId, keys); + if (!StringUtils.isEmpty(keys)) { + List keyList = Arrays.asList(keys.split(",")); + ctx.loadAttributes(deviceId, scope, keyList, callback); + } else { + ctx.loadAttributes(deviceId, scope, callback); + } } else { - attributes = new ArrayList<>(); - Arrays.stream(DataConstants.ALL_SCOPES).forEach(s -> attributes.addAll(getAttributeKvEntries(ctx, s, deviceId, keys))); + if (!StringUtils.isEmpty(keys)) { + List keyList = Arrays.asList(keys.split(",")); + ctx.loadAttributes(deviceId, Arrays.asList(DataConstants.ALL_SCOPES), keyList, callback); + } else { + ctx.loadAttributes(deviceId, Arrays.asList(DataConstants.ALL_SCOPES), callback); + } } - List values = attributes.stream().map(attribute -> new AttributeData(attribute.getLastUpdateTs(), - attribute.getKey(), attribute.getValue())).collect(Collectors.toList()); - msg.getResponseHolder().setResult(new ResponseEntity<>(values, HttpStatus.OK)); } } } else { @@ -156,6 +173,7 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler { @Override public void onFailure(PluginContext ctx, Exception e) { + log.error("Failed to save attributes", e); msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR)); } }); @@ -184,8 +202,18 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler { String keysParam = request.getParameter("keys"); if (!StringUtils.isEmpty(keysParam)) { String[] keys = keysParam.split(","); - ctx.removeAttributes(deviceId, scope, Arrays.asList(keys)); - msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.OK)); + ctx.removeAttributes(deviceId, scope, Arrays.asList(keys), new PluginCallback() { + @Override + public void onSuccess(PluginContext ctx, Void value) { + msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.OK)); + } + + @Override + public void onFailure(PluginContext ctx, Exception e) { + log.error("Failed to remove attributes", e); + msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR)); + } + }); return; } } @@ -196,14 +224,37 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler { msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST)); } - private List getAttributeKvEntries(PluginContext ctx, String scope, DeviceId deviceId, String keysParam) { - List attributes; - if (!StringUtils.isEmpty(keysParam)) { - String[] keys = keysParam.split(","); - attributes = ctx.loadAttributes(deviceId, scope, Arrays.asList(keys)); - } else { - attributes = ctx.loadAttributes(deviceId, scope); - } - return attributes; + + private PluginCallback> getAttributeKeysPluginCallback(final PluginRestMsg msg) { + return new PluginCallback>() { + @Override + public void onSuccess(PluginContext ctx, List attributes) { + List keys = attributes.stream().map(attrKv -> attrKv.getKey()).collect(Collectors.toList()); + msg.getResponseHolder().setResult(new ResponseEntity<>(keys, HttpStatus.OK)); + } + + @Override + public void onFailure(PluginContext ctx, Exception e) { + log.error("Failed to fetch attributes", e); + msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR)); + } + }; + } + + private PluginCallback> getAttributeValuesPluginCallback(final PluginRestMsg msg) { + return new PluginCallback>() { + @Override + public void onSuccess(PluginContext ctx, List attributes) { + List values = attributes.stream().map(attribute -> new AttributeData(attribute.getLastUpdateTs(), + attribute.getKey(), attribute.getValue())).collect(Collectors.toList()); + msg.getResponseHolder().setResult(new ResponseEntity<>(values, HttpStatus.OK)); + } + + @Override + public void onFailure(PluginContext ctx, Exception e) { + log.error("Failed to fetch attributes", e); + msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR)); + } + }; } } diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRuleMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRuleMsgHandler.java index 85a25da427..d9bfba073e 100644 --- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRuleMsgHandler.java +++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRuleMsgHandler.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.extensions.core.plugin.telemetry.handlers; +import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.RuleId; @@ -38,6 +39,7 @@ import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionT import java.util.*; import java.util.stream.Collectors; +@Slf4j public class TelemetryRuleMsgHandler extends DefaultRuleMsgHandler { private final SubscriptionManager subscriptionManager; @@ -49,27 +51,36 @@ public class TelemetryRuleMsgHandler extends DefaultRuleMsgHandler { public void handleGetAttributesRequest(PluginContext ctx, TenantId tenantId, RuleId ruleId, GetAttributesRequestRuleToPluginMsg msg) { GetAttributesRequest request = msg.getPayload(); - List clientAttributes = getAttributeKvEntries(ctx, msg.getDeviceId(), DataConstants.CLIENT_SCOPE, request.getClientAttributeNames()); - List sharedAttributes = getAttributeKvEntries(ctx, msg.getDeviceId(), DataConstants.SHARED_SCOPE, request.getSharedAttributeNames()); + BiPluginCallBack, List> callback = new BiPluginCallBack, List>() { - BasicGetAttributesResponse response = BasicGetAttributesResponse.onSuccess(request.getMsgType(), - request.getRequestId(), BasicAttributeKVMsg.from(clientAttributes, sharedAttributes)); + @Override + public void onSuccess(PluginContext ctx, List clientAttributes, List sharedAttributes) { + BasicGetAttributesResponse response = BasicGetAttributesResponse.onSuccess(request.getMsgType(), + request.getRequestId(), BasicAttributeKVMsg.from(clientAttributes, sharedAttributes)); + ctx.reply(new ResponsePluginToRuleMsg(msg.getUid(), tenantId, ruleId, response)); + } + + @Override + public void onFailure(PluginContext ctx, Exception e) { + log.error("Failed to process get attributes request", e); + ctx.reply(new ResponsePluginToRuleMsg(msg.getUid(), tenantId, ruleId, BasicStatusCodeResponse.onError(request.getMsgType(), request.getRequestId(), e))); + } + }; - ctx.reply(new ResponsePluginToRuleMsg(msg.getUid(), tenantId, ruleId, response)); + getAttributeKvEntries(ctx, msg.getDeviceId(), DataConstants.CLIENT_SCOPE, request.getClientAttributeNames(), callback.getV1Callback()); + getAttributeKvEntries(ctx, msg.getDeviceId(), DataConstants.SHARED_SCOPE, request.getSharedAttributeNames(), callback.getV2Callback()); } - private List getAttributeKvEntries(PluginContext ctx, DeviceId deviceId, String scope, Optional> names) { - List attributes; + private void getAttributeKvEntries(PluginContext ctx, DeviceId deviceId, String scope, Optional> names, PluginCallback> callback) { if (names.isPresent()) { if (!names.get().isEmpty()) { - attributes = ctx.loadAttributes(deviceId, scope, new ArrayList<>(names.get())); + ctx.loadAttributes(deviceId, scope, new ArrayList<>(names.get()), callback); } else { - attributes = ctx.loadAttributes(deviceId, scope); + ctx.loadAttributes(deviceId, scope, callback); } } else { - attributes = Collections.emptyList(); + callback.onSuccess(ctx, Collections.emptyList()); } - return attributes; } @Override @@ -100,6 +111,7 @@ public class TelemetryRuleMsgHandler extends DefaultRuleMsgHandler { @Override public void onFailure(PluginContext ctx, Exception e) { + log.error("Failed to process telemetry upload request", e); ctx.reply(new ResponsePluginToRuleMsg(msg.getUid(), tenantId, ruleId, BasicStatusCodeResponse.onError(request.getMsgType(), request.getRequestId(), e))); } }); @@ -127,6 +139,7 @@ public class TelemetryRuleMsgHandler extends DefaultRuleMsgHandler { @Override public void onFailure(PluginContext ctx, Exception e) { + log.error("Failed to process attributes update request", e); ctx.reply(new ResponsePluginToRuleMsg(msg.getUid(), tenantId, ruleId, BasicStatusCodeResponse.onError(request.getMsgType(), request.getRequestId(), e))); } }); diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java index 739bedfd89..dd30f99e9c 100644 --- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java +++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java @@ -104,37 +104,64 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler { SubscriptionState sub; if (keysOptional.isPresent()) { List keys = new ArrayList<>(keysOptional.get()); - List data = new ArrayList<>(); + + PluginCallback> callback = new PluginCallback>() { + @Override + public void onSuccess(PluginContext ctx, List data) { + List attributesData = data.stream().map(d -> new BasicTsKvEntry(d.getLastUpdateTs(), d)).collect(Collectors.toList()); + sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), attributesData)); + + Map subState = new HashMap<>(keys.size()); + keys.forEach(key -> subState.put(key, 0L)); + attributesData.forEach(v -> subState.put(v.getKey(), v.getTs())); + + SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), deviceId, SubscriptionType.ATTRIBUTES, false, subState); + subscriptionManager.addLocalWsSubscription(ctx, sessionId, deviceId, sub); + } + + @Override + public void onFailure(PluginContext ctx, Exception e) { + log.error("Failed to fetch attributes!", e); + SubscriptionUpdate update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR, + "Failed to fetch attributes!"); + sendWsMsg(ctx, sessionRef, update); + } + }; + if (StringUtils.isEmpty(cmd.getScope())) { - Arrays.stream(DataConstants.ALL_SCOPES).forEach(s -> data.addAll(ctx.loadAttributes(deviceId, s, keys))); + ctx.loadAttributes(deviceId, Arrays.asList(DataConstants.ALL_SCOPES), keys, callback); } else { - data.addAll(ctx.loadAttributes(deviceId, cmd.getScope(), keys)); + ctx.loadAttributes(deviceId, cmd.getScope(), keys, callback); } + } else { + PluginCallback> callback = new PluginCallback>() { + @Override + public void onSuccess(PluginContext ctx, List data) { + List attributesData = data.stream().map(d -> new BasicTsKvEntry(d.getLastUpdateTs(), d)).collect(Collectors.toList()); + sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), attributesData)); - List attributesData = data.stream().map(d -> new BasicTsKvEntry(d.getLastUpdateTs(), d)).collect(Collectors.toList()); - sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), attributesData)); + Map subState = new HashMap<>(attributesData.size()); + attributesData.forEach(v -> subState.put(v.getKey(), v.getTs())); - Map subState = new HashMap<>(keys.size()); - keys.forEach(key -> subState.put(key, 0L)); - attributesData.forEach(v -> subState.put(v.getKey(), v.getTs())); + SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), deviceId, SubscriptionType.ATTRIBUTES, true, subState); + subscriptionManager.addLocalWsSubscription(ctx, sessionId, deviceId, sub); + } + + @Override + public void onFailure(PluginContext ctx, Exception e) { + log.error("Failed to fetch attributes!", e); + SubscriptionUpdate update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR, + "Failed to fetch attributes!"); + sendWsMsg(ctx, sessionRef, update); + } + }; - sub = new SubscriptionState(sessionId, cmd.getCmdId(), deviceId, SubscriptionType.ATTRIBUTES, false, subState); - } else { - List data = new ArrayList<>(); if (StringUtils.isEmpty(cmd.getScope())) { - Arrays.stream(DataConstants.ALL_SCOPES).forEach(s -> data.addAll(ctx.loadAttributes(deviceId, s))); + ctx.loadAttributes(deviceId, Arrays.asList(DataConstants.ALL_SCOPES), callback); } else { - data.addAll(ctx.loadAttributes(deviceId, cmd.getScope())); + ctx.loadAttributes(deviceId, cmd.getScope(), callback); } - List attributesData = data.stream().map(d -> new BasicTsKvEntry(d.getLastUpdateTs(), d)).collect(Collectors.toList()); - sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), attributesData)); - - Map subState = new HashMap<>(attributesData.size()); - attributesData.forEach(v -> subState.put(v.getKey(), v.getTs())); - - sub = new SubscriptionState(sessionId, cmd.getCmdId(), deviceId, SubscriptionType.ATTRIBUTES, true, subState); } - subscriptionManager.addLocalWsSubscription(ctx, sessionId, deviceId, sub); } } } @@ -205,6 +232,7 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler { @Override public void onFailure(PluginContext ctx, Exception e) { + log.error("Failed to fetch data!", e); SubscriptionUpdate update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR, "Failed to fetch data!"); sendWsMsg(ctx, sessionRef, update);