Browse Source

Server-side RPC and Telemetry. Default Rule Chain. Fixed Tests.

pull/725/head
Andrew Shvayka 8 years ago
parent
commit
69aac973f6
  1. 2
      application/src/main/conf/thingsboard.conf
  2. 13
      application/src/main/data/json/demo/plugins/demo_device_messaging_rpc_plugin.json
  3. 28
      application/src/main/data/json/demo/plugins/demo_email_plugin.json
  4. 11
      application/src/main/data/json/demo/plugins/demo_time_rpc_plugin.json
  5. 46
      application/src/main/data/json/demo/rules/demo_alarm_rule.json
  6. 35
      application/src/main/data/json/demo/rules/demo_gettime_rpc_rule.json
  7. 38
      application/src/main/data/json/demo/rules/demo_messaging_rpc_rule.json
  8. 11
      application/src/main/data/json/system/plugins/system_rpc_plugin.json
  9. 9
      application/src/main/data/json/system/plugins/system_telemetry_plugin.json
  10. 29
      application/src/main/data/json/system/rules/system_telemetry_rule.json
  11. 102
      application/src/main/data/json/tenant/rule_chains/root_rule_chain.json
  12. 2
      application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
  13. 81
      application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
  14. 8
      application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java
  15. 29
      application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
  16. 2
      application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
  17. 9
      application/src/main/java/org/thingsboard/server/controller/RpcController.java
  18. 32
      application/src/main/java/org/thingsboard/server/controller/TenantController.java
  19. 147
      application/src/main/java/org/thingsboard/server/service/install/DefaultSystemDataLoaderService.java
  20. 182
      application/src/main/java/org/thingsboard/server/service/install/InstallScripts.java
  21. 4
      application/src/main/java/org/thingsboard/server/service/install/SystemDataLoaderService.java
  22. 5
      application/src/main/java/org/thingsboard/server/service/rpc/DefaultDeviceRpcService.java
  23. 27
      application/src/test/java/org/thingsboard/server/actors/ActorsTestSuite.java
  24. 245
      application/src/test/java/org/thingsboard/server/actors/DefaultActorServiceTest.java
  25. 63
      application/src/test/java/org/thingsboard/server/actors/DummySessionID.java
  26. 17
      application/src/test/java/org/thingsboard/server/controller/AbstractRuleEngineControllerTest.java
  27. 21
      application/src/test/java/org/thingsboard/server/controller/BaseComponentDescriptorControllerTest.java
  28. 232
      application/src/test/java/org/thingsboard/server/controller/BasePluginControllerTest.java
  29. 247
      application/src/test/java/org/thingsboard/server/controller/BaseRuleControllerTest.java
  30. 26
      application/src/test/java/org/thingsboard/server/controller/nosql/PluginControllerNoSqlTest.java
  31. 26
      application/src/test/java/org/thingsboard/server/controller/nosql/RuleControllerNoSqlTest.java
  32. 26
      application/src/test/java/org/thingsboard/server/controller/sql/PluginControllerSqlTest.java
  33. 2
      application/src/test/java/org/thingsboard/server/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java
  34. 3
      application/src/test/java/org/thingsboard/server/rules/RuleEngineSqlTestSuite.java
  35. 13
      application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java
  36. 5
      application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java
  37. 1
      application/src/test/java/org/thingsboard/server/system/SystemSqlTestSuite.java
  38. 2
      common/message/src/main/java/org/thingsboard/server/common/msg/core/AttributesUpdateRequest.java
  39. 9
      common/message/src/main/java/org/thingsboard/server/common/msg/core/BasicAttributesUpdateRequest.java
  40. 2
      common/message/src/main/java/org/thingsboard/server/common/msg/core/RuleEngineError.java
  41. 16
      common/message/src/main/java/org/thingsboard/server/common/msg/core/RuleEngineErrorMsg.java
  42. 6
      common/transport/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java
  43. 2
      dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java
  44. 6
      extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/msg/UpdateAttributesRequestRuleToPluginMsg.java
  45. 5
      extensions-core/src/main/java/org/thingsboard/server/extensions/core/action/telemetry/TelemetryPluginAction.java
  46. 4
      extensions-core/src/main/java/org/thingsboard/server/extensions/core/filter/DeviceAttributesFilter.java
  47. 4
      extensions-core/src/main/java/org/thingsboard/server/extensions/core/filter/NashornJsEvaluator.java
  48. 4
      extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRuleMsgHandler.java
  49. 4
      extensions-core/src/main/java/org/thingsboard/server/extensions/core/processor/AlarmProcessor.java
  50. 8
      extensions-core/src/main/resources/MsgTypeFilterDescriptor.json
  51. 2
      extensions/extension-kafka/src/main/java/org/thingsboard/server/extensions/kafka/action/KafkaPluginAction.java
  52. 4
      extensions/extension-kafka/src/main/java/org/thingsboard/server/extensions/kafka/plugin/KafkaMsgHandler.java
  53. 2
      extensions/extension-mqtt/src/main/java/org/thingsboard/server/extensions/mqtt/action/MqttPluginAction.java
  54. 4
      extensions/extension-mqtt/src/main/java/org/thingsboard/server/extensions/mqtt/plugin/MqttMsgHandler.java
  55. 2
      extensions/extension-rabbitmq/src/main/java/org/thingsboard/server/extensions/rabbitmq/action/RabbitMqPluginAction.java
  56. 2
      extensions/extension-rabbitmq/src/main/java/org/thingsboard/server/extensions/rabbitmq/plugin/RabbitMqMsgHandler.java
  57. 2
      extensions/extension-rest-api-call/src/main/java/org/thingsboard/server/extensions/rest/action/RestApiCallPluginAction.java
  58. 2
      extensions/extension-rest-api-call/src/main/java/org/thingsboard/server/extensions/rest/plugin/RestApiCallMsgHandler.java
  59. 2
      extensions/extension-sns/src/main/java/org/thingsboard/server/extensions/sns/action/SnsTopicPluginAction.java
  60. 2
      extensions/extension-sns/src/main/java/org/thingsboard/server/extensions/sns/plugin/SnsMessageHandler.java
  61. 2
      extensions/extension-sqs/src/main/java/org/thingsboard/server/extensions/sqs/action/fifo/SqsFifoQueuePluginAction.java
  62. 2
      extensions/extension-sqs/src/main/java/org/thingsboard/server/extensions/sqs/action/standard/SqsStandardQueuePluginAction.java
  63. 2
      extensions/extension-sqs/src/main/java/org/thingsboard/server/extensions/sqs/plugin/SqsMessageHandler.java
  64. 76
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java
  65. 23
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNodeConfiguration.java
  66. 15
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java
  67. 8
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeConfiguration.java
  68. 31
      rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbAlarmNodeTest.java
  69. 3
      transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/JsonCoapAdaptor.java
  70. 2
      transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java
  71. 2
      transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java

2
application/src/main/conf/thingsboard.conf

@ -14,7 +14,7 @@
# limitations under the License.
#
export JAVA_OPTS="$JAVA_OPTS -Dplatform=@pkg.platform@"
export JAVA_OPTS="$JAVA_OPTS -Dplatform=@pkg.platform@ -Dinstall.data_dir=@pkg.installFolder@"
export LOG_FILENAME=${pkg.name}.out
export LOADER_PATH=${pkg.installFolder}/conf,${pkg.installFolder}/extensions
export SQL_DATA_FOLDER=${pkg.installFolder}/data/sql

13
application/src/main/data/json/demo/plugins/demo_device_messaging_rpc_plugin.json

@ -1,13 +0,0 @@
{
"apiToken": "messaging",
"name": "Demo Device Messaging RPC Plugin",
"clazz": "org.thingsboard.server.extensions.core.plugin.messaging.DeviceMessagingPlugin",
"publicAccess": false,
"state": "ACTIVE",
"configuration": {
"maxDeviceCountPerCustomer": 1024,
"defaultTimeout": 20000,
"maxTimeout": 60000
},
"additionalInfo": null
}

28
application/src/main/data/json/demo/plugins/demo_email_plugin.json

@ -1,28 +0,0 @@
{
"apiToken": "mail",
"name": "Demo Email Plugin",
"clazz": "org.thingsboard.server.extensions.core.plugin.mail.MailPlugin",
"publicAccess": true,
"state": "ACTIVE",
"configuration": {
"host": "smtp.sendgrid.net",
"port": 2525,
"username": "apikey",
"password": "your_api_key",
"otherProperties": [
{
"key": "mail.smtp.auth",
"value": "true"
},
{
"key": "mail.smtp.timeout",
"value": "10000"
},
{
"key": "mail.smtp.starttls.enable",
"value": "true"
}
]
},
"additionalInfo": null
}

11
application/src/main/data/json/demo/plugins/demo_time_rpc_plugin.json

@ -1,11 +0,0 @@
{
"apiToken": "time",
"name": "Demo Time RPC Plugin",
"clazz": "org.thingsboard.server.extensions.core.plugin.time.TimePlugin",
"publicAccess": false,
"state": "ACTIVE",
"configuration": {
"timeFormat": "yyyy MM dd HH:mm:ss.SSS"
},
"additionalInfo": null
}

46
application/src/main/data/json/demo/rules/demo_alarm_rule.json

@ -1,46 +0,0 @@
{
"name": "Demo Alarm Rule",
"state": "ACTIVE",
"weight": 0,
"pluginToken": "mail",
"filters": [
{
"clazz": "org.thingsboard.server.extensions.core.filter.MsgTypeFilter",
"name": "MsgTypeFilter",
"configuration": {
"messageTypes": [
"POST_TELEMETRY",
"POST_ATTRIBUTES",
"GET_ATTRIBUTES"
]
}
},
{
"clazz": "org.thingsboard.server.extensions.core.filter.DeviceTelemetryFilter",
"name": "Temperature filter",
"configuration": {
"filter": "typeof temperature !== 'undefined' && temperature >= 100"
}
}
],
"processor": {
"clazz": "org.thingsboard.server.extensions.core.processor.AlarmDeduplicationProcessor",
"name": "AlarmDeduplicationProcessor",
"configuration": {
"alarmIdTemplate": "[$date.get('yyyy-MM-dd HH:mm')] Device $cs.get('serialNumber')($cs.get('model')) temperature is high!",
"alarmBodyTemplate": "[$date.get('yyyy-MM-dd HH:mm:ss')] Device $cs.get('serialNumber')($cs.get('model')) temperature is $temp.valueAsString!"
}
},
"action": {
"clazz": "org.thingsboard.server.extensions.core.action.mail.SendMailAction",
"name": "Send Mail Action",
"configuration": {
"sendFlag": "isNewAlarm",
"fromTemplate": "thingsboard@gmail.com",
"toTemplate": "thingsboard@gmail.com",
"subjectTemplate": "$alarmId",
"bodyTemplate": "$alarmBody"
}
},
"additionalInfo": null
}

35
application/src/main/data/json/demo/rules/demo_gettime_rpc_rule.json

@ -1,35 +0,0 @@
{
"name": "Demo getTime RPC Rule",
"state": "ACTIVE",
"weight": 0,
"pluginToken": "time",
"filters": [
{
"configuration": {
"messageTypes": [
"RPC_REQUEST"
]
},
"name": "RPC Request Filter",
"clazz": "org.thingsboard.server.extensions.core.filter.MsgTypeFilter"
},
{
"configuration": {
"methodNames": [
{
"name": "getTime"
}
]
},
"name": "getTime method filter",
"clazz": "org.thingsboard.server.extensions.core.filter.MethodNameFilter"
}
],
"processor": null,
"action": {
"configuration": {},
"clazz": "org.thingsboard.server.extensions.core.action.rpc.RpcPluginAction",
"name": "getTimeAction"
},
"additionalInfo": null
}

38
application/src/main/data/json/demo/rules/demo_messaging_rpc_rule.json

@ -1,38 +0,0 @@
{
"name": "Demo Messaging RPC Rule",
"state": "ACTIVE",
"weight": 0,
"pluginToken": "messaging",
"filters": [
{
"configuration": {
"messageTypes": [
"RPC_REQUEST"
]
},
"name": "RPC Request Filter",
"clazz": "org.thingsboard.server.extensions.core.filter.MsgTypeFilter"
},
{
"configuration": {
"methodNames": [
{
"name": "getDevices"
},
{
"name": "sendMsg"
}
]
},
"name": "Messaging methods filter",
"clazz": "org.thingsboard.server.extensions.core.filter.MethodNameFilter"
}
],
"processor": null,
"action": {
"configuration": {},
"clazz": "org.thingsboard.server.extensions.core.action.rpc.RpcPluginAction",
"name": "Messaging RPC Action"
},
"additionalInfo": null
}

11
application/src/main/data/json/system/plugins/system_rpc_plugin.json

@ -1,11 +0,0 @@
{
"apiToken": "rpc",
"name": "System RPC Plugin",
"clazz": "org.thingsboard.server.extensions.core.plugin.rpc.RpcPlugin",
"publicAccess": true,
"state": "ACTIVE",
"configuration": {
"defaultTimeout": 20000
},
"additionalInfo": null
}

9
application/src/main/data/json/system/plugins/system_telemetry_plugin.json

@ -1,9 +0,0 @@
{
"apiToken": "telemetry",
"name": "System Telemetry Plugin",
"clazz": "org.thingsboard.server.extensions.core.plugin.telemetry.TelemetryStoragePlugin",
"publicAccess": true,
"state": "ACTIVE",
"configuration": {},
"additionalInfo": null
}

29
application/src/main/data/json/system/rules/system_telemetry_rule.json

@ -1,29 +0,0 @@
{
"name": "System Telemetry Rule",
"state": "ACTIVE",
"weight": 0,
"pluginToken": "telemetry",
"filters": [
{
"clazz": "org.thingsboard.server.extensions.core.filter.MsgTypeFilter",
"name": "TelemetryFilter",
"configuration": {
"messageTypes": [
"POST_TELEMETRY",
"POST_ATTRIBUTES",
"GET_ATTRIBUTES"
]
}
}
],
"processor": null,
"action": {
"clazz": "org.thingsboard.server.extensions.core.action.telemetry.TelemetryPluginAction",
"name": "TelemetryMsgConverterAction",
"configuration": {
"timeUnit": "DAYS",
"ttlValue": 365
}
},
"additionalInfo": null
}

102
application/src/main/data/json/tenant/rule_chains/root_rule_chain.json

