271 changed files with 5795 additions and 2660 deletions
@ -1,13 +0,0 @@ |
|||
{ |
|||
"apiToken": "messaging", |
|||
"name": "Demo Device Messaging RPC Plugin", |
|||
"clazz": "org.thingsboard.server.extensions.core.plugin.messaging.DeviceMessagingPlugin", |
|||
"publicAccess": false, |
|||
"state": "ACTIVE", |
|||
"configuration": { |
|||
"maxDeviceCountPerCustomer": 1024, |
|||
"defaultTimeout": 20000, |
|||
"maxTimeout": 60000 |
|||
}, |
|||
"additionalInfo": null |
|||
} |
|||
@ -1,28 +0,0 @@ |
|||
{ |
|||
"apiToken": "mail", |
|||
"name": "Demo Email Plugin", |
|||
"clazz": "org.thingsboard.server.extensions.core.plugin.mail.MailPlugin", |
|||
"publicAccess": true, |
|||
"state": "ACTIVE", |
|||
"configuration": { |
|||
"host": "smtp.sendgrid.net", |
|||
"port": 2525, |
|||
"username": "apikey", |
|||
"password": "your_api_key", |
|||
"otherProperties": [ |
|||
{ |
|||
"key": "mail.smtp.auth", |
|||
"value": "true" |
|||
}, |
|||
{ |
|||
"key": "mail.smtp.timeout", |
|||
"value": "10000" |
|||
}, |
|||
{ |
|||
"key": "mail.smtp.starttls.enable", |
|||
"value": "true" |
|||
} |
|||
] |
|||
}, |
|||
"additionalInfo": null |
|||
} |
|||
@ -1,11 +0,0 @@ |
|||
{ |
|||
"apiToken": "time", |
|||
"name": "Demo Time RPC Plugin", |
|||
"clazz": "org.thingsboard.server.extensions.core.plugin.time.TimePlugin", |
|||
"publicAccess": false, |
|||
"state": "ACTIVE", |
|||
"configuration": { |
|||
"timeFormat": "yyyy MM dd HH:mm:ss.SSS" |
|||
}, |
|||
"additionalInfo": null |
|||
} |
|||
@ -1,46 +0,0 @@ |
|||
{ |
|||
"name": "Demo Alarm Rule", |
|||
"state": "ACTIVE", |
|||
"weight": 0, |
|||
"pluginToken": "mail", |
|||
"filters": [ |
|||
{ |
|||
"clazz": "org.thingsboard.server.extensions.core.filter.MsgTypeFilter", |
|||
"name": "MsgTypeFilter", |
|||
"configuration": { |
|||
"messageTypes": [ |
|||
"POST_TELEMETRY", |
|||
"POST_ATTRIBUTES", |
|||
"GET_ATTRIBUTES" |
|||
] |
|||
} |
|||
}, |
|||
{ |
|||
"clazz": "org.thingsboard.server.extensions.core.filter.DeviceTelemetryFilter", |
|||
"name": "Temperature filter", |
|||
"configuration": { |
|||
"filter": "typeof temperature !== 'undefined' && temperature >= 100" |
|||
} |
|||
} |
|||
], |
|||
"processor": { |
|||
"clazz": "org.thingsboard.server.extensions.core.processor.AlarmDeduplicationProcessor", |
|||
"name": "AlarmDeduplicationProcessor", |
|||
"configuration": { |
|||
"alarmIdTemplate": "[$date.get('yyyy-MM-dd HH:mm')] Device $cs.get('serialNumber')($cs.get('model')) temperature is high!", |
|||
"alarmBodyTemplate": "[$date.get('yyyy-MM-dd HH:mm:ss')] Device $cs.get('serialNumber')($cs.get('model')) temperature is $temp.valueAsString!" |
|||
} |
|||
}, |
|||
"action": { |
|||
"clazz": "org.thingsboard.server.extensions.core.action.mail.SendMailAction", |
|||
"name": "Send Mail Action", |
|||
"configuration": { |
|||
"sendFlag": "isNewAlarm", |
|||
"fromTemplate": "thingsboard@gmail.com", |
|||
"toTemplate": "thingsboard@gmail.com", |
|||
"subjectTemplate": "$alarmId", |
|||
"bodyTemplate": "$alarmBody" |
|||
} |
|||
}, |
|||
"additionalInfo": null |
|||
} |
|||
@ -1,35 +0,0 @@ |
|||
{ |
|||
"name": "Demo getTime RPC Rule", |
|||
"state": "ACTIVE", |
|||
"weight": 0, |
|||
"pluginToken": "time", |
|||
"filters": [ |
|||
{ |
|||
"configuration": { |
|||
"messageTypes": [ |
|||
"RPC_REQUEST" |
|||
] |
|||
}, |
|||
"name": "RPC Request Filter", |
|||
"clazz": "org.thingsboard.server.extensions.core.filter.MsgTypeFilter" |
|||
}, |
|||
{ |
|||
"configuration": { |
|||
"methodNames": [ |
|||
{ |
|||
"name": "getTime" |
|||
} |
|||
] |
|||
}, |
|||
"name": "getTime method filter", |
|||
"clazz": "org.thingsboard.server.extensions.core.filter.MethodNameFilter" |
|||
} |
|||
], |
|||
"processor": null, |
|||
"action": { |
|||
"configuration": {}, |
|||
"clazz": "org.thingsboard.server.extensions.core.action.rpc.RpcPluginAction", |
|||
"name": "getTimeAction" |
|||
}, |
|||
"additionalInfo": null |
|||
} |
|||
@ -1,38 +0,0 @@ |
|||
{ |
|||
"name": "Demo Messaging RPC Rule", |
|||
"state": "ACTIVE", |
|||
"weight": 0, |
|||
"pluginToken": "messaging", |
|||
"filters": [ |
|||
{ |
|||
"configuration": { |
|||
"messageTypes": [ |
|||
"RPC_REQUEST" |
|||
] |
|||
}, |
|||
"name": "RPC Request Filter", |
|||
"clazz": "org.thingsboard.server.extensions.core.filter.MsgTypeFilter" |
|||
}, |
|||
{ |
|||
"configuration": { |
|||
"methodNames": [ |
|||
{ |
|||
"name": "getDevices" |
|||
}, |
|||
{ |
|||
"name": "sendMsg" |
|||
} |
|||
] |
|||
}, |
|||
"name": "Messaging methods filter", |
|||
"clazz": "org.thingsboard.server.extensions.core.filter.MethodNameFilter" |
|||
} |
|||
], |
|||
"processor": null, |
|||
"action": { |
|||
"configuration": {}, |
|||
"clazz": "org.thingsboard.server.extensions.core.action.rpc.RpcPluginAction", |
|||
"name": "Messaging RPC Action" |
|||
}, |
|||
"additionalInfo": null |
|||
} |
|||
@ -1,11 +0,0 @@ |
|||
{ |
|||
"apiToken": "rpc", |
|||
"name": "System RPC Plugin", |
|||
"clazz": "org.thingsboard.server.extensions.core.plugin.rpc.RpcPlugin", |
|||
"publicAccess": true, |
|||
"state": "ACTIVE", |
|||
"configuration": { |
|||
"defaultTimeout": 20000 |
|||
}, |
|||
"additionalInfo": null |
|||
} |
|||
@ -1,9 +0,0 @@ |
|||
{ |
|||
"apiToken": "telemetry", |
|||
"name": "System Telemetry Plugin", |
|||
"clazz": "org.thingsboard.server.extensions.core.plugin.telemetry.TelemetryStoragePlugin", |
|||
"publicAccess": true, |
|||
"state": "ACTIVE", |
|||
"configuration": {}, |
|||
"additionalInfo": null |
|||
} |
|||
@ -1,29 +0,0 @@ |
|||
{ |
|||
"name": "System Telemetry Rule", |
|||
"state": "ACTIVE", |
|||
"weight": 0, |
|||
"pluginToken": "telemetry", |
|||
"filters": [ |
|||
{ |
|||
"clazz": "org.thingsboard.server.extensions.core.filter.MsgTypeFilter", |
|||
"name": "TelemetryFilter", |
|||
"configuration": { |
|||
"messageTypes": [ |
|||
"POST_TELEMETRY", |
|||
"POST_ATTRIBUTES", |
|||
"GET_ATTRIBUTES" |
|||
] |
|||
} |
|||
} |
|||
], |
|||
"processor": null, |
|||
"action": { |
|||
"clazz": "org.thingsboard.server.extensions.core.action.telemetry.TelemetryPluginAction", |
|||
"name": "TelemetryMsgConverterAction", |
|||
"configuration": { |
|||
"timeUnit": "DAYS", |
|||
"ttlValue": 365 |
|||
} |
|||
}, |
|||
"additionalInfo": null |
|||
} |
|||
@ -0,0 +1,98 @@ |
|||
{ |
|||
"ruleChain": { |
|||
"additionalInfo": null, |
|||
"name": "Root Rule Chain", |
|||
"firstRuleNodeId": null, |
|||
"root": true, |
|||
"debugMode": false, |
|||
"configuration": null |
|||
}, |
|||
"metadata": { |
|||
"firstNodeIndex": 2, |
|||
"nodes": [ |
|||
{ |
|||
"additionalInfo": { |
|||
"layoutX": 824, |
|||
"layoutY": 156 |
|||
}, |
|||
"type": "org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode", |
|||
"name": "SaveTS", |
|||
"debugMode": true, |
|||
"configuration": { |
|||
"defaultTTL": 0 |
|||
} |
|||
}, |
|||
{ |
|||
"additionalInfo": { |
|||
"layoutX": 825, |
|||
"layoutY": 52 |
|||
}, |
|||
"type": "org.thingsboard.rule.engine.telemetry.TbMsgAttributesNode", |
|||
"name": "save client attributes", |
|||
"debugMode": true, |
|||
"configuration": { |
|||
"scope": "CLIENT_SCOPE" |
|||
} |
|||
}, |
|||
{ |
|||
"additionalInfo": { |
|||
"layoutX": 347, |
|||
"layoutY": 149 |
|||
}, |
|||
"type": "org.thingsboard.rule.engine.filter.TbMsgTypeSwitchNode", |
|||
"name": "Message Type Switch", |
|||
"debugMode": false, |
|||
"configuration": { |
|||
"version": 0 |
|||
} |
|||
}, |
|||
{ |
|||
"additionalInfo": { |
|||
"layoutX": 825, |
|||
"layoutY": 266 |
|||
}, |
|||
"type": "org.thingsboard.rule.engine.action.TbLogNode", |
|||
"name": "Log RPC", |
|||
"debugMode": false, |
|||
"configuration": { |
|||
"jsScript": "return '\\nIncoming message:\\n' + JSON.stringify(msg) + '\\nIncoming metadata:\\n' + JSON.stringify(metadata);" |
|||
} |
|||
}, |
|||
{ |
|||
"additionalInfo": { |
|||
"layoutX": 825, |
|||
"layoutY": 379 |
|||
}, |
|||
"type": "org.thingsboard.rule.engine.action.TbLogNode", |
|||
"name": "Log Other", |
|||
"debugMode": false, |
|||
"configuration": { |
|||
"jsScript": "return '\\nIncoming message:\\n' + JSON.stringify(msg) + '\\nIncoming metadata:\\n' + JSON.stringify(metadata);" |
|||
} |
|||
} |
|||
], |
|||
"connections": [ |
|||
{ |
|||
"fromIndex": 2, |
|||
"toIndex": 4, |
|||
"type": "Other" |
|||
}, |
|||
{ |
|||
"fromIndex": 2, |
|||
"toIndex": 1, |
|||
"type": "Post attributes" |
|||
}, |
|||
{ |
|||
"fromIndex": 2, |
|||
"toIndex": 0, |
|||
"type": "Post telemetry" |
|||
}, |
|||
{ |
|||
"fromIndex": 2, |
|||
"toIndex": 3, |
|||
"type": "RPC Request" |
|||
} |
|||
], |
|||
"ruleChainConnections": null |
|||
} |
|||
} |
|||
@ -0,0 +1,38 @@ |
|||
/** |
|||
* Copyright © 2016-2018 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors.device; |
|||
|
|||
import akka.actor.ActorRef; |
|||
import lombok.Data; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.msg.MsgType; |
|||
import org.thingsboard.server.common.msg.TbActorMsg; |
|||
import org.thingsboard.server.common.msg.TbMsg; |
|||
|
|||
/** |
|||
* Created by ashvayka on 15.03.18. |
|||
*/ |
|||
@Data |
|||
public final class DeviceActorToRuleEngineMsg implements TbActorMsg { |
|||
|
|||
private final ActorRef callbackRef; |
|||
private final TbMsg tbMsg; |
|||
|
|||
@Override |
|||
public MsgType getMsgType() { |
|||
return MsgType.DEVICE_ACTOR_TO_RULE_ENGINE_MSG; |
|||
} |
|||
} |
|||
@ -0,0 +1,40 @@ |
|||
/** |
|||
* Copyright © 2016-2018 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors.device; |
|||
|
|||
import lombok.AllArgsConstructor; |
|||
import lombok.Data; |
|||
import org.thingsboard.server.common.data.id.SessionId; |
|||
import org.thingsboard.server.common.msg.cluster.ServerAddress; |
|||
import org.thingsboard.server.common.msg.session.SessionMsgType; |
|||
|
|||
import java.util.Optional; |
|||
|
|||
/** |
|||
* Created by ashvayka on 17.04.18. |
|||
*/ |
|||
@Data |
|||
@AllArgsConstructor |
|||
public final class PendingSessionMsgData { |
|||
|
|||
private final SessionId sessionId; |
|||
private final Optional<ServerAddress> serverAddress; |
|||
private final SessionMsgType sessionMsgType; |
|||
private final int requestId; |
|||
private final boolean replyOnQueueAck; |
|||
private int ackMsgCount; |
|||
|
|||
} |
|||
@ -0,0 +1,36 @@ |
|||
/** |
|||
* Copyright © 2016-2018 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors.device; |
|||
|
|||
import lombok.Data; |
|||
import org.thingsboard.server.common.msg.MsgType; |
|||
import org.thingsboard.server.common.msg.TbActorMsg; |
|||
|
|||
import java.util.UUID; |
|||
|
|||
/** |
|||
* Created by ashvayka on 15.03.18. |
|||
*/ |
|||
@Data |
|||
public final class RuleEngineQueuePutAckMsg implements TbActorMsg { |
|||
|
|||
private final UUID id; |
|||
|
|||
@Override |
|||
public MsgType getMsgType() { |
|||
return MsgType.RULE_ENGINE_QUEUE_PUT_ACK_MSG; |
|||
} |
|||
} |
|||
@ -0,0 +1,33 @@ |
|||
/** |
|||
* Copyright © 2016-2018 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors.device; |
|||
|
|||
import lombok.Data; |
|||
import org.thingsboard.server.common.data.id.SessionId; |
|||
import org.thingsboard.server.common.msg.cluster.ServerAddress; |
|||
import org.thingsboard.server.common.msg.session.SessionType; |
|||
|
|||
import java.util.Optional; |
|||
|
|||
/** |
|||
* @author Andrew Shvayka |
|||
*/ |
|||
@Data |
|||
public class ToServerRpcRequestMetadata { |
|||
private final SessionId sessionId; |
|||
private final SessionType type; |
|||
private final Optional<ServerAddress> server; |
|||
} |
|||
@ -0,0 +1,40 @@ |
|||
/** |
|||
* Copyright © 2016-2018 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors.ruleChain; |
|||
|
|||
import lombok.Data; |
|||
import org.thingsboard.rule.engine.api.TbContext; |
|||
import org.thingsboard.server.common.data.id.RuleChainId; |
|||
import org.thingsboard.server.common.msg.MsgType; |
|||
import org.thingsboard.server.common.msg.TbActorMsg; |
|||
import org.thingsboard.server.common.msg.TbMsg; |
|||
|
|||
/** |
|||
* Created by ashvayka on 19.03.18. |
|||
*/ |
|||
@Data |
|||
public final class RuleChainToRuleChainMsg implements TbActorMsg { |
|||
|
|||
private final RuleChainId target; |
|||
private final RuleChainId source; |
|||
private final TbMsg msg; |
|||
private final boolean enqueue; |
|||
|
|||
@Override |
|||
public MsgType getMsgType() { |
|||
return MsgType.RULE_CHAIN_TO_RULE_CHAIN_MSG; |
|||
} |
|||
} |
|||
@ -0,0 +1,225 @@ |
|||
/** |
|||
* Copyright © 2016-2018 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.controller; |
|||
|
|||
import com.fasterxml.jackson.databind.JsonNode; |
|||
import com.fasterxml.jackson.databind.ObjectMapper; |
|||
import com.google.common.util.concurrent.FutureCallback; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.http.HttpStatus; |
|||
import org.springframework.http.ResponseEntity; |
|||
import org.springframework.security.access.prepost.PreAuthorize; |
|||
import org.springframework.util.StringUtils; |
|||
import org.springframework.web.bind.annotation.PathVariable; |
|||
import org.springframework.web.bind.annotation.RequestBody; |
|||
import org.springframework.web.bind.annotation.RequestMapping; |
|||
import org.springframework.web.bind.annotation.RequestMethod; |
|||
import org.springframework.web.bind.annotation.ResponseBody; |
|||
import org.springframework.web.bind.annotation.RestController; |
|||
import org.springframework.web.context.request.async.DeferredResult; |
|||
import org.thingsboard.server.common.data.audit.ActionType; |
|||
import org.thingsboard.server.common.data.exception.ThingsboardErrorCode; |
|||
import org.thingsboard.server.common.data.exception.ThingsboardException; |
|||
import org.thingsboard.server.common.data.id.DeviceId; |
|||
import org.thingsboard.server.common.data.id.EntityId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.data.id.UUIDBased; |
|||
import org.thingsboard.server.common.data.rpc.RpcRequest; |
|||
import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody; |
|||
import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest; |
|||
import org.thingsboard.server.extensions.api.exception.ToErrorResponseEntity; |
|||
import org.thingsboard.server.extensions.api.plugins.PluginConstants; |
|||
import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse; |
|||
import org.thingsboard.server.extensions.api.plugins.msg.RpcError; |
|||
import org.thingsboard.server.service.rpc.DeviceRpcService; |
|||
import org.thingsboard.server.service.rpc.LocalRequestMetaData; |
|||
import org.thingsboard.server.service.security.AccessValidator; |
|||
import org.thingsboard.server.service.security.model.SecurityUser; |
|||
|
|||
import javax.annotation.Nullable; |
|||
import javax.annotation.PostConstruct; |
|||
import javax.annotation.PreDestroy; |
|||
import java.io.IOException; |
|||
import java.util.Optional; |
|||
import java.util.UUID; |
|||
import java.util.concurrent.ExecutorService; |
|||
import java.util.concurrent.Executors; |
|||
import java.util.function.Consumer; |
|||
|
|||
/** |
|||
* Created by ashvayka on 22.03.18. |
|||
*/ |
|||
@RestController |
|||
@RequestMapping(PluginConstants.RPC_URL_PREFIX) |
|||
@Slf4j |
|||
public class RpcController extends BaseController { |
|||
|
|||
public static final int DEFAULT_TIMEOUT = 10000; |
|||
protected final ObjectMapper jsonMapper = new ObjectMapper(); |
|||
|
|||
@Autowired |
|||
private DeviceRpcService deviceRpcService; |
|||
|
|||
@Autowired |
|||
private AccessValidator accessValidator; |
|||
|
|||
private ExecutorService executor; |
|||
|
|||
@PostConstruct |
|||
public void initExecutor() { |
|||
executor = Executors.newSingleThreadExecutor(); |
|||
} |
|||
|
|||
@PreDestroy |
|||
public void shutdownExecutor() { |
|||
if (executor != null) { |
|||
executor.shutdownNow(); |
|||
} |
|||
} |
|||
|
|||
@PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')") |
|||
@RequestMapping(value = "/oneway/{deviceId}", method = RequestMethod.POST) |
|||
@ResponseBody |
|||
public DeferredResult<ResponseEntity> handleOneWayDeviceRPCRequest(@PathVariable("deviceId") String deviceIdStr, @RequestBody String requestBody) throws ThingsboardException { |
|||
return handleDeviceRPCRequest(true, new DeviceId(UUID.fromString(deviceIdStr)), requestBody); |
|||
} |
|||
|
|||
@PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')") |
|||
@RequestMapping(value = "/twoway/{deviceId}", method = RequestMethod.POST) |
|||
@ResponseBody |
|||
public DeferredResult<ResponseEntity> handleTwoWayDeviceRPCRequest(@PathVariable("deviceId") String deviceIdStr, @RequestBody String requestBody) throws ThingsboardException { |
|||
return handleDeviceRPCRequest(false, new DeviceId(UUID.fromString(deviceIdStr)), requestBody); |
|||
} |
|||
|
|||
|
|||
private DeferredResult<ResponseEntity> handleDeviceRPCRequest(boolean oneWay, DeviceId deviceId, String requestBody) throws ThingsboardException { |
|||
try { |
|||
JsonNode rpcRequestBody = jsonMapper.readTree(requestBody); |
|||
RpcRequest cmd = new RpcRequest(rpcRequestBody.get("method").asText(), |
|||
jsonMapper.writeValueAsString(rpcRequestBody.get("params"))); |
|||
|
|||
if (rpcRequestBody.has("timeout")) { |
|||
cmd.setTimeout(rpcRequestBody.get("timeout").asLong()); |
|||
} |
|||
SecurityUser currentUser = getCurrentUser(); |
|||
TenantId tenantId = currentUser.getTenantId(); |
|||
final DeferredResult<ResponseEntity> response = new DeferredResult<>(); |
|||
long timeout = System.currentTimeMillis() + (cmd.getTimeout() != null ? cmd.getTimeout() : DEFAULT_TIMEOUT); |
|||
ToDeviceRpcRequestBody body = new ToDeviceRpcRequestBody(cmd.getMethodName(), cmd.getRequestData()); |
|||
accessValidator.validate(currentUser, deviceId, new HttpValidationCallback(response, new FutureCallback<DeferredResult<ResponseEntity>>() { |
|||
@Override |
|||
public void onSuccess(@Nullable DeferredResult<ResponseEntity> result) { |
|||
ToDeviceRpcRequest rpcRequest = new ToDeviceRpcRequest(UUID.randomUUID(), |
|||
tenantId, |
|||
deviceId, |
|||
oneWay, |
|||
timeout, |
|||
body |
|||
); |
|||
deviceRpcService.process(rpcRequest, new Consumer<FromDeviceRpcResponse>(){ |
|||
|
|||
@Override |
|||
public void accept(FromDeviceRpcResponse fromDeviceRpcResponse) { |
|||
reply(new LocalRequestMetaData(rpcRequest, currentUser, result), fromDeviceRpcResponse); |
|||
} |
|||
}); |
|||
} |
|||
|
|||
@Override |
|||
public void onFailure(Throwable e) { |
|||
ResponseEntity entity; |
|||
if (e instanceof ToErrorResponseEntity) { |
|||
entity = ((ToErrorResponseEntity) e).toErrorResponseEntity(); |
|||
} else { |
|||
entity = new ResponseEntity(HttpStatus.UNAUTHORIZED); |
|||
} |
|||
logRpcCall(currentUser, deviceId, body, oneWay, Optional.empty(), e); |
|||
response.setResult(entity); |
|||
} |
|||
})); |
|||
return response; |
|||
} catch (IOException ioe) { |
|||
throw new ThingsboardException("Invalid request body", ioe, ThingsboardErrorCode.BAD_REQUEST_PARAMS); |
|||
} |
|||
} |
|||
|
|||
public void reply(LocalRequestMetaData rpcRequest, FromDeviceRpcResponse response) { |
|||
Optional<RpcError> rpcError = response.getError(); |
|||
DeferredResult<ResponseEntity> responseWriter = rpcRequest.getResponseWriter(); |
|||
if (rpcError.isPresent()) { |
|||
logRpcCall(rpcRequest, rpcError, null); |
|||
RpcError error = rpcError.get(); |
|||
switch (error) { |
|||
case TIMEOUT: |
|||
responseWriter.setResult(new ResponseEntity<>(HttpStatus.REQUEST_TIMEOUT)); |
|||
break; |
|||
case NO_ACTIVE_CONNECTION: |
|||
responseWriter.setResult(new ResponseEntity<>(HttpStatus.CONFLICT)); |
|||
break; |
|||
default: |
|||
responseWriter.setResult(new ResponseEntity<>(HttpStatus.REQUEST_TIMEOUT)); |
|||
break; |
|||
} |
|||
} else { |
|||
Optional<String> responseData = response.getResponse(); |
|||
if (responseData.isPresent() && !StringUtils.isEmpty(responseData.get())) { |
|||
String data = responseData.get(); |
|||
try { |
|||
logRpcCall(rpcRequest, rpcError, null); |
|||
responseWriter.setResult(new ResponseEntity<>(jsonMapper.readTree(data), HttpStatus.OK)); |
|||
} catch (IOException e) { |
|||
log.debug("Failed to decode device response: {}", data, e); |
|||
logRpcCall(rpcRequest, rpcError, e); |
|||
responseWriter.setResult(new ResponseEntity<>(HttpStatus.NOT_ACCEPTABLE)); |
|||
} |
|||
} else { |
|||
logRpcCall(rpcRequest, rpcError, null); |
|||
responseWriter.setResult(new ResponseEntity<>(HttpStatus.OK)); |
|||
} |
|||
} |
|||
} |
|||
|
|||
private void logRpcCall(LocalRequestMetaData rpcRequest, Optional<RpcError> rpcError, Throwable e) { |
|||
logRpcCall(rpcRequest.getUser(), rpcRequest.getRequest().getDeviceId(), rpcRequest.getRequest().getBody(), rpcRequest.getRequest().isOneway(), rpcError, null); |
|||
} |
|||
|
|||
|
|||
private void logRpcCall(SecurityUser user, EntityId entityId, ToDeviceRpcRequestBody body, boolean oneWay, Optional<RpcError> rpcError, Throwable e) { |
|||
String rpcErrorStr = ""; |
|||
if (rpcError.isPresent()) { |
|||
rpcErrorStr = "RPC Error: " + rpcError.get().name(); |
|||
} |
|||
String method = body.getMethod(); |
|||
String params = body.getParams(); |
|||
|
|||
auditLogService.logEntityAction( |
|||
user.getTenantId(), |
|||
user.getCustomerId(), |
|||
user.getId(), |
|||
user.getName(), |
|||
(UUIDBased & EntityId) entityId, |
|||
null, |
|||
ActionType.RPC_CALL, |
|||
BaseController.toException(e), |
|||
rpcErrorStr, |
|||
oneWay, |
|||
method, |
|||
params); |
|||
} |
|||
|
|||
|
|||
} |
|||
@ -0,0 +1,33 @@ |
|||
/** |
|||
* Copyright © 2016-2018 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.executors; |
|||
|
|||
import org.springframework.beans.factory.annotation.Value; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
@Component |
|||
public class ExternalCallExecutorService extends AbstractListeningExecutor { |
|||
|
|||
@Value("${actors.rule.external_call_thread_pool_size}") |
|||
private int externalCallExecutorThreadPoolSize; |
|||
|
|||
@Override |
|||
protected int getThreadPollSize() { |
|||
return externalCallExecutorThreadPoolSize; |
|||
} |
|||
|
|||
} |
|||
|
|||
@ -0,0 +1,22 @@ |
|||
/** |
|||
* Copyright © 2016-2018 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.install; |
|||
|
|||
public interface DataUpdateService { |
|||
|
|||
void updateData(String fromVersion) throws Exception; |
|||
|
|||
} |
|||
@ -0,0 +1,106 @@ |
|||
/** |
|||
* Copyright © 2016-2018 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.install; |
|||
|
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.context.annotation.Profile; |
|||
import org.springframework.stereotype.Service; |
|||
import org.thingsboard.server.common.data.Tenant; |
|||
import org.thingsboard.server.common.data.id.IdBased; |
|||
import org.thingsboard.server.common.data.page.TextPageLink; |
|||
import org.thingsboard.server.common.data.rule.RuleChain; |
|||
import org.thingsboard.server.dao.rule.RuleChainService; |
|||
import org.thingsboard.server.dao.tenant.TenantService; |
|||
|
|||
import java.util.List; |
|||
import java.util.UUID; |
|||
|
|||
@Service |
|||
@Profile("install") |
|||
@Slf4j |
|||
public class DefaultDataUpdateService implements DataUpdateService { |
|||
|
|||
@Autowired |
|||
private TenantService tenantService; |
|||
|
|||
@Autowired |
|||
private RuleChainService ruleChainService; |
|||
|
|||
@Autowired |
|||
private InstallScripts installScripts; |
|||
|
|||
@Override |
|||
public void updateData(String fromVersion) throws Exception { |
|||
switch (fromVersion) { |
|||
case "1.4.0": |
|||
log.info("Updating data from version 1.4.0 to 1.5.0 ..."); |
|||
tenantsDefaultRuleChainUpdater.updateEntities(null); |
|||
break; |
|||
default: |
|||
throw new RuntimeException("Unable to update data, unsupported fromVersion: " + fromVersion); |
|||
} |
|||
} |
|||
|
|||
private PaginatedUpdater<String, Tenant> tenantsDefaultRuleChainUpdater = |
|||
new PaginatedUpdater<String, Tenant>() { |
|||
|
|||
@Override |
|||
protected List<Tenant> findEntities(String region, TextPageLink pageLink) { |
|||
return tenantService.findTenants(pageLink).getData(); |
|||
} |
|||
|
|||
@Override |
|||
protected void updateEntity(Tenant tenant) { |
|||
try { |
|||
RuleChain ruleChain = ruleChainService.getRootTenantRuleChain(tenant.getId()); |
|||
if (ruleChain == null) { |
|||
installScripts.createDefaultRuleChains(tenant.getId()); |
|||
} |
|||
} catch (Exception e) { |
|||
log.error("Unable to update Tenant", e); |
|||
} |
|||
} |
|||
}; |
|||
|
|||
public abstract class PaginatedUpdater<I, D extends IdBased<?>> { |
|||
|
|||
private static final int DEFAULT_LIMIT = 100; |
|||
|
|||
public void updateEntities(I id) { |
|||
TextPageLink pageLink = new TextPageLink(DEFAULT_LIMIT); |
|||
boolean hasNext = true; |
|||
while (hasNext) { |
|||
List<D> entities = findEntities(id, pageLink); |
|||
for (D entity : entities) { |
|||
updateEntity(entity); |
|||
} |
|||
hasNext = entities.size() == pageLink.getLimit(); |
|||
if (hasNext) { |
|||
int index = entities.size() - 1; |
|||
UUID idOffset = entities.get(index).getUuidId(); |
|||
pageLink.setIdOffset(idOffset); |
|||
} |
|||
} |
|||
} |
|||
|
|||
protected abstract List<D> findEntities(I id, TextPageLink pageLink); |
|||
|
|||
protected abstract void updateEntity(D entity); |
|||
|
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,185 @@ |
|||
/** |
|||
* Copyright © 2016-2018 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.install; |
|||
|
|||
import com.fasterxml.jackson.databind.JsonNode; |
|||
import lombok.Getter; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.beans.factory.annotation.Value; |
|||
import org.springframework.stereotype.Component; |
|||
import org.springframework.util.StringUtils; |
|||
import org.thingsboard.server.common.data.Dashboard; |
|||
import org.thingsboard.server.common.data.id.CustomerId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.data.rule.RuleChain; |
|||
import org.thingsboard.server.common.data.rule.RuleChainMetaData; |
|||
import org.thingsboard.server.common.data.widget.WidgetType; |
|||
import org.thingsboard.server.common.data.widget.WidgetsBundle; |
|||
import org.thingsboard.server.dao.dashboard.DashboardService; |
|||
import org.thingsboard.server.dao.rule.RuleChainService; |
|||
import org.thingsboard.server.dao.widget.WidgetTypeService; |
|||
import org.thingsboard.server.dao.widget.WidgetsBundleService; |
|||
|
|||
import java.io.IOException; |
|||
import java.nio.file.DirectoryStream; |
|||
import java.nio.file.Files; |
|||
import java.nio.file.Path; |
|||
import java.nio.file.Paths; |
|||
|
|||
import static org.thingsboard.server.service.install.DatabaseHelper.objectMapper; |
|||
|
|||
/** |
|||
* Created by ashvayka on 18.04.18. |
|||
*/ |
|||
@Component |
|||
@Slf4j |
|||
public class InstallScripts { |
|||
|
|||
public static final String APP_DIR = "application"; |
|||
public static final String SRC_DIR = "src"; |
|||
public static final String MAIN_DIR = "main"; |
|||
public static final String DATA_DIR = "data"; |
|||
public static final String JSON_DIR = "json"; |
|||
public static final String SYSTEM_DIR = "system"; |
|||
public static final String TENANT_DIR = "tenant"; |
|||
public static final String DEMO_DIR = "demo"; |
|||
public static final String RULE_CHAINS_DIR = "rule_chains"; |
|||
public static final String WIDGET_BUNDLES_DIR = "widget_bundles"; |
|||
public static final String DASHBOARDS_DIR = "dashboards"; |
|||
|
|||
public static final String JSON_EXT = ".json"; |
|||
|
|||
@Value("${install.data_dir:}") |
|||
private String dataDir; |
|||
|
|||
@Autowired |
|||
private RuleChainService ruleChainService; |
|||
|
|||
@Autowired |
|||
private DashboardService dashboardService; |
|||
|
|||
@Autowired |
|||
private WidgetTypeService widgetTypeService; |
|||
|
|||
@Autowired |
|||
private WidgetsBundleService widgetsBundleService; |
|||
|
|||
public Path getTenantRuleChainsDir() { |
|||
return Paths.get(getDataDir(), JSON_DIR, TENANT_DIR, RULE_CHAINS_DIR); |
|||
} |
|||
|
|||
public String getDataDir() { |
|||
if (!StringUtils.isEmpty(dataDir)) { |
|||
if (!Paths.get(this.dataDir).toFile().isDirectory()) { |
|||
throw new RuntimeException("'install.data_dir' property value is not a valid directory!"); |
|||
} |
|||
return dataDir; |
|||
} else { |
|||
String workDir = System.getProperty("user.dir"); |
|||
if (workDir.endsWith("application")) { |
|||
return Paths.get(workDir, SRC_DIR, MAIN_DIR, DATA_DIR).toString(); |
|||
} else { |
|||
Path dataDirPath = Paths.get(workDir, APP_DIR, SRC_DIR, MAIN_DIR, DATA_DIR); |
|||
if (Files.exists(dataDirPath)) { |
|||
return dataDirPath.toString(); |
|||
} else { |
|||
throw new RuntimeException("Not valid working directory: " + workDir + ". Please use either root project directory, application module directory or specify valid \"install.data_dir\" ENV variable to avoid automatic data directory lookup!"); |
|||
} |
|||
} |
|||
} |
|||
} |
|||
|
|||
public void createDefaultRuleChains(TenantId tenantId) throws IOException { |
|||
Path tenantChainsDir = getTenantRuleChainsDir(); |
|||
try (DirectoryStream<Path> dirStream = Files.newDirectoryStream(tenantChainsDir, path -> path.toString().endsWith(InstallScripts.JSON_EXT))) { |
|||
dirStream.forEach( |
|||
path -> { |
|||
try { |
|||
JsonNode ruleChainJson = objectMapper.readTree(path.toFile()); |
|||
RuleChain ruleChain = objectMapper.treeToValue(ruleChainJson.get("ruleChain"), RuleChain.class); |
|||
RuleChainMetaData ruleChainMetaData = objectMapper.treeToValue(ruleChainJson.get("metadata"), RuleChainMetaData.class); |
|||
|
|||
ruleChain.setTenantId(tenantId); |
|||
ruleChain = ruleChainService.saveRuleChain(ruleChain); |
|||
|
|||
ruleChainMetaData.setRuleChainId(ruleChain.getId()); |
|||
ruleChainService.saveRuleChainMetaData(ruleChainMetaData); |
|||
} catch (Exception e) { |
|||
log.error("Unable to load rule chain from json: [{}]", path.toString()); |
|||
throw new RuntimeException("Unable to load rule chain from json", e); |
|||
} |
|||
} |
|||
); |
|||
} |
|||
} |
|||
|
|||
public void loadSystemWidgets() throws Exception { |
|||
Path widgetBundlesDir = Paths.get(getDataDir(), JSON_DIR, SYSTEM_DIR, WIDGET_BUNDLES_DIR); |
|||
try (DirectoryStream<Path> dirStream = Files.newDirectoryStream(widgetBundlesDir, path -> path.toString().endsWith(JSON_EXT))) { |
|||
dirStream.forEach( |
|||
path -> { |
|||
try { |
|||
JsonNode widgetsBundleDescriptorJson = objectMapper.readTree(path.toFile()); |
|||
JsonNode widgetsBundleJson = widgetsBundleDescriptorJson.get("widgetsBundle"); |
|||
WidgetsBundle widgetsBundle = objectMapper.treeToValue(widgetsBundleJson, WidgetsBundle.class); |
|||
WidgetsBundle savedWidgetsBundle = widgetsBundleService.saveWidgetsBundle(widgetsBundle); |
|||
JsonNode widgetTypesArrayJson = widgetsBundleDescriptorJson.get("widgetTypes"); |
|||
widgetTypesArrayJson.forEach( |
|||
widgetTypeJson -> { |
|||
try { |
|||
WidgetType widgetType = objectMapper.treeToValue(widgetTypeJson, WidgetType.class); |
|||
widgetType.setBundleAlias(savedWidgetsBundle.getAlias()); |
|||
widgetTypeService.saveWidgetType(widgetType); |
|||
} catch (Exception e) { |
|||
log.error("Unable to load widget type from json: [{}]", path.toString()); |
|||
throw new RuntimeException("Unable to load widget type from json", e); |
|||
} |
|||
} |
|||
); |
|||
} catch (Exception e) { |
|||
log.error("Unable to load widgets bundle from json: [{}]", path.toString()); |
|||
throw new RuntimeException("Unable to load widgets bundle from json", e); |
|||
} |
|||
} |
|||
); |
|||
} |
|||
} |
|||
|
|||
public void loadDashboards(TenantId tenantId, CustomerId customerId) throws Exception { |
|||
Path dashboardsDir = Paths.get(getDataDir(), JSON_DIR, DEMO_DIR, DASHBOARDS_DIR); |
|||
try (DirectoryStream<Path> dirStream = Files.newDirectoryStream(dashboardsDir, path -> path.toString().endsWith(JSON_EXT))) { |
|||
dirStream.forEach( |
|||
path -> { |
|||
try { |
|||
JsonNode dashboardJson = objectMapper.readTree(path.toFile()); |
|||
Dashboard dashboard = objectMapper.treeToValue(dashboardJson, Dashboard.class); |
|||
dashboard.setTenantId(tenantId); |
|||
Dashboard savedDashboard = dashboardService.saveDashboard(dashboard); |
|||
if (customerId != null && !customerId.isNullUid()) { |
|||
dashboardService.assignDashboardToCustomer(savedDashboard.getId(), customerId); |
|||
} |
|||
} catch (Exception e) { |
|||
log.error("Unable to load dashboard from json: [{}]", path.toString()); |
|||
throw new RuntimeException("Unable to load dashboard from json", e); |
|||
} |
|||
} |
|||
); |
|||
} |
|||
} |
|||
|
|||
|
|||
} |
|||
@ -0,0 +1,157 @@ |
|||
/** |
|||
* Copyright © 2016-2018 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.rpc; |
|||
|
|||
import com.fasterxml.jackson.databind.ObjectMapper; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.http.HttpStatus; |
|||
import org.springframework.http.ResponseEntity; |
|||
import org.springframework.stereotype.Service; |
|||
import org.springframework.util.StringUtils; |
|||
import org.springframework.web.context.request.async.DeferredResult; |
|||
import org.thingsboard.server.actors.service.ActorService; |
|||
import org.thingsboard.server.common.data.audit.ActionType; |
|||
import org.thingsboard.server.common.data.id.DeviceId; |
|||
import org.thingsboard.server.common.data.id.EntityId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.data.id.UUIDBased; |
|||
import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody; |
|||
import org.thingsboard.server.common.msg.cluster.ServerAddress; |
|||
import org.thingsboard.server.common.msg.core.ToServerRpcResponseMsg; |
|||
import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest; |
|||
import org.thingsboard.server.controller.BaseController; |
|||
import org.thingsboard.server.dao.audit.AuditLogService; |
|||
import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg; |
|||
import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse; |
|||
import org.thingsboard.server.extensions.api.plugins.msg.RpcError; |
|||
import org.thingsboard.server.service.cluster.routing.ClusterRoutingService; |
|||
import org.thingsboard.server.service.cluster.rpc.ClusterRpcService; |
|||
import org.thingsboard.server.service.security.model.SecurityUser; |
|||
|
|||
import javax.annotation.PostConstruct; |
|||
import javax.annotation.PreDestroy; |
|||
import java.io.IOException; |
|||
import java.util.Optional; |
|||
import java.util.UUID; |
|||
import java.util.concurrent.ConcurrentHashMap; |
|||
import java.util.concurrent.ConcurrentMap; |
|||
import java.util.concurrent.Executors; |
|||
import java.util.concurrent.ScheduledExecutorService; |
|||
import java.util.concurrent.TimeUnit; |
|||
import java.util.function.BiConsumer; |
|||
import java.util.function.Consumer; |
|||
|
|||
/** |
|||
* Created by ashvayka on 27.03.18. |
|||
*/ |
|||
@Service |
|||
@Slf4j |
|||
public class DefaultDeviceRpcService implements DeviceRpcService { |
|||
|
|||
@Autowired |
|||
private ClusterRoutingService routingService; |
|||
|
|||
@Autowired |
|||
private ClusterRpcService rpcService; |
|||
|
|||
@Autowired |
|||
private ActorService actorService; |
|||
|
|||
@Autowired |
|||
private AuditLogService auditLogService; |
|||
|
|||
private ScheduledExecutorService rpcCallBackExecutor; |
|||
|
|||
private final ConcurrentMap<UUID, Consumer<FromDeviceRpcResponse>> localRpcRequests = new ConcurrentHashMap<>(); |
|||
|
|||
|
|||
@PostConstruct |
|||
public void initExecutor() { |
|||
rpcCallBackExecutor = Executors.newSingleThreadScheduledExecutor(); |
|||
} |
|||
|
|||
@PreDestroy |
|||
public void shutdownExecutor() { |
|||
if (rpcCallBackExecutor != null) { |
|||
rpcCallBackExecutor.shutdownNow(); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public void process(ToDeviceRpcRequest request, Consumer<FromDeviceRpcResponse> responseConsumer) { |
|||
log.trace("[{}] Processing local rpc call for device [{}]", request.getTenantId(), request.getDeviceId()); |
|||
sendRpcRequest(request); |
|||
UUID requestId = request.getId(); |
|||
localRpcRequests.put(requestId, responseConsumer); |
|||
long timeout = Math.max(0, request.getExpirationTime() - System.currentTimeMillis()); |
|||
log.error("[{}] processing the request: [{}]", this.hashCode(), requestId); |
|||
rpcCallBackExecutor.schedule(() -> { |
|||
log.error("[{}] timeout the request: [{}]", this.hashCode(), requestId); |
|||
Consumer<FromDeviceRpcResponse> consumer = localRpcRequests.remove(requestId); |
|||
if (consumer != null) { |
|||
consumer.accept(new FromDeviceRpcResponse(requestId, null, RpcError.TIMEOUT)); |
|||
} |
|||
}, timeout, TimeUnit.MILLISECONDS); |
|||
} |
|||
|
|||
@Override |
|||
public void process(ToDeviceRpcRequest request, ServerAddress originator) { |
|||
// if (pluginServerAddress.isPresent()) {
|
|||
// systemContext.getRpcService().tell(pluginServerAddress.get(), responsePluginMsg);
|
|||
// logger.debug("[{}] Rpc command response sent to remote plugin actor [{}]!", deviceId, requestMd.getMsg().getMsg().getId());
|
|||
// } else {
|
|||
// context.parent().tell(responsePluginMsg, ActorRef.noSender());
|
|||
// logger.debug("[{}] Rpc command response sent to local plugin actor [{}]!", deviceId, requestMd.getMsg().getMsg().getId());
|
|||
// }
|
|||
} |
|||
|
|||
@Override |
|||
public void process(FromDeviceRpcResponse response) { |
|||
log.error("[{}] response the request: [{}]", this.hashCode(), response.getId()); |
|||
//TODO: send to another server if needed.
|
|||
UUID requestId = response.getId(); |
|||
Consumer<FromDeviceRpcResponse> consumer = localRpcRequests.remove(requestId); |
|||
if (consumer != null) { |
|||
consumer.accept(response); |
|||
} else { |
|||
log.trace("[{}] Unknown or stale rpc response received [{}]", requestId, response); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public void sendRpcReplyToDevice(TenantId tenantId, DeviceId deviceId, int requestId, String body) { |
|||
ToServerRpcResponseActorMsg rpcMsg = new ToServerRpcResponseActorMsg(tenantId, deviceId, new ToServerRpcResponseMsg(requestId, body)); |
|||
forward(deviceId, rpcMsg, rpcService::tell); |
|||
} |
|||
|
|||
private void sendRpcRequest(ToDeviceRpcRequest msg) { |
|||
log.trace("[{}] Forwarding msg {} to device actor!", msg.getDeviceId(), msg); |
|||
ToDeviceRpcRequestActorMsg rpcMsg = new ToDeviceRpcRequestActorMsg(msg); |
|||
forward(msg.getDeviceId(), rpcMsg, rpcService::tell); |
|||
} |
|||
|
|||
private <T extends ToDeviceActorNotificationMsg> void forward(DeviceId deviceId, T msg, BiConsumer<ServerAddress, T> rpcFunction) { |
|||
Optional<ServerAddress> instance = routingService.resolveById(deviceId); |
|||
if (instance.isPresent()) { |
|||
log.trace("[{}] Forwarding msg {} to remote device actor!", msg.getTenantId(), msg); |
|||
rpcFunction.accept(instance.get(), msg); |
|||
} else { |
|||
log.trace("[{}] Forwarding msg {} to local device actor!", msg.getTenantId(), msg); |
|||
actorService.onMsg(msg); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,39 @@ |
|||
/** |
|||
* Copyright © 2016-2018 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.rpc; |
|||
|
|||
import org.thingsboard.server.common.data.id.DeviceId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.msg.cluster.ServerAddress; |
|||
import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest; |
|||
import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse; |
|||
|
|||
import java.util.function.Consumer; |
|||
|
|||
/** |
|||
* Created by ashvayka on 16.04.18. |
|||
*/ |
|||
public interface DeviceRpcService { |
|||
|
|||
void process(ToDeviceRpcRequest request, Consumer<FromDeviceRpcResponse> responseConsumer); |
|||
|
|||
void process(ToDeviceRpcRequest request, ServerAddress originator); |
|||
|
|||
void process(FromDeviceRpcResponse response); |
|||
|
|||
void sendRpcReplyToDevice(TenantId tenantId, DeviceId deviceId, int requestId, String body); |
|||
|
|||
} |
|||
@ -0,0 +1,60 @@ |
|||
/** |
|||
* Copyright © 2016-2018 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.rpc; |
|||
|
|||
import lombok.Getter; |
|||
import lombok.RequiredArgsConstructor; |
|||
import lombok.ToString; |
|||
import org.thingsboard.server.common.data.id.DeviceId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.msg.MsgType; |
|||
import org.thingsboard.server.common.msg.cluster.ServerAddress; |
|||
import org.thingsboard.server.common.msg.core.ToServerRpcResponseMsg; |
|||
import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg; |
|||
|
|||
import java.util.Optional; |
|||
|
|||
/** |
|||
* Created by ashvayka on 16.04.18. |
|||
*/ |
|||
@ToString |
|||
@RequiredArgsConstructor |
|||
public class ToServerRpcResponseActorMsg implements ToDeviceActorNotificationMsg { |
|||
|
|||
private final ServerAddress serverAddress; |
|||
|
|||
@Getter |
|||
private final TenantId tenantId; |
|||
|
|||
@Getter |
|||
private final DeviceId deviceId; |
|||
|
|||
@Getter |
|||
private final ToServerRpcResponseMsg msg; |
|||
|
|||
public ToServerRpcResponseActorMsg(TenantId tenantId, DeviceId deviceId, ToServerRpcResponseMsg msg) { |
|||
this(null, tenantId, deviceId, msg); |
|||
} |
|||
|
|||
public Optional<ServerAddress> getServerAddress() { |
|||
return Optional.ofNullable(serverAddress); |
|||
} |
|||
|
|||
@Override |
|||
public MsgType getMsgType() { |
|||
return MsgType.SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG; |
|||
} |
|||
} |
|||
@ -0,0 +1,390 @@ |
|||
/** |
|||
* Copyright © 2016-2018 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.state; |
|||
|
|||
import com.datastax.driver.core.utils.UUIDs; |
|||
import com.fasterxml.jackson.databind.ObjectMapper; |
|||
import com.google.common.base.Function; |
|||
import com.google.common.util.concurrent.FutureCallback; |
|||
import com.google.common.util.concurrent.Futures; |
|||
import com.google.common.util.concurrent.ListenableFuture; |
|||
import com.google.common.util.concurrent.ListeningScheduledExecutorService; |
|||
import com.google.common.util.concurrent.MoreExecutors; |
|||
import lombok.Getter; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.beans.factory.annotation.Value; |
|||
import org.springframework.stereotype.Service; |
|||
import org.thingsboard.server.actors.service.ActorService; |
|||
import org.thingsboard.server.common.data.DataConstants; |
|||
import org.thingsboard.server.common.data.Device; |
|||
import org.thingsboard.server.common.data.Tenant; |
|||
import org.thingsboard.server.common.data.id.DeviceId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.data.kv.AttributeKvEntry; |
|||
import org.thingsboard.server.common.data.page.TextPageLink; |
|||
import org.thingsboard.server.common.msg.TbMsg; |
|||
import org.thingsboard.server.common.msg.TbMsgDataType; |
|||
import org.thingsboard.server.common.msg.TbMsgMetaData; |
|||
import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg; |
|||
import org.thingsboard.server.dao.attributes.AttributesService; |
|||
import org.thingsboard.server.dao.device.DeviceService; |
|||
import org.thingsboard.server.dao.tenant.TenantService; |
|||
import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService; |
|||
|
|||
import javax.annotation.Nullable; |
|||
import javax.annotation.PostConstruct; |
|||
import javax.annotation.PreDestroy; |
|||
import java.util.ArrayList; |
|||
import java.util.Arrays; |
|||
import java.util.HashSet; |
|||
import java.util.List; |
|||
import java.util.Optional; |
|||
import java.util.Set; |
|||
import java.util.concurrent.ConcurrentHashMap; |
|||
import java.util.concurrent.ConcurrentMap; |
|||
import java.util.concurrent.ExecutionException; |
|||
import java.util.concurrent.Executors; |
|||
import java.util.concurrent.TimeUnit; |
|||
|
|||
import static org.thingsboard.server.common.data.DataConstants.ACTIVITY_EVENT; |
|||
import static org.thingsboard.server.common.data.DataConstants.CONNECT_EVENT; |
|||
import static org.thingsboard.server.common.data.DataConstants.DISCONNECT_EVENT; |
|||
import static org.thingsboard.server.common.data.DataConstants.INACTIVITY_EVENT; |
|||
|
|||
/** |
|||
* Created by ashvayka on 01.05.18. |
|||
*/ |
|||
@Service |
|||
@Slf4j |
|||
//TODO: refactor to use page links as cursor and not fetch all
|
|||
public class DefaultDeviceStateService implements DeviceStateService { |
|||
|
|||
private static final ObjectMapper json = new ObjectMapper(); |
|||
public static final String ACTIVITY_STATE = "active"; |
|||
public static final String LAST_CONNECT_TIME = "lastConnectTime"; |
|||
public static final String LAST_DISCONNECT_TIME = "lastDisconnectTime"; |
|||
public static final String LAST_ACTIVITY_TIME = "lastActivityTime"; |
|||
public static final String INACTIVITY_ALARM_TIME = "inactivityAlarmTime"; |
|||
public static final String INACTIVITY_TIMEOUT = "inactivityTimeout"; |
|||
|
|||
public static final List<String> PERSISTENT_ATTRIBUTES = Arrays.asList(ACTIVITY_STATE, LAST_CONNECT_TIME, LAST_DISCONNECT_TIME, LAST_ACTIVITY_TIME, INACTIVITY_ALARM_TIME, INACTIVITY_TIMEOUT); |
|||
|
|||
@Autowired |
|||
private TenantService tenantService; |
|||
|
|||
@Autowired |
|||
private DeviceService deviceService; |
|||
|
|||
@Autowired |
|||
private AttributesService attributesService; |
|||
|
|||
@Autowired |
|||
private ActorService actorService; |
|||
|
|||
@Autowired |
|||
private TelemetrySubscriptionService tsSubService; |
|||
|
|||
@Value("${state.defaultInactivityTimeoutInSec}") |
|||
@Getter |
|||
private long defaultInactivityTimeoutInSec; |
|||
|
|||
@Value("${state.defaultStateCheckIntervalInSec}") |
|||
@Getter |
|||
private long defaultStateCheckIntervalInSec; |
|||
|
|||
// TODO in v2.1
|
|||
// @Value("${state.defaultStatePersistenceIntervalInSec}")
|
|||
// @Getter
|
|||
// private long defaultStatePersistenceIntervalInSec;
|
|||
//
|
|||
// @Value("${state.defaultStatePersistencePack}")
|
|||
// @Getter
|
|||
// private long defaultStatePersistencePack;
|
|||
|
|||
private ListeningScheduledExecutorService queueExecutor; |
|||
|
|||
private ConcurrentMap<TenantId, Set<DeviceId>> tenantDevices = new ConcurrentHashMap<>(); |
|||
private ConcurrentMap<DeviceId, DeviceStateData> deviceStates = new ConcurrentHashMap<>(); |
|||
|
|||
@PostConstruct |
|||
public void init() { |
|||
// Should be always single threaded due to absence of locks.
|
|||
queueExecutor = MoreExecutors.listeningDecorator(Executors.newSingleThreadScheduledExecutor()); |
|||
queueExecutor.submit(this::initStateFromDB); |
|||
queueExecutor.scheduleAtFixedRate(this::updateState, defaultStateCheckIntervalInSec, defaultStateCheckIntervalInSec, TimeUnit.SECONDS); |
|||
//TODO: schedule persistence in v2.1;
|
|||
} |
|||
|
|||
@PreDestroy |
|||
public void stop() { |
|||
if (queueExecutor != null) { |
|||
queueExecutor.shutdownNow(); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public void onDeviceAdded(Device device) { |
|||
queueExecutor.submit(() -> onDeviceAddedSync(device)); |
|||
} |
|||
|
|||
@Override |
|||
public void onDeviceUpdated(Device device) { |
|||
queueExecutor.submit(() -> onDeviceUpdatedSync(device)); |
|||
} |
|||
|
|||
@Override |
|||
public void onDeviceConnect(DeviceId deviceId) { |
|||
queueExecutor.submit(() -> onDeviceConnectSync(deviceId)); |
|||
} |
|||
|
|||
@Override |
|||
public void onDeviceActivity(DeviceId deviceId) { |
|||
queueExecutor.submit(() -> onDeviceActivitySync(deviceId)); |
|||
} |
|||
|
|||
@Override |
|||
public void onDeviceDisconnect(DeviceId deviceId) { |
|||
queueExecutor.submit(() -> onDeviceDisconnectSync(deviceId)); |
|||
} |
|||
|
|||
@Override |
|||
public void onDeviceDeleted(Device device) { |
|||
queueExecutor.submit(() -> onDeviceDeleted(device.getTenantId(), device.getId())); |
|||
} |
|||
|
|||
@Override |
|||
public void onDeviceInactivityTimeoutUpdate(DeviceId deviceId, long inactivityTimeout) { |
|||
queueExecutor.submit(() -> onInactivityTimeoutUpdate(deviceId, inactivityTimeout)); |
|||
} |
|||
|
|||
@Override |
|||
public Optional<DeviceState> getDeviceState(DeviceId deviceId) { |
|||
DeviceStateData state = deviceStates.get(deviceId); |
|||
if (state != null) { |
|||
return Optional.of(state.getState()); |
|||
} else { |
|||
return Optional.empty(); |
|||
} |
|||
} |
|||
|
|||
private void initStateFromDB() { |
|||
List<Tenant> tenants = tenantService.findTenants(new TextPageLink(Integer.MAX_VALUE)).getData(); |
|||
for (Tenant tenant : tenants) { |
|||
List<ListenableFuture<DeviceStateData>> fetchFutures = new ArrayList<>(); |
|||
List<Device> devices = deviceService.findDevicesByTenantId(tenant.getId(), new TextPageLink(Integer.MAX_VALUE)).getData(); |
|||
for (Device device : devices) { |
|||
fetchFutures.add(fetchDeviceState(device)); |
|||
} |
|||
try { |
|||
Futures.successfulAsList(fetchFutures).get().forEach(this::addDeviceUsingState); |
|||
} catch (InterruptedException | ExecutionException e) { |
|||
log.warn("Failed to init device state service from DB", e); |
|||
} |
|||
} |
|||
} |
|||
|
|||
private void addDeviceUsingState(DeviceStateData state) { |
|||
tenantDevices.computeIfAbsent(state.getTenantId(), id -> ConcurrentHashMap.newKeySet()).add(state.getDeviceId()); |
|||
deviceStates.put(state.getDeviceId(), state); |
|||
} |
|||
|
|||
private void updateState() { |
|||
long ts = System.currentTimeMillis(); |
|||
Set<DeviceId> deviceIds = new HashSet<>(deviceStates.keySet()); |
|||
for (DeviceId deviceId : deviceIds) { |
|||
DeviceStateData stateData = deviceStates.get(deviceId); |
|||
DeviceState state = stateData.getState(); |
|||
state.setActive(ts < state.getLastActivityTime() + state.getInactivityTimeout()); |
|||
if (!state.isActive() && state.getLastInactivityAlarmTime() < state.getLastActivityTime()) { |
|||
state.setLastInactivityAlarmTime(ts); |
|||
pushRuleEngineMessage(stateData, INACTIVITY_EVENT); |
|||
saveAttribute(deviceId, INACTIVITY_ALARM_TIME, ts); |
|||
saveAttribute(deviceId, ACTIVITY_STATE, state.isActive()); |
|||
} |
|||
} |
|||
} |
|||
|
|||
private void onDeviceConnectSync(DeviceId deviceId) { |
|||
DeviceStateData stateData = deviceStates.get(deviceId); |
|||
if (stateData != null) { |
|||
long ts = System.currentTimeMillis(); |
|||
stateData.getState().setLastConnectTime(ts); |
|||
pushRuleEngineMessage(stateData, CONNECT_EVENT); |
|||
saveAttribute(deviceId, LAST_CONNECT_TIME, ts); |
|||
} |
|||
} |
|||
|
|||
private void onDeviceDisconnectSync(DeviceId deviceId) { |
|||
DeviceStateData stateData = deviceStates.get(deviceId); |
|||
if (stateData != null) { |
|||
long ts = System.currentTimeMillis(); |
|||
stateData.getState().setLastDisconnectTime(ts); |
|||
pushRuleEngineMessage(stateData, DISCONNECT_EVENT); |
|||
saveAttribute(deviceId, LAST_DISCONNECT_TIME, ts); |
|||
} |
|||
} |
|||
|
|||
private void onDeviceActivitySync(DeviceId deviceId) { |
|||
DeviceStateData stateData = deviceStates.get(deviceId); |
|||
if (stateData != null) { |
|||
DeviceState state = stateData.getState(); |
|||
long ts = System.currentTimeMillis(); |
|||
state.setActive(true); |
|||
stateData.getState().setLastActivityTime(ts); |
|||
pushRuleEngineMessage(stateData, ACTIVITY_EVENT); |
|||
saveAttribute(deviceId, LAST_ACTIVITY_TIME, ts); |
|||
saveAttribute(deviceId, ACTIVITY_STATE, state.isActive()); |
|||
} |
|||
} |
|||
|
|||
private void onInactivityTimeoutUpdate(DeviceId deviceId, long inactivityTimeout) { |
|||
if (inactivityTimeout == 0L) { |
|||
return; |
|||
} |
|||
DeviceStateData stateData = deviceStates.get(deviceId); |
|||
if (stateData != null) { |
|||
long ts = System.currentTimeMillis(); |
|||
DeviceState state = stateData.getState(); |
|||
state.setInactivityTimeout(inactivityTimeout); |
|||
boolean oldActive = state.isActive(); |
|||
state.setActive(ts < state.getLastActivityTime() + state.getInactivityTimeout()); |
|||
if (!oldActive && state.isActive()) { |
|||
saveAttribute(deviceId, ACTIVITY_STATE, state.isActive()); |
|||
} |
|||
} |
|||
} |
|||
|
|||
private void onDeviceAddedSync(Device device) { |
|||
Futures.addCallback(fetchDeviceState(device), new FutureCallback<DeviceStateData>() { |
|||
@Override |
|||
public void onSuccess(@Nullable DeviceStateData state) { |
|||
addDeviceUsingState(state); |
|||
} |
|||
|
|||
@Override |
|||
public void onFailure(Throwable t) { |
|||
log.warn("Failed to register device to the state service", t); |
|||
} |
|||
}); |
|||
} |
|||
|
|||
private void onDeviceUpdatedSync(Device device) { |
|||
DeviceStateData stateData = deviceStates.get(device.getId()); |
|||
if (stateData != null) { |
|||
TbMsgMetaData md = new TbMsgMetaData(); |
|||
md.putValue("deviceName", device.getName()); |
|||
md.putValue("deviceType", device.getType()); |
|||
stateData.setMetaData(md); |
|||
} |
|||
} |
|||
|
|||
private void onDeviceDeleted(TenantId tenantId, DeviceId deviceId) { |
|||
deviceStates.remove(deviceId); |
|||
Set<DeviceId> deviceIds = tenantDevices.get(tenantId); |
|||
if (deviceIds != null) { |
|||
deviceIds.remove(deviceId); |
|||
if (deviceIds.isEmpty()) { |
|||
tenantDevices.remove(tenantId); |
|||
} |
|||
} |
|||
} |
|||
|
|||
private ListenableFuture<DeviceStateData> fetchDeviceState(Device device) { |
|||
ListenableFuture<List<AttributeKvEntry>> attributes = attributesService.find(device.getId(), DataConstants.SERVER_SCOPE, PERSISTENT_ATTRIBUTES); |
|||
return Futures.transform(attributes, new Function<List<AttributeKvEntry>, DeviceStateData>() { |
|||
@Nullable |
|||
@Override |
|||
public DeviceStateData apply(@Nullable List<AttributeKvEntry> attributes) { |
|||
long lastActivityTime = getAttributeValue(attributes, LAST_ACTIVITY_TIME, 0L); |
|||
long inactivityAlarmTime = getAttributeValue(attributes, INACTIVITY_ALARM_TIME, 0L); |
|||
long inactivityTimeout = getAttributeValue(attributes, INACTIVITY_TIMEOUT, TimeUnit.SECONDS.toMillis(defaultInactivityTimeoutInSec)); |
|||
boolean active = System.currentTimeMillis() < lastActivityTime + inactivityTimeout; |
|||
DeviceState deviceState = DeviceState.builder() |
|||
.active(active) |
|||
.lastConnectTime(getAttributeValue(attributes, LAST_CONNECT_TIME, 0L)) |
|||
.lastDisconnectTime(getAttributeValue(attributes, LAST_DISCONNECT_TIME, 0L)) |
|||
.lastActivityTime(lastActivityTime) |
|||
.lastInactivityAlarmTime(inactivityAlarmTime) |
|||
.inactivityTimeout(inactivityTimeout) |
|||
.build(); |
|||
TbMsgMetaData md = new TbMsgMetaData(); |
|||
md.putValue("deviceName", device.getName()); |
|||
md.putValue("deviceType", device.getType()); |
|||
return DeviceStateData.builder() |
|||
.tenantId(device.getTenantId()) |
|||
.deviceId(device.getId()) |
|||
.metaData(md) |
|||
.state(deviceState).build(); |
|||
} |
|||
}); |
|||
} |
|||
|
|||
private long getLastPersistTime(List<AttributeKvEntry> attributes) { |
|||
return attributes.stream().map(AttributeKvEntry::getLastUpdateTs).max(Long::compare).orElse(0L); |
|||
} |
|||
|
|||
private long getAttributeValue(List<AttributeKvEntry> attributes, String attributeName, long defaultValue) { |
|||
for (AttributeKvEntry attribute : attributes) { |
|||
if (attribute.getKey().equals(attributeName)) { |
|||
return attribute.getLongValue().orElse(defaultValue); |
|||
} |
|||
} |
|||
return defaultValue; |
|||
} |
|||
|
|||
private void pushRuleEngineMessage(DeviceStateData stateData, String msgType) { |
|||
DeviceState state = stateData.getState(); |
|||
try { |
|||
TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), msgType, stateData.getDeviceId(), stateData.getMetaData(), TbMsgDataType.JSON |
|||
, json.writeValueAsString(state) |
|||
, null, null, 0L); |
|||
actorService.onMsg(new ServiceToRuleEngineMsg(stateData.getTenantId(), tbMsg)); |
|||
} catch (Exception e) { |
|||
log.warn("[{}] Failed to push inactivity alarm: {}", stateData.getDeviceId(), state, e); |
|||
} |
|||
} |
|||
|
|||
private void saveAttribute(DeviceId deviceId, String key, long value) { |
|||
tsSubService.saveAttrAndNotify(deviceId, DataConstants.SERVER_SCOPE, key, value, new AttributeSaveCallback(deviceId, key, value)); |
|||
} |
|||
|
|||
private void saveAttribute(DeviceId deviceId, String key, boolean value) { |
|||
tsSubService.saveAttrAndNotify(deviceId, DataConstants.SERVER_SCOPE, key, value, new AttributeSaveCallback(deviceId, key, value)); |
|||
} |
|||
|
|||
private class AttributeSaveCallback implements FutureCallback<Void> { |
|||
private final DeviceId deviceId; |
|||
private final String key; |
|||
private final Object value; |
|||
|
|||
AttributeSaveCallback(DeviceId deviceId, String key, Object value) { |
|||
this.deviceId = deviceId; |
|||
this.key = key; |
|||
this.value = value; |
|||
} |
|||
|
|||
@Override |
|||
public void onSuccess(@Nullable Void result) { |
|||
log.trace("[{}] Successfully updated attribute [{}] with value [{}]", deviceId, key, value); |
|||
} |
|||
|
|||
@Override |
|||
public void onFailure(Throwable t) { |
|||
log.warn("[{}] Failed to update attribute [{}] with value [{}]", deviceId, key, value, t); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,35 @@ |
|||
/** |
|||
* Copyright © 2016-2018 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.state; |
|||
|
|||
import lombok.Builder; |
|||
import lombok.Data; |
|||
|
|||
/** |
|||
* Created by ashvayka on 01.05.18. |
|||
*/ |
|||
@Data |
|||
@Builder |
|||
public class DeviceState { |
|||
|
|||
private boolean active; |
|||
private long lastConnectTime; |
|||
private long lastActivityTime; |
|||
private long lastDisconnectTime; |
|||
private long lastInactivityAlarmTime; |
|||
private long inactivityTimeout; |
|||
|
|||
} |
|||
@ -0,0 +1,37 @@ |
|||
/** |
|||
* Copyright © 2016-2018 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.state; |
|||
|
|||
import lombok.Builder; |
|||
import lombok.Data; |
|||
import org.thingsboard.server.common.data.id.DeviceId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.msg.TbMsgMetaData; |
|||
|
|||
/** |
|||
* Created by ashvayka on 01.05.18. |
|||
*/ |
|||
@Data |
|||
@Builder |
|||
class DeviceStateData { |
|||
|
|||
private final TenantId tenantId; |
|||
private final DeviceId deviceId; |
|||
|
|||
private TbMsgMetaData metaData; |
|||
private final DeviceState state; |
|||
|
|||
} |
|||
@ -0,0 +1,44 @@ |
|||
/** |
|||
* Copyright © 2016-2018 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.state; |
|||
|
|||
import org.thingsboard.server.common.data.Device; |
|||
import org.thingsboard.server.common.data.id.DeviceId; |
|||
|
|||
import java.util.Optional; |
|||
|
|||
/** |
|||
* Created by ashvayka on 01.05.18. |
|||
*/ |
|||
public interface DeviceStateService { |
|||
|
|||
void onDeviceAdded(Device device); |
|||
|
|||
void onDeviceUpdated(Device device); |
|||
|
|||
void onDeviceDeleted(Device device); |
|||
|
|||
void onDeviceConnect(DeviceId deviceId); |
|||
|
|||
void onDeviceActivity(DeviceId deviceId); |
|||
|
|||
void onDeviceDisconnect(DeviceId deviceId); |
|||
|
|||
void onDeviceInactivityTimeoutUpdate(DeviceId deviceId, long inactivityTimeout); |
|||
|
|||
Optional<DeviceState> getDeviceState(DeviceId deviceId); |
|||
|
|||
} |
|||
@ -1,27 +0,0 @@ |
|||
/** |
|||
* Copyright © 2016-2018 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors; |
|||
|
|||
import org.junit.extensions.cpsuite.ClasspathSuite; |
|||
import org.junit.runner.RunWith; |
|||
|
|||
/** |
|||
* @author Andrew Shvayka |
|||
*/ |
|||
@RunWith(ClasspathSuite.class) |
|||
@ClasspathSuite.ClassnameFilters({"org.thingsboard.server.actors.*Test"}) |
|||
public class ActorsTestSuite { |
|||
} |
|||
@ -1,245 +0,0 @@ |
|||
/** |
|||
* Copyright © 2016-2018 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors; |
|||
|
|||
import static org.mockito.Matchers.any; |
|||
import static org.mockito.Mockito.mock; |
|||
import static org.mockito.Mockito.verify; |
|||
import static org.mockito.Mockito.when; |
|||
|
|||
import java.util.*; |
|||
|
|||
import com.google.common.util.concurrent.Futures; |
|||
import org.thingsboard.server.actors.service.DefaultActorService; |
|||
import org.thingsboard.server.common.data.id.*; |
|||
import org.thingsboard.server.common.data.kv.TsKvEntry; |
|||
import org.thingsboard.server.common.data.page.TextPageData; |
|||
import org.thingsboard.server.common.data.plugin.ComponentDescriptor; |
|||
import org.thingsboard.server.common.data.plugin.ComponentLifecycleState; |
|||
import org.thingsboard.server.common.data.plugin.ComponentType; |
|||
import org.thingsboard.server.common.msg.session.*; |
|||
import org.thingsboard.server.dao.attributes.AttributesService; |
|||
import org.thingsboard.server.dao.event.EventService; |
|||
import org.thingsboard.server.gen.discovery.ServerInstanceProtos; |
|||
import org.thingsboard.server.service.cluster.discovery.DiscoveryService; |
|||
import org.thingsboard.server.service.cluster.discovery.ServerInstance; |
|||
import org.thingsboard.server.service.cluster.routing.ClusterRoutingService; |
|||
import org.thingsboard.server.service.cluster.rpc.ClusterRpcService; |
|||
import org.thingsboard.server.service.component.ComponentDiscoveryService; |
|||
import org.thingsboard.server.common.transport.auth.DeviceAuthResult; |
|||
import org.thingsboard.server.common.transport.auth.DeviceAuthService; |
|||
import org.thingsboard.server.common.data.DataConstants; |
|||
import org.thingsboard.server.common.data.Device; |
|||
import org.thingsboard.server.common.data.Tenant; |
|||
import org.thingsboard.server.common.data.kv.BasicTsKvEntry; |
|||
import org.thingsboard.server.common.data.kv.KvEntry; |
|||
import org.thingsboard.server.common.data.kv.StringDataEntry; |
|||
import org.thingsboard.server.common.data.plugin.PluginMetaData; |
|||
import org.thingsboard.server.common.data.rule.RuleMetaData; |
|||
import org.thingsboard.server.common.data.security.DeviceCredentialsFilter; |
|||
import org.thingsboard.server.common.data.security.DeviceTokenCredentials; |
|||
import org.thingsboard.server.common.msg.core.BasicTelemetryUploadRequest; |
|||
import org.thingsboard.server.dao.device.DeviceService; |
|||
import org.thingsboard.server.dao.model.ModelConstants; |
|||
import org.thingsboard.server.dao.plugin.PluginService; |
|||
import org.thingsboard.server.dao.rule.RuleService; |
|||
import org.thingsboard.server.dao.tenant.TenantService; |
|||
import org.thingsboard.server.dao.timeseries.TimeseriesService; |
|||
import org.thingsboard.server.extensions.core.plugin.telemetry.TelemetryStoragePlugin; |
|||
import org.junit.After; |
|||
import org.junit.Before; |
|||
import org.junit.Test; |
|||
import org.mockito.Mockito; |
|||
import org.springframework.test.util.ReflectionTestUtils; |
|||
|
|||
import com.fasterxml.jackson.databind.JsonNode; |
|||
import com.fasterxml.jackson.databind.ObjectMapper; |
|||
|
|||
public class DefaultActorServiceTest { |
|||
|
|||
private static final TenantId SYSTEM_TENANT = new TenantId(ModelConstants.NULL_UUID); |
|||
|
|||
private static final String PLUGIN_ID = "9fb2e951-e298-4acb-913a-db69af8a15f4"; |
|||
private static final String FILTERS_CONFIGURATION = |
|||
"[{\"clazz\":\"org.thingsboard.server.extensions.core.filter.MsgTypeFilter\", \"name\":\"TelemetryFilter\", \"configuration\": {\"messageTypes\":[\"POST_TELEMETRY\",\"POST_ATTRIBUTES\",\"GET_ATTRIBUTES\"]}}]"; |
|||
private static final String ACTION_CONFIGURATION = "{\"pluginToken\":\"telemetry\", \"clazz\":\"org.thingsboard.server.extensions.core.action.telemetry.TelemetryPluginAction\", \"name\":\"TelemetryMsgConverterAction\", \"configuration\":{}}"; |
|||
private static final String PLUGIN_CONFIGURATION = "{}"; |
|||
private DefaultActorService actorService; |
|||
private ActorSystemContext actorContext; |
|||
|
|||
private PluginService pluginService; |
|||
private RuleService ruleService; |
|||
private DeviceAuthService deviceAuthService; |
|||
private DeviceService deviceService; |
|||
private TimeseriesService tsService; |
|||
private TenantService tenantService; |
|||
private ClusterRpcService rpcService; |
|||
private DiscoveryService discoveryService; |
|||
private ClusterRoutingService routingService; |
|||
private AttributesService attributesService; |
|||
private ComponentDiscoveryService componentService; |
|||
private EventService eventService; |
|||
private ServerInstance serverInstance; |
|||
|
|||
private RuleMetaData ruleMock; |
|||
private PluginMetaData pluginMock; |
|||
private RuleId ruleId = new RuleId(UUID.randomUUID()); |
|||
private PluginId pluginId = new PluginId(UUID.fromString(PLUGIN_ID)); |
|||
private TenantId tenantId = new TenantId(UUID.randomUUID()); |
|||
|
|||
|
|||
@Before |
|||
public void before() throws Exception { |
|||
actorService = new DefaultActorService(); |
|||
actorContext = new ActorSystemContext(); |
|||
|
|||
tenantService = mock(TenantService.class); |
|||
pluginService = mock(PluginService.class); |
|||
ruleService = mock(RuleService.class); |
|||
deviceAuthService = mock(DeviceAuthService.class); |
|||
deviceService = mock(DeviceService.class); |
|||
tsService = mock(TimeseriesService.class); |
|||
rpcService = mock(ClusterRpcService.class); |
|||
discoveryService = mock(DiscoveryService.class); |
|||
routingService = mock(ClusterRoutingService.class); |
|||
attributesService = mock(AttributesService.class); |
|||
componentService = mock(ComponentDiscoveryService.class); |
|||
eventService = mock(EventService.class); |
|||
serverInstance = new ServerInstance(ServerInstanceProtos.ServerInfo.newBuilder().setHost("localhost").setPort(8080).build()); |
|||
|
|||
ReflectionTestUtils.setField(actorService, "actorContext", actorContext); |
|||
ReflectionTestUtils.setField(actorService, "rpcService", rpcService); |
|||
ReflectionTestUtils.setField(actorService, "discoveryService", discoveryService); |
|||
|
|||
ReflectionTestUtils.setField(actorContext, "syncSessionTimeout", 10000L); |
|||
ReflectionTestUtils.setField(actorContext, "pluginActorTerminationDelay", 10000L); |
|||
ReflectionTestUtils.setField(actorContext, "pluginErrorPersistFrequency", 10000L); |
|||
ReflectionTestUtils.setField(actorContext, "ruleActorTerminationDelay", 10000L); |
|||
ReflectionTestUtils.setField(actorContext, "ruleErrorPersistFrequency", 10000L); |
|||
ReflectionTestUtils.setField(actorContext, "pluginProcessingTimeout", 60000L); |
|||
ReflectionTestUtils.setField(actorContext, "tenantService", tenantService); |
|||
ReflectionTestUtils.setField(actorContext, "pluginService", pluginService); |
|||
ReflectionTestUtils.setField(actorContext, "ruleService", ruleService); |
|||
ReflectionTestUtils.setField(actorContext, "deviceAuthService", deviceAuthService); |
|||
ReflectionTestUtils.setField(actorContext, "deviceService", deviceService); |
|||
ReflectionTestUtils.setField(actorContext, "tsService", tsService); |
|||
ReflectionTestUtils.setField(actorContext, "rpcService", rpcService); |
|||
ReflectionTestUtils.setField(actorContext, "discoveryService", discoveryService); |
|||
ReflectionTestUtils.setField(actorContext, "tsService", tsService); |
|||
ReflectionTestUtils.setField(actorContext, "routingService", routingService); |
|||
ReflectionTestUtils.setField(actorContext, "attributesService", attributesService); |
|||
ReflectionTestUtils.setField(actorContext, "componentService", componentService); |
|||
ReflectionTestUtils.setField(actorContext, "eventService", eventService); |
|||
|
|||
|
|||
when(routingService.resolveById((EntityId) any())).thenReturn(Optional.empty()); |
|||
|
|||
when(discoveryService.getCurrentServer()).thenReturn(serverInstance); |
|||
|
|||
ruleMock = mock(RuleMetaData.class); |
|||
when(ruleMock.getId()).thenReturn(ruleId); |
|||
when(ruleMock.getState()).thenReturn(ComponentLifecycleState.ACTIVE); |
|||
when(ruleMock.getPluginToken()).thenReturn("telemetry"); |
|||
TextPageData<RuleMetaData> systemRules = new TextPageData<>(Collections.emptyList(), null, false); |
|||
TextPageData<RuleMetaData> tenantRules = new TextPageData<>(Collections.singletonList(ruleMock), null, false); |
|||
when(ruleService.findSystemRules(any())).thenReturn(systemRules); |
|||
when(ruleService.findTenantRules(any(), any())).thenReturn(tenantRules); |
|||
when(ruleService.findRuleById(ruleId)).thenReturn(ruleMock); |
|||
|
|||
pluginMock = mock(PluginMetaData.class); |
|||
when(pluginMock.getTenantId()).thenReturn(SYSTEM_TENANT); |
|||
when(pluginMock.getId()).thenReturn(pluginId); |
|||
when(pluginMock.getState()).thenReturn(ComponentLifecycleState.ACTIVE); |
|||
TextPageData<PluginMetaData> systemPlugins = new TextPageData<>(Collections.singletonList(pluginMock), null, false); |
|||
TextPageData<PluginMetaData> tenantPlugins = new TextPageData<>(Collections.emptyList(), null, false); |
|||
when(pluginService.findSystemPlugins(any())).thenReturn(systemPlugins); |
|||
when(pluginService.findTenantPlugins(any(), any())).thenReturn(tenantPlugins); |
|||
when(pluginService.findPluginByApiToken("telemetry")).thenReturn(pluginMock); |
|||
when(pluginService.findPluginById(pluginId)).thenReturn(pluginMock); |
|||
|
|||
TextPageData<Tenant> tenants = new TextPageData<>(Collections.emptyList(), null, false); |
|||
when(tenantService.findTenants(any())).thenReturn(tenants); |
|||
} |
|||
|
|||
private void initActorSystem() { |
|||
actorService.initActorSystem(); |
|||
} |
|||
|
|||
@After |
|||
public void after() { |
|||
actorService.stopActorSystem(); |
|||
} |
|||
|
|||
@Test |
|||
public void testBasicPostWithSyncSession() throws Exception { |
|||
SessionContext ssnCtx = mock(SessionContext.class); |
|||
KvEntry entry1 = new StringDataEntry("key1", "value1"); |
|||
KvEntry entry2 = new StringDataEntry("key2", "value2"); |
|||
BasicTelemetryUploadRequest telemetry = new BasicTelemetryUploadRequest(); |
|||
long ts = 42; |
|||
telemetry.add(ts, entry1); |
|||
telemetry.add(ts, entry2); |
|||
BasicAdaptorToSessionActorMsg msg = new BasicAdaptorToSessionActorMsg(ssnCtx, telemetry); |
|||
|
|||
DeviceId deviceId = new DeviceId(UUID.randomUUID()); |
|||
|
|||
DeviceCredentialsFilter filter = new DeviceTokenCredentials("token1"); |
|||
Device device = mock(Device.class); |
|||
|
|||
when(device.getId()).thenReturn(deviceId); |
|||
when(device.getTenantId()).thenReturn(tenantId); |
|||
when(ssnCtx.getSessionId()).thenReturn(new DummySessionID("session1")); |
|||
when(ssnCtx.getSessionType()).thenReturn(SessionType.SYNC); |
|||
when(deviceAuthService.process(filter)).thenReturn(DeviceAuthResult.of(deviceId)); |
|||
when(deviceService.findDeviceById(deviceId)).thenReturn(device); |
|||
|
|||
ObjectMapper ruleMapper = new ObjectMapper(); |
|||
when(ruleMock.getFilters()).thenReturn(ruleMapper.readTree(FILTERS_CONFIGURATION)); |
|||
when(ruleMock.getAction()).thenReturn(ruleMapper.readTree(ACTION_CONFIGURATION)); |
|||
|
|||
ComponentDescriptor filterComp = new ComponentDescriptor(); |
|||
filterComp.setClazz("org.thingsboard.server.extensions.core.filter.MsgTypeFilter"); |
|||
filterComp.setType(ComponentType.FILTER); |
|||
when(componentService.getComponent("org.thingsboard.server.extensions.core.filter.MsgTypeFilter")) |
|||
.thenReturn(Optional.of(filterComp)); |
|||
|
|||
ComponentDescriptor actionComp = new ComponentDescriptor(); |
|||
actionComp.setClazz("org.thingsboard.server.extensions.core.action.telemetry.TelemetryPluginAction"); |
|||
actionComp.setType(ComponentType.ACTION); |
|||
when(componentService.getComponent("org.thingsboard.server.extensions.core.action.telemetry.TelemetryPluginAction")) |
|||
.thenReturn(Optional.of(actionComp)); |
|||
|
|||
ObjectMapper pluginMapper = new ObjectMapper(); |
|||
JsonNode pluginAdditionalInfo = pluginMapper.readTree(PLUGIN_CONFIGURATION); |
|||
when(pluginMock.getConfiguration()).thenReturn(pluginAdditionalInfo); |
|||
when(pluginMock.getClazz()).thenReturn(TelemetryStoragePlugin.class.getName()); |
|||
|
|||
when(attributesService.findAll(deviceId, DataConstants.CLIENT_SCOPE)).thenReturn(Futures.immediateFuture(Collections.emptyList())); |
|||
when(attributesService.findAll(deviceId, DataConstants.SHARED_SCOPE)).thenReturn(Futures.immediateFuture(Collections.emptyList())); |
|||
when(attributesService.findAll(deviceId, DataConstants.SERVER_SCOPE)).thenReturn(Futures.immediateFuture(Collections.emptyList())); |
|||
|
|||
initActorSystem(); |
|||
Thread.sleep(1000); |
|||
actorService.process(new BasicToDeviceActorSessionMsg(device, msg)); |
|||
|
|||
// Check that device data was saved to DB;
|
|||
List<TsKvEntry> expected = new ArrayList<>(); |
|||
expected.add(new BasicTsKvEntry(ts, entry1)); |
|||
expected.add(new BasicTsKvEntry(ts, entry2)); |
|||
verify(tsService, Mockito.timeout(5000)).save(deviceId, expected, 0L); |
|||
} |
|||
|
|||
} |
|||
@ -1,63 +0,0 @@ |
|||
/** |
|||
* Copyright © 2016-2018 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors; |
|||
|
|||
import org.thingsboard.server.common.data.id.SessionId; |
|||
|
|||
public class DummySessionID implements SessionId { |
|||
|
|||
@Override |
|||
public String toString() { |
|||
return id; |
|||
} |
|||
|
|||
private final String id; |
|||
|
|||
public DummySessionID(String id) { |
|||
this.id = id; |
|||
} |
|||
|
|||
@Override |
|||
public String toUidStr() { |
|||
return id; |
|||
} |
|||
|
|||
@Override |
|||
public int hashCode() { |
|||
final int prime = 31; |
|||
int result = 1; |
|||
result = prime * result + ((id == null) ? 0 : id.hashCode()); |
|||
return result; |
|||
} |
|||
|
|||
@Override |
|||
public boolean equals(Object obj) { |
|||
if (this == obj) |
|||
return true; |
|||
if (obj == null) |
|||
return false; |
|||
if (getClass() != obj.getClass()) |
|||
return false; |
|||
DummySessionID other = (DummySessionID) obj; |
|||
if (id == null) { |
|||
if (other.id != null) |
|||
return false; |
|||
} else if (!id.equals(other.id)) |
|||
return false; |
|||
return true; |
|||
} |
|||
|
|||
} |
|||
@ -1,232 +0,0 @@ |
|||
/** |
|||
* Copyright © 2016-2018 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.controller; |
|||
|
|||
import com.fasterxml.jackson.core.type.TypeReference; |
|||
import com.fasterxml.jackson.databind.ObjectMapper; |
|||
import org.junit.After; |
|||
import org.junit.Assert; |
|||
import org.junit.Before; |
|||
import org.junit.Test; |
|||
import org.thingsboard.server.common.data.Tenant; |
|||
import org.thingsboard.server.common.data.User; |
|||
import org.thingsboard.server.common.data.page.TextPageData; |
|||
import org.thingsboard.server.common.data.page.TextPageLink; |
|||
import org.thingsboard.server.common.data.plugin.PluginMetaData; |
|||
import org.thingsboard.server.common.data.rule.RuleMetaData; |
|||
import org.thingsboard.server.common.data.security.Authority; |
|||
import org.thingsboard.server.extensions.core.plugin.telemetry.TelemetryStoragePlugin; |
|||
|
|||
import java.util.ArrayList; |
|||
import java.util.Collections; |
|||
import java.util.List; |
|||
import java.util.stream.Collectors; |
|||
|
|||
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; |
|||
|
|||
public abstract class BasePluginControllerTest extends AbstractControllerTest { |
|||
|
|||
private IdComparator<PluginMetaData> idComparator = new IdComparator<>(); |
|||
|
|||
private final ObjectMapper mapper = new ObjectMapper(); |
|||
private Tenant savedTenant; |
|||
private User tenantAdmin; |
|||
|
|||
@Before |
|||
public void beforeTest() throws Exception { |
|||
loginSysAdmin(); |
|||
|
|||
Tenant tenant = new Tenant(); |
|||
tenant.setTitle("My tenant"); |
|||
savedTenant = doPost("/api/tenant", tenant, Tenant.class); |
|||
Assert.assertNotNull(savedTenant); |
|||
|
|||
tenantAdmin = new User(); |
|||
tenantAdmin.setAuthority(Authority.TENANT_ADMIN); |
|||
tenantAdmin.setTenantId(savedTenant.getId()); |
|||
tenantAdmin.setEmail("tenant2@thingsboard.org"); |
|||
tenantAdmin.setFirstName("Joe"); |
|||
tenantAdmin.setLastName("Downs"); |
|||
|
|||
tenantAdmin = createUserAndLogin(tenantAdmin, "testPassword1"); |
|||
} |
|||
|
|||
@After |
|||
public void afterTest() throws Exception { |
|||
loginSysAdmin(); |
|||
|
|||
doDelete("/api/tenant/" + savedTenant.getId().getId().toString()) |
|||
.andExpect(status().isOk()); |
|||
} |
|||
|
|||
@Test |
|||
public void testSavePlugin() throws Exception { |
|||
PluginMetaData plugin = new PluginMetaData(); |
|||
doPost("/api/plugin", plugin).andExpect(status().isBadRequest()); |
|||
plugin.setName("My plugin"); |
|||
doPost("/api/plugin", plugin).andExpect(status().isBadRequest()); |
|||
plugin.setApiToken("myplugin"); |
|||
doPost("/api/plugin", plugin).andExpect(status().isBadRequest()); |
|||
plugin.setConfiguration(mapper.readTree("{}")); |
|||
doPost("/api/plugin", plugin).andExpect(status().isBadRequest()); |
|||
plugin.setClazz(TelemetryStoragePlugin.class.getName()); |
|||
PluginMetaData savedPlugin = doPost("/api/plugin", plugin, PluginMetaData.class); |
|||
|
|||
Assert.assertNotNull(savedPlugin); |
|||
Assert.assertNotNull(savedPlugin.getId()); |
|||
Assert.assertTrue(savedPlugin.getCreatedTime() > 0); |
|||
Assert.assertEquals(savedTenant.getId(), savedPlugin.getTenantId()); |
|||
} |
|||
|
|||
@Test |
|||
public void testFindPluginById() throws Exception { |
|||
PluginMetaData plugin = new PluginMetaData(); |
|||
plugin.setName("My plugin"); |
|||
plugin.setApiToken("myplugin"); |
|||
plugin.setConfiguration(mapper.readTree("{}")); |
|||
plugin.setClazz(TelemetryStoragePlugin.class.getName()); |
|||
|
|||
PluginMetaData savedPlugin = doPost("/api/plugin", plugin, PluginMetaData.class); |
|||
PluginMetaData foundPlugin = doGet("/api/plugin/" + savedPlugin.getId().getId().toString(), PluginMetaData.class); |
|||
Assert.assertNotNull(foundPlugin); |
|||
Assert.assertEquals(savedPlugin, foundPlugin); |
|||
} |
|||
|
|||
@Test |
|||
public void testActivatePlugin() throws Exception { |
|||
PluginMetaData plugin = new PluginMetaData(); |
|||
plugin.setName("My plugin"); |
|||
plugin.setApiToken("myplugin"); |
|||
plugin.setConfiguration(mapper.readTree("{}")); |
|||
plugin.setClazz(TelemetryStoragePlugin.class.getName()); |
|||
|
|||
PluginMetaData savedPlugin = doPost("/api/plugin", plugin, PluginMetaData.class); |
|||
|
|||
doPost("/api/plugin/" + savedPlugin.getId().getId().toString() + "/activate").andExpect(status().isOk()); |
|||
} |
|||
|
|||
@Test |
|||
public void testSuspendPlugin() throws Exception { |
|||
PluginMetaData plugin = new PluginMetaData(); |
|||
plugin.setName("My plugin"); |
|||
plugin.setApiToken("myplugin"); |
|||
plugin.setConfiguration(mapper.readTree("{}")); |
|||
plugin.setClazz(TelemetryStoragePlugin.class.getName()); |
|||
|
|||
PluginMetaData savedPlugin = doPost("/api/plugin", plugin, PluginMetaData.class); |
|||
|
|||
doPost("/api/plugin/" + savedPlugin.getId().getId().toString() + "/activate").andExpect(status().isOk()); |
|||
|
|||
RuleMetaData rule = BaseRuleControllerTest.createRuleMetaData(savedPlugin); |
|||
RuleMetaData savedRule = doPost("/api/rule", rule, RuleMetaData.class); |
|||
doPost("/api/rule/" + savedRule.getId().getId().toString() + "/activate").andExpect(status().isOk()); |
|||
|
|||
doPost("/api/plugin/" + savedPlugin.getId().getId().toString() + "/suspend").andExpect(status().isBadRequest()); |
|||
|
|||
doPost("/api/rule/" + savedRule.getId().getId().toString() + "/suspend").andExpect(status().isOk()); |
|||
|
|||
doPost("/api/plugin/" + savedPlugin.getId().getId().toString() + "/suspend").andExpect(status().isOk()); |
|||
} |
|||
|
|||
@Test |
|||
public void testDeletePluginById() throws Exception { |
|||
PluginMetaData plugin = new PluginMetaData(); |
|||
plugin.setName("My plugin"); |
|||
plugin.setApiToken("myplugin"); |
|||
plugin.setConfiguration(mapper.readTree("{}")); |
|||
plugin.setClazz(TelemetryStoragePlugin.class.getName()); |
|||
|
|||
PluginMetaData savedPlugin = doPost("/api/plugin", plugin, PluginMetaData.class); |
|||
|
|||
RuleMetaData rule = BaseRuleControllerTest.createRuleMetaData(savedPlugin); |
|||
RuleMetaData savedRule = doPost("/api/rule", rule, RuleMetaData.class); |
|||
|
|||
doDelete("/api/plugin/" + savedPlugin.getId().getId()).andExpect(status().isBadRequest()); |
|||
|
|||
doDelete("/api/rule/" + savedRule.getId().getId()).andExpect(status().isOk()); |
|||
|
|||
doDelete("/api/plugin/" + savedPlugin.getId().getId()).andExpect(status().isOk()); |
|||
doGet("/api/plugin/" + savedPlugin.getId().getId().toString()).andExpect(status().isNotFound()); |
|||
} |
|||
|
|||
@Test |
|||
public void testFindPluginByToken() throws Exception { |
|||
PluginMetaData plugin = new PluginMetaData(); |
|||
plugin.setName("My plugin"); |
|||
plugin.setApiToken("myplugin"); |
|||
plugin.setConfiguration(mapper.readTree("{}")); |
|||
plugin.setClazz(TelemetryStoragePlugin.class.getName()); |
|||
|
|||
PluginMetaData savedPlugin = doPost("/api/plugin", plugin, PluginMetaData.class); |
|||
PluginMetaData foundPlugin = doGet("/api/plugin/token/" + "myplugin", PluginMetaData.class); |
|||
Assert.assertNotNull(foundPlugin); |
|||
Assert.assertEquals(savedPlugin, foundPlugin); |
|||
} |
|||
|
|||
@Test |
|||
public void testFindCurrentTenantPlugins() throws Exception { |
|||
List<PluginMetaData> plugins = testPluginsCreation("/api/plugin"); |
|||
for (PluginMetaData plugin : plugins) { |
|||
doDelete("/api/plugin/" + plugin.getId().getId()).andExpect(status().isOk()); |
|||
} |
|||
} |
|||
|
|||
@Test |
|||
public void testFindSystemPlugins() throws Exception { |
|||
loginSysAdmin(); |
|||
List<PluginMetaData> plugins = testPluginsCreation("/api/plugin/system"); |
|||
for (PluginMetaData plugin : plugins) { |
|||
doDelete("/api/plugin/" + plugin.getId().getId()).andExpect(status().isOk()); |
|||
} |
|||
} |
|||
|
|||
private List<PluginMetaData> testPluginsCreation(String url) throws Exception { |
|||
List<PluginMetaData> plugins = new ArrayList<>(); |
|||
for (int i = 0; i < 111; i++) { |
|||
PluginMetaData plugin = new PluginMetaData(); |
|||
plugin.setName("My plugin"); |
|||
plugin.setApiToken("myplugin" + i); |
|||
plugin.setConfiguration(mapper.readTree("{}")); |
|||
plugin.setClazz(TelemetryStoragePlugin.class.getName()); |
|||
plugins.add(doPost("/api/plugin", plugin, PluginMetaData.class)); |
|||
} |
|||
|
|||
List<PluginMetaData> loadedPlugins = new ArrayList<>(); |
|||
TextPageLink pageLink = new TextPageLink(23); |
|||
TextPageData<PluginMetaData> pageData; |
|||
do { |
|||
pageData = doGetTypedWithPageLink(url + "?", |
|||
new TypeReference<TextPageData<PluginMetaData>>() { |
|||
}, pageLink); |
|||
loadedPlugins.addAll(pageData.getData()); |
|||
if (pageData.hasNext()) { |
|||
pageLink = pageData.getNextPageLink(); |
|||
} |
|||
} while (pageData.hasNext()); |
|||
|
|||
loadedPlugins = loadedPlugins.stream() |
|||
.filter(p -> !p.getName().equals("System Telemetry Plugin")) |
|||
.filter(p -> !p.getName().equals("Mail Sender Plugin")) |
|||
.filter(p -> !p.getName().equals("System RPC Plugin")) |
|||
.collect(Collectors.toList()); |
|||
|
|||
Collections.sort(plugins, idComparator); |
|||
Collections.sort(loadedPlugins, idComparator); |
|||
|
|||
Assert.assertEquals(plugins, loadedPlugins); |
|||
return loadedPlugins; |
|||
} |
|||
} |
|||
@ -1,247 +0,0 @@ |
|||
/** |
|||
* Copyright © 2016-2018 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.controller; |
|||
|
|||
import com.fasterxml.jackson.core.type.TypeReference; |
|||
import com.fasterxml.jackson.databind.ObjectMapper; |
|||
import org.junit.After; |
|||
import org.junit.Assert; |
|||
import org.junit.Before; |
|||
import org.junit.Test; |
|||
import org.thingsboard.server.common.data.Tenant; |
|||
import org.thingsboard.server.common.data.User; |
|||
import org.thingsboard.server.common.data.page.TextPageData; |
|||
import org.thingsboard.server.common.data.page.TextPageLink; |
|||
import org.thingsboard.server.common.data.plugin.PluginMetaData; |
|||
import org.thingsboard.server.common.data.rule.RuleMetaData; |
|||
import org.thingsboard.server.common.data.security.Authority; |
|||
import org.thingsboard.server.extensions.core.plugin.telemetry.TelemetryStoragePlugin; |
|||
|
|||
import java.io.IOException; |
|||
import java.util.ArrayList; |
|||
import java.util.Collections; |
|||
import java.util.List; |
|||
import java.util.stream.Collectors; |
|||
|
|||
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; |
|||
|
|||
public abstract class BaseRuleControllerTest extends AbstractControllerTest { |
|||
|
|||
private IdComparator<RuleMetaData> idComparator = new IdComparator<>(); |
|||
|
|||
private static final ObjectMapper mapper = new ObjectMapper(); |
|||
private Tenant savedTenant; |
|||
private User tenantAdmin; |
|||
private PluginMetaData sysPlugin; |
|||
private PluginMetaData tenantPlugin; |
|||
|
|||
@Before |
|||
public void beforeTest() throws Exception { |
|||
loginSysAdmin(); |
|||
|
|||
sysPlugin = new PluginMetaData(); |
|||
sysPlugin.setName("Sys plugin"); |
|||
sysPlugin.setApiToken("sysplugin"); |
|||
sysPlugin.setConfiguration(mapper.readTree("{}")); |
|||
sysPlugin.setClazz(TelemetryStoragePlugin.class.getName()); |
|||
sysPlugin = doPost("/api/plugin", sysPlugin, PluginMetaData.class); |
|||
|
|||
Tenant tenant = new Tenant(); |
|||
tenant.setTitle("My tenant"); |
|||
savedTenant = doPost("/api/tenant", tenant, Tenant.class); |
|||
Assert.assertNotNull(savedTenant); |
|||
|
|||
tenantAdmin = new User(); |
|||
tenantAdmin.setAuthority(Authority.TENANT_ADMIN); |
|||
tenantAdmin.setTenantId(savedTenant.getId()); |
|||
tenantAdmin.setEmail("tenant2@thingsboard.org"); |
|||
tenantAdmin.setFirstName("Joe"); |
|||
tenantAdmin.setLastName("Downs"); |
|||
|
|||
tenantAdmin = createUserAndLogin(tenantAdmin, "testPassword1"); |
|||
|
|||
tenantPlugin = new PluginMetaData(); |
|||
tenantPlugin.setName("My plugin"); |
|||
tenantPlugin.setApiToken("myplugin"); |
|||
tenantPlugin.setConfiguration(mapper.readTree("{}")); |
|||
tenantPlugin.setClazz(TelemetryStoragePlugin.class.getName()); |
|||
tenantPlugin = doPost("/api/plugin", tenantPlugin, PluginMetaData.class); |
|||
} |
|||
|
|||
@After |
|||
public void afterTest() throws Exception { |
|||
loginSysAdmin(); |
|||
|
|||
doDelete("/api/tenant/" + savedTenant.getId().getId().toString()) |
|||
.andExpect(status().isOk()); |
|||
|
|||
doDelete("/api/plugin/" + sysPlugin.getId().getId()).andExpect(status().isOk()); |
|||
} |
|||
|
|||
@Test |
|||
public void testSaveRule() throws Exception { |
|||
RuleMetaData rule = new RuleMetaData(); |
|||
doPost("/api/rule", rule).andExpect(status().isBadRequest()); |
|||
rule.setName("My Rule"); |
|||
doPost("/api/rule", rule).andExpect(status().isBadRequest()); |
|||
rule.setPluginToken(tenantPlugin.getApiToken()); |
|||
doPost("/api/rule", rule).andExpect(status().isBadRequest()); |
|||
rule.setFilters(mapper.readTree("[{\"clazz\":\"org.thingsboard.server.extensions.core.filter.MsgTypeFilter\", " + |
|||
"\"name\":\"TelemetryFilter\", " + |
|||
"\"configuration\": {\"messageTypes\":[\"POST_TELEMETRY\",\"POST_ATTRIBUTES\",\"GET_ATTRIBUTES\"]}}]")); |
|||
doPost("/api/rule", rule).andExpect(status().isBadRequest()); |
|||
rule.setAction(mapper.readTree("{\"clazz\":\"org.thingsboard.server.extensions.core.action.telemetry.TelemetryPluginAction\", \"name\":\"TelemetryMsgConverterAction\", \"configuration\":{\"timeUnit\":\"DAYS\", \"ttlValue\":1}}")); |
|||
|
|||
RuleMetaData savedRule = doPost("/api/rule", rule, RuleMetaData.class); |
|||
Assert.assertNotNull(savedRule); |
|||
Assert.assertNotNull(savedRule.getId()); |
|||
Assert.assertTrue(savedRule.getCreatedTime() > 0); |
|||
Assert.assertEquals(savedTenant.getId(), savedRule.getTenantId()); |
|||
} |
|||
|
|||
@Test |
|||
public void testFindRuleById() throws Exception { |
|||
RuleMetaData rule = createRuleMetaData(tenantPlugin); |
|||
RuleMetaData savedRule = doPost("/api/rule", rule, RuleMetaData.class); |
|||
|
|||
RuleMetaData foundRule = doGet("/api/rule/" + savedRule.getId().getId().toString(), RuleMetaData.class); |
|||
Assert.assertNotNull(foundRule); |
|||
Assert.assertEquals(savedRule, foundRule); |
|||
} |
|||
|
|||
@Test |
|||
public void testFindRuleByPluginToken() throws Exception { |
|||
RuleMetaData rule = createRuleMetaData(tenantPlugin); |
|||
RuleMetaData savedRule = doPost("/api/rule", rule, RuleMetaData.class); |
|||
|
|||
List<RuleMetaData> foundRules = doGetTyped("/api/rule/token/" + savedRule.getPluginToken(), |
|||
new TypeReference<List<RuleMetaData>>() { |
|||
}); |
|||
Assert.assertNotNull(foundRules); |
|||
Assert.assertEquals(1, foundRules.size()); |
|||
Assert.assertEquals(savedRule, foundRules.get(0)); |
|||
} |
|||
|
|||
@Test |
|||
public void testActivateRule() throws Exception { |
|||
RuleMetaData rule = createRuleMetaData(tenantPlugin); |
|||
RuleMetaData savedRule = doPost("/api/rule", rule, RuleMetaData.class); |
|||
|
|||
doPost("/api/rule/" + savedRule.getId().getId().toString() + "/activate").andExpect(status().isBadRequest()); |
|||
|
|||
doPost("/api/plugin/" + tenantPlugin.getId().getId().toString() + "/activate").andExpect(status().isOk()); |
|||
|
|||
doPost("/api/rule/" + savedRule.getId().getId().toString() + "/activate").andExpect(status().isOk()); |
|||
} |
|||
|
|||
@Test |
|||
public void testSuspendRule() throws Exception { |
|||
RuleMetaData rule = createRuleMetaData(tenantPlugin); |
|||
RuleMetaData savedRule = doPost("/api/rule", rule, RuleMetaData.class); |
|||
|
|||
doPost("/api/plugin/" + tenantPlugin.getId().getId().toString() + "/activate").andExpect(status().isOk()); |
|||
doPost("/api/rule/" + savedRule.getId().getId().toString() + "/activate").andExpect(status().isOk()); |
|||
doPost("/api/rule/" + savedRule.getId().getId().toString() + "/suspend").andExpect(status().isOk()); |
|||
} |
|||
|
|||
@Test |
|||
public void testFindSystemRules() throws Exception { |
|||
loginSysAdmin(); |
|||
List<RuleMetaData> rules = testRulesCreation("/api/rule/system", sysPlugin); |
|||
for (RuleMetaData rule : rules) { |
|||
doDelete("/api/rule/" + rule.getId().getId()).andExpect(status().isOk()); |
|||
} |
|||
loginTenantAdmin(); |
|||
} |
|||
|
|||
@Test |
|||
public void testFindCurrentTenantPlugins() throws Exception { |
|||
List<RuleMetaData> rules = testRulesCreation("/api/rule", tenantPlugin); |
|||
for (RuleMetaData rule : rules) { |
|||
doDelete("/api/rule/" + rule.getId().getId()).andExpect(status().isOk()); |
|||
} |
|||
} |
|||
|
|||
@Test |
|||
public void testFindTenantPlugins() throws Exception { |
|||
List<RuleMetaData> rules = testRulesCreation("/api/rule", tenantPlugin); |
|||
loginSysAdmin(); |
|||
List<RuleMetaData> loadedRules = new ArrayList<>(); |
|||
TextPageLink pageLink = new TextPageLink(3); |
|||
TextPageData<RuleMetaData> pageData; |
|||
do { |
|||
pageData = doGetTypedWithPageLink("/api/rule/tenant/" + savedTenant.getId().getId().toString() + "?", |
|||
new TypeReference<TextPageData<RuleMetaData>>() { |
|||
}, pageLink); |
|||
loadedRules.addAll(pageData.getData()); |
|||
if (pageData.hasNext()) { |
|||
pageLink = pageData.getNextPageLink(); |
|||
} |
|||
} while (pageData.hasNext()); |
|||
|
|||
Collections.sort(rules, idComparator); |
|||
Collections.sort(loadedRules, idComparator); |
|||
|
|||
Assert.assertEquals(rules, loadedRules); |
|||
|
|||
for (RuleMetaData rule : rules) { |
|||
doDelete("/api/rule/" + rule.getId().getId()).andExpect(status().isOk()); |
|||
} |
|||
} |
|||
|
|||
private List<RuleMetaData> testRulesCreation(String url, PluginMetaData plugin) throws Exception { |
|||
List<RuleMetaData> rules = new ArrayList<>(); |
|||
for (int i = 0; i < 6; i++) { |
|||
RuleMetaData rule = createRuleMetaData(plugin); |
|||
rule.setPluginToken(plugin.getApiToken()); |
|||
rule.setName(rule.getName() + i); |
|||
rules.add(doPost("/api/rule", rule, RuleMetaData.class)); |
|||
} |
|||
|
|||
List<RuleMetaData> loadedRules = new ArrayList<>(); |
|||
TextPageLink pageLink = new TextPageLink(3); |
|||
TextPageData<RuleMetaData> pageData; |
|||
do { |
|||
pageData = doGetTypedWithPageLink(url + "?", |
|||
new TypeReference<TextPageData<RuleMetaData>>() { |
|||
}, pageLink); |
|||
loadedRules.addAll(pageData.getData()); |
|||
if (pageData.hasNext()) { |
|||
pageLink = pageData.getNextPageLink(); |
|||
} |
|||
} while (pageData.hasNext()); |
|||
|
|||
loadedRules = loadedRules.stream().filter(p -> !p.getName().equals("System Telemetry Rule")).collect(Collectors.toList()); |
|||
|
|||
Collections.sort(rules, idComparator); |
|||
Collections.sort(loadedRules, idComparator); |
|||
|
|||
Assert.assertEquals(rules, loadedRules); |
|||
return loadedRules; |
|||
} |
|||
|
|||
public static RuleMetaData createRuleMetaData(PluginMetaData plugin) throws IOException { |
|||
RuleMetaData rule = new RuleMetaData(); |
|||
rule.setName("My Rule"); |
|||
rule.setPluginToken(plugin.getApiToken()); |
|||
rule.setFilters(mapper.readTree("[{\"clazz\":\"org.thingsboard.server.extensions.core.filter.MsgTypeFilter\", " + |
|||
"\"name\":\"TelemetryFilter\", " + |
|||
"\"configuration\": {\"messageTypes\":[\"POST_TELEMETRY\",\"POST_ATTRIBUTES\",\"GET_ATTRIBUTES\"]}}]")); |
|||
rule.setAction(mapper.readTree("{\"clazz\":\"org.thingsboard.server.extensions.core.action.telemetry.TelemetryPluginAction\", \"name\":\"TelemetryMsgConverterAction\", " + |
|||
"\"configuration\":{\"timeUnit\":\"DAYS\", \"ttlValue\":1}}")); |
|||
return rule; |
|||
} |
|||
} |
|||
@ -1,26 +0,0 @@ |
|||
/** |
|||
* Copyright © 2016-2018 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.controller.sql; |
|||
|
|||
import org.thingsboard.server.controller.BasePluginControllerTest; |
|||
import org.thingsboard.server.dao.service.DaoSqlTest; |
|||
|
|||
/** |
|||
* Created by Valerii Sosliuk on 6/28/2017. |
|||
*/ |
|||
@DaoSqlTest |
|||
public class PluginControllerSqlTest extends BasePluginControllerTest { |
|||
} |
|||
@ -0,0 +1,42 @@ |
|||
/** |
|||
* Copyright © 2016-2018 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.rules; |
|||
|
|||
import org.cassandraunit.dataset.cql.ClassPathCQLDataSet; |
|||
import org.junit.ClassRule; |
|||
import org.junit.extensions.cpsuite.ClasspathSuite; |
|||
import org.junit.runner.RunWith; |
|||
import org.thingsboard.server.dao.CustomCassandraCQLUnit; |
|||
import org.thingsboard.server.dao.CustomSqlUnit; |
|||
|
|||
import java.util.Arrays; |
|||
|
|||
@RunWith(ClasspathSuite.class) |
|||
@ClasspathSuite.ClassnameFilters({ |
|||
"org.thingsboard.server.rules.flow.nosql.*Test", |
|||
"org.thingsboard.server.rules.lifecycle.nosql.*Test" |
|||
}) |
|||
public class RuleEngineNoSqlTestSuite { |
|||
|
|||
@ClassRule |
|||
public static CustomCassandraCQLUnit cassandraUnit = |
|||
new CustomCassandraCQLUnit( |
|||
Arrays.asList( |
|||
new ClassPathCQLDataSet("cassandra/schema.cql", false, false), |
|||
new ClassPathCQLDataSet("cassandra/system-data.cql", false, false)), |
|||
"cassandra-test.yaml", 30000l); |
|||
|
|||
} |
|||
Some files were not shown because too many files changed in this diff
Loading…
Reference in new issue