Browse Source

Implementation and test fixes

pull/56/head
Andrew Shvayka 9 years ago
parent
commit
e3c823ef37
  1. 29
      application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
  2. 5
      application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java
  3. 5
      application/src/test/java/org/thingsboard/server/actors/DefaultActorServiceTest.java
  4. 2
      dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesDao.java
  5. 4
      dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java
  6. 26
      extensions-api/src/main/java/org/thingsboard/server/extensions/api/device/DeviceAttributes.java
  7. 32
      extensions-api/src/main/java/org/thingsboard/server/extensions/api/device/DeviceAttributesEventNotificationMsg.java
  8. 2
      extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java
  9. 16
      extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java
  10. 4
      transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java

29
application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java

@ -58,6 +58,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.Predicate;
@ -85,14 +86,24 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
this.attributeSubscriptions = new HashMap<>();
this.rpcSubscriptions = new HashMap<>();
this.rpcPendingMap = new HashMap<>();
refreshAttributes();
initAttributes();
}
private void refreshAttributes() {
private void initAttributes() {
this.deviceAttributes = new DeviceAttributes(fetchAttributes(DataConstants.CLIENT_SCOPE),
fetchAttributes(DataConstants.SERVER_SCOPE), fetchAttributes(DataConstants.SHARED_SCOPE));
}
private void refreshAttributes(DeviceAttributesEventNotificationMsg msg) {
if (this.deviceAttributes != null) {
if (msg.isDeleted()) {
msg.getDeletedKeys().forEach(key -> deviceAttributes.remove(key));
} else {
deviceAttributes.update(msg.getScope(), msg.getValues());
}
}
}
void processRpcRequest(ActorContext context, ToDeviceRpcRequestPluginMsg msg) {
ToDeviceRpcRequest request = msg.getMsg();
ToDeviceRpcRequestBody body = request.getBody();
@ -196,8 +207,8 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
void processAttributesUpdate(ActorContext context, DeviceAttributesEventNotificationMsg msg) {
//TODO: improve this procedure to fetch only changed attributes and support attributes deletion
refreshAttributes();
Set<AttributeKey> keys = msg.getKeys();
refreshAttributes(msg);
Set<AttributeKey> keys = msg.getDeletedKeys();
if (attributeSubscriptions.size() > 0) {
ToDeviceMsg notification = null;
if (msg.isDeleted()) {
@ -359,8 +370,14 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
}
}
private List<AttributeKvEntry> fetchAttributes(String attributeType) {
return systemContext.getAttributesService().findAll(this.deviceId, attributeType);
private List<AttributeKvEntry> fetchAttributes(String scope) {
try {
//TODO: replace this with async operation. Happens only during actor creation, but is still criticla for performance,
return systemContext.getAttributesService().findAll(this.deviceId, scope).get();
} catch (InterruptedException | ExecutionException e) {
logger.warning("[{}] Failed to fetch attributes for scope: {}", deviceId, scope);
throw new RuntimeException(e);
}
}
public void processCredentialsUpdate(ActorContext context, DeviceCredentialsUpdateNotificationMsg msg) {

5
application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java

@ -302,9 +302,8 @@ public final class PluginProcessingContext implements PluginContext {
@Override
public void getDevice(DeviceId deviceId, PluginCallback<Device> callback) {
//TODO: add caching here with async api.
Device device = pluginCtx.deviceService.findDeviceById(deviceId);
pluginCtx.self().tell(PluginCallbackMessage.onSuccess(callback, device), ActorRef.noSender());
ListenableFuture<Device> deviceFuture = pluginCtx.deviceService.findDeviceByIdAsync(deviceId);
Futures.addCallback(deviceFuture, getCallback(callback, v -> v));
}
@Override

5
application/src/test/java/org/thingsboard/server/actors/DefaultActorServiceTest.java

@ -22,6 +22,7 @@ import static org.mockito.Mockito.when;
import java.util.*;
import com.google.common.util.concurrent.Futures;
import org.thingsboard.server.actors.service.DefaultActorService;
import org.thingsboard.server.common.data.id.*;
import org.thingsboard.server.common.data.kv.TsKvEntry;
@ -226,7 +227,9 @@ public class DefaultActorServiceTest {
when(pluginMock.getConfiguration()).thenReturn(pluginAdditionalInfo);
when(pluginMock.getClazz()).thenReturn(TelemetryStoragePlugin.class.getName());
when(attributesService.findAll(deviceId, DataConstants.CLIENT_SCOPE)).thenReturn(Collections.emptyList());
when(attributesService.findAll(deviceId, DataConstants.CLIENT_SCOPE)).thenReturn(Futures.immediateFuture(Collections.emptyList()));
when(attributesService.findAll(deviceId, DataConstants.SHARED_SCOPE)).thenReturn(Futures.immediateFuture(Collections.emptyList()));
when(attributesService.findAll(deviceId, DataConstants.SERVER_SCOPE)).thenReturn(Futures.immediateFuture(Collections.emptyList()));
initActorSystem();
Thread.sleep(1000);

2
dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesDao.java

@ -68,7 +68,7 @@ public class BaseAttributesDao extends AbstractAsyncDao implements AttributesDao
.and(eq(ATTRIBUTE_KEY_COLUMN, attributeKey));
log.trace("Generated query [{}] for entityId {} and key {}", select, entityId, attributeKey);
return Futures.transform(executeAsyncRead(select), (Function<? super ResultSet, ? extends Optional<AttributeKvEntry>>) input ->
Optional.of(convertResultToAttributesKvEntry(attributeKey, input.one()))
Optional.ofNullable(convertResultToAttributesKvEntry(attributeKey, input.one()))
, readResultsProcessingExecutor);
}

4
dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java

@ -133,8 +133,8 @@ public class BaseTimeseriesDao extends AbstractAsyncDao implements TimeseriesDao
}
private ListenableFuture<List<TsKvEntry>> findAllAsyncWithLimit(String entityType, UUID entityId, TsKvQuery query) {
long minPartition = query.getStartTs();
long maxPartition = query.getEndTs();
long minPartition = toPartitionTs(query.getStartTs());
long maxPartition = toPartitionTs(query.getEndTs());
ResultSetFuture partitionsFuture = fetchPartitions(entityType, entityId, query.getKey(), minPartition, maxPartition);

26
extensions-api/src/main/java/org/thingsboard/server/extensions/api/device/DeviceAttributes.java

@ -15,6 +15,8 @@
*/
package org.thingsboard.server.extensions.api.device;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.kv.AttributeKey;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import java.util.*;
@ -65,4 +67,28 @@ public class DeviceAttributes {
public Optional<AttributeKvEntry> getServerPublicAttribute(String attribute) {
return Optional.ofNullable(serverPublicAttributesMap.get(attribute));
}
public void remove(AttributeKey key) {
Map<String, AttributeKvEntry> map = getMapByScope(key.getScope());
if (map != null) {
map.remove(key);
}
}
public void update(String scope, List<AttributeKvEntry> values) {
Map<String, AttributeKvEntry> map = getMapByScope(scope);
values.forEach(v -> map.put(v.getKey(), v));
}
private Map<String, AttributeKvEntry> getMapByScope(String scope) {
Map<String, AttributeKvEntry> map = null;
if (scope.equalsIgnoreCase(DataConstants.CLIENT_SCOPE)) {
map = clientSideAttributesMap;
} else if (scope.equalsIgnoreCase(DataConstants.SHARED_SCOPE)) {
map = serverPublicAttributesMap;
} else if (scope.equalsIgnoreCase(DataConstants.SERVER_SCOPE)) {
map = serverPrivateAttributesMap;
}
return map;
}
}

32
extensions-api/src/main/java/org/thingsboard/server/extensions/api/device/DeviceAttributesEventNotificationMsg.java

@ -15,37 +15,43 @@
*/
package org.thingsboard.server.extensions.api.device;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.ToString;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKey;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import java.util.List;
import java.util.Set;
/**
* @author Andrew Shvayka
*/
@ToString
@AllArgsConstructor
public class DeviceAttributesEventNotificationMsg implements ToDeviceActorNotificationMsg {
@Getter private final TenantId tenantId;
@Getter private final DeviceId deviceId;
@Getter private final Set<AttributeKey> keys;
@Getter private final boolean deleted;
@Getter
private final TenantId tenantId;
@Getter
private final DeviceId deviceId;
@Getter
private final Set<AttributeKey> deletedKeys;
@Getter
private final String scope;
@Getter
private final List<AttributeKvEntry> values;
@Getter
private final boolean deleted;
public static DeviceAttributesEventNotificationMsg onUpdate(TenantId tenantId, DeviceId deviceId, Set<AttributeKey> keys) {
return new DeviceAttributesEventNotificationMsg(tenantId, deviceId, keys, false);
public static DeviceAttributesEventNotificationMsg onUpdate(TenantId tenantId, DeviceId deviceId, String scope, List<AttributeKvEntry> values) {
return new DeviceAttributesEventNotificationMsg(tenantId, deviceId, null, scope, values, false);
}
public static DeviceAttributesEventNotificationMsg onDelete(TenantId tenantId, DeviceId deviceId, Set<AttributeKey> keys) {
return new DeviceAttributesEventNotificationMsg(tenantId, deviceId, keys, true);
return new DeviceAttributesEventNotificationMsg(tenantId, deviceId, keys, null, null, true);
}
private DeviceAttributesEventNotificationMsg(TenantId tenantId, DeviceId deviceId, Set<AttributeKey> keys, boolean deleted) {
this.tenantId = tenantId;
this.deviceId = deviceId;
this.keys = keys;
this.deleted = deleted;
}
}

2
extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java

@ -93,7 +93,7 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
Aggregation agg = Aggregation.valueOf(request.getParameter("agg", Aggregation.NONE.name()));
List<String> keys = Arrays.asList(keysStr.split(","));
List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, startTs.get(), endTs.get(), limit.get(), agg)).collect(Collectors.toList());
List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, startTs.get(), endTs.get(), limit.orElse(TelemetryWebsocketMsgHandler.DEFAULT_LIMIT), agg)).collect(Collectors.toList());
ctx.loadTimeseries(deviceId, queries, new PluginCallback<List<TsKvEntry>>() {
@Override
public void onSuccess(PluginContext ctx, List<TsKvEntry> data) {

16
extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java

@ -48,6 +48,8 @@ import java.util.stream.Collectors;
public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
private static final int UNKNOWN_SUBSCRIPTION_ID = 0;
public static final int DEFAULT_LIMIT = 100;
public static final Aggregation DEFAULT_AGGREGATION = Aggregation.NONE;
private final SubscriptionManager subscriptionManager;
@ -187,13 +189,11 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
if (keysOptional.isPresent()) {
long startTs;
if (cmd.getTimeWindow() > 0) {
List<TsKvEntry> data = new ArrayList<>();
List<String> keys = new ArrayList<>(getKeys(cmd).orElse(Collections.emptySet()));
log.debug("[{}] fetching timeseries data for last {} ms for keys: ({}) for device : {}", sessionId, cmd.getTimeWindow(), cmd.getKeys(), cmd.getDeviceId());
long endTs = System.currentTimeMillis();
startTs = endTs - cmd.getTimeWindow();
List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, startTs, endTs, cmd.getLimit(), Aggregation.valueOf(cmd.getAgg()))).collect(Collectors.toList());
List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, startTs, endTs, getLimit(cmd.getLimit()), getAggregation(cmd.getAgg()))).collect(Collectors.toList());
ctx.loadTimeseries(deviceId, queries, getSubscriptionCallback(sessionRef, cmd, sessionId, deviceId, startTs, keys));
} else {
List<String> keys = new ArrayList<>(getKeys(cmd).orElse(Collections.emptySet()));
@ -277,7 +277,7 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
}
DeviceId deviceId = DeviceId.fromString(cmd.getDeviceId());
List<String> keys = new ArrayList<>(getKeys(cmd).orElse(Collections.emptySet()));
List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, cmd.getStartTs(), cmd.getEndTs(), cmd.getLimit(), Aggregation.valueOf(cmd.getAgg()))).collect(Collectors.toList());
List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, cmd.getStartTs(), cmd.getEndTs(), getLimit(cmd.getLimit()), getAggregation(cmd.getAgg()))).collect(Collectors.toList());
ctx.loadTimeseries(deviceId, queries, new PluginCallback<List<TsKvEntry>>() {
@Override
public void onSuccess(PluginContext ctx, List<TsKvEntry> data) {
@ -299,6 +299,14 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
});
}
private static Aggregation getAggregation(String agg) {
return StringUtils.isEmpty(agg) ? DEFAULT_AGGREGATION : Aggregation.valueOf(agg);
}
private int getLimit(int limit) {
return limit == 0 ? DEFAULT_LIMIT : limit;
}
private boolean validateSessionMetadata(PluginContext ctx, PluginWebsocketSessionRef sessionRef, SubscriptionCmd cmd, String sessionId) {
WsSessionMetaData sessionMD = wsSessionsMap.get(sessionId);
if (sessionMD == null) {

4
transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java

@ -91,7 +91,9 @@ public class GatewaySessionCtx {
public void onDeviceDisconnect(MqttPublishMessage msg) throws AdaptorException {
String deviceName = checkDeviceName(getDeviceName(msg));
GatewayDeviceSessionCtx deviceSessionCtx = devices.remove(deviceName);
deviceSessionCtx.setClosed(true);
if (deviceSessionCtx != null) {
deviceSessionCtx.setClosed(true);
}
ack(msg);
}

Loading…
Cancel
Save