@ -0,0 +1,102 @@
{
"ruleChain": {
"additionalInfo": null,
"name": "Root Rule Chain",
"firstRuleNodeId": null,
"root": true,
"debugMode": false,
"configuration": null
},
"metadata": {
"firstNodeIndex": 2,
"nodes": [
{
"additionalInfo": {
"layoutX": 639,
"layoutY": 113
},
"type": "org.thingsboard.rule.engine.filter.TbMsgTypeFilterNode",
"name": "PostAttributes",
"debugMode": true,
"configuration": {
"messageTypes": [
"POST_ATTRIBUTES_REQUEST"
]
}
},
{
"additionalInfo": {
"layoutX": 638,
"layoutY": 206
},
"type": "org.thingsboard.rule.engine.filter.TbMsgTypeFilterNode",
"name": "PostTelemetry",
"debugMode": true,
"configuration": {
"messageTypes": [
"POST_TELEMETRY_REQUEST"
]
}
},
{
"additionalInfo": {
"layoutX": 297,
"layoutY": 148
},
"type": "org.thingsboard.rule.engine.action.TbLogNode",
"name": "Log",
"debugMode": false,
"configuration": {
"jsScript": "return 'incoming message = ' + msg;"
}
},
{
"additionalInfo": {
"layoutX": 905,
"layoutY": 203
},
"type": "org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode",
"name": "SaveTS",
"debugMode": true,
"configuration": {
"defaultTTL": 0
}
},
{
"additionalInfo": {
"layoutX": 904,
"layoutY": 110
},
"type": "org.thingsboard.rule.engine.telemetry.TbMsgAttributesNode",
"name": "save client attributes",
"debugMode": true,
"configuration": {
"scope": "CLIENT_SCOPE"
}
}
],
"connections": [
{
"fromIndex": 0,
"toIndex": 4,
"type": "True"
},
{
"fromIndex": 1,
"toIndex": 3,
"type": "True"
},
{
"fromIndex": 2,
"toIndex": 0,
"type": "Success"
},
{
"fromIndex": 2,
"toIndex": 1,
"type": "Success"
}
],
"ruleChainConnections": null
}
}

2
application/src/main/java/org/thingsboard/server/actors/app/AppActor.java

@ -103,7 +103,7 @@ public class AppActor extends RuleChainManagerActor {
case DEVICE_CREDENTIALS_UPDATE_TO_DEVICE_ACTOR_MSG:
case DEVICE_NAME_OR_TYPE_UPDATE_TO_DEVICE_ACTOR_MSG:
case DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG:
onToDeviceActorMsg((DeviceToDeviceActorMsg) msg);
onToDeviceActorMsg((TenantAwareMsg) msg);
break;
default:
return false;

81
application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java

@ -19,6 +19,9 @@ import akka.actor.ActorContext;
import akka.actor.ActorRef;
import akka.event.LoggingAdapter;
import com.datastax.driver.core.utils.UUIDs;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
@ -39,14 +42,18 @@ import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.common.msg.core.AttributesUpdateNotification;
import org.thingsboard.server.common.msg.core.AttributesUpdateRequest;
import org.thingsboard.server.common.msg.core.BasicCommandAckResponse;
import org.thingsboard.server.common.msg.core.BasicGetAttributesResponse;
import org.thingsboard.server.common.msg.core.BasicStatusCodeResponse;
import org.thingsboard.server.common.msg.core.BasicToDeviceSessionActorMsg;
import org.thingsboard.server.common.msg.core.GetAttributesRequest;
import org.thingsboard.server.common.msg.core.RuleEngineError;
import org.thingsboard.server.common.msg.core.RuleEngineErrorMsg;
import org.thingsboard.server.common.msg.core.SessionCloseMsg;
import org.thingsboard.server.common.msg.core.SessionCloseNotification;
import org.thingsboard.server.common.msg.core.SessionOpenMsg;
import org.thingsboard.server.common.msg.core.StatusCodeResponse;
import org.thingsboard.server.common.msg.core.TelemetryUploadRequest;
import org.thingsboard.server.common.msg.core.ToDeviceRpcRequestMsg;
import org.thingsboard.server.common.msg.core.ToDeviceRpcResponseMsg;
@ -64,10 +71,15 @@ import org.thingsboard.server.common.msg.timeout.DeviceActorQueueTimeoutMsg;
import org.thingsboard.server.common.msg.timeout.DeviceActorRpcTimeoutMsg;
import org.thingsboard.server.extensions.api.device.DeviceAttributesEventNotificationMsg;
import org.thingsboard.server.extensions.api.device.DeviceNameOrTypeUpdateMsg;
import org.thingsboard.server.extensions.api.plugins.PluginCallback;
import org.thingsboard.server.extensions.api.plugins.PluginContext;
import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse;
import org.thingsboard.server.extensions.api.plugins.msg.RpcError;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -114,7 +126,6 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
}
private void initAttributes() {
//TODO: add invalidation of deviceType cache.
Device device = systemContext.getDeviceService().findDeviceById(deviceId);
this.deviceName = device.getName();
this.deviceType = device.getType();
@ -238,6 +249,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
processSubscriptionCommands(context, msg);
processRpcResponses(context, msg);
processSessionStateMsgs(msg);
SessionMsgType sessionMsgType = msg.getPayload().getMsgType();
if (sessionMsgType.requiresRulesProcessing()) {
switch (sessionMsgType) {
@ -245,6 +257,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
handleGetAttributesRequest(msg);
break;
case POST_ATTRIBUTES_REQUEST:
handlePostAttributesRequest(context, msg);
break;
case POST_TELEMETRY_REQUEST:
handlePostTelemetryRequest(context, msg);
@ -256,14 +269,62 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
}
}
private void handleGetAttributesRequest(DeviceToDeviceActorMsg msg) {
private void handleGetAttributesRequest(DeviceToDeviceActorMsg src) {
GetAttributesRequest request = (GetAttributesRequest) src.getPayload();
ListenableFuture<List<AttributeKvEntry>> clientAttributesFuture = getAttributeKvEntries(deviceId, DataConstants.CLIENT_SCOPE, request.getClientAttributeNames());
ListenableFuture<List<AttributeKvEntry>> sharedAttributesFuture = getAttributeKvEntries(deviceId, DataConstants.SHARED_SCOPE, request.getClientAttributeNames());
Futures.addCallback(Futures.allAsList(Arrays.asList(clientAttributesFuture, sharedAttributesFuture)), new FutureCallback<List<List<AttributeKvEntry>>>() {
@Override
public void onSuccess(@Nullable List<List<AttributeKvEntry>> result) {
BasicGetAttributesResponse response = BasicGetAttributesResponse.onSuccess(request.getMsgType(),
request.getRequestId(), BasicAttributeKVMsg.from(result.get(0), result.get(1)));
sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(response, src.getSessionId()), src.getServerAddress());
}
@Override
public void onFailure(Throwable t) {
if (t instanceof Exception) {
ToDeviceMsg toDeviceMsg = BasicStatusCodeResponse.onError(SessionMsgType.GET_ATTRIBUTES_REQUEST, request.getRequestId(), (Exception) t);
sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(toDeviceMsg, src.getSessionId()), src.getServerAddress());
} else {
logger.error("[{}] Failed to process attributes request", deviceId, t);
}
}
});
}
private ListenableFuture<List<AttributeKvEntry>> getAttributeKvEntries(DeviceId deviceId, String scope, Optional<Set<String>> names) {
if (names.isPresent()) {
if (!names.get().isEmpty()) {
return systemContext.getAttributesService().find(deviceId, scope, names.get());
} else {
return systemContext.getAttributesService().findAll(deviceId, scope);
}
} else {
return Futures.immediateFuture(Collections.emptyList());
}
}
private void handlePostAttributesRequest(ActorContext context, DeviceToDeviceActorMsg src) {
AttributesUpdateRequest request = (AttributesUpdateRequest) src.getPayload();
JsonObject json = new JsonObject();
for (AttributeKvEntry kv : request.getAttributes()) {
kv.getBooleanValue().ifPresent(v -> json.addProperty(kv.getKey(), v));
kv.getLongValue().ifPresent(v -> json.addProperty(kv.getKey(), v));
kv.getDoubleValue().ifPresent(v -> json.addProperty(kv.getKey(), v));
kv.getStrValue().ifPresent(v -> json.addProperty(kv.getKey(), v));
}
TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), SessionMsgType.POST_ATTRIBUTES_REQUEST.name(), deviceId, defaultMetaData, TbMsgDataType.JSON, gson.toJson(json));
pushToRuleEngineWithTimeout(context, tbMsg, src, request);
}
private void handlePostTelemetryRequest(ActorContext context, DeviceToDeviceActorMsg src) {
TelemetryUploadRequest telemetry = (TelemetryUploadRequest) src.getPayload();
TelemetryUploadRequest request = (TelemetryUploadRequest) src.getPayload();
Map<Long, List<KvEntry>> tsData = telemetry.getData();
Map<Long, List<KvEntry>> tsData = request.getData();
JsonArray json = new JsonArray();
for (Map.Entry<Long, List<KvEntry>> entry : tsData.entrySet()) {
@ -281,7 +342,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
}
TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), SessionMsgType.POST_TELEMETRY_REQUEST.name(), deviceId, defaultMetaData, TbMsgDataType.JSON, gson.toJson(json));
pushToRuleEngineWithTimeout(context, tbMsg, src, telemetry);
pushToRuleEngineWithTimeout(context, tbMsg, src, request);
}
private void pushToRuleEngineWithTimeout(ActorContext context, TbMsg tbMsg, DeviceToDeviceActorMsg src, FromDeviceRequestMsg fromDeviceRequestMsg) {
@ -403,16 +464,6 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
}
}
private List<AttributeKvEntry> fetchAttributes(String scope) {
try {
//TODO: replace this with async operation. Happens only during actor creation, but is still criticla for performance,
return systemContext.getAttributesService().findAll(this.deviceId, scope).get();
} catch (InterruptedException | ExecutionException e) {
logger.warning("[{}] Failed to fetch attributes for scope: {}", deviceId, scope);
throw new RuntimeException(e);
}
}
void processCredentialsUpdate() {
sessions.forEach((k, v) -> {
sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(new SessionCloseNotification(), k), v.getServer());

8
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java

@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
* <p>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

29
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java

@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
* <p>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -60,6 +60,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
private RuleNodeId firstId;
private RuleNodeCtx firstNode;
private boolean started;
RuleChainActorMessageProcessor(TenantId tenantId, RuleChainId ruleChainId, ActorSystemContext systemContext
, LoggingAdapter logger, ActorRef parent, ActorRef self) {
@ -73,14 +74,19 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
@Override
public void start(ActorContext context) throws Exception {
RuleChain ruleChain = service.findRuleChainById(entityId);
List<RuleNode> ruleNodeList = service.getRuleChainNodes(entityId);
// Creating and starting the actors;
for (RuleNode ruleNode : ruleNodeList) {
ActorRef ruleNodeActor = createRuleNodeActor(context, ruleNode);
nodeActors.put(ruleNode.getId(), new RuleNodeCtx(tenantId, self, ruleNodeActor, ruleNode));
if (!started) {
RuleChain ruleChain = service.findRuleChainById(entityId);
List<RuleNode> ruleNodeList = service.getRuleChainNodes(entityId);
// Creating and starting the actors;
for (RuleNode ruleNode : ruleNodeList) {
ActorRef ruleNodeActor = createRuleNodeActor(context, ruleNode);
nodeActors.put(ruleNode.getId(), new RuleNodeCtx(tenantId, self, ruleNodeActor, ruleNode));
}
initRoutes(ruleChain, ruleNodeList);
started = true;
} else {
onUpdate(context);
}
initRoutes(ruleChain, ruleNodeList);
}
@Override
@ -115,6 +121,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
nodeActors.clear();
nodeRoutes.clear();
context.stop(self);
started = false;
}
@Override

2
application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java

@ -81,7 +81,7 @@ public class TenantActor extends RuleChainManagerActor {
case DEVICE_CREDENTIALS_UPDATE_TO_DEVICE_ACTOR_MSG:
case DEVICE_NAME_OR_TYPE_UPDATE_TO_DEVICE_ACTOR_MSG:
case DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG:
onToDeviceActorMsg((DeviceToDeviceActorMsg) msg);
onToDeviceActorMsg((DeviceAwareMsg) msg);
break;
default:
return false;

9
application/src/main/java/org/thingsboard/server/controller/RpcController.java

@ -114,9 +114,10 @@ public class RpcController extends BaseController {
final DeferredResult<ResponseEntity> response = new DeferredResult<>();
long timeout = System.currentTimeMillis() + (cmd.getTimeout() != null ? cmd.getTimeout() : DEFAULT_TIMEOUT);
ToDeviceRpcRequestBody body = new ToDeviceRpcRequestBody(cmd.getMethodName(), cmd.getRequestData());
accessValidator.validate(currentUser, deviceId, new FutureCallback<ValidationResult>() {
accessValidator.validate(currentUser, deviceId, new HttpValidationCallback(response, new FutureCallback<DeferredResult<ResponseEntity>>() {
@Override
public void onSuccess(@Nullable ValidationResult result) {
public void onSuccess(@Nullable DeferredResult<ResponseEntity> result) {
ToDeviceRpcRequest rpcRequest = new ToDeviceRpcRequest(UUID.randomUUID(),
tenantId,
deviceId,
@ -124,7 +125,7 @@ public class RpcController extends BaseController {
timeout,
body
);
deviceRpcService.process(rpcRequest, new LocalRequestMetaData(rpcRequest, currentUser, response));
deviceRpcService.process(rpcRequest, new LocalRequestMetaData(rpcRequest, currentUser, result));
}
@Override
@ -138,7 +139,7 @@ public class RpcController extends BaseController {
deviceRpcService.logRpcCall(currentUser, deviceId, body, oneWay, Optional.empty(), e);
response.setResult(entity);
}
});
}));
return response;
} catch (IOException ioe) {
throw new ThingsboardException("Invalid request body", ioe, ThingsboardErrorCode.BAD_REQUEST_PARAMS);

32
application/src/main/java/org/thingsboard/server/controller/TenantController.java

@ -15,21 +15,34 @@
*/
package org.thingsboard.server.controller;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;
import org.thingsboard.server.common.data.Tenant;
import org.thingsboard.server.common.data.exception.ThingsboardException;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.TextPageData;
import org.thingsboard.server.common.data.page.TextPageLink;
import org.thingsboard.server.dao.tenant.TenantService;
import org.thingsboard.server.common.data.exception.ThingsboardException;
import org.thingsboard.server.service.install.InstallScripts;
@RestController
@RequestMapping("/api")
@Slf4j
public class TenantController extends BaseController {
@Autowired
private InstallScripts installScripts;
@Autowired
private TenantService tenantService;
@ -49,10 +62,15 @@ public class TenantController extends BaseController {
@PreAuthorize("hasAuthority('SYS_ADMIN')")
@RequestMapping(value = "/tenant", method = RequestMethod.POST)
@ResponseBody
@ResponseBody
public Tenant saveTenant(@RequestBody Tenant tenant) throws ThingsboardException {
try {
return checkNotNull(tenantService.saveTenant(tenant));
boolean newTenant = tenant.getId() == null;
tenant = checkNotNull(tenantService.saveTenant(tenant));
if (newTenant) {
installScripts.createDefaultRuleChains(tenant.getId());
}
return tenant;
} catch (Exception e) {
throw handleException(e);
}
@ -72,7 +90,7 @@ public class TenantController extends BaseController {
}
@PreAuthorize("hasAuthority('SYS_ADMIN')")
@RequestMapping(value = "/tenants", params = { "limit" }, method = RequestMethod.GET)
@RequestMapping(value = "/tenants", params = {"limit"}, method = RequestMethod.GET)
@ResponseBody
public TextPageData<Tenant> getTenants(@RequestParam int limit,
@RequestParam(required = false) String textSearch,
@ -85,5 +103,5 @@ public class TenantController extends BaseController {
throw handleException(e);
}
}
}

147
application/src/main/java/org/thingsboard/server/service/install/DefaultSystemDataLoaderService.java

@ -60,21 +60,12 @@ import java.nio.file.Paths;
@Slf4j
public class DefaultSystemDataLoaderService implements SystemDataLoaderService {
private static final String JSON_DIR = "json";
private static final String SYSTEM_DIR = "system";
private static final String DEMO_DIR = "demo";
private static final String WIDGET_BUNDLES_DIR = "widget_bundles";
private static final String PLUGINS_DIR = "plugins";
private static final String RULES_DIR = "rules";
private static final String DASHBOARDS_DIR = "dashboards";
private static final ObjectMapper objectMapper = new ObjectMapper();
public static final String JSON_EXT = ".json";
public static final String CUSTOMER_CRED = "customer";
public static final String DEFAULT_DEVICE_TYPE = "default";
@Value("${install.data_dir}")
private String dataDir;
@Autowired
private InstallScripts installScripts;
@Autowired
private BCryptPasswordEncoder passwordEncoder;
@ -88,15 +79,6 @@ public class DefaultSystemDataLoaderService implements SystemDataLoaderService {
@Autowired
private WidgetsBundleService widgetsBundleService;
@Autowired
private WidgetTypeService widgetTypeService;
@Autowired
private PluginService pluginService;
@Autowired
private RuleService ruleService;
@Autowired
private TenantService tenantService;
@ -109,9 +91,6 @@ public class DefaultSystemDataLoaderService implements SystemDataLoaderService {
@Autowired
private DeviceCredentialsService deviceCredentialsService;
@Autowired
private DashboardService dashboardService;
@Bean
protected BCryptPasswordEncoder passwordEncoder() {
return new BCryptPasswordEncoder();
@ -146,56 +125,13 @@ public class DefaultSystemDataLoaderService implements SystemDataLoaderService {
adminSettingsService.saveAdminSettings(mailSettings);
}
@Override
public void loadSystemWidgets() throws Exception {
Path widgetBundlesDir = Paths.get(dataDir, JSON_DIR, SYSTEM_DIR, WIDGET_BUNDLES_DIR);
try (DirectoryStream<Path> dirStream = Files.newDirectoryStream(widgetBundlesDir, path -> path.toString().endsWith(JSON_EXT))) {
dirStream.forEach(
path -> {
try {
JsonNode widgetsBundleDescriptorJson = objectMapper.readTree(path.toFile());
JsonNode widgetsBundleJson = widgetsBundleDescriptorJson.get("widgetsBundle");
WidgetsBundle widgetsBundle = objectMapper.treeToValue(widgetsBundleJson, WidgetsBundle.class);
WidgetsBundle savedWidgetsBundle = widgetsBundleService.saveWidgetsBundle(widgetsBundle);
JsonNode widgetTypesArrayJson = widgetsBundleDescriptorJson.get("widgetTypes");
widgetTypesArrayJson.forEach(
widgetTypeJson -> {
try {
WidgetType widgetType = objectMapper.treeToValue(widgetTypeJson, WidgetType.class);
widgetType.setBundleAlias(savedWidgetsBundle.getAlias());
widgetTypeService.saveWidgetType(widgetType);
} catch (Exception e) {
log.error("Unable to load widget type from json: [{}]", path.toString());
throw new RuntimeException("Unable to load widget type from json", e);
}
}
);
} catch (Exception e) {
log.error("Unable to load widgets bundle from json: [{}]", path.toString());
throw new RuntimeException("Unable to load widgets bundle from json", e);
}
}
);
}
}
@Override
public void loadSystemPlugins() throws Exception {
// loadPlugins(Paths.get(dataDir, JSON_DIR, SYSTEM_DIR, PLUGINS_DIR), null);
}
@Override
public void loadSystemRules() throws Exception {
// loadRules(Paths.get(dataDir, JSON_DIR, SYSTEM_DIR, RULES_DIR), null);
}
@Override
public void loadDemoData() throws Exception {
Tenant demoTenant = new Tenant();
demoTenant.setRegion("Global");
demoTenant.setTitle("Tenant");
demoTenant = tenantService.saveTenant(demoTenant);
installScripts.createDefaultRuleChains(demoTenant.getId());
createUser(Authority.TENANT_ADMIN, demoTenant.getId(), null, "tenant@thingsboard.org", "tenant");
Customer customerA = new Customer();
@ -227,9 +163,7 @@ public class DefaultSystemDataLoaderService implements SystemDataLoaderService {
createDevice(demoTenant.getId(), null, DEFAULT_DEVICE_TYPE, "Raspberry Pi Demo Device", "RASPBERRY_PI_DEMO_TOKEN", "Demo device that is used in " +
"Raspberry Pi GPIO control sample application");
// loadPlugins(Paths.get(dataDir, JSON_DIR, DEMO_DIR, PLUGINS_DIR), demoTenant.getId());
// loadRules(Paths.get(dataDir, JSON_DIR, DEMO_DIR, RULES_DIR), demoTenant.getId());
loadDashboards(Paths.get(dataDir, JSON_DIR, DEMO_DIR, DASHBOARDS_DIR), demoTenant.getId(), null);
installScripts.loadDashboards(demoTenant.getId(), null);
}
@Override
@ -240,6 +174,11 @@ public class DefaultSystemDataLoaderService implements SystemDataLoaderService {
}
}
@Override
public void loadSystemWidgets() throws Exception {
installScripts.loadSystemWidgets();
}
private User createUser(Authority authority,
TenantId tenantId,
CustomerId customerId,
@ -282,72 +221,4 @@ public class DefaultSystemDataLoaderService implements SystemDataLoaderService {
return device;
}
private void loadPlugins(Path pluginsDir, TenantId tenantId) throws Exception{
try (DirectoryStream<Path> dirStream = Files.newDirectoryStream(pluginsDir, path -> path.toString().endsWith(JSON_EXT))) {
dirStream.forEach(
path -> {
try {
JsonNode pluginJson = objectMapper.readTree(path.toFile());
PluginMetaData plugin = objectMapper.treeToValue(pluginJson, PluginMetaData.class);
plugin.setTenantId(tenantId);
if (plugin.getState() == ComponentLifecycleState.ACTIVE) {
plugin.setState(ComponentLifecycleState.SUSPENDED);
PluginMetaData savedPlugin = pluginService.savePlugin(plugin);
pluginService.activatePluginById(savedPlugin.getId());
} else {
pluginService.savePlugin(plugin);
}
} catch (Exception e) {
log.error("Unable to load plugin from json: [{}]", path.toString());
throw new RuntimeException("Unable to load plugin from json", e);
}
}
);
}
}
private void loadRules(Path rulesDir, TenantId tenantId) throws Exception {
try (DirectoryStream<Path> dirStream = Files.newDirectoryStream(rulesDir, path -> path.toString().endsWith(JSON_EXT))) {
dirStream.forEach(
path -> {
try {
JsonNode ruleJson = objectMapper.readTree(path.toFile());
RuleMetaData rule = objectMapper.treeToValue(ruleJson, RuleMetaData.class);
rule.setTenantId(tenantId);
if (rule.getState() == ComponentLifecycleState.ACTIVE) {
rule.setState(ComponentLifecycleState.SUSPENDED);
RuleMetaData savedRule = ruleService.saveRule(rule);
ruleService.activateRuleById(savedRule.getId());
} else {
ruleService.saveRule(rule);
}
} catch (Exception e) {
log.error("Unable to load rule from json: [{}]", path.toString());
throw new RuntimeException("Unable to load rule from json", e);
}
}
);
}
}
private void loadDashboards(Path dashboardsDir, TenantId tenantId, CustomerId customerId) throws Exception {
try (DirectoryStream<Path> dirStream = Files.newDirectoryStream(dashboardsDir, path -> path.toString().endsWith(JSON_EXT))) {
dirStream.forEach(
path -> {
try {
JsonNode dashboardJson = objectMapper.readTree(path.toFile());
Dashboard dashboard = objectMapper.treeToValue(dashboardJson, Dashboard.class);
dashboard.setTenantId(tenantId);
Dashboard savedDashboard = dashboardService.saveDashboard(dashboard);
if (customerId != null && !customerId.isNullUid()) {
dashboardService.assignDashboardToCustomer(savedDashboard.getId(), customerId);
}
} catch (Exception e) {
log.error("Unable to load dashboard from json: [{}]", path.toString());
throw new RuntimeException("Unable to load dashboard from json", e);
}
}
);
}
}
}

182
application/src/main/java/org/thingsboard/server/service/install/InstallScripts.java

@ -0,0 +1,182 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.install;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import org.thingsboard.server.common.data.Dashboard;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.common.data.rule.RuleChainMetaData;
import org.thingsboard.server.common.data.widget.WidgetType;
import org.thingsboard.server.common.data.widget.WidgetsBundle;
import org.thingsboard.server.dao.dashboard.DashboardService;
import org.thingsboard.server.dao.rule.RuleChainService;
import org.thingsboard.server.dao.widget.WidgetTypeService;
import org.thingsboard.server.dao.widget.WidgetsBundleService;
import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import static org.thingsboard.server.service.install.DatabaseHelper.objectMapper;
/**
* Created by ashvayka on 18.04.18.
*/
@Component
@Slf4j
public class InstallScripts {
public static final String APP_DIR = "application";
public static final String SRC_DIR = "src";
public static final String MAIN_DIR = "main";
public static final String DATA_DIR = "data";
public static final String JSON_DIR = "json";
public static final String SYSTEM_DIR = "system";
public static final String TENANT_DIR = "tenant";
public static final String DEMO_DIR = "demo";
public static final String RULE_CHAINS_DIR = "rule_chains";
public static final String WIDGET_BUNDLES_DIR = "widget_bundles";
public static final String DASHBOARDS_DIR = "dashboards";
public static final String JSON_EXT = ".json";
@Value("${install.data_dir:}")
private String dataDir;
@Autowired
private RuleChainService ruleChainService;
@Autowired
private DashboardService dashboardService;
@Autowired
private WidgetTypeService widgetTypeService;
@Autowired
private WidgetsBundleService widgetsBundleService;
public Path getTenantRuleChainsDir() {
return Paths.get(getDataDir(), JSON_DIR, TENANT_DIR, RULE_CHAINS_DIR);
}
public String getDataDir() {
if (!StringUtils.isEmpty(dataDir)) {
return dataDir;
} else {
String workDir = System.getProperty("user.dir");
if (workDir.endsWith("application")) {
return Paths.get(workDir, SRC_DIR, MAIN_DIR, DATA_DIR).toString();
} else {
Path dataDirPath = Paths.get(workDir, APP_DIR, SRC_DIR, MAIN_DIR, DATA_DIR);
if (Files.exists(dataDirPath)) {
return dataDirPath.toString();
} else {
throw new RuntimeException("Not valid working directory: " + workDir + ". Please use either root project directory, application module directory or specify valid \"install.data_dir\" ENV variable to avoid automatic data directory lookup!");
}
}
}
}
public void createDefaultRuleChains(TenantId tenantId) throws IOException {
Path tenantChainsDir = getTenantRuleChainsDir();
try (DirectoryStream<Path> dirStream = Files.newDirectoryStream(tenantChainsDir, path -> path.toString().endsWith(InstallScripts.JSON_EXT))) {
dirStream.forEach(
path -> {
try {
JsonNode ruleChainJson = objectMapper.readTree(path.toFile());
RuleChain ruleChain = objectMapper.treeToValue(ruleChainJson.get("ruleChain"), RuleChain.class);
RuleChainMetaData ruleChainMetaData = objectMapper.treeToValue(ruleChainJson.get("metadata"), RuleChainMetaData.class);
ruleChain.setTenantId(tenantId);
ruleChain = ruleChainService.saveRuleChain(ruleChain);
ruleChainMetaData.setRuleChainId(ruleChain.getId());
ruleChainService.saveRuleChainMetaData(ruleChainMetaData);
} catch (Exception e) {
log.error("Unable to load rule chain from json: [{}]", path.toString());
throw new RuntimeException("Unable to load rule chain from json", e);
}
}
);
}
}
public void loadSystemWidgets() throws Exception {
Path widgetBundlesDir = Paths.get(getDataDir(), JSON_DIR, SYSTEM_DIR, WIDGET_BUNDLES_DIR);
try (DirectoryStream<Path> dirStream = Files.newDirectoryStream(widgetBundlesDir, path -> path.toString().endsWith(JSON_EXT))) {
dirStream.forEach(
path -> {
try {
JsonNode widgetsBundleDescriptorJson = objectMapper.readTree(path.toFile());
JsonNode widgetsBundleJson = widgetsBundleDescriptorJson.get("widgetsBundle");
WidgetsBundle widgetsBundle = objectMapper.treeToValue(widgetsBundleJson, WidgetsBundle.class);
WidgetsBundle savedWidgetsBundle = widgetsBundleService.saveWidgetsBundle(widgetsBundle);
JsonNode widgetTypesArrayJson = widgetsBundleDescriptorJson.get("widgetTypes");
widgetTypesArrayJson.forEach(
widgetTypeJson -> {
try {
WidgetType widgetType = objectMapper.treeToValue(widgetTypeJson, WidgetType.class);
widgetType.setBundleAlias(savedWidgetsBundle.getAlias());
widgetTypeService.saveWidgetType(widgetType);
} catch (Exception e) {
log.error("Unable to load widget type from json: [{}]", path.toString());
throw new RuntimeException("Unable to load widget type from json", e);
}
}
);
} catch (Exception e) {
log.error("Unable to load widgets bundle from json: [{}]", path.toString());
throw new RuntimeException("Unable to load widgets bundle from json", e);
}
}
);
}
}
public void loadDashboards(TenantId tenantId, CustomerId customerId) throws Exception {
Path dashboardsDir = Paths.get(getDataDir(), JSON_DIR, DEMO_DIR, DASHBOARDS_DIR);
try (DirectoryStream<Path> dirStream = Files.newDirectoryStream(dashboardsDir, path -> path.toString().endsWith(JSON_EXT))) {
dirStream.forEach(
path -> {
try {
JsonNode dashboardJson = objectMapper.readTree(path.toFile());
Dashboard dashboard = objectMapper.treeToValue(dashboardJson, Dashboard.class);
dashboard.setTenantId(tenantId);
Dashboard savedDashboard = dashboardService.saveDashboard(dashboard);
if (customerId != null && !customerId.isNullUid()) {
dashboardService.assignDashboardToCustomer(savedDashboard.getId(), customerId);
}
} catch (Exception e) {
log.error("Unable to load dashboard from json: [{}]", path.toString());
throw new RuntimeException("Unable to load dashboard from json", e);
}
}
);
}
}
}

4
application/src/main/java/org/thingsboard/server/service/install/SystemDataLoaderService.java

@ -23,10 +23,6 @@ public interface SystemDataLoaderService {
void loadSystemWidgets() throws Exception;
void loadSystemPlugins() throws Exception;
void loadSystemRules() throws Exception;
void loadDemoData() throws Exception;
void deleteSystemWidgetBundle(String bundleAlias) throws Exception;

5
application/src/main/java/org/thingsboard/server/service/rpc/DefaultDeviceRpcService.java

@ -96,8 +96,10 @@ public class DefaultDeviceRpcService implements DeviceRpcService {
sendRpcRequest(request);
UUID requestId = request.getId();
localRpcRequests.put(requestId, metaData);
long timeout = Math.max(0, System.currentTimeMillis() - request.getExpirationTime());
long timeout = Math.max(0, request.getExpirationTime() - System.currentTimeMillis());
log.error("[{}] processing the request: [{}]", this.hashCode(), requestId);
rpcCallBackExecutor.schedule(() -> {
log.error("[{}] timeout the request: [{}]", this.hashCode(), requestId);
LocalRequestMetaData localMetaData = localRpcRequests.remove(requestId);
if (localMetaData != null) {
reply(localMetaData, new FromDeviceRpcResponse(requestId, null, RpcError.TIMEOUT));
@ -118,6 +120,7 @@ public class DefaultDeviceRpcService implements DeviceRpcService {
@Override
public void process(FromDeviceRpcResponse response) {
log.error("[{}] response the request: [{}]", this.hashCode(), response.getId());
//TODO: send to another server if needed.
UUID requestId = response.getId();
LocalRequestMetaData md = localRpcRequests.remove(requestId);

27
application/src/test/java/org/thingsboard/server/actors/ActorsTestSuite.java

@ -1,27 +0,0 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.actors;
import org.junit.extensions.cpsuite.ClasspathSuite;
import org.junit.runner.RunWith;
/**
* @author Andrew Shvayka
*/
@RunWith(ClasspathSuite.class)
@ClasspathSuite.ClassnameFilters({"org.thingsboard.server.actors.*Test"})
public class ActorsTestSuite {
}

245
application/src/test/java/org/thingsboard/server/actors/DefaultActorServiceTest.java

@ -1,245 +0,0 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.actors;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.*;
import com.google.common.util.concurrent.Futures;
import org.thingsboard.server.actors.service.DefaultActorService;
import org.thingsboard.server.common.data.id.*;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.page.TextPageData;
import org.thingsboard.server.common.data.plugin.ComponentDescriptor;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleState;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.msg.session.*;
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.event.EventService;
import org.thingsboard.server.gen.discovery.ServerInstanceProtos;
import org.thingsboard.server.service.cluster.discovery.DiscoveryService;
import org.thingsboard.server.service.cluster.discovery.ServerInstance;
import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
import org.thingsboard.server.service.component.ComponentDiscoveryService;
import org.thingsboard.server.common.transport.auth.DeviceAuthResult;
import org.thingsboard.server.common.transport.auth.DeviceAuthService;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.Tenant;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.kv.StringDataEntry;
import org.thingsboard.server.common.data.plugin.PluginMetaData;
import org.thingsboard.server.common.data.rule.RuleMetaData;
import org.thingsboard.server.common.data.security.DeviceCredentialsFilter;
import org.thingsboard.server.common.data.security.DeviceTokenCredentials;
import org.thingsboard.server.common.msg.core.BasicTelemetryUploadRequest;
import org.thingsboard.server.dao.device.DeviceService;
import org.thingsboard.server.dao.model.ModelConstants;
import org.thingsboard.server.dao.plugin.PluginService;
import org.thingsboard.server.dao.rule.RuleService;
import org.thingsboard.server.dao.tenant.TenantService;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.extensions.core.plugin.telemetry.TelemetryStoragePlugin;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.springframework.test.util.ReflectionTestUtils;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
public class DefaultActorServiceTest {
private static final TenantId SYSTEM_TENANT = new TenantId(ModelConstants.NULL_UUID);
private static final String PLUGIN_ID = "9fb2e951-e298-4acb-913a-db69af8a15f4";
private static final String FILTERS_CONFIGURATION =
"[{\"clazz\":\"org.thingsboard.server.extensions.core.filter.MsgTypeFilter\", \"name\":\"TelemetryFilter\", \"configuration\": {\"messageTypes\":[\"POST_TELEMETRY\",\"POST_ATTRIBUTES\",\"GET_ATTRIBUTES\"]}}]";
private static final String ACTION_CONFIGURATION = "{\"pluginToken\":\"telemetry\", \"clazz\":\"org.thingsboard.server.extensions.core.action.telemetry.TelemetryPluginAction\", \"name\":\"TelemetryMsgConverterAction\", \"configuration\":{}}";
private static final String PLUGIN_CONFIGURATION = "{}";
private DefaultActorService actorService;
private ActorSystemContext actorContext;
private PluginService pluginService;
private RuleService ruleService;
private DeviceAuthService deviceAuthService;
private DeviceService deviceService;
private TimeseriesService tsService;
private TenantService tenantService;
private ClusterRpcService rpcService;
private DiscoveryService discoveryService;
private ClusterRoutingService routingService;
private AttributesService attributesService;
private ComponentDiscoveryService componentService;
private EventService eventService;
private ServerInstance serverInstance;
private RuleMetaData ruleMock;
private PluginMetaData pluginMock;
private RuleId ruleId = new RuleId(UUID.randomUUID());
private PluginId pluginId = new PluginId(UUID.fromString(PLUGIN_ID));
private TenantId tenantId = new TenantId(UUID.randomUUID());
@Before
public void before() throws Exception {
actorService = new DefaultActorService();
actorContext = new ActorSystemContext();
tenantService = mock(TenantService.class);
pluginService = mock(PluginService.class);
ruleService = mock(RuleService.class);
deviceAuthService = mock(DeviceAuthService.class);
deviceService = mock(DeviceService.class);
tsService = mock(TimeseriesService.class);
rpcService = mock(ClusterRpcService.class);
discoveryService = mock(DiscoveryService.class);
routingService = mock(ClusterRoutingService.class);
attributesService = mock(AttributesService.class);
componentService = mock(ComponentDiscoveryService.class);
eventService = mock(EventService.class);
serverInstance = new ServerInstance(ServerInstanceProtos.ServerInfo.newBuilder().setHost("localhost").setPort(8080).build());
ReflectionTestUtils.setField(actorService, "actorContext", actorContext);
ReflectionTestUtils.setField(actorService, "rpcService", rpcService);
ReflectionTestUtils.setField(actorService, "discoveryService", discoveryService);
ReflectionTestUtils.setField(actorContext, "syncSessionTimeout", 10000L);
ReflectionTestUtils.setField(actorContext, "pluginActorTerminationDelay", 10000L);
ReflectionTestUtils.setField(actorContext, "pluginErrorPersistFrequency", 10000L);
ReflectionTestUtils.setField(actorContext, "ruleActorTerminationDelay", 10000L);
ReflectionTestUtils.setField(actorContext, "ruleErrorPersistFrequency", 10000L);
ReflectionTestUtils.setField(actorContext, "pluginProcessingTimeout", 60000L);
ReflectionTestUtils.setField(actorContext, "tenantService", tenantService);
ReflectionTestUtils.setField(actorContext, "pluginService", pluginService);
ReflectionTestUtils.setField(actorContext, "ruleService", ruleService);
ReflectionTestUtils.setField(actorContext, "deviceAuthService", deviceAuthService);
ReflectionTestUtils.setField(actorContext, "deviceService", deviceService);
ReflectionTestUtils.setField(actorContext, "tsService", tsService);
ReflectionTestUtils.setField(actorContext, "rpcService", rpcService);
ReflectionTestUtils.setField(actorContext, "discoveryService", discoveryService);
ReflectionTestUtils.setField(actorContext, "tsService", tsService);
ReflectionTestUtils.setField(actorContext, "routingService", routingService);
ReflectionTestUtils.setField(actorContext, "attributesService", attributesService);
ReflectionTestUtils.setField(actorContext, "componentService", componentService);
ReflectionTestUtils.setField(actorContext, "eventService", eventService);
when(routingService.resolveById((EntityId) any())).thenReturn(Optional.empty());
when(discoveryService.getCurrentServer()).thenReturn(serverInstance);
ruleMock = mock(RuleMetaData.class);
when(ruleMock.getId()).thenReturn(ruleId);
when(ruleMock.getState()).thenReturn(ComponentLifecycleState.ACTIVE);
when(ruleMock.getPluginToken()).thenReturn("telemetry");
TextPageData<RuleMetaData> systemRules = new TextPageData<>(Collections.emptyList(), null, false);
TextPageData<RuleMetaData> tenantRules = new TextPageData<>(Collections.singletonList(ruleMock), null, false);
when(ruleService.findSystemRules(any())).thenReturn(systemRules);
when(ruleService.findTenantRules(any(), any())).thenReturn(tenantRules);
when(ruleService.findRuleById(ruleId)).thenReturn(ruleMock);
pluginMock = mock(PluginMetaData.class);
when(pluginMock.getTenantId()).thenReturn(SYSTEM_TENANT);
when(pluginMock.getId()).thenReturn(pluginId);
when(pluginMock.getState()).thenReturn(ComponentLifecycleState.ACTIVE);
TextPageData<PluginMetaData> systemPlugins = new TextPageData<>(Collections.singletonList(pluginMock), null, false);
TextPageData<PluginMetaData> tenantPlugins = new TextPageData<>(Collections.emptyList(), null, false);
when(pluginService.findSystemPlugins(any())).thenReturn(systemPlugins);
when(pluginService.findTenantPlugins(any(), any())).thenReturn(tenantPlugins);
when(pluginService.findPluginByApiToken("telemetry")).thenReturn(pluginMock);
when(pluginService.findPluginById(pluginId)).thenReturn(pluginMock);
TextPageData<Tenant> tenants = new TextPageData<>(Collections.emptyList(), null, false);
when(tenantService.findTenants(any())).thenReturn(tenants);
}
private void initActorSystem() {
actorService.initActorSystem();
}
@After
public void after() {
actorService.stopActorSystem();
}
@Test
public void testBasicPostWithSyncSession() throws Exception {
SessionContext ssnCtx = mock(SessionContext.class);
KvEntry entry1 = new StringDataEntry("key1", "value1");
KvEntry entry2 = new StringDataEntry("key2", "value2");
BasicTelemetryUploadRequest telemetry = new BasicTelemetryUploadRequest();
long ts = 42;
telemetry.add(ts, entry1);
telemetry.add(ts, entry2);
BasicAdaptorToSessionActorMsg msg = new BasicAdaptorToSessionActorMsg(ssnCtx, telemetry);
DeviceId deviceId = new DeviceId(UUID.randomUUID());
DeviceCredentialsFilter filter = new DeviceTokenCredentials("token1");
Device device = mock(Device.class);
when(device.getId()).thenReturn(deviceId);
when(device.getTenantId()).thenReturn(tenantId);
when(ssnCtx.getSessionId()).thenReturn(new DummySessionID("session1"));
when(ssnCtx.getSessionType()).thenReturn(SessionType.SYNC);
when(deviceAuthService.process(filter)).thenReturn(DeviceAuthResult.of(deviceId));
when(deviceService.findDeviceById(deviceId)).thenReturn(device);
ObjectMapper ruleMapper = new ObjectMapper();
when(ruleMock.getFilters()).thenReturn(ruleMapper.readTree(FILTERS_CONFIGURATION));
when(ruleMock.getAction()).thenReturn(ruleMapper.readTree(ACTION_CONFIGURATION));
ComponentDescriptor filterComp = new ComponentDescriptor();
filterComp.setClazz("org.thingsboard.server.extensions.core.filter.MsgTypeFilter");
filterComp.setType(ComponentType.FILTER);
when(componentService.getComponent("org.thingsboard.server.extensions.core.filter.MsgTypeFilter"))
.thenReturn(Optional.of(filterComp));
ComponentDescriptor actionComp = new ComponentDescriptor();
actionComp.setClazz("org.thingsboard.server.extensions.core.action.telemetry.TelemetryPluginAction");
actionComp.setType(ComponentType.ACTION);
when(componentService.getComponent("org.thingsboard.server.extensions.core.action.telemetry.TelemetryPluginAction"))
.thenReturn(Optional.of(actionComp));
ObjectMapper pluginMapper = new ObjectMapper();
JsonNode pluginAdditionalInfo = pluginMapper.readTree(PLUGIN_CONFIGURATION);
when(pluginMock.getConfiguration()).thenReturn(pluginAdditionalInfo);
when(pluginMock.getClazz()).thenReturn(TelemetryStoragePlugin.class.getName());
when(attributesService.findAll(deviceId, DataConstants.CLIENT_SCOPE)).thenReturn(Futures.immediateFuture(Collections.emptyList()));
when(attributesService.findAll(deviceId, DataConstants.SHARED_SCOPE)).thenReturn(Futures.immediateFuture(Collections.emptyList()));
when(attributesService.findAll(deviceId, DataConstants.SERVER_SCOPE)).thenReturn(Futures.immediateFuture(Collections.emptyList()));
initActorSystem();
Thread.sleep(1000);
actorService.process(new BasicToDeviceActorSessionMsg(device, msg));
// Check that device data was saved to DB;
List<TsKvEntry> expected = new ArrayList<>();
expected.add(new BasicTsKvEntry(ts, entry1));
expected.add(new BasicTsKvEntry(ts, entry2));
verify(tsService, Mockito.timeout(5000)).save(deviceId, expected, 0L);
}
}

63
application/src/test/java/org/thingsboard/server/actors/DummySessionID.java

@ -1,63 +0,0 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.actors;
import org.thingsboard.server.common.data.id.SessionId;
public class DummySessionID implements SessionId {
@Override
public String toString() {
return id;
}
private final String id;
public DummySessionID(String id) {
this.id = id;
}
@Override
public String toUidStr() {
return id;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((id == null) ? 0 : id.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
DummySessionID other = (DummySessionID) obj;
if (id == null) {
if (other.id != null)
return false;
} else if (!id.equals(other.id))
return false;
return true;
}
}

17
application/src/test/java/org/thingsboard/server/controller/AbstractRuleEngineControllerTest.java

@ -16,6 +16,8 @@
package org.thingsboard.server.controller;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import org.springframework.beans.factory.annotation.Autowired;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.Event;
import org.thingsboard.server.common.data.id.EntityId;
@ -25,12 +27,18 @@ import org.thingsboard.server.common.data.page.TimePageData;
import org.thingsboard.server.common.data.page.TimePageLink;
import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.common.data.rule.RuleChainMetaData;
import org.thingsboard.server.dao.rule.RuleChainService;
import java.io.IOException;
/**
* Created by ashvayka on 20.03.18.
*/
public class AbstractRuleEngineControllerTest extends AbstractControllerTest {
@Autowired
protected RuleChainService ruleChainService;
protected RuleChain saveRuleChain(RuleChain ruleChain) throws Exception {
return doPost("/api/ruleChain", ruleChain, RuleChain.class);
}
@ -53,4 +61,13 @@ public class AbstractRuleEngineControllerTest extends AbstractControllerTest {
new TypeReference<TimePageData<Event>>() {
}, pageLink, entityId.getEntityType(), entityId.getId(), DataConstants.DEBUG_RULE_NODE, tenantId.getId());
}
protected JsonNode getMetadata(Event outEvent) {
String metaDataStr = outEvent.getBody().get("metadata").asText();
try {
return mapper.readTree(metaDataStr);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

21
application/src/test/java/org/thingsboard/server/controller/BaseComponentDescriptorControllerTest.java

@ -20,6 +20,7 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.thingsboard.rule.engine.filter.TbJsFilterNode;
import org.thingsboard.server.common.data.Tenant;
import org.thingsboard.server.common.data.User;
import org.thingsboard.server.common.data.plugin.ComponentDescriptor;
@ -35,7 +36,7 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.
public abstract class BaseComponentDescriptorControllerTest extends AbstractControllerTest {
private static final int AMOUNT_OF_DEFAULT_PLUGINS_DESCRIPTORS = 5;
private static final int AMOUNT_OF_DEFAULT_FILTER_NODES = 3;
private Tenant savedTenant;
private User tenantAdmin;
@ -69,38 +70,28 @@ public abstract class BaseComponentDescriptorControllerTest extends AbstractCont
@Test
public void testGetByClazz() throws Exception {
ComponentDescriptor descriptor =
doGet("/api/component/" + TelemetryStoragePlugin.class.getName(), ComponentDescriptor.class);
doGet("/api/component/" + TbJsFilterNode.class.getName(), ComponentDescriptor.class);
Assert.assertNotNull(descriptor);
Assert.assertNotNull(descriptor.getId());
Assert.assertNotNull(descriptor.getName());
Assert.assertEquals(ComponentScope.TENANT, descriptor.getScope());
Assert.assertEquals(ComponentType.PLUGIN, descriptor.getType());
Assert.assertEquals(ComponentType.FILTER, descriptor.getType());
Assert.assertEquals(descriptor.getClazz(), descriptor.getClazz());
}
@Test
public void testGetByType() throws Exception {
List<ComponentDescriptor> descriptors = readResponse(
doGet("/api/components/" + ComponentType.PLUGIN).andExpect(status().isOk()), new TypeReference<List<ComponentDescriptor>>() {
doGet("/api/components/" + ComponentType.FILTER).andExpect(status().isOk()), new TypeReference<List<ComponentDescriptor>>() {
});
Assert.assertNotNull(descriptors);
Assert.assertEquals(AMOUNT_OF_DEFAULT_PLUGINS_DESCRIPTORS, descriptors.size());
Assert.assertEquals(AMOUNT_OF_DEFAULT_FILTER_NODES, descriptors.size());
for (ComponentType type : ComponentType.values()) {
doGet("/api/components/" + type).andExpect(status().isOk());
}
}
@Test
public void testGetActionsByType() throws Exception {
List<ComponentDescriptor> descriptors = readResponse(
doGet("/api/components/actions/" + TelemetryStoragePlugin.class.getName()).andExpect(status().isOk()), new TypeReference<List<ComponentDescriptor>>() {
});
Assert.assertNotNull(descriptors);
Assert.assertEquals(1, descriptors.size());
Assert.assertEquals(TelemetryPluginAction.class.getName(), descriptors.get(0).getClazz());
}
}

232
application/src/test/java/org/thingsboard/server/controller/BasePluginControllerTest.java

@ -1,232 +0,0 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.controller;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.thingsboard.server.common.data.Tenant;
import org.thingsboard.server.common.data.User;
import org.thingsboard.server.common.data.page.TextPageData;
import org.thingsboard.server.common.data.page.TextPageLink;
import org.thingsboard.server.common.data.plugin.PluginMetaData;
import org.thingsboard.server.common.data.rule.RuleMetaData;
import org.thingsboard.server.common.data.security.Authority;
import org.thingsboard.server.extensions.core.plugin.telemetry.TelemetryStoragePlugin;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
public abstract class BasePluginControllerTest extends AbstractControllerTest {
private IdComparator<PluginMetaData> idComparator = new IdComparator<>();
private final ObjectMapper mapper = new ObjectMapper();
private Tenant savedTenant;
private User tenantAdmin;
@Before
public void beforeTest() throws Exception {
loginSysAdmin();
Tenant tenant = new Tenant();
tenant.setTitle("My tenant");
savedTenant = doPost("/api/tenant", tenant, Tenant.class);
Assert.assertNotNull(savedTenant);
tenantAdmin = new User();
tenantAdmin.setAuthority(Authority.TENANT_ADMIN);
tenantAdmin.setTenantId(savedTenant.getId());
tenantAdmin.setEmail("tenant2@thingsboard.org");
tenantAdmin.setFirstName("Joe");
tenantAdmin.setLastName("Downs");
tenantAdmin = createUserAndLogin(tenantAdmin, "testPassword1");
}
@After
public void afterTest() throws Exception {
loginSysAdmin();
doDelete("/api/tenant/" + savedTenant.getId().getId().toString())
.andExpect(status().isOk());
}
@Test
public void testSavePlugin() throws Exception {
PluginMetaData plugin = new PluginMetaData();
doPost("/api/plugin", plugin).andExpect(status().isBadRequest());
plugin.setName("My plugin");
doPost("/api/plugin", plugin).andExpect(status().isBadRequest());
plugin.setApiToken("myplugin");
doPost("/api/plugin", plugin).andExpect(status().isBadRequest());
plugin.setConfiguration(mapper.readTree("{}"));
doPost("/api/plugin", plugin).andExpect(status().isBadRequest());
plugin.setClazz(TelemetryStoragePlugin.class.getName());
PluginMetaData savedPlugin = doPost("/api/plugin", plugin, PluginMetaData.class);
Assert.assertNotNull(savedPlugin);
Assert.assertNotNull(savedPlugin.getId());
Assert.assertTrue(savedPlugin.getCreatedTime() > 0);
Assert.assertEquals(savedTenant.getId(), savedPlugin.getTenantId());
}
@Test
public void testFindPluginById() throws Exception {
PluginMetaData plugin = new PluginMetaData();
plugin.setName("My plugin");
plugin.setApiToken("myplugin");
plugin.setConfiguration(mapper.readTree("{}"));
plugin.setClazz(TelemetryStoragePlugin.class.getName());
PluginMetaData savedPlugin = doPost("/api/plugin", plugin, PluginMetaData.class);
PluginMetaData foundPlugin = doGet("/api/plugin/" + savedPlugin.getId().getId().toString(), PluginMetaData.class);
Assert.assertNotNull(foundPlugin);
Assert.assertEquals(savedPlugin, foundPlugin);
}
@Test
public void testActivatePlugin() throws Exception {
PluginMetaData plugin = new PluginMetaData();
plugin.setName("My plugin");
plugin.setApiToken("myplugin");
plugin.setConfiguration(mapper.readTree("{}"));
plugin.setClazz(TelemetryStoragePlugin.class.getName());
PluginMetaData savedPlugin = doPost("/api/plugin", plugin, PluginMetaData.class);
doPost("/api/plugin/" + savedPlugin.getId().getId().toString() + "/activate").andExpect(status().isOk());
}
@Test
public void testSuspendPlugin() throws Exception {
PluginMetaData plugin = new PluginMetaData();
plugin.setName("My plugin");
plugin.setApiToken("myplugin");
plugin.setConfiguration(mapper.readTree("{}"));
plugin.setClazz(TelemetryStoragePlugin.class.getName());
PluginMetaData savedPlugin = doPost("/api/plugin", plugin, PluginMetaData.class);
doPost("/api/plugin/" + savedPlugin.getId().getId().toString() + "/activate").andExpect(status().isOk());
RuleMetaData rule = BaseRuleControllerTest.createRuleMetaData(savedPlugin);
RuleMetaData savedRule = doPost("/api/rule", rule, RuleMetaData.class);
doPost("/api/rule/" + savedRule.getId().getId().toString() + "/activate").andExpect(status().isOk());
doPost("/api/plugin/" + savedPlugin.getId().getId().toString() + "/suspend").andExpect(status().isBadRequest());
doPost("/api/rule/" + savedRule.getId().getId().toString() + "/suspend").andExpect(status().isOk());
doPost("/api/plugin/" + savedPlugin.getId().getId().toString() + "/suspend").andExpect(status().isOk());
}
@Test
public void testDeletePluginById() throws Exception {
PluginMetaData plugin = new PluginMetaData();
plugin.setName("My plugin");
plugin.setApiToken("myplugin");
plugin.setConfiguration(mapper.readTree("{}"));
plugin.setClazz(TelemetryStoragePlugin.class.getName());
PluginMetaData savedPlugin = doPost("/api/plugin", plugin, PluginMetaData.class);
RuleMetaData rule = BaseRuleControllerTest.createRuleMetaData(savedPlugin);
RuleMetaData savedRule = doPost("/api/rule", rule, RuleMetaData.class);
doDelete("/api/plugin/" + savedPlugin.getId().getId()).andExpect(status().isBadRequest());
doDelete("/api/rule/" + savedRule.getId().getId()).andExpect(status().isOk());
doDelete("/api/plugin/" + savedPlugin.getId().getId()).andExpect(status().isOk());
doGet("/api/plugin/" + savedPlugin.getId().getId().toString()).andExpect(status().isNotFound());
}
@Test
public void testFindPluginByToken() throws Exception {
PluginMetaData plugin = new PluginMetaData();
plugin.setName("My plugin");
plugin.setApiToken("myplugin");
plugin.setConfiguration(mapper.readTree("{}"));
plugin.setClazz(TelemetryStoragePlugin.class.getName());
PluginMetaData savedPlugin = doPost("/api/plugin", plugin, PluginMetaData.class);
PluginMetaData foundPlugin = doGet("/api/plugin/token/" + "myplugin", PluginMetaData.class);
Assert.assertNotNull(foundPlugin);
Assert.assertEquals(savedPlugin, foundPlugin);
}
@Test
public void testFindCurrentTenantPlugins() throws Exception {
List<PluginMetaData> plugins = testPluginsCreation("/api/plugin");
for (PluginMetaData plugin : plugins) {
doDelete("/api/plugin/" + plugin.getId().getId()).andExpect(status().isOk());
}
}
@Test
public void testFindSystemPlugins() throws Exception {
loginSysAdmin();
List<PluginMetaData> plugins = testPluginsCreation("/api/plugin/system");
for (PluginMetaData plugin : plugins) {
doDelete("/api/plugin/" + plugin.getId().getId()).andExpect(status().isOk());
}
}
private List<PluginMetaData> testPluginsCreation(String url) throws Exception {
List<PluginMetaData> plugins = new ArrayList<>();
for (int i = 0; i < 111; i++) {
PluginMetaData plugin = new PluginMetaData();
plugin.setName("My plugin");
plugin.setApiToken("myplugin" + i);
plugin.setConfiguration(mapper.readTree("{}"));
plugin.setClazz(TelemetryStoragePlugin.class.getName());
plugins.add(doPost("/api/plugin", plugin, PluginMetaData.class));
}
List<PluginMetaData> loadedPlugins = new ArrayList<>();
TextPageLink pageLink = new TextPageLink(23);
TextPageData<PluginMetaData> pageData;
do {
pageData = doGetTypedWithPageLink(url + "?",
new TypeReference<TextPageData<PluginMetaData>>() {
}, pageLink);
loadedPlugins.addAll(pageData.getData());
if (pageData.hasNext()) {
pageLink = pageData.getNextPageLink();
}
} while (pageData.hasNext());
loadedPlugins = loadedPlugins.stream()
.filter(p -> !p.getName().equals("System Telemetry Plugin"))
.filter(p -> !p.getName().equals("Mail Sender Plugin"))
.filter(p -> !p.getName().equals("System RPC Plugin"))
.collect(Collectors.toList());
Collections.sort(plugins, idComparator);
Collections.sort(loadedPlugins, idComparator);
Assert.assertEquals(plugins, loadedPlugins);
return loadedPlugins;
}
}

247
application/src/test/java/org/thingsboard/server/controller/BaseRuleControllerTest.java

@ -1,247 +0,0 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.controller;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.thingsboard.server.common.data.Tenant;
import org.thingsboard.server.common.data.User;
import org.thingsboard.server.common.data.page.TextPageData;
import org.thingsboard.server.common.data.page.TextPageLink;
import org.thingsboard.server.common.data.plugin.PluginMetaData;
import org.thingsboard.server.common.data.rule.RuleMetaData;
import org.thingsboard.server.common.data.security.Authority;
import org.thingsboard.server.extensions.core.plugin.telemetry.TelemetryStoragePlugin;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
public abstract class BaseRuleControllerTest extends AbstractControllerTest {
private IdComparator<RuleMetaData> idComparator = new IdComparator<>();
private static final ObjectMapper mapper = new ObjectMapper();
private Tenant savedTenant;
private User tenantAdmin;
private PluginMetaData sysPlugin;
private PluginMetaData tenantPlugin;
@Before
public void beforeTest() throws Exception {
loginSysAdmin();
sysPlugin = new PluginMetaData();
sysPlugin.setName("Sys plugin");
sysPlugin.setApiToken("sysplugin");
sysPlugin.setConfiguration(mapper.readTree("{}"));
sysPlugin.setClazz(TelemetryStoragePlugin.class.getName());
sysPlugin = doPost("/api/plugin", sysPlugin, PluginMetaData.class);
Tenant tenant = new Tenant();
tenant.setTitle("My tenant");
savedTenant = doPost("/api/tenant", tenant, Tenant.class);
Assert.assertNotNull(savedTenant);
tenantAdmin = new User();
tenantAdmin.setAuthority(Authority.TENANT_ADMIN);
tenantAdmin.setTenantId(savedTenant.getId());
tenantAdmin.setEmail("tenant2@thingsboard.org");
tenantAdmin.setFirstName("Joe");
tenantAdmin.setLastName("Downs");
tenantAdmin = createUserAndLogin(tenantAdmin, "testPassword1");
tenantPlugin = new PluginMetaData();
tenantPlugin.setName("My plugin");
tenantPlugin.setApiToken("myplugin");
tenantPlugin.setConfiguration(mapper.readTree("{}"));
tenantPlugin.setClazz(TelemetryStoragePlugin.class.getName());
tenantPlugin = doPost("/api/plugin", tenantPlugin, PluginMetaData.class);
}
@After
public void afterTest() throws Exception {
loginSysAdmin();
doDelete("/api/tenant/" + savedTenant.getId().getId().toString())
.andExpect(status().isOk());
doDelete("/api/plugin/" + sysPlugin.getId().getId()).andExpect(status().isOk());
}
@Test
public void testSaveRule() throws Exception {
RuleMetaData rule = new RuleMetaData();
doPost("/api/rule", rule).andExpect(status().isBadRequest());
rule.setName("My Rule");
doPost("/api/rule", rule).andExpect(status().isBadRequest());
rule.setPluginToken(tenantPlugin.getApiToken());
doPost("/api/rule", rule).andExpect(status().isBadRequest());
rule.setFilters(mapper.readTree("[{\"clazz\":\"org.thingsboard.server.extensions.core.filter.MsgTypeFilter\", " +
"\"name\":\"TelemetryFilter\", " +
"\"configuration\": {\"messageTypes\":[\"POST_TELEMETRY\",\"POST_ATTRIBUTES\",\"GET_ATTRIBUTES\"]}}]"));
doPost("/api/rule", rule).andExpect(status().isBadRequest());
rule.setAction(mapper.readTree("{\"clazz\":\"org.thingsboard.server.extensions.core.action.telemetry.TelemetryPluginAction\", \"name\":\"TelemetryMsgConverterAction\", \"configuration\":{\"timeUnit\":\"DAYS\", \"ttlValue\":1}}"));
RuleMetaData savedRule = doPost("/api/rule", rule, RuleMetaData.class);
Assert.assertNotNull(savedRule);
Assert.assertNotNull(savedRule.getId());
Assert.assertTrue(savedRule.getCreatedTime() > 0);
Assert.assertEquals(savedTenant.getId(), savedRule.getTenantId());
}
@Test
public void testFindRuleById() throws Exception {
RuleMetaData rule = createRuleMetaData(tenantPlugin);
RuleMetaData savedRule = doPost("/api/rule", rule, RuleMetaData.class);
RuleMetaData foundRule = doGet("/api/rule/" + savedRule.getId().getId().toString(), RuleMetaData.class);
Assert.assertNotNull(foundRule);
Assert.assertEquals(savedRule, foundRule);
}
@Test
public void testFindRuleByPluginToken() throws Exception {
RuleMetaData rule = createRuleMetaData(tenantPlugin);
RuleMetaData savedRule = doPost("/api/rule", rule, RuleMetaData.class);
List<RuleMetaData> foundRules = doGetTyped("/api/rule/token/" + savedRule.getPluginToken(),
new TypeReference<List<RuleMetaData>>() {
});
Assert.assertNotNull(foundRules);
Assert.assertEquals(1, foundRules.size());
Assert.assertEquals(savedRule, foundRules.get(0));
}
@Test
public void testActivateRule() throws Exception {
RuleMetaData rule = createRuleMetaData(tenantPlugin);
RuleMetaData savedRule = doPost("/api/rule", rule, RuleMetaData.class);
doPost("/api/rule/" + savedRule.getId().getId().toString() + "/activate").andExpect(status().isBadRequest());
doPost("/api/plugin/" + tenantPlugin.getId().getId().toString() + "/activate").andExpect(status().isOk());
doPost("/api/rule/" + savedRule.getId().getId().toString() + "/activate").andExpect(status().isOk());
}
@Test
public void testSuspendRule() throws Exception {
RuleMetaData rule = createRuleMetaData(tenantPlugin);
RuleMetaData savedRule = doPost("/api/rule", rule, RuleMetaData.class);
doPost("/api/plugin/" + tenantPlugin.getId().getId().toString() + "/activate").andExpect(status().isOk());
doPost("/api/rule/" + savedRule.getId().getId().toString() + "/activate").andExpect(status().isOk());
doPost("/api/rule/" + savedRule.getId().getId().toString() + "/suspend").andExpect(status().isOk());
}
@Test
public void testFindSystemRules() throws Exception {
loginSysAdmin();
List<RuleMetaData> rules = testRulesCreation("/api/rule/system", sysPlugin);
for (RuleMetaData rule : rules) {
doDelete("/api/rule/" + rule.getId().getId()).andExpect(status().isOk());
}
loginTenantAdmin();
}
@Test
public void testFindCurrentTenantPlugins() throws Exception {
List<RuleMetaData> rules = testRulesCreation("/api/rule", tenantPlugin);
for (RuleMetaData rule : rules) {
doDelete("/api/rule/" + rule.getId().getId()).andExpect(status().isOk());
}
}
@Test
public void testFindTenantPlugins() throws Exception {
List<RuleMetaData> rules = testRulesCreation("/api/rule", tenantPlugin);
loginSysAdmin();
List<RuleMetaData> loadedRules = new ArrayList<>();
TextPageLink pageLink = new TextPageLink(3);
TextPageData<RuleMetaData> pageData;
do {
pageData = doGetTypedWithPageLink("/api/rule/tenant/" + savedTenant.getId().getId().toString() + "?",
new TypeReference<TextPageData<RuleMetaData>>() {
}, pageLink);
loadedRules.addAll(pageData.getData());
if (pageData.hasNext()) {
pageLink = pageData.getNextPageLink();
}
} while (pageData.hasNext());
Collections.sort(rules, idComparator);
Collections.sort(loadedRules, idComparator);
Assert.assertEquals(rules, loadedRules);
for (RuleMetaData rule : rules) {
doDelete("/api/rule/" + rule.getId().getId()).andExpect(status().isOk());
}
}
private List<RuleMetaData> testRulesCreation(String url, PluginMetaData plugin) throws Exception {
List<RuleMetaData> rules = new ArrayList<>();
for (int i = 0; i < 6; i++) {
RuleMetaData rule = createRuleMetaData(plugin);
rule.setPluginToken(plugin.getApiToken());
rule.setName(rule.getName() + i);
rules.add(doPost("/api/rule", rule, RuleMetaData.class));
}
List<RuleMetaData> loadedRules = new ArrayList<>();
TextPageLink pageLink = new TextPageLink(3);
TextPageData<RuleMetaData> pageData;
do {
pageData = doGetTypedWithPageLink(url + "?",
new TypeReference<TextPageData<RuleMetaData>>() {
}, pageLink);
loadedRules.addAll(pageData.getData());
if (pageData.hasNext()) {
pageLink = pageData.getNextPageLink();
}
} while (pageData.hasNext());
loadedRules = loadedRules.stream().filter(p -> !p.getName().equals("System Telemetry Rule")).collect(Collectors.toList());
Collections.sort(rules, idComparator);
Collections.sort(loadedRules, idComparator);
Assert.assertEquals(rules, loadedRules);
return loadedRules;
}
public static RuleMetaData createRuleMetaData(PluginMetaData plugin) throws IOException {
RuleMetaData rule = new RuleMetaData();
rule.setName("My Rule");
rule.setPluginToken(plugin.getApiToken());
rule.setFilters(mapper.readTree("[{\"clazz\":\"org.thingsboard.server.extensions.core.filter.MsgTypeFilter\", " +
"\"name\":\"TelemetryFilter\", " +
"\"configuration\": {\"messageTypes\":[\"POST_TELEMETRY\",\"POST_ATTRIBUTES\",\"GET_ATTRIBUTES\"]}}]"));
rule.setAction(mapper.readTree("{\"clazz\":\"org.thingsboard.server.extensions.core.action.telemetry.TelemetryPluginAction\", \"name\":\"TelemetryMsgConverterAction\", " +
"\"configuration\":{\"timeUnit\":\"DAYS\", \"ttlValue\":1}}"));
return rule;
}
}

26
application/src/test/java/org/thingsboard/server/controller/nosql/PluginControllerNoSqlTest.java

@ -1,26 +0,0 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.controller.nosql;
import org.thingsboard.server.controller.BasePluginControllerTest;
import org.thingsboard.server.dao.service.DaoNoSqlTest;
/**
* Created by Valerii Sosliuk on 6/28/2017.
*/
@DaoNoSqlTest
public class PluginControllerNoSqlTest extends BasePluginControllerTest {
}

26
application/src/test/java/org/thingsboard/server/controller/nosql/RuleControllerNoSqlTest.java

@ -1,26 +0,0 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.controller.nosql;
import org.thingsboard.server.controller.BaseRuleControllerTest;
import org.thingsboard.server.dao.service.DaoNoSqlTest;
/**
* Created by Valerii Sosliuk on 6/28/2017.
*/
@DaoNoSqlTest
public class RuleControllerNoSqlTest extends BaseRuleControllerTest {
}

26
application/src/test/java/org/thingsboard/server/controller/sql/PluginControllerSqlTest.java

@ -1,26 +0,0 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.controller.sql;
import org.thingsboard.server.controller.BasePluginControllerTest;
import org.thingsboard.server.dao.service.DaoSqlTest;
/**
* Created by Valerii Sosliuk on 6/28/2017.
*/
@DaoSqlTest
public class PluginControllerSqlTest extends BasePluginControllerTest {
}

2
application/src/test/java/org/thingsboard/server/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java

@ -80,6 +80,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC
}
}
@Ignore
@Test
public void testServerMqttOneWayRpc() throws Exception {
Device device = new Device();
@ -106,6 +107,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC
Assert.assertTrue(StringUtils.isEmpty(result));
}
@Ignore
@Test
public void testServerMqttOneWayRpcDeviceOffline() throws Exception {
Device device = new Device();

3
application/src/test/java/org/thingsboard/server/rules/RuleEngineSqlTestSuite.java

@ -24,7 +24,8 @@ import java.util.Arrays;
@RunWith(ClasspathSuite.class)
@ClasspathSuite.ClassnameFilters({
"org.thingsboard.server.rules.flow.*Test"})
"org.thingsboard.server.rules.flow.*Test",
"org.thingsboard.server.rules.lifecycle.*Test"})
public class RuleEngineSqlTestSuite {
@ClassRule

13
application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java

@ -16,6 +16,7 @@
package org.thingsboard.server.rules.flow;
import com.datastax.driver.core.utils.UUIDs;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.junit.After;
@ -28,6 +29,7 @@ import org.thingsboard.server.actors.service.ActorService;
import org.thingsboard.server.common.data.*;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
import org.thingsboard.server.common.data.kv.StringDataEntry;
import org.thingsboard.server.common.data.page.TextPageLink;
import org.thingsboard.server.common.data.page.TimePageData;
import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.common.data.rule.RuleChainMetaData;
@ -40,6 +42,7 @@ import org.thingsboard.server.controller.AbstractRuleEngineControllerTest;
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.rule.RuleChainService;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
@ -60,9 +63,6 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule
@Autowired
protected AttributesService attributesService;
@Autowired
protected RuleChainService ruleChainService;
@Before
public void beforeTest() throws Exception {
loginSysAdmin();
@ -71,6 +71,7 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule
tenant.setTitle("My tenant");
savedTenant = doPost("/api/tenant", tenant, Tenant.class);
Assert.assertNotNull(savedTenant);
ruleChainService.deleteRuleChainsByTenantId(savedTenant.getId());
tenantAdmin = new User();
tenantAdmin.setAuthority(Authority.TENANT_ADMIN);
@ -166,7 +167,7 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule
Assert.assertEquals(ruleChain.getFirstRuleNodeId(), outEvent.getEntityId());
Assert.assertEquals(device.getId().getId().toString(), outEvent.getBody().get("entityId").asText());
Assert.assertEquals("serverAttributeValue1", outEvent.getBody().get("metadata").get("ss.serverAttributeKey1").asText());
Assert.assertEquals("serverAttributeValue1", getMetadata(outEvent).get("ss_serverAttributeKey1").asText());
RuleChain finalRuleChain = ruleChain;
RuleNode lastRuleNode = metaData.getNodes().stream().filter(node -> !node.getId().equals(finalRuleChain.getFirstRuleNodeId())).findFirst().get();
@ -183,8 +184,8 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule
Assert.assertEquals(lastRuleNode.getId(), outEvent.getEntityId());
Assert.assertEquals(device.getId().getId().toString(), outEvent.getBody().get("entityId").asText());
Assert.assertEquals("serverAttributeValue1", outEvent.getBody().get("metadata").get("ss.serverAttributeKey1").asText());
Assert.assertEquals("serverAttributeValue2", outEvent.getBody().get("metadata").get("ss.serverAttributeKey2").asText());
Assert.assertEquals("serverAttributeValue1", getMetadata(outEvent).get("ss_serverAttributeKey1").asText());
Assert.assertEquals("serverAttributeValue2", getMetadata(outEvent).get("ss_serverAttributeKey2").asText());
}
}

5
application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java

@ -16,6 +16,7 @@
package org.thingsboard.server.rules.lifecycle;
import com.datastax.driver.core.utils.UUIDs;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.extern.slf4j.Slf4j;
import org.junit.After;
import org.junit.Assert;
@ -42,6 +43,7 @@ import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
import org.thingsboard.server.controller.AbstractRuleEngineControllerTest;
import org.thingsboard.server.dao.attributes.AttributesService;
import java.io.IOException;
import java.util.Collections;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
@ -69,6 +71,7 @@ public abstract class AbstractRuleEngineLifecycleIntegrationTest extends Abstrac
tenant.setTitle("My tenant");
savedTenant = doPost("/api/tenant", tenant, Tenant.class);
Assert.assertNotNull(savedTenant);
ruleChainService.deleteRuleChainsByTenantId(savedTenant.getId());
tenantAdmin = new User();
tenantAdmin.setAuthority(Authority.TENANT_ADMIN);
@ -152,7 +155,7 @@ public abstract class AbstractRuleEngineLifecycleIntegrationTest extends Abstrac
Assert.assertEquals(ruleChain.getFirstRuleNodeId(), outEvent.getEntityId());
Assert.assertEquals(device.getId().getId().toString(), outEvent.getBody().get("entityId").asText());
Assert.assertEquals("serverAttributeValue", outEvent.getBody().get("metadata").get("ss.serverAttributeKey").asText());
Assert.assertEquals("serverAttributeValue", getMetadata(outEvent).get("ss_serverAttributeKey").asText());
}
}

1
application/src/test/java/org/thingsboard/server/system/SystemSqlTestSuite.java

@ -35,5 +35,4 @@ public class SystemSqlTestSuite {
"sql/drop-all-tables.sql",
"sql-test.properties");
}

2
common/message/src/main/java/org/thingsboard/server/common/msg/core/UpdateAttributesRequest.java → common/message/src/main/java/org/thingsboard/server/common/msg/core/AttributesUpdateRequest.java

@ -21,7 +21,7 @@ import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.msg.session.FromDeviceMsg;
import org.thingsboard.server.common.msg.session.FromDeviceRequestMsg;
public interface UpdateAttributesRequest extends FromDeviceRequestMsg {
public interface AttributesUpdateRequest extends FromDeviceRequestMsg {
Set<AttributeKvEntry> getAttributes();

9
common/message/src/main/java/org/thingsboard/server/common/msg/core/BasicUpdateAttributesRequest.java → common/message/src/main/java/org/thingsboard/server/common/msg/core/BasicAttributesUpdateRequest.java

@ -21,19 +21,18 @@ import java.util.Set;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.msg.session.SessionMsgType;
import org.thingsboard.server.common.msg.session.SessionMsgType;
public class BasicUpdateAttributesRequest extends BasicRequest implements UpdateAttributesRequest {
public class BasicAttributesUpdateRequest extends BasicRequest implements AttributesUpdateRequest {
private static final long serialVersionUID = 1L;
private final Set<AttributeKvEntry> data;
public BasicUpdateAttributesRequest() {
public BasicAttributesUpdateRequest() {
this(DEFAULT_REQUEST_ID);
}
public BasicUpdateAttributesRequest(Integer requestId) {
public BasicAttributesUpdateRequest(Integer requestId) {
super(requestId);
this.data = new LinkedHashSet<>();
}
@ -58,7 +57,7 @@ public class BasicUpdateAttributesRequest extends BasicRequest implements Update
@Override
public String toString() {
return "BasicUpdateAttributesRequest [data=" + data + "]";
return "BasicAttributesUpdateRequest [data=" + data + "]";
}
}

2
common/message/src/main/java/org/thingsboard/server/common/msg/core/RuleEngineError.java

@ -21,7 +21,7 @@ package org.thingsboard.server.common.msg.core;
public enum RuleEngineError {
NO_RULES, NO_ACTIVE_RULES, NO_FILTERS_MATCHED, NO_REQUEST_FROM_ACTIONS, NO_TWO_WAY_ACTIONS, NO_RESPONSE_FROM_ACTIONS, QUEUE_PUT_TIMEOUT(true);
QUEUE_PUT_TIMEOUT(true), SERVER_ERROR(true);
private final boolean critical;

16
common/message/src/main/java/org/thingsboard/server/common/msg/core/RuleEngineErrorMsg.java

@ -40,20 +40,10 @@ public class RuleEngineErrorMsg implements ToDeviceMsg {
public String getErrorMsg() {
switch (error) {
case NO_RULES:
return "No rules configured!";
case NO_ACTIVE_RULES:
return "No active rules!";
case NO_FILTERS_MATCHED:
return "No rules that match current message!";
case NO_REQUEST_FROM_ACTIONS:
return "Rule filters match, but no plugin message produced by rule action!";
case NO_TWO_WAY_ACTIONS:
return "Rule filters match, but no rule with two-way action configured!";
case NO_RESPONSE_FROM_ACTIONS:
return "Rule filters match, message processed by plugin, but no response produced by rule action!";
case QUEUE_PUT_TIMEOUT:
return "Timeout during processing of message by plugin!";
return "Timeout during persistence of the message to the queue!";
case SERVER_ERROR:
return "Error during processing of message by the server!";
default:
throw new RuntimeException("Error " + error + " is not supported!");
}

6
common/transport/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java

@ -118,13 +118,13 @@ public class JsonConverter {
}
}
public static UpdateAttributesRequest convertToAttributes(JsonElement element) {
public static AttributesUpdateRequest convertToAttributes(JsonElement element) {
return convertToAttributes(element, BasicRequest.DEFAULT_REQUEST_ID);
}
public static UpdateAttributesRequest convertToAttributes(JsonElement element, int requestId) {
public static AttributesUpdateRequest convertToAttributes(JsonElement element, int requestId) {
if (element.isJsonObject()) {
BasicUpdateAttributesRequest request = new BasicUpdateAttributesRequest(requestId);
BasicAttributesUpdateRequest request = new BasicAttributesUpdateRequest(requestId);
long ts = System.currentTimeMillis();
request.add(parseValues(element.getAsJsonObject()).stream().map(kv -> new BaseAttributeKvEntry(kv, ts)).collect(Collectors.toList()));
return request;

2
dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java

@ -75,8 +75,6 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC
log.trace("Save system rule chain with predefined id {}", SYSTEM_TENANT);
ruleChain.setTenantId(SYSTEM_TENANT);
}
//TODO: Temporary Hack to continue tests;
ruleChain.setRoot(true);
RuleChain savedRuleChain = ruleChainDao.save(ruleChain);
if (ruleChain.isRoot() && ruleChain.getTenantId() != null && ruleChain.getId() == null) {
try {

6
extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/msg/UpdateAttributesRequestRuleToPluginMsg.java

@ -18,13 +18,13 @@ package org.thingsboard.server.extensions.api.plugins.msg;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.core.UpdateAttributesRequest;
import org.thingsboard.server.common.msg.core.AttributesUpdateRequest;
public class UpdateAttributesRequestRuleToPluginMsg extends AbstractRuleToPluginMsg<UpdateAttributesRequest> {
public class UpdateAttributesRequestRuleToPluginMsg extends AbstractRuleToPluginMsg<AttributesUpdateRequest> {
private static final long serialVersionUID = 1L;
public UpdateAttributesRequestRuleToPluginMsg(TenantId tenantId, CustomerId customerId, DeviceId deviceId, UpdateAttributesRequest payload) {
public UpdateAttributesRequestRuleToPluginMsg(TenantId tenantId, CustomerId customerId, DeviceId deviceId, AttributesUpdateRequest payload) {
super(tenantId, customerId, deviceId, payload);
}

5
extensions-core/src/main/java/org/thingsboard/server/extensions/core/action/telemetry/TelemetryPluginAction.java

@ -18,11 +18,10 @@ package org.thingsboard.server.extensions.core.action.telemetry;
import org.springframework.util.StringUtils;
import org.thingsboard.server.common.msg.core.GetAttributesRequest;
import org.thingsboard.server.common.msg.core.TelemetryUploadRequest;
import org.thingsboard.server.common.msg.core.UpdateAttributesRequest;
import org.thingsboard.server.common.msg.core.AttributesUpdateRequest;
import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg;
import org.thingsboard.server.common.msg.session.FromDeviceMsg;
import org.thingsboard.server.common.msg.session.SessionMsgType;
import org.thingsboard.server.common.msg.session.SessionMsgType;
import org.thingsboard.server.common.msg.session.ToDeviceMsg;
import org.thingsboard.server.extensions.api.component.Action;
import org.thingsboard.server.extensions.api.plugins.PluginAction;
@ -58,7 +57,7 @@ public class TelemetryPluginAction extends SimpleRuleLifecycleComponent implemen
return Optional.of(new TelemetryUploadRequestRuleToPluginMsg(deviceToDeviceActorMsg.getTenantId(), deviceToDeviceActorMsg.getCustomerId(),
deviceToDeviceActorMsg.getDeviceId(), payload, ttl));
} else if (msg.getMsgType() == SessionMsgType.POST_ATTRIBUTES_REQUEST) {
UpdateAttributesRequest payload = (UpdateAttributesRequest) msg;
AttributesUpdateRequest payload = (AttributesUpdateRequest) msg;
return Optional.of(new UpdateAttributesRequestRuleToPluginMsg(deviceToDeviceActorMsg.getTenantId(), deviceToDeviceActorMsg.getCustomerId(),
deviceToDeviceActorMsg.getDeviceId(), payload));
} else if (msg.getMsgType() == SessionMsgType.GET_ATTRIBUTES_REQUEST) {

4
extensions-core/src/main/java/org/thingsboard/server/extensions/core/filter/DeviceAttributesFilter.java

@ -16,7 +16,7 @@
package org.thingsboard.server.extensions.core.filter;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.msg.core.UpdateAttributesRequest;
import org.thingsboard.server.common.msg.core.AttributesUpdateRequest;
import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg;
import org.thingsboard.server.common.msg.session.FromDeviceMsg;
import org.thingsboard.server.extensions.api.component.Filter;
@ -44,7 +44,7 @@ public class DeviceAttributesFilter extends BasicJsFilter {
if (msg != null) {
switch (msg.getMsgType()) {
case POST_ATTRIBUTES_REQUEST:
bindings = NashornJsEvaluator.updateBindings(bindings, (UpdateAttributesRequest) msg);
bindings = NashornJsEvaluator.updateBindings(bindings, (AttributesUpdateRequest) msg);
break;
default:
break;

4
extensions-core/src/main/java/org/thingsboard/server/extensions/core/filter/NashornJsEvaluator.java

@ -19,7 +19,7 @@ import jdk.nashorn.api.scripting.NashornScriptEngineFactory;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.msg.core.UpdateAttributesRequest;
import org.thingsboard.server.common.msg.core.AttributesUpdateRequest;
import org.thingsboard.server.extensions.api.device.DeviceAttributes;
import javax.script.*;
@ -69,7 +69,7 @@ public class NashornJsEvaluator {
return bindings;
}
public static Bindings updateBindings(Bindings bindings, UpdateAttributesRequest msg) {
public static Bindings updateBindings(Bindings bindings, AttributesUpdateRequest msg) {
Map<String, Object> attrMap = (Map<String, Object>) bindings.get(CLIENT_SIDE);
for (AttributeKvEntry attr : msg.getAttributes()) {
if (!CLIENT_SIDE.equalsIgnoreCase(attr.getKey()) && !SERVER_SIDE.equalsIgnoreCase(attr.getKey())

4
extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRuleMsgHandler.java

@ -28,7 +28,7 @@ import org.thingsboard.server.common.msg.core.BasicGetAttributesResponse;
import org.thingsboard.server.common.msg.core.BasicStatusCodeResponse;
import org.thingsboard.server.common.msg.core.GetAttributesRequest;
import org.thingsboard.server.common.msg.core.TelemetryUploadRequest;
import org.thingsboard.server.common.msg.core.UpdateAttributesRequest;
import org.thingsboard.server.common.msg.core.AttributesUpdateRequest;
import org.thingsboard.server.common.msg.kv.BasicAttributeKVMsg;
import org.thingsboard.server.extensions.api.plugins.PluginCallback;
import org.thingsboard.server.extensions.api.plugins.PluginContext;
@ -132,7 +132,7 @@ public class TelemetryRuleMsgHandler extends DefaultRuleMsgHandler {
@Override
public void handleUpdateAttributesRequest(PluginContext ctx, TenantId tenantId, RuleId ruleId, UpdateAttributesRequestRuleToPluginMsg msg) {
UpdateAttributesRequest request = msg.getPayload();
AttributesUpdateRequest request = msg.getPayload();
ctx.saveAttributes(msg.getTenantId(), msg.getDeviceId(), DataConstants.CLIENT_SCOPE, request.getAttributes().stream().collect(Collectors.toList()),
new PluginCallback<Void>() {
@Override

4
extensions-core/src/main/java/org/thingsboard/server/extensions/core/processor/AlarmProcessor.java

@ -26,7 +26,7 @@ import org.thingsboard.server.common.data.alarm.AlarmSeverity;
import org.thingsboard.server.common.data.alarm.AlarmStatus;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.msg.core.TelemetryUploadRequest;
import org.thingsboard.server.common.msg.core.UpdateAttributesRequest;
import org.thingsboard.server.common.msg.core.AttributesUpdateRequest;
import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg;
import org.thingsboard.server.common.msg.session.FromDeviceMsg;
import org.thingsboard.server.extensions.api.component.Processor;
@ -219,7 +219,7 @@ public class AlarmProcessor implements RuleProcessor<AlarmProcessorConfiguration
if (msg != null) {
switch (msg.getMsgType()) {
case POST_ATTRIBUTES_REQUEST:
bindings = NashornJsEvaluator.updateBindings(bindings, (UpdateAttributesRequest) msg);
bindings = NashornJsEvaluator.updateBindings(bindings, (AttributesUpdateRequest) msg);
break;
case POST_TELEMETRY_REQUEST:
TelemetryUploadRequest telemetryMsg = (TelemetryUploadRequest) msg;

8
extensions-core/src/main/resources/MsgTypeFilterDescriptor.json

@ -9,19 +9,19 @@
"minItems" : 1,
"items": [
{
"value": "GET_ATTRIBUTES",
"value": "GET_ATTRIBUTES_REQUEST",
"label": "Get attributes"
},
{
"value": "POST_ATTRIBUTES",
"value": "POST_ATTRIBUTES_REQUEST",
"label": "Post attributes"
},
{
"value": "POST_TELEMETRY",
"value": "POST_TELEMETRY_REQUEST",
"label": "Post telemetry"
},
{
"value": "RPC_REQUEST",
"value": "RPC_REQUEST_REQUEST",
"label": "RPC Request"
}
],

2
extensions/extension-kafka/src/main/java/org/thingsboard/server/extensions/kafka/action/KafkaPluginAction.java

@ -32,7 +32,7 @@ public class KafkaPluginAction extends AbstractTemplatePluginAction<KafkaPluginA
@Override
protected Optional<RuleToPluginMsg> buildRuleToPluginMsg(RuleContext ctx, DeviceToDeviceActorMsg msg, FromDeviceRequestMsg payload) {
KafkaActionPayload.KafkaActionPayloadBuilder builder = KafkaActionPayload.builder();
builder.msgType(payload.getMsgType());
builder.sessionMsgType(payload.getMsgType());
builder.requestId(payload.getRequestId());
builder.sync(configuration.isSync());
builder.topic(configuration.getTopic());

4
extensions/extension-kafka/src/main/java/org/thingsboard/server/extensions/kafka/plugin/KafkaMsgHandler.java

@ -49,10 +49,10 @@ public class KafkaMsgHandler implements RuleMsgHandler {
if (payload.isSync()) {
if (metadata != null) {
ctx.reply(new ResponsePluginToRuleMsg(msg.getUid(), tenantId, ruleId,
BasicStatusCodeResponse.onSuccess(payload.getMsgType(), payload.getRequestId())));
BasicStatusCodeResponse.onSuccess(payload.getSessionMsgType(), payload.getRequestId())));
} else {
ctx.reply(new ResponsePluginToRuleMsg(msg.getUid(), tenantId, ruleId,
BasicStatusCodeResponse.onError(payload.getMsgType(), payload.getRequestId(), e)));
BasicStatusCodeResponse.onError(payload.getSessionMsgType(), payload.getRequestId(), e)));
}
}
});

2
extensions/extension-mqtt/src/main/java/org/thingsboard/server/extensions/mqtt/action/MqttPluginAction.java

@ -31,7 +31,7 @@ public class MqttPluginAction extends AbstractTemplatePluginAction<MqttPluginAct
protected Optional<RuleToPluginMsg> buildRuleToPluginMsg(RuleContext ctx, DeviceToDeviceActorMsg msg, FromDeviceRequestMsg payload) {
MqttActionPayload.MqttActionPayloadBuilder builder = MqttActionPayload.builder();
builder.sync(configuration.isSync());
builder.msgType(payload.getMsgType());
builder.sessionMsgType(payload.getMsgType());
builder.requestId(payload.getRequestId());
builder.topic(configuration.getTopic());
builder.msgBody(getMsgBody(ctx, msg));

4
extensions/extension-mqtt/src/main/java/org/thingsboard/server/extensions/mqtt/plugin/MqttMsgHandler.java

@ -51,7 +51,7 @@ public class MqttMsgHandler implements RuleMsgHandler {
log.debug("Message [{}] was successfully delivered to topic [{}]!", msg.toString(), payload.getTopic());
if (payload.isSync()) {
ctx.reply(new ResponsePluginToRuleMsg(msg.getUid(), tenantId, ruleId,
BasicStatusCodeResponse.onSuccess(payload.getMsgType(), payload.getRequestId())));
BasicStatusCodeResponse.onSuccess(payload.getSessionMsgType(), payload.getRequestId())));
}
}
@Override
@ -59,7 +59,7 @@ public class MqttMsgHandler implements RuleMsgHandler {
log.warn("Failed to deliver message [{}] to topic [{}]!", msg.toString(), payload.getTopic());
if (payload.isSync()) {
ctx.reply(new ResponsePluginToRuleMsg(msg.getUid(), tenantId, ruleId,
BasicStatusCodeResponse.onError(payload.getMsgType(), payload.getRequestId(), new Exception(e))));
BasicStatusCodeResponse.onError(payload.getSessionMsgType(), payload.getRequestId(), new Exception(e))));
}
}
});

2
extensions/extension-rabbitmq/src/main/java/org/thingsboard/server/extensions/rabbitmq/action/RabbitMqPluginAction.java

@ -38,7 +38,7 @@ public class RabbitMqPluginAction extends AbstractTemplatePluginAction<RabbitMqP
builder.exchange(configuration.getExchange());
builder.queueName(configuration.getQueueName());
builder.messageProperties(configuration.getMessageProperties());
builder.msgType(payload.getMsgType());
builder.sessionMsgType(payload.getMsgType());
builder.requestId(payload.getRequestId());
builder.payload(getMsgBody(ctx, msg));
return Optional.of(new RabbitMqActionMsg(msg.getTenantId(),

2
extensions/extension-rabbitmq/src/main/java/org/thingsboard/server/extensions/rabbitmq/plugin/RabbitMqMsgHandler.java

@ -57,7 +57,7 @@ public class RabbitMqMsgHandler implements RuleMsgHandler {
payload.getPayload().getBytes(UTF8));
if (payload.isSync()) {
ctx.reply(new ResponsePluginToRuleMsg(msg.getUid(), tenantId, ruleId,
BasicStatusCodeResponse.onSuccess(payload.getMsgType(), payload.getRequestId())));
BasicStatusCodeResponse.onSuccess(payload.getSessionMsgType(), payload.getRequestId())));
}
} catch (IOException e) {
throw new RuleException(e.getMessage(), e);

2
extensions/extension-rest-api-call/src/main/java/org/thingsboard/server/extensions/rest/action/RestApiCallPluginAction.java

@ -35,7 +35,7 @@ public class RestApiCallPluginAction extends AbstractTemplatePluginAction<RestAp
@Override
protected Optional<RuleToPluginMsg> buildRuleToPluginMsg(RuleContext ctx, DeviceToDeviceActorMsg msg, FromDeviceRequestMsg payload) {
RestApiCallActionPayload.RestApiCallActionPayloadBuilder builder = RestApiCallActionPayload.builder();
builder.msgType(payload.getMsgType());
builder.sessionMsgType(payload.getMsgType());
builder.requestId(payload.getRequestId());
builder.sync(configuration.isSync());
builder.actionPath(configuration.getActionPath());

2
extensions/extension-rest-api-call/src/main/java/org/thingsboard/server/extensions/rest/plugin/RestApiCallMsgHandler.java

@ -52,7 +52,7 @@ public class RestApiCallMsgHandler implements RuleMsgHandler {
String.class);
if (exchangeResponse.getStatusCode().equals(payload.getExpectedResultCode()) && payload.isSync()) {
ctx.reply(new ResponsePluginToRuleMsg(msg.getUid(), tenantId, ruleId,
BasicStatusCodeResponse.onSuccess(payload.getMsgType(), payload.getRequestId())));
BasicStatusCodeResponse.onSuccess(payload.getSessionMsgType(), payload.getRequestId())));
} else if(!exchangeResponse.getStatusCode().equals(payload.getExpectedResultCode())) {
throw new RuntimeException("Response Status Code '"
+ exchangeResponse.getStatusCode()

2
extensions/extension-sns/src/main/java/org/thingsboard/server/extensions/sns/action/SnsTopicPluginAction.java

@ -33,7 +33,7 @@ public class SnsTopicPluginAction extends AbstractTemplatePluginAction<SnsTopicP
@Override
protected Optional<RuleToPluginMsg> buildRuleToPluginMsg(RuleContext ctx, DeviceToDeviceActorMsg msg, FromDeviceRequestMsg payload) {
SnsTopicActionPayload.SnsTopicActionPayloadBuilder builder = SnsTopicActionPayload.builder();
builder.msgType(payload.getMsgType());
builder.sessionMsgType(payload.getMsgType());
builder.requestId(payload.getRequestId());
builder.topicArn(configuration.getTopicArn());
builder.msgBody(getMsgBody(ctx, msg));

2
extensions/extension-sns/src/main/java/org/thingsboard/server/extensions/sns/plugin/SnsMessageHandler.java

@ -52,7 +52,7 @@ public class SnsMessageHandler implements RuleMsgHandler {
sns.publish(publishRequest);
if (payload.isSync()) {
ctx.reply(new ResponsePluginToRuleMsg(msg.getUid(), tenantId, ruleId,
BasicStatusCodeResponse.onSuccess(payload.getMsgType(), payload.getRequestId())));
BasicStatusCodeResponse.onSuccess(payload.getSessionMsgType(), payload.getRequestId())));
}
return;
}

2
extensions/extension-sqs/src/main/java/org/thingsboard/server/extensions/sqs/action/fifo/SqsFifoQueuePluginAction.java

@ -33,7 +33,7 @@ public class SqsFifoQueuePluginAction extends AbstractTemplatePluginAction<SqsFi
@Override
protected Optional<RuleToPluginMsg> buildRuleToPluginMsg(RuleContext ctx, DeviceToDeviceActorMsg msg, FromDeviceRequestMsg payload) {
SqsFifoQueueActionPayload.SqsFifoQueueActionPayloadBuilder builder = SqsFifoQueueActionPayload.builder();
builder.msgType(payload.getMsgType());
builder.sessionMsgType(payload.getMsgType());
builder.requestId(payload.getRequestId());
builder.queue(configuration.getQueue());
builder.deviceId(msg.getDeviceId().toString());

2
extensions/extension-sqs/src/main/java/org/thingsboard/server/extensions/sqs/action/standard/SqsStandardQueuePluginAction.java

@ -33,7 +33,7 @@ public class SqsStandardQueuePluginAction extends AbstractTemplatePluginAction<S
@Override
protected Optional<RuleToPluginMsg> buildRuleToPluginMsg(RuleContext ctx, DeviceToDeviceActorMsg msg, FromDeviceRequestMsg payload) {
SqsStandardQueueActionPayload.SqsStandardQueueActionPayloadBuilder builder = SqsStandardQueueActionPayload.builder();
builder.msgType(payload.getMsgType());
builder.sessionMsgType(payload.getMsgType());
builder.requestId(payload.getRequestId());
builder.queue(configuration.getQueue());
builder.delaySeconds(configuration.getDelaySeconds());

2
extensions/extension-sqs/src/main/java/org/thingsboard/server/extensions/sqs/plugin/SqsMessageHandler.java

@ -78,7 +78,7 @@ public class SqsMessageHandler implements RuleMsgHandler {
sqs.sendMessage(sendMsgRequest);
if (payload.isSync()) {
ctx.reply(new ResponsePluginToRuleMsg(msg.getUid(), tenantId, ruleId,
BasicStatusCodeResponse.onSuccess(payload.getMsgType(), payload.getRequestId())));
BasicStatusCodeResponse.onSuccess(payload.getSessionMsgType(), payload.getRequestId())));
}
}
}

76
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java

@ -0,0 +1,76 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.telemetry;
import com.google.gson.JsonParser;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StringUtils;
import org.thingsboard.rule.engine.TbNodeUtils;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.core.AttributesUpdateRequest;
import org.thingsboard.server.common.msg.core.TelemetryUploadRequest;
import org.thingsboard.server.common.msg.session.SessionMsgType;
import org.thingsboard.server.common.transport.adaptor.JsonConverter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@Slf4j
@RuleNode(
type = ComponentType.ACTION,
name = "save attributes",
configClazz = TbMsgAttributesNodeConfiguration.class,
nodeDescription = "Saves attributes data",
nodeDetails = "Saves entity attributes based on configurable scope parameter. Expects messages with 'POST_ATTRIBUTES_REQUEST' message type"
)
public class TbMsgAttributesNode implements TbNode {
private TbMsgAttributesNodeConfiguration config;
@Override
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
this.config = TbNodeUtils.convert(configuration, TbMsgAttributesNodeConfiguration.class);
}
@Override
public void onMsg(TbContext ctx, TbMsg msg) {
if (!msg.getType().equals(SessionMsgType.POST_ATTRIBUTES_REQUEST.name())) {
ctx.tellError(msg, new IllegalArgumentException("Unsupported msg type: " + msg.getType()));
return;
}
String src = msg.getData();
Set<AttributeKvEntry> attributes = JsonConverter.convertToAttributes(new JsonParser().parse(src)).getAttributes();
ctx.getTelemetryService().saveAndNotify(msg.getOriginator(), config.getScope(), new ArrayList<>(attributes), new TelemetryNodeCallback(ctx, msg));
}
@Override
public void destroy() {
}
}

23
application/src/test/java/org/thingsboard/server/controller/sql/RuleControllerSqlTest.java → rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNodeConfiguration.java

@ -13,14 +13,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.controller.sql;
package org.thingsboard.rule.engine.telemetry;
import org.thingsboard.server.controller.BaseRuleControllerTest;
import org.thingsboard.server.dao.service.DaoSqlTest;
import lombok.Data;
import org.thingsboard.rule.engine.api.NodeConfiguration;
import org.thingsboard.server.common.data.DataConstants;
/**
* Created by Valerii Sosliuk on 6/28/2017.
*/
@DaoSqlTest
public class RuleControllerSqlTest extends BaseRuleControllerTest {
@Data
public class TbMsgAttributesNodeConfiguration implements NodeConfiguration<TbMsgAttributesNodeConfiguration> {
private String scope;
@Override
public TbMsgAttributesNodeConfiguration defaultConfiguration() {
TbMsgAttributesNodeConfiguration configuration = new TbMsgAttributesNodeConfiguration();
configuration.setScope(DataConstants.SERVER_SCOPE);
return configuration;
}
}

15
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTelemetryNode.java → rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java

@ -28,13 +28,11 @@ import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.core.TelemetryUploadRequest;
import org.thingsboard.server.common.msg.session.SessionMsgType;
import org.thingsboard.server.common.transport.adaptor.JsonConverter;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@ -42,21 +40,20 @@ import java.util.Map;
@Slf4j
@RuleNode(
type = ComponentType.ACTION,
name = "save timeseries data",
configClazz = TbMsgTelemetryNodeConfiguration.class,
name = "save timeseries",
configClazz = TbMsgTimeseriesNodeConfiguration.class,
nodeDescription = "Saves timeseries data",
nodeDetails = "Saves timeseries telemetry data based on configurable TTL parameter. Expects messages with 'POST_TELEMETRY' message type",
nodeDetails = "Saves timeseries telemetry data based on configurable TTL parameter. Expects messages with 'POST_TELEMETRY_REQUEST' message type",
uiResources = {"static/rulenode/rulenode-core-config.js", "static/rulenode/rulenode-core-config.css"},
configDirective = "tbActionNodeTelemetryConfig"
)
public class TbMsgTimeseriesNode implements TbNode {
public class TbMsgTelemetryNode implements TbNode {
private TbMsgTelemetryNodeConfiguration config;
private TbMsgTimeseriesNodeConfiguration config;
@Override
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
this.config = TbNodeUtils.convert(configuration, TbMsgTelemetryNodeConfiguration.class);
this.config = TbNodeUtils.convert(configuration, TbMsgTimeseriesNodeConfiguration.class);
}
@Override

8
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTelemetryNodeConfiguration.java → rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeConfiguration.java

@ -18,16 +18,14 @@ package org.thingsboard.rule.engine.telemetry;
import lombok.Data;
import org.thingsboard.rule.engine.api.NodeConfiguration;
import java.util.Map;
@Data
public class TbMsgTelemetryNodeConfiguration implements NodeConfiguration<TbMsgTelemetryNodeConfiguration> {
public class TbMsgTimeseriesNodeConfiguration implements NodeConfiguration<TbMsgTimeseriesNodeConfiguration> {
private long defaultTTL;
@Override
public TbMsgTelemetryNodeConfiguration defaultConfiguration() {
TbMsgTelemetryNodeConfiguration configuration = new TbMsgTelemetryNodeConfiguration();
public TbMsgTimeseriesNodeConfiguration defaultConfiguration() {
TbMsgTimeseriesNodeConfiguration configuration = new TbMsgTimeseriesNodeConfiguration();
configuration.setDefaultTTL(0L);
return configuration;
}

31
rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbAlarmNodeTest.java

@ -17,9 +17,14 @@ package org.thingsboard.rule.engine.action;
import com.datastax.driver.core.utils.UUIDs;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.AbstractListeningExecutorService;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.commons.lang3.NotImplementedException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
@ -38,6 +43,8 @@ import org.thingsboard.server.dao.alarm.AlarmService;
import javax.script.ScriptException;
import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static org.junit.Assert.*;
import static org.mockito.Matchers.any;
@ -66,11 +73,32 @@ public class TbAlarmNodeTest {
@Mock
private ScriptEngine detailsJs;
private ListeningExecutor dbExecutor;
private EntityId originator = new DeviceId(UUIDs.timeBased());
private TenantId tenantId = new TenantId(UUIDs.timeBased());
private TbMsgMetaData metaData = new TbMsgMetaData();
private String rawJson = "{\"name\": \"Vit\", \"passed\": 5}";
@Before
public void before() {
dbExecutor = new ListeningExecutor() {
@Override
public <T> ListenableFuture<T> executeAsync(Callable<T> task) {
try {
return Futures.immediateFuture(task.call());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public void execute(Runnable command) {
command.run();
}
};
}
@Test
public void newAlarmCanBeCreated() throws ScriptException, IOException {
initWithScript();
@ -128,6 +156,7 @@ public class TbAlarmNodeTest {
verify(ctx).createJsScriptEngine("CLEAR", "isCleared");
verify(ctx).createJsScriptEngine("DETAILS", "Details");
verify(ctx).getJsExecutor();
verify(ctx).getDbCallbackExecutor();
verifyNoMoreInteractions(ctx, alarmService, clearJs, detailsJs);
}
@ -151,6 +180,7 @@ public class TbAlarmNodeTest {
verify(ctx).createJsScriptEngine("DETAILS", "Details");
verify(ctx, times(2)).getJsExecutor();
verify(ctx).getAlarmService();
verify(ctx, times(3)).getDbCallbackExecutor();
verify(ctx).getTenantId();
verify(alarmService).findLatestByOriginatorAndType(tenantId, originator, "SomeType");
@ -307,6 +337,7 @@ public class TbAlarmNodeTest {
when(ctx.getTenantId()).thenReturn(tenantId);
when(ctx.getJsExecutor()).thenReturn(executor);
when(ctx.getAlarmService()).thenReturn(alarmService);
when(ctx.getDbCallbackExecutor()).thenReturn(dbExecutor);
mockJsExecutor();

3
transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/JsonCoapAdaptor.java

@ -31,7 +31,6 @@ import org.thingsboard.server.common.msg.session.FromDeviceMsg;
import org.thingsboard.server.common.msg.session.SessionMsgType;
import org.thingsboard.server.common.msg.session.SessionActorToAdaptorMsg;
import org.thingsboard.server.common.msg.session.SessionContext;
import org.thingsboard.server.common.msg.session.SessionMsgType;
import org.thingsboard.server.common.msg.session.ToDeviceMsg;
import org.thingsboard.server.common.msg.session.ex.ProcessingTimeoutException;
import org.thingsboard.server.common.transport.adaptor.AdaptorException;
@ -157,7 +156,7 @@ public class JsonCoapAdaptor implements CoapTransportAdaptor {
return response;
}
private UpdateAttributesRequest convertToUpdateAttributesRequest(SessionContext ctx, Request inbound) throws AdaptorException {
private AttributesUpdateRequest convertToUpdateAttributesRequest(SessionContext ctx, Request inbound) throws AdaptorException {
String payload = validatePayload(ctx, inbound);
try {
return JsonConverter.convertToAttributes(new JsonParser().parse(payload));

2
transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java

@ -219,7 +219,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
}
}
private UpdateAttributesRequest convertToUpdateAttributesRequest(SessionContext ctx, MqttPublishMessage inbound) throws AdaptorException {
private AttributesUpdateRequest convertToUpdateAttributesRequest(SessionContext ctx, MqttPublishMessage inbound) throws AdaptorException {
String payload = validatePayload(ctx.getSessionId(), inbound.payload());
try {
return JsonConverter.convertToAttributes(new JsonParser().parse(payload), inbound.variableHeader().messageId());

2
transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java

@ -179,7 +179,7 @@ public class GatewaySessionCtx {
throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
}
long ts = System.currentTimeMillis();
BasicUpdateAttributesRequest request = new BasicUpdateAttributesRequest(requestId);
BasicAttributesUpdateRequest request = new BasicAttributesUpdateRequest(requestId);
JsonObject deviceData = deviceEntry.getValue().getAsJsonObject();
request.add(JsonConverter.parseValues(deviceData).stream().map(kv -> new BaseAttributeKvEntry(kv, ts)).collect(Collectors.toList()));
GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName);

Loading…
Cancel
Save