Browse Source

Merge remote-tracking branch 'upstream/master' into optimize-imports

pull/11334/head
Andrii Landiak 2 years ago
parent
commit
0c0c2fc5fc
  1. 4
      application/src/main/java/org/thingsboard/server/controller/BaseController.java
  2. 2
      application/src/main/java/org/thingsboard/server/controller/ControllerConstants.java
  3. 237
      application/src/main/java/org/thingsboard/server/controller/RuleEngineController.java
  4. 1
      application/src/main/java/org/thingsboard/server/controller/TbUrlConstants.java
  5. 20
      application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java
  6. 13
      application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java
  7. 11
      application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcService.java
  8. 100
      application/src/main/java/org/thingsboard/server/service/ruleengine/DefaultRuleEngineCallService.java
  9. 31
      application/src/main/java/org/thingsboard/server/service/ruleengine/RuleEngineCallService.java
  10. 20
      application/src/test/java/org/thingsboard/server/controller/AbstractNotifyEntityTest.java
  11. 12
      application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java
  12. 249
      application/src/test/java/org/thingsboard/server/controller/RuleEngineControllerTest.java
  13. 102
      application/src/test/java/org/thingsboard/server/service/queue/DefaultTbClusterServiceTest.java
  14. 16
      application/src/test/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerServiceTest.java
  15. 61
      application/src/test/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcServiceTest.java
  16. 141
      application/src/test/java/org/thingsboard/server/service/ruleengine/DefaultRuleEngineCallServiceTest.java
  17. 5
      common/cluster-api/src/main/java/org/thingsboard/server/cluster/TbClusterService.java
  18. 1
      common/data/src/main/java/org/thingsboard/server/common/data/audit/ActionType.java
  19. 1
      common/data/src/main/java/org/thingsboard/server/common/data/msg/TbMsgType.java
  20. 4
      common/data/src/test/java/org/thingsboard/server/common/data/audit/ActionTypeTest.java
  21. 7
      common/proto/src/main/proto/queue.proto
  22. 5
      pom.xml
  23. 45
      rest-client/src/main/java/org/thingsboard/rest/client/RestClient.java
  24. 3
      rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineRpcService.java
  25. 4
      rule-engine/rule-engine-components/pom.xml
  26. 151
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/aws/lambda/TbAwsLambdaNode.java
  27. 46
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/aws/lambda/TbAwsLambdaNodeConfiguration.java
  28. 66
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbSendRestApiCallReplyNode.java
  29. 45
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbSendRestApiCallReplyNodeConfiguration.java
  30. 308
      rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/aws/lambda/TbAwsLambdaNodeTest.java
  31. 133
      rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/rest/TbSendRestApiCallReplyNodeTest.java
  32. 83
      ui-ngx/src/assets/help/en_US/rulenode/node-templatization-doc.md

4
application/src/main/java/org/thingsboard/server/controller/BaseController.java

@ -141,6 +141,7 @@ import org.thingsboard.server.dao.widget.WidgetTypeService;
import org.thingsboard.server.dao.widget.WidgetsBundleService;
import org.thingsboard.server.exception.ThingsboardErrorResponseHandler;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.action.EntityActionService;
@ -314,6 +315,9 @@ public abstract class BaseController {
@Autowired
protected ExportableEntitiesService entitiesService;
@Autowired
protected TbServiceInfoProvider serviceInfoProvider;
@Value("${server.log_controller_error_stack_trace}")
@Getter
private boolean logControllerErrorStackTrace;

2
application/src/main/java/org/thingsboard/server/controller/ControllerConstants.java

@ -1712,4 +1712,6 @@ public class ControllerConstants {
MARKDOWN_CODE_BLOCK_START +
"[{\"ts\":1634712287000,\"values\":{\"temperature\":26, \"humidity\":87}}, {\"ts\":1634712588000,\"values\":{\"temperature\":25, \"humidity\":88}}]" +
MARKDOWN_CODE_BLOCK_END ;
protected static final String SECURITY_WRITE_CHECK = " Security check is performed to verify that the user has 'WRITE' permission for the entity (entities).";
}

237
application/src/main/java/org/thingsboard/server/controller/RuleEngineController.java

@ -0,0 +1,237 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.controller;
import com.google.common.util.concurrent.FutureCallback;
import io.swagger.v3.oas.annotations.Parameter;
import jakarta.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.audit.ActionType;
import org.thingsboard.server.common.data.exception.ThingsboardErrorCode;
import org.thingsboard.server.common.data.exception.ThingsboardException;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityIdFactory;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.config.annotations.ApiOperation;
import org.thingsboard.server.exception.ToErrorResponseEntity;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.ruleengine.RuleEngineCallService;
import org.thingsboard.server.service.security.AccessValidator;
import org.thingsboard.server.service.security.model.SecurityUser;
import org.thingsboard.server.service.security.permission.Operation;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import static org.thingsboard.server.controller.ControllerConstants.ENTITY_ID_PARAM_DESCRIPTION;
import static org.thingsboard.server.controller.ControllerConstants.ENTITY_TYPE_PARAM_DESCRIPTION;
@RestController
@TbCoreComponent
@RequestMapping(TbUrlConstants.RULE_ENGINE_URL_PREFIX)
@Slf4j
public class RuleEngineController extends BaseController {
public static final int DEFAULT_TIMEOUT = 10000;
private static final String MSG_DESCRIPTION_PREFIX = "Creates the Message with type 'REST_API_REQUEST' and payload taken from the request body. ";
private static final String MSG_DESCRIPTION = "This method allows you to extend the regular platform API with the power of Rule Engine. You may use default and custom rule nodes to handle the message. " +
"The generated message contains two important metadata fields:\n\n" +
" * **'serviceId'** to identify the platform server that received the request;\n" +
" * **'requestUUID'** to identify the request and route possible response from the Rule Engine;\n\n" +
"Use **'rest call reply'** rule node to push the reply from rule engine back as a REST API call response. ";
@Autowired
private RuleEngineCallService ruleEngineCallService;
@Autowired
private AccessValidator accessValidator;
@ApiOperation(value = "Push user message to the rule engine (handleRuleEngineRequest)",
notes = MSG_DESCRIPTION_PREFIX +
"Uses current User Id ( the one which credentials is used to perform the request) as the Rule Engine message originator. " +
MSG_DESCRIPTION +
"The default timeout of the request processing is 10 seconds."
+ "\n\n" + ControllerConstants.SECURITY_WRITE_CHECK)
@PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
@RequestMapping(value = "/", method = RequestMethod.POST)
@ResponseBody
public DeferredResult<ResponseEntity> handleRuleEngineRequest(
@Parameter(description = "A JSON value representing the message.", required = true)
@RequestBody String requestBody) throws ThingsboardException {
return handleRuleEngineRequest(null, null, null, DEFAULT_TIMEOUT, requestBody);
}
@ApiOperation(value = "Push entity message to the rule engine (handleRuleEngineRequest)",
notes = MSG_DESCRIPTION_PREFIX +
"Uses specified Entity Id as the Rule Engine message originator. " +
MSG_DESCRIPTION +
"The default timeout of the request processing is 10 seconds."
+ "\n\n" + ControllerConstants.SECURITY_WRITE_CHECK)
@PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
@RequestMapping(value = "/{entityType}/{entityId}", method = RequestMethod.POST)
@ResponseBody
public DeferredResult<ResponseEntity> handleRuleEngineRequest(
@Parameter(description = ENTITY_TYPE_PARAM_DESCRIPTION, required = true)
@PathVariable("entityType") String entityType,
@Parameter(description = ENTITY_ID_PARAM_DESCRIPTION, required = true)
@PathVariable("entityId") String entityIdStr,
@Parameter(description = "A JSON value representing the message.", required = true)
@RequestBody String requestBody) throws ThingsboardException {
return handleRuleEngineRequest(entityType, entityIdStr, null, DEFAULT_TIMEOUT, requestBody);
}
@ApiOperation(value = "Push entity message with timeout to the rule engine (handleRuleEngineRequest)",
notes = MSG_DESCRIPTION_PREFIX +
"Uses specified Entity Id as the Rule Engine message originator. " +
MSG_DESCRIPTION +
"The platform expects the timeout value in milliseconds."
+ "\n\n" + ControllerConstants.SECURITY_WRITE_CHECK)
@PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
@RequestMapping(value = "/{entityType}/{entityId}/{timeout}", method = RequestMethod.POST)
@ResponseBody
public DeferredResult<ResponseEntity> handleRuleEngineRequest(
@Parameter(description = ENTITY_TYPE_PARAM_DESCRIPTION, required = true)
@PathVariable("entityType") String entityType,
@Parameter(description = ENTITY_ID_PARAM_DESCRIPTION, required = true)
@PathVariable("entityId") String entityIdStr,
@Parameter(description = "Timeout to process the request in milliseconds", required = true)
@PathVariable("timeout") int timeout,
@Parameter(description = "A JSON value representing the message.", required = true)
@RequestBody String requestBody) throws ThingsboardException {
return handleRuleEngineRequest(entityType, entityIdStr, null, timeout, requestBody);
}
@ApiOperation(value = "Push entity message with timeout and specified queue to the rule engine (handleRuleEngineRequest)",
notes = MSG_DESCRIPTION_PREFIX +
"Uses specified Entity Id as the Rule Engine message originator. " +
MSG_DESCRIPTION +
"If request sent for Device/Device Profile or Asset/Asset Profile entity, specified queue will be used instead of the queue selected in the device or asset profile. " +
"The platform expects the timeout value in milliseconds."
+ "\n\n" + ControllerConstants.SECURITY_WRITE_CHECK)
@PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
@RequestMapping(value = "/{entityType}/{entityId}/{queueName}/{timeout}", method = RequestMethod.POST)
@ResponseBody
public DeferredResult<ResponseEntity> handleRuleEngineRequest(
@Parameter(description = ENTITY_TYPE_PARAM_DESCRIPTION, required = true)
@PathVariable("entityType") String entityType,
@Parameter(description = ENTITY_ID_PARAM_DESCRIPTION, required = true)
@PathVariable("entityId") String entityIdStr,
@Parameter(description = "Queue name to process the request in the rule engine", required = true)
@PathVariable("queueName") String queueName,
@Parameter(description = "Timeout to process the request in milliseconds", required = true)
@PathVariable("timeout") int timeout,
@Parameter(description = "A JSON value representing the message.", required = true)
@RequestBody String requestBody) throws ThingsboardException {
try {
SecurityUser currentUser = getCurrentUser();
EntityId entityId;
if (StringUtils.isEmpty(entityType) || StringUtils.isEmpty(entityIdStr)) {
entityId = currentUser.getId();
} else {
entityId = EntityIdFactory.getByTypeAndId(entityType, entityIdStr);
}
//Check that this is a valid JSON
JacksonUtil.toJsonNode(requestBody);
final DeferredResult<ResponseEntity> response = new DeferredResult<>();
accessValidator.validate(currentUser, Operation.WRITE, entityId, new HttpValidationCallback(response, new FutureCallback<DeferredResult<ResponseEntity>>() {
@Override
public void onSuccess(@Nullable DeferredResult<ResponseEntity> result) {
long expTime = System.currentTimeMillis() + timeout;
HashMap<String, String> metaData = new HashMap<>();
UUID requestId = UUID.randomUUID();
metaData.put("serviceId", serviceInfoProvider.getServiceId());
metaData.put("requestUUID", requestId.toString());
metaData.put("expirationTime", Long.toString(expTime));
TbMsg msg = TbMsg.newMsg(queueName, TbMsgType.REST_API_REQUEST, entityId, currentUser.getCustomerId(), new TbMsgMetaData(metaData), requestBody);
ruleEngineCallService.processRestApiCallToRuleEngine(currentUser.getTenantId(), requestId, msg, queueName != null,
reply -> reply(new LocalRequestMetaData(msg, currentUser, result), reply));
}
@Override
public void onFailure(Throwable e) {
ResponseEntity entity;
if (e instanceof ToErrorResponseEntity) {
entity = ((ToErrorResponseEntity) e).toErrorResponseEntity();
} else {
entity = new ResponseEntity(HttpStatus.UNAUTHORIZED);
}
logRuleEngineCall(currentUser, entityId, requestBody, null, e);
response.setResult(entity);
}
}));
return response;
} catch (IllegalArgumentException iae) {
throw new ThingsboardException("Invalid request body", iae, ThingsboardErrorCode.BAD_REQUEST_PARAMS);
}
}
private void reply(LocalRequestMetaData rpcRequest, TbMsg response) {
DeferredResult<ResponseEntity> responseWriter = rpcRequest.responseWriter();
if (response == null) {
logRuleEngineCall(rpcRequest, null, new TimeoutException("Processing timeout detected!"));
responseWriter.setResult(new ResponseEntity<>(HttpStatus.REQUEST_TIMEOUT));
} else {
String responseData = response.getData();
if (!StringUtils.isEmpty(responseData)) {
try {
logRuleEngineCall(rpcRequest, response, null);
responseWriter.setResult(new ResponseEntity<>(JacksonUtil.toJsonNode(responseData), HttpStatus.OK));
} catch (IllegalArgumentException e) {
log.debug("Failed to decode device response: {}", responseData, e);
logRuleEngineCall(rpcRequest, response, e);
responseWriter.setResult(new ResponseEntity<>(HttpStatus.NOT_ACCEPTABLE));
}
} else {
logRuleEngineCall(rpcRequest, response, null);
responseWriter.setResult(new ResponseEntity<>(HttpStatus.OK));
}
}
}
private void logRuleEngineCall(LocalRequestMetaData rpcRequest, TbMsg response, Throwable e) {
logRuleEngineCall(rpcRequest.user(), rpcRequest.request().getOriginator(), rpcRequest.request().getData(), response, e);
}
private void logRuleEngineCall(SecurityUser user, EntityId entityId, String request, TbMsg response, Throwable e) {
auditLogService.logEntityAction(
user.getTenantId(),
user.getCustomerId(),
user.getId(),
user.getName(),
entityId,
null,
ActionType.REST_API_RULE_ENGINE_CALL,
BaseController.toException(e),
request,
response != null ? response.getData() : "");
}
private record LocalRequestMetaData(TbMsg request, SecurityUser user, DeferredResult<ResponseEntity> responseWriter) {}
}

1
application/src/main/java/org/thingsboard/server/controller/TbUrlConstants.java

@ -22,4 +22,5 @@ public class TbUrlConstants {
public static final String TELEMETRY_URL_PREFIX = "/api/plugins/telemetry";
public static final String RPC_V1_URL_PREFIX = "/api/plugins/rpc";
public static final String RPC_V2_URL_PREFIX = "/api/rpc";
public static final String RULE_ENGINE_URL_PREFIX = "/api/rule-engine/";
}

20
application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java

@ -191,6 +191,14 @@ public class DefaultTbClusterService implements TbClusterService {
toCoreNfs.incrementAndGet();
}
@Override
public void pushNotificationToCore(String targetServiceId, TransportProtos.RestApiCallResponseMsgProto responseMsgProto, TbQueueCallback callback) {
TopicPartitionInfo tpi = topicService.getNotificationsTopic(ServiceType.TB_CORE, targetServiceId);
ToCoreNotificationMsg msg = ToCoreNotificationMsg.newBuilder().setRestApiCallResponseMsg(responseMsgProto).build();
producerProvider.getTbCoreNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), msg), callback);
toCoreNfs.incrementAndGet();
}
@Override
public void pushMsgToRuleEngine(TopicPartitionInfo tpi, UUID msgId, ToRuleEngineMsg msg, TbQueueCallback callback) {
log.trace("PUSHING msg: {} to:{}", msg, tpi);
@ -200,6 +208,11 @@ public class DefaultTbClusterService implements TbClusterService {
@Override
public void pushMsgToRuleEngine(TenantId tenantId, EntityId entityId, TbMsg tbMsg, TbQueueCallback callback) {
pushMsgToRuleEngine(tenantId, entityId, tbMsg, false, callback);
}
@Override
public void pushMsgToRuleEngine(TenantId tenantId, EntityId entityId, TbMsg tbMsg, boolean useQueueFromTbMsg, TbQueueCallback callback) {
if (tenantId == null || tenantId.isNullUid()) {
if (entityId.getEntityType().equals(EntityType.TENANT)) {
tenantId = TenantId.fromUUID(entityId.getId());
@ -209,9 +222,8 @@ public class DefaultTbClusterService implements TbClusterService {
}
} else {
HasRuleEngineProfile ruleEngineProfile = getRuleEngineProfileForEntityOrElseNull(tenantId, entityId, tbMsg);
tbMsg = transformMsg(tbMsg, ruleEngineProfile);
tbMsg = transformMsg(tbMsg, ruleEngineProfile, useQueueFromTbMsg);
}
ruleEngineProducerService.sendToRuleEngine(producerProvider.getRuleEngineMsgProducer(), tenantId, tbMsg, callback);
toRuleEngineMsgs.incrementAndGet();
}
@ -255,10 +267,10 @@ public class DefaultTbClusterService implements TbClusterService {
return null;
}
private TbMsg transformMsg(TbMsg tbMsg, HasRuleEngineProfile ruleEngineProfile) {
private TbMsg transformMsg(TbMsg tbMsg, HasRuleEngineProfile ruleEngineProfile, boolean useQueueFromTbMsg) {
if (ruleEngineProfile != null) {
RuleChainId targetRuleChainId = ruleEngineProfile.getDefaultRuleChainId();
String targetQueueName = ruleEngineProfile.getDefaultQueueName();
String targetQueueName = useQueueFromTbMsg ? tbMsg.getQueueName() : ruleEngineProfile.getDefaultQueueName();
boolean isRuleChainTransform = targetRuleChainId != null && !targetRuleChainId.equals(tbMsg.getRuleChainId());
boolean isQueueTransform = targetQueueName != null && !targetQueueName.equals(tbMsg.getQueueName());

13
application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java

@ -96,6 +96,7 @@ import org.thingsboard.server.service.queue.processing.AbstractConsumerService;
import org.thingsboard.server.service.queue.processing.IdMsgPair;
import org.thingsboard.server.service.resource.TbImageService;
import org.thingsboard.server.service.rpc.TbCoreDeviceRpcService;
import org.thingsboard.server.service.ruleengine.RuleEngineCallService;
import org.thingsboard.server.service.security.auth.jwt.settings.JwtSettingsService;
import org.thingsboard.server.service.state.DeviceStateService;
import org.thingsboard.server.service.subscription.SubscriptionManagerService;
@ -149,6 +150,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
private final NotificationRuleProcessor notificationRuleProcessor;
private final TbCoreQueueFactory queueFactory;
private final TbImageService imageService;
private final RuleEngineCallService ruleEngineCallService;
private final TbCoreConsumerStats stats;
private MainQueueConsumerManager<TbProtoQueueMsg<ToCoreMsg>, CoreQueueConfig> mainConsumer;
@ -177,7 +179,8 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
JwtSettingsService jwtSettingsService,
NotificationSchedulerService notificationSchedulerService,
NotificationRuleProcessor notificationRuleProcessor,
TbImageService imageService) {
TbImageService imageService,
RuleEngineCallService ruleEngineCallService) {
super(actorContext, tenantProfileCache, deviceProfileCache, assetProfileCache, apiUsageStateService, partitionService,
eventPublisher, jwtSettingsService);
this.stateService = stateService;
@ -192,6 +195,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
this.notificationSchedulerService = notificationSchedulerService;
this.notificationRuleProcessor = notificationRuleProcessor;
this.imageService = imageService;
this.ruleEngineCallService = ruleEngineCallService;
this.queueFactory = tbCoreQueueFactory;
}
@ -377,6 +381,9 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
} else if (toCoreNotification.hasFromDeviceRpcResponse()) {
log.trace("[{}] Forwarding message to RPC service {}", id, toCoreNotification.getFromDeviceRpcResponse());
forwardToCoreRpcService(toCoreNotification.getFromDeviceRpcResponse(), callback);
} else if (toCoreNotification.hasRestApiCallResponseMsg()) {
log.trace("[{}] Forwarding message to RuleEngineCallService service {}", id, toCoreNotification.getRestApiCallResponseMsg());
forwardToRuleEngineCallService(toCoreNotification.getRestApiCallResponseMsg(), callback);
} else if (toCoreNotification.hasComponentLifecycle()) {
handleComponentLifecycleMsg(id, ProtoUtils.fromProto(toCoreNotification.getComponentLifecycle()));
callback.onSuccess();
@ -738,6 +745,10 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
actorContext.getDbCallbackExecutor());
}
void forwardToRuleEngineCallService(TransportProtos.RestApiCallResponseMsgProto restApiCallResponseMsg, TbCallback callback) {
ruleEngineCallService.onQueueMsg(restApiCallResponseMsg, callback);
}
private void throwNotHandled(Object msg, TbCallback callback) {
log.warn("Message not handled: {}", msg);
callback.onFailure(new RuntimeException("Message not handled!"));

11
application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcService.java

@ -29,6 +29,7 @@ import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.rpc.Rpc;
import org.thingsboard.server.common.data.rpc.RpcError;
import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse;
@ -127,6 +128,16 @@ public class DefaultTbRuleEngineRpcService implements TbRuleEngineDeviceRpcServi
});
}
@Override
public void sendRestApiCallReply(String serviceId, UUID requestId, TbMsg tbMsg) {
TransportProtos.RestApiCallResponseMsgProto msg = TransportProtos.RestApiCallResponseMsgProto.newBuilder()
.setRequestIdMSB(requestId.getMostSignificantBits())
.setRequestIdLSB(requestId.getLeastSignificantBits())
.setResponse(TbMsg.toByteString(tbMsg))
.build();
clusterService.pushNotificationToCore(serviceId, msg, null);
}
@Override
public Rpc findRpcById(TenantId tenantId, RpcId id) {
return rpcService.findById(tenantId, id);

100
application/src/main/java/org/thingsboard/server/service/ruleengine/DefaultRuleEngineCallService.java

@ -0,0 +1,100 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.ruleengine;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.queue.TbMsgCallback;
import org.thingsboard.server.gen.transport.TransportProtos;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
@Service
@Slf4j
public class DefaultRuleEngineCallService implements RuleEngineCallService {
private final TbClusterService clusterService;
private ScheduledExecutorService executor;
private final ConcurrentMap<UUID, Consumer<TbMsg>> requests = new ConcurrentHashMap<>();
public DefaultRuleEngineCallService(TbClusterService clusterService) {
this.clusterService = clusterService;
}
@PostConstruct
public void initExecutor() {
executor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("re-rest-callback"));
}
@PreDestroy
public void shutdownExecutor() {
if (executor != null) {
executor.shutdownNow();
}
}
@Override
public void processRestApiCallToRuleEngine(TenantId tenantId, UUID requestId, TbMsg request, boolean useQueueFromTbMsg, Consumer<TbMsg> responseConsumer) {
log.trace("[{}] Processing REST API call to rule engine: [{}] for entity: [{}]", tenantId, requestId, request.getOriginator());
requests.put(requestId, responseConsumer);
sendRequestToRuleEngine(tenantId, request, useQueueFromTbMsg);
scheduleTimeout(request, requestId, requests);
}
@Override
public void onQueueMsg(TransportProtos.RestApiCallResponseMsgProto restApiCallResponseMsg, TbCallback callback) {
UUID requestId = new UUID(restApiCallResponseMsg.getRequestIdMSB(), restApiCallResponseMsg.getRequestIdLSB());
Consumer<TbMsg> consumer = requests.remove(requestId);
if (consumer != null) {
consumer.accept(TbMsg.fromBytes(null, restApiCallResponseMsg.getResponse().toByteArray(), TbMsgCallback.EMPTY));
} else {
log.trace("[{}] Unknown or stale rest api call response received", requestId);
}
callback.onSuccess();
}
private void sendRequestToRuleEngine(TenantId tenantId, TbMsg msg, boolean useQueueFromTbMsg) {
clusterService.pushMsgToRuleEngine(tenantId, msg.getOriginator(), msg, useQueueFromTbMsg, null);
}
private void scheduleTimeout(TbMsg request, UUID requestId, ConcurrentMap<UUID, Consumer<TbMsg>> requestsMap) {
long expirationTime = Long.parseLong(request.getMetaData().getValue("expirationTime"));
long timeout = Math.max(0, expirationTime - System.currentTimeMillis());
log.trace("[{}] processing the request: [{}]", this.hashCode(), requestId);
executor.schedule(() -> {
Consumer<TbMsg> consumer = requestsMap.remove(requestId);
if (consumer != null) {
log.trace("[{}] request timeout detected: [{}]", this.hashCode(), requestId);
consumer.accept(null);
}
}, timeout, TimeUnit.MILLISECONDS);
}
}

31
application/src/main/java/org/thingsboard/server/service/ruleengine/RuleEngineCallService.java

@ -0,0 +1,31 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.ruleengine;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.gen.transport.TransportProtos;
import java.util.UUID;
import java.util.function.Consumer;
public interface RuleEngineCallService {
void processRestApiCallToRuleEngine(TenantId tenantId, UUID requestId, TbMsg request, boolean useQueueFromTbMsg, Consumer<TbMsg> responseConsumer);
void onQueueMsg(TransportProtos.RestApiCallResponseMsgProto restApiCallResponseMsg, TbCallback callback);
}

20
application/src/test/java/org/thingsboard/server/controller/AbstractNotifyEntityTest.java

@ -386,6 +386,15 @@ public abstract class AbstractNotifyEntityTest extends AbstractWebTest {
actionType, cntTime, extractMatcherAdditionalInfo(additionalInfo));
}
protected void testLogEntityActionError(EntityId originatorId, TenantId tenantId,
CustomerId customerId, UserId userId, String userName,
ActionType actionType, Exception exception, Object... additionalInfo) {
ArgumentMatcher<Exception> matcherError = argument -> argument.getMessage().contains(exception.getMessage())
& argument.getClass().equals(exception.getClass());
testLogEntityActionErrorAdditionalInfo(Objects::isNull, originatorId, tenantId, customerId, userId, userName,
actionType, 1, matcherError, extractMatcherAdditionalInfo(additionalInfo));
}
private void testLogEntityActionAdditionalInfo(ArgumentMatcher<HasName> matcherEntity, ArgumentMatcher<EntityId> matcherOriginatorId,
TenantId tenantId, ArgumentMatcher<CustomerId> matcherCustomerId,
ArgumentMatcher<UserId> matcherUserId, String userName, ActionType actionType,
@ -529,8 +538,9 @@ public abstract class AbstractNotifyEntityTest extends AbstractWebTest {
Mockito.argThat(matcherEntity),
Mockito.eq(actionType),
Mockito.argThat(matcherError),
Mockito.argThat(Mockito.eq(matcherAdditionalInfos.get(0))),
Mockito.argThat(Mockito.eq(matcherAdditionalInfos.get(1))));
Mockito.argThat(matcherAdditionalInfos.get(0)),
Mockito.argThat(matcherAdditionalInfos.get(1)));
break;
case 3:
Mockito.verify(auditLogService, times(cntTime))
.logEntityAction(Mockito.eq(tenantId),
@ -541,9 +551,9 @@ public abstract class AbstractNotifyEntityTest extends AbstractWebTest {
Mockito.argThat(matcherEntity),
Mockito.eq(actionType),
Mockito.argThat(matcherError),
Mockito.argThat(Mockito.eq(matcherAdditionalInfos.get(0))),
Mockito.argThat(Mockito.eq(matcherAdditionalInfos.get(1))),
Mockito.argThat(Mockito.eq(matcherAdditionalInfos.get(2))));
Mockito.argThat(matcherAdditionalInfos.get(0)),
Mockito.argThat(matcherAdditionalInfos.get(1)),
Mockito.argThat(matcherAdditionalInfos.get(2)));
break;
default:
Mockito.verify(auditLogService, times(cntTime))

12
application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java

@ -212,6 +212,7 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest {
protected UserId differentCustomerUserId;
protected UserId differentTenantCustomerUserId;
protected UserId currentUserId;
@SuppressWarnings("rawtypes")
private HttpMessageConverter mappingJackson2HttpMessageConverter;
@ -567,6 +568,8 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest {
Claims claims = jwsClaims.getPayload();
String subject = claims.getSubject();
Assert.assertEquals(username, subject);
String userId = claims.get("userId", String.class);
this.currentUserId = UserId.fromString(userId);
}
protected void resetTokens() throws Exception {
@ -633,6 +636,11 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest {
return doPost("/api/device?accessToken=" + accessToken, device, Device.class);
}
protected Device assignDeviceToCustomer(DeviceId deviceId, CustomerId customerId) {
String deviceIdStr = String.valueOf(deviceId.getId());
return doPost("/api/customer/" + customerId.getId() + "/device/" + deviceIdStr, Device.class);
}
protected MqttDeviceProfileTransportConfiguration createMqttDeviceProfileTransportConfiguration(TransportPayloadTypeConfiguration transportPayloadTypeConfiguration, boolean sendAckOnValidationException) {
MqttDeviceProfileTransportConfiguration mqttDeviceProfileTransportConfiguration = new MqttDeviceProfileTransportConfiguration();
mqttDeviceProfileTransportConfiguration.setDeviceTelemetryTopic(MqttTopics.DEVICE_TELEMETRY_TOPIC);
@ -803,6 +811,10 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest {
return readResponse(doPost(urlTemplate, content, params).andExpect(resultMatcher), responseType);
}
protected <T, R> R doPostAsyncWithTypedResponse(String urlTemplate, T content, TypeReference<R> responseType, ResultMatcher resultMatcher, String... params) throws Exception {
return readResponse(doPostAsync(urlTemplate, content, DEFAULT_TIMEOUT, params).andExpect(resultMatcher), responseType);
}
protected <T, R> R doPostAsync(String urlTemplate, T content, Class<R> responseClass, ResultMatcher resultMatcher, String... params) throws Exception {
return readResponse(doPostAsync(urlTemplate, content, DEFAULT_TIMEOUT, params).andExpect(resultMatcher), responseClass);
}

249
application/src/test/java/org/thingsboard/server/controller/RuleEngineControllerTest.java

@ -0,0 +1,249 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.controller;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.springframework.boot.test.mock.mockito.SpyBean;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.audit.ActionType;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.dao.service.DaoSqlTest;
import org.thingsboard.server.service.ruleengine.RuleEngineCallService;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
@DaoSqlTest
public class RuleEngineControllerTest extends AbstractControllerTest {
private final String REQUEST_BODY = "{\"request\":\"download\"}";
private final String RESPONSE_BODY = "{\"response\":\"downloadOk\"}";
@SpyBean
private RuleEngineCallService ruleEngineCallService;
@Test
public void testHandleRuleEngineRequestWithMsgOriginatorUser() throws Exception {
loginSysAdmin();
TbMsg responseMsg = TbMsg.newMsg(TbMsgType.REST_API_REQUEST, currentUserId, TbMsgMetaData.EMPTY, RESPONSE_BODY);
mockRestApiCallToRuleEngine(responseMsg);
JsonNode apiResponse = doPostAsyncWithTypedResponse("/api/rule-engine/", REQUEST_BODY, new TypeReference<>() {
}, status().isOk());
assertThat(JacksonUtil.toString(apiResponse)).isEqualTo(RESPONSE_BODY);
ArgumentCaptor<TbMsg> requestMsgCaptor = ArgumentCaptor.forClass(TbMsg.class);
verify(ruleEngineCallService).processRestApiCallToRuleEngine(eq(TenantId.SYS_TENANT_ID), any(UUID.class), requestMsgCaptor.capture(), eq(false), any(Consumer.class));
TbMsg requestMsgCaptorValue = requestMsgCaptor.getValue();
assertThat(requestMsgCaptorValue.getData()).isEqualTo(REQUEST_BODY);
assertThat(requestMsgCaptorValue.getType()).isEqualTo(TbMsgType.REST_API_REQUEST.name());
assertThat(requestMsgCaptorValue.getOriginator()).isEqualTo(currentUserId);
assertThat(requestMsgCaptorValue.getCustomerId()).isNull();
checkMetadataProperties(requestMsgCaptorValue.getMetaData());
testLogEntityAction(null, currentUserId, TenantId.SYS_TENANT_ID, new CustomerId(EntityId.NULL_UUID), currentUserId,
SYS_ADMIN_EMAIL, ActionType.REST_API_RULE_ENGINE_CALL, 1, REQUEST_BODY, RESPONSE_BODY);
}
@Test
public void testHandleRuleEngineRequestWithMsgOriginatorDevice() throws Exception {
loginTenantAdmin();
Device device = createDevice("Test", "123");
DeviceId deviceId = device.getId();
TbMsg responseMsg = TbMsg.newMsg(TbMsgType.REST_API_REQUEST, deviceId, TbMsgMetaData.EMPTY, RESPONSE_BODY);
mockRestApiCallToRuleEngine(responseMsg);
JsonNode apiResponse = doPostAsyncWithTypedResponse("/api/rule-engine/DEVICE/" + deviceId.getId(), REQUEST_BODY, new TypeReference<>() {
}, status().isOk());
assertThat(JacksonUtil.toString(apiResponse)).isEqualTo(RESPONSE_BODY);
ArgumentCaptor<TbMsg> requestMsgCaptor = ArgumentCaptor.forClass(TbMsg.class);
verify(ruleEngineCallService).processRestApiCallToRuleEngine(eq(tenantId), any(UUID.class), requestMsgCaptor.capture(), eq(false), any(Consumer.class));
TbMsg requestMsgCaptorValue = requestMsgCaptor.getValue();
assertThat(requestMsgCaptorValue.getData()).isEqualTo(REQUEST_BODY);
assertThat(requestMsgCaptorValue.getType()).isEqualTo(TbMsgType.REST_API_REQUEST.name());
assertThat(requestMsgCaptorValue.getOriginator()).isEqualTo(deviceId);
assertThat(requestMsgCaptorValue.getCustomerId()).isNull();
checkMetadataProperties(requestMsgCaptorValue.getMetaData());
testLogEntityAction(null, deviceId, tenantId, new CustomerId(EntityId.NULL_UUID), tenantAdminUserId,
TENANT_ADMIN_EMAIL, ActionType.REST_API_RULE_ENGINE_CALL, 1, REQUEST_BODY, RESPONSE_BODY);
}
@Test
public void testHandleRuleEngineRequestWithMsgOriginatorDeviceAndSpecifiedTimeout() throws Exception {
loginTenantAdmin();
Device device = createDevice("Test", "123");
DeviceId deviceId = device.getId();
TbMsg responseMsg = TbMsg.newMsg(TbMsgType.REST_API_REQUEST, deviceId, TbMsgMetaData.EMPTY, RESPONSE_BODY);
mockRestApiCallToRuleEngine(responseMsg);
JsonNode apiResponse = doPostAsyncWithTypedResponse("/api/rule-engine/DEVICE/" + deviceId.getId() + "/15000", REQUEST_BODY, new TypeReference<>() {
}, status().isOk());
assertThat(JacksonUtil.toString(apiResponse)).isEqualTo(RESPONSE_BODY);
ArgumentCaptor<TbMsg> requestMsgCaptor = ArgumentCaptor.forClass(TbMsg.class);
verify(ruleEngineCallService).processRestApiCallToRuleEngine(eq(tenantId), any(UUID.class), requestMsgCaptor.capture(), eq(false), any(Consumer.class));
TbMsg requestMsgCaptorValue = requestMsgCaptor.getValue();
assertThat(requestMsgCaptorValue.getData()).isEqualTo(REQUEST_BODY);
assertThat(requestMsgCaptorValue.getType()).isEqualTo(TbMsgType.REST_API_REQUEST.name());
assertThat(requestMsgCaptorValue.getOriginator()).isEqualTo(deviceId);
assertThat(requestMsgCaptorValue.getCustomerId()).isNull();
checkMetadataProperties(requestMsgCaptorValue.getMetaData());
testLogEntityAction(null, deviceId, tenantId, new CustomerId(EntityId.NULL_UUID), tenantAdminUserId,
TENANT_ADMIN_EMAIL, ActionType.REST_API_RULE_ENGINE_CALL, 1, REQUEST_BODY, RESPONSE_BODY);
}
@Test
public void testHandleRuleEngineRequestWithMsgOriginatorDeviceAndResponseIsNull() throws Exception {
loginTenantAdmin();
Device device = createDevice("Test", "123");
DeviceId deviceId = device.getId();
mockRestApiCallToRuleEngine(null);
doPostAsync("/api/rule-engine/DEVICE/" + deviceId.getId() + "/15000", REQUEST_BODY, String.class, status().isRequestTimeout());
ArgumentCaptor<TbMsg> requestMsgCaptor = ArgumentCaptor.forClass(TbMsg.class);
verify(ruleEngineCallService).processRestApiCallToRuleEngine(eq(tenantId), any(UUID.class), requestMsgCaptor.capture(), eq(false), any(Consumer.class));
TbMsg requestMsgCaptorValue = requestMsgCaptor.getValue();
assertThat(requestMsgCaptorValue.getData()).isEqualTo(REQUEST_BODY);
assertThat(requestMsgCaptorValue.getType()).isEqualTo(TbMsgType.REST_API_REQUEST.name());
assertThat(requestMsgCaptorValue.getOriginator()).isEqualTo(deviceId);
assertThat(requestMsgCaptorValue.getCustomerId()).isNull();
checkMetadataProperties(requestMsgCaptorValue.getMetaData());
Exception exception = new TimeoutException("Processing timeout detected!");
testLogEntityActionError(deviceId, tenantId, new CustomerId(EntityId.NULL_UUID), tenantAdminUserId,
TENANT_ADMIN_EMAIL, ActionType.REST_API_RULE_ENGINE_CALL, exception, REQUEST_BODY, "");
}
@Test
public void testHandleRuleEngineRequestWithMsgOriginatorDeviceAndSpecifiedQueue() throws Exception {
loginTenantAdmin();
Device device = createDevice("Test", "123");
DeviceId deviceId = device.getId();
TbMsg responseMsg = TbMsg.newMsg(DataConstants.HP_QUEUE_NAME, TbMsgType.REST_API_REQUEST, deviceId, TbMsgMetaData.EMPTY, RESPONSE_BODY);
mockRestApiCallToRuleEngine(responseMsg);
JsonNode apiResponse = doPostAsyncWithTypedResponse("/api/rule-engine/DEVICE/" + deviceId.getId() + "/HighPriority/1000", REQUEST_BODY, new TypeReference<>() {
}, status().isOk());
assertThat(JacksonUtil.toString(apiResponse)).isEqualTo(RESPONSE_BODY);
ArgumentCaptor<TbMsg> requestMsgCaptor = ArgumentCaptor.forClass(TbMsg.class);
verify(ruleEngineCallService).processRestApiCallToRuleEngine(eq(tenantId), any(UUID.class), requestMsgCaptor.capture(), eq(true), any(Consumer.class));
TbMsg requestMsgCaptorValue = requestMsgCaptor.getValue();
assertThat(requestMsgCaptorValue.getData()).isEqualTo(REQUEST_BODY);
assertThat(requestMsgCaptorValue.getType()).isEqualTo(TbMsgType.REST_API_REQUEST.name());
assertThat(requestMsgCaptorValue.getQueueName()).isEqualTo(DataConstants.HP_QUEUE_NAME);
assertThat(requestMsgCaptorValue.getOriginator()).isEqualTo(deviceId);
assertThat(requestMsgCaptorValue.getCustomerId()).isNull();
checkMetadataProperties(requestMsgCaptorValue.getMetaData());
testLogEntityAction(null, deviceId, tenantId, new CustomerId(EntityId.NULL_UUID), tenantAdminUserId,
TENANT_ADMIN_EMAIL, ActionType.REST_API_RULE_ENGINE_CALL, 1, REQUEST_BODY, RESPONSE_BODY);
}
@Test
public void testHandleRuleEngineRequestWithInvalidRequestBody() throws Exception {
loginSysAdmin();
doPost("/api/rule-engine/", (Object) "@")
.andExpect(status().isBadRequest())
.andExpect(statusReason(containsString("Invalid request body")));
verifyNoInteractions(ruleEngineCallService);
}
@Test
public void testHandleRuleEngineRequestWithAuthorityCustomerUser() throws Exception {
loginTenantAdmin();
Device device = createDevice("Test", "123");
DeviceId deviceId = device.getId();
assignDeviceToCustomer(deviceId, customerId);
loginCustomerUser();
TbMsg responseMsg = TbMsg.newMsg(TbMsgType.REST_API_REQUEST, deviceId, customerId, TbMsgMetaData.EMPTY, RESPONSE_BODY);
mockRestApiCallToRuleEngine(responseMsg);
JsonNode apiResponse = doPostAsyncWithTypedResponse("/api/rule-engine/DEVICE/" + deviceId.getId(), REQUEST_BODY, new TypeReference<>() {
}, status().isOk());
assertThat(JacksonUtil.toString(apiResponse)).isEqualTo(RESPONSE_BODY);
ArgumentCaptor<TbMsg> requestMsgCaptor = ArgumentCaptor.forClass(TbMsg.class);
verify(ruleEngineCallService).processRestApiCallToRuleEngine(eq(tenantId), any(UUID.class), requestMsgCaptor.capture(), eq(false), any(Consumer.class));
TbMsg requestMsgCaptorValue = requestMsgCaptor.getValue();
assertThat(requestMsgCaptorValue.getData()).isEqualTo(REQUEST_BODY);
assertThat(requestMsgCaptorValue.getType()).isEqualTo(TbMsgType.REST_API_REQUEST.name());
assertThat(requestMsgCaptorValue.getOriginator()).isEqualTo(deviceId);
assertThat(requestMsgCaptorValue.getCustomerId()).isEqualTo(customerId);
checkMetadataProperties(requestMsgCaptorValue.getMetaData());
testLogEntityAction(null, deviceId, tenantId, customerId, customerUserId, CUSTOMER_USER_EMAIL,
ActionType.REST_API_RULE_ENGINE_CALL, 1, REQUEST_BODY, RESPONSE_BODY);
}
@Test
public void testHandleRuleEngineRequestWithoutPermission() throws Exception {
loginTenantAdmin();
Device device = createDevice("test", "123");
loginCustomerUser();
doPostAsync("/api/rule-engine/DEVICE/" + device.getId().getId(), (Object) REQUEST_BODY, -1L)
.andExpect(status().isForbidden())
.andExpect(content().string("You don't have permission to perform this operation!"));
verifyNoInteractions(ruleEngineCallService);
}
@Test
public void testHandleRuleEngineRequestUnauthorized() throws Exception {
doPost("/api/rule-engine/", (Object) REQUEST_BODY)
.andExpect(status().isUnauthorized())
.andExpect(statusReason(containsString("Authentication failed")));
}
private void mockRestApiCallToRuleEngine(TbMsg responseMsg) {
doAnswer(invocation -> {
Consumer<TbMsg> consumer = invocation.getArgument(4);
consumer.accept(responseMsg);
return null;
}).when(ruleEngineCallService).processRestApiCallToRuleEngine(any(TenantId.class), any(UUID.class), any(TbMsg.class), anyBoolean(), any(Consumer.class));
}
public void checkMetadataProperties(TbMsgMetaData metaData) {
Map<String, String> data = metaData.getData();
assertThat(data).containsKeys("serviceId", "requestUUID", "expirationTime");
}
}

102
application/src/test/java/org/thingsboard/server/service/queue/DefaultTbClusterServiceTest.java

@ -19,6 +19,7 @@ import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.boot.test.mock.mockito.SpyBean;
import org.springframework.test.context.ContextConfiguration;
@ -27,6 +28,7 @@ import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.asset.Asset;
import org.thingsboard.server.common.data.id.AssetId;
import org.thingsboard.server.common.data.id.AssetProfileId;
@ -37,10 +39,14 @@ import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.data.queue.Queue;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.dao.edge.EdgeService;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.common.DefaultTbQueueMsgHeaders;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.common.TbRuleEngineProducerService;
import org.thingsboard.server.queue.discovery.PartitionService;
@ -53,13 +59,16 @@ import org.thingsboard.server.service.profile.TbDeviceProfileCache;
import java.util.List;
import java.util.UUID;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
@Slf4j
@ -249,6 +258,99 @@ public class DefaultTbClusterServiceTest {
.send(eq(topicService.getNotificationsTopic(ServiceType.TB_TRANSPORT, monolith2)), any(TbProtoQueueMsg.class), isNull());
}
@Test
public void testPushNotificationToCoreWithRestApiCallResponseMsgProto() {
TopicPartitionInfo tpi = mock(TopicPartitionInfo.class);
TbQueueCallback callbackMock = mock(TbQueueCallback.class);
TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreNotificationMsg>> tbCoreQueueProducer = mock(TbQueueProducer.class);
doReturn(tpi).when(topicService).getNotificationsTopic(any(ServiceType.class), any(String.class));
when(producerProvider.getTbCoreNotificationsMsgProducer()).thenReturn(tbCoreQueueProducer);
TransportProtos.RestApiCallResponseMsgProto responseMsgProto = TransportProtos.RestApiCallResponseMsgProto.getDefaultInstance();
TransportProtos.ToCoreNotificationMsg toCoreNotificationMsg = TransportProtos.ToCoreNotificationMsg.newBuilder().setRestApiCallResponseMsg(responseMsgProto).build();
clusterService.pushNotificationToCore(CORE, responseMsgProto, callbackMock);
verify(topicService).getNotificationsTopic(ServiceType.TB_CORE, CORE);
verify(producerProvider).getTbCoreNotificationsMsgProducer();
ArgumentCaptor<TbProtoQueueMsg<TransportProtos.ToCoreNotificationMsg>> protoQueueMsgArgumentCaptor = ArgumentCaptor.forClass(TbProtoQueueMsg.class);
verify(tbCoreQueueProducer).send(eq(tpi), protoQueueMsgArgumentCaptor.capture(), eq(callbackMock));
TbProtoQueueMsg<TransportProtos.ToCoreNotificationMsg> protoQueueMsgArgumentCaptorValue = protoQueueMsgArgumentCaptor.getValue();
assertThat(protoQueueMsgArgumentCaptorValue.getKey()).isNotNull();
assertThat(protoQueueMsgArgumentCaptorValue.getValue()).isEqualTo(toCoreNotificationMsg);
assertThat(protoQueueMsgArgumentCaptorValue.getHeaders().getData()).isEqualTo(new DefaultTbQueueMsgHeaders().getData());
}
@Test
public void testPushMsgToRuleEngineWithTenantIdIsNullUuidAndEntityIsTenantUseQueueFromMsgIsTrue() {
TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> tbREQueueProducer = mock(TbQueueProducer.class);
TbQueueCallback callback = mock(TbQueueCallback.class);
TenantId tenantId = TenantId.fromUUID(UUID.fromString("3c8bd350-1239-4a3b-b9c3-4dd76f8e20f1"));
TbMsg requestMsg = TbMsg.newMsg(DataConstants.HP_QUEUE_NAME, TbMsgType.REST_API_REQUEST, tenantId, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT);
when(producerProvider.getRuleEngineMsgProducer()).thenReturn(tbREQueueProducer);
clusterService.pushMsgToRuleEngine(TenantId.SYS_TENANT_ID, tenantId, requestMsg, true, callback);
verify(producerProvider).getRuleEngineMsgProducer();
verify(ruleEngineProducerService).sendToRuleEngine(tbREQueueProducer, tenantId, requestMsg, callback);
}
@Test
public void testPushMsgToRuleEngineWithTenantIdIsNullUuidAndEntityIsDevice() {
TenantId tenantId = TenantId.SYS_TENANT_ID;
DeviceId deviceId = new DeviceId(UUID.fromString("aa6d112d-2914-4a22-a9e3-bee33edbdb14"));
TbMsg requestMsg = TbMsg.newMsg(TbMsgType.REST_API_REQUEST, deviceId, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT);
TbQueueCallback callback = mock(TbQueueCallback.class);
clusterService.pushMsgToRuleEngine(tenantId, deviceId, requestMsg, false, callback);
verifyNoMoreInteractions(partitionService, producerProvider);
}
@Test
public void testPushMsgToRuleEngineWithTenantIdIsNotNullUuidUseQueueFromMsgIsTrue() {
TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> tbREQueueProducer = mock(TbQueueProducer.class);
TbQueueCallback callback = mock(TbQueueCallback.class);
TenantId tenantId = TenantId.fromUUID(UUID.fromString("3c8bd350-1239-4a3b-b9c3-4dd76f8e20f1"));
DeviceId deviceId = new DeviceId(UUID.fromString("adbb9d41-3367-40fd-9e74-7dd7cc5d30cf"));
DeviceProfile deviceProfile = new DeviceProfile(new DeviceProfileId(UUID.fromString("552f5d6d-0b2b-43e1-a7d2-a51cb2a96927")));
TbMsg requestMsg = TbMsg.newMsg(DataConstants.HP_QUEUE_NAME, TbMsgType.REST_API_REQUEST, deviceId, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT);
when(deviceProfileCache.get(any(TenantId.class), any(DeviceId.class))).thenReturn(deviceProfile);
when(producerProvider.getRuleEngineMsgProducer()).thenReturn(tbREQueueProducer);
clusterService.pushMsgToRuleEngine(tenantId, deviceId, requestMsg, true, callback);
verify(producerProvider).getRuleEngineMsgProducer();
verify(ruleEngineProducerService).sendToRuleEngine(tbREQueueProducer, tenantId, requestMsg, callback);
}
@Test
public void testPushMsgToRuleEngineUseQueueFromMsgIsFalse() {
TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> tbREQueueProducer = mock(TbQueueProducer.class);
TbQueueCallback callback = mock(TbQueueCallback.class);
TenantId tenantId = TenantId.fromUUID(UUID.fromString("5377c8d0-26e5-4d81-84c6-4344043973c8"));
DeviceId deviceId = new DeviceId(UUID.fromString("016c2abb-f46f-49f9-a83d-4d28b803cfe6"));
DeviceProfile deviceProfile = new DeviceProfile(new DeviceProfileId(UUID.fromString("dc5766e2-1a32-4022-859b-743050097ab7")));
deviceProfile.setDefaultQueueName(DataConstants.MAIN_QUEUE_NAME);
TbMsg requestMsg = TbMsg.newMsg(TbMsgType.REST_API_REQUEST, deviceId, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT);
when(deviceProfileCache.get(any(TenantId.class), any(DeviceId.class))).thenReturn(deviceProfile);
when(producerProvider.getRuleEngineMsgProducer()).thenReturn(tbREQueueProducer);
clusterService.pushMsgToRuleEngine(tenantId, deviceId, requestMsg, false, callback);
verify(producerProvider).getRuleEngineMsgProducer();
TbMsg expectedMsg = TbMsg.transformMsgQueueName(requestMsg, DataConstants.MAIN_QUEUE_NAME);
ArgumentCaptor<TbMsg> actualMsg = ArgumentCaptor.forClass(TbMsg.class);
verify(ruleEngineProducerService).sendToRuleEngine(eq(tbREQueueProducer), eq(tenantId), actualMsg.capture(), eq(callback));
assertThat(actualMsg.getValue()).usingRecursiveComparison().ignoringFields("ctx").isEqualTo(expectedMsg);
}
protected Queue createTestQueue() {
TenantId tenantId = TenantId.SYS_TENANT_ID;
Queue queue = new Queue(new QueueId(UUID.randomUUID()));

16
application/src/test/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerServiceTest.java

@ -29,6 +29,7 @@ import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.service.ruleengine.RuleEngineCallService;
import org.thingsboard.server.service.state.DeviceStateService;
import java.util.UUID;
@ -48,6 +49,8 @@ public class DefaultTbCoreConsumerServiceTest {
private DeviceStateService stateServiceMock;
@Mock
private TbCoreConsumerStats statsMock;
@Mock
private RuleEngineCallService ruleEngineCallServiceMock;
@Mock
private TbCallback tbCallbackMock;
@ -529,4 +532,17 @@ public class DefaultTbCoreConsumerServiceTest {
then(statsMock).should(never()).log(inactivityMsg);
}
@Test
public void givenRestApiCallResponseMsgProto_whenForwardToRuleEngineCallService_thenCallOnQueueMsg() {
// GIVEN
ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "ruleEngineCallService", ruleEngineCallServiceMock);
var restApiCallResponseMsgProto = TransportProtos.RestApiCallResponseMsgProto.getDefaultInstance();
doCallRealMethod().when(defaultTbCoreConsumerServiceMock).forwardToRuleEngineCallService(restApiCallResponseMsgProto, tbCallbackMock);
// WHEN
defaultTbCoreConsumerServiceMock.forwardToRuleEngineCallService(restApiCallResponseMsgProto, tbCallbackMock);
// THEN
then(ruleEngineCallServiceMock).should().onQueueMsg(restApiCallResponseMsgProto, tbCallbackMock);
}
}

61
application/src/test/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcServiceTest.java

@ -0,0 +1,61 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.rpc;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.gen.transport.TransportProtos;
import java.util.UUID;
import static org.mockito.BDDMockito.then;
@ExtendWith(MockitoExtension.class)
class DefaultTbRuleEngineRpcServiceTest {
@Mock
private TbClusterService tbClusterServiceMock;
@InjectMocks
private DefaultTbRuleEngineRpcService tbRuleEngineRpcService;
@Test
public void givenTbMsg_whenSendRestApiCallReply_thenPushNotificationToCore() {
// GIVEN
String serviceId = "tb-core-0";
UUID requestId = UUID.fromString("f64a20df-eb1e-46a3-ba6f-0b3ae053ee0a");
DeviceId deviceId = new DeviceId(UUID.fromString("1d9f771a-7cdc-4ac7-838c-ba193d05a012"));
TbMsg msg = TbMsg.newMsg(TbMsgType.REST_API_REQUEST, deviceId, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT);
var restApiCallResponseMsgProto = TransportProtos.RestApiCallResponseMsgProto.newBuilder()
.setRequestIdMSB(requestId.getMostSignificantBits())
.setRequestIdLSB(requestId.getLeastSignificantBits())
.setResponse(TbMsg.toByteString(msg))
.build();
// WHEN
tbRuleEngineRpcService.sendRestApiCallReply(serviceId, requestId, msg);
// THEN
then(tbClusterServiceMock).should().pushNotificationToCore(serviceId, restApiCallResponseMsgProto, null);
}
}

141
application/src/test/java/org/thingsboard/server/service/ruleengine/DefaultRuleEngineCallServiceTest.java

@ -0,0 +1,141 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.ruleengine;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.test.util.ReflectionTestUtils;
import org.testcontainers.shaded.org.awaitility.Awaitility;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.gen.transport.TransportProtos;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.verify;
@ExtendWith(MockitoExtension.class)
public class DefaultRuleEngineCallServiceTest {
private static final TenantId TENANT_ID = TenantId.fromUUID(UUID.fromString("d7210c7f-a152-4e91-8186-19ae85499a6b"));
private final ConcurrentMap<UUID, Consumer<TbMsg>> requests = new ConcurrentHashMap<>();
@Mock
private TbClusterService tbClusterServiceMock;
private DefaultRuleEngineCallService ruleEngineCallService;
private ScheduledExecutorService executor;
@BeforeEach
void setUp() {
executor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("re-rest-callback"));
ruleEngineCallService = new DefaultRuleEngineCallService(tbClusterServiceMock);
ReflectionTestUtils.setField(ruleEngineCallService, "executor", executor);
ReflectionTestUtils.setField(ruleEngineCallService, "requests", requests);
}
@AfterEach
void tearDown() {
requests.clear();
if (executor != null) {
executor.shutdownNow();
}
}
@Test
void givenRequest_whenProcessRestApiCallToRuleEngine_thenPushMsgToRuleEngineAndCheckRemovedDueTimeout() {
long timeout = 1L;
long expTime = System.currentTimeMillis() + timeout;
HashMap<String, String> metaData = new HashMap<>();
UUID requestId = UUID.randomUUID();
metaData.put("serviceId", "core");
metaData.put("requestUUID", requestId.toString());
metaData.put("expirationTime", Long.toString(expTime));
TbMsg msg = TbMsg.newMsg(DataConstants.MAIN_QUEUE_NAME, TbMsgType.REST_API_REQUEST, TENANT_ID, new TbMsgMetaData(metaData), "{\"key\":\"value\"}");
Consumer<TbMsg> anyConsumer = TbMsg::getData;
doAnswer(invocation -> {
//check the presence of request in the map after pushMsgToRuleEngine()
assertThat(requests.size()).isEqualTo(1);
assertThat(requests.get(requestId)).isEqualTo(anyConsumer);
return null;
}).when(tbClusterServiceMock).pushMsgToRuleEngine(any(), any(), any(), anyBoolean(), any());
ruleEngineCallService.processRestApiCallToRuleEngine(TENANT_ID, requestId, msg, true, anyConsumer);
verify(tbClusterServiceMock).pushMsgToRuleEngine(TENANT_ID, TENANT_ID, msg, true, null);
//check map is empty after scheduleTimeout()
Awaitility.await("Await until request was deleted from map due to timeout")
.pollDelay(25, TimeUnit.MILLISECONDS)
.atMost(10, TimeUnit.SECONDS)
.until(requests::isEmpty);
}
@Test
void givenResponse_whenOnQueue_thenAcceptTbMsgResponse() {
long timeout = 10000L;
long expTime = System.currentTimeMillis() + timeout;
HashMap<String, String> metaData = new HashMap<>();
UUID requestId = UUID.randomUUID();
metaData.put("serviceId", "core");
metaData.put("requestUUID", requestId.toString());
metaData.put("expirationTime", Long.toString(expTime));
TbMsg msg = TbMsg.newMsg(DataConstants.MAIN_QUEUE_NAME, TbMsgType.REST_API_REQUEST, TENANT_ID, new TbMsgMetaData(metaData), "{\"key\":\"value\"}");
Consumer<TbMsg> anyConsumer = TbMsg::getData;
doAnswer(invocation -> {
//check the presence of request in the map after pushMsgToRuleEngine()
assertThat(requests.size()).isEqualTo(1);
assertThat(requests.get(requestId)).isEqualTo(anyConsumer);
ruleEngineCallService.onQueueMsg(getResponse(requestId, msg), TbCallback.EMPTY);
//check map is empty after onQueueMsg()
assertThat(requests.size()).isEqualTo(0);
return null;
}).when(tbClusterServiceMock).pushMsgToRuleEngine(any(), any(), any(), anyBoolean(), any());
ruleEngineCallService.processRestApiCallToRuleEngine(TENANT_ID, requestId, msg, true, anyConsumer);
verify(tbClusterServiceMock).pushMsgToRuleEngine(TENANT_ID, TENANT_ID, msg, true, null);
}
private TransportProtos.RestApiCallResponseMsgProto getResponse(UUID requestId, TbMsg msg) {
return TransportProtos.RestApiCallResponseMsgProto.newBuilder()
.setResponse(TbMsg.toByteString(msg))
.setRequestIdMSB(requestId.getMostSignificantBits())
.setRequestIdLSB(requestId.getLeastSignificantBits())
.build();
}
}

5
common/cluster-api/src/main/java/org/thingsboard/server/cluster/TbClusterService.java

@ -38,6 +38,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.RestApiCallResponseMsgProto;
import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.TbQueueClusterService;
@ -57,10 +58,14 @@ public interface TbClusterService extends TbQueueClusterService {
void pushNotificationToCore(String targetServiceId, FromDeviceRpcResponse response, TbQueueCallback callback);
void pushNotificationToCore(String targetServiceId, RestApiCallResponseMsgProto msg, TbQueueCallback callback);
void pushMsgToRuleEngine(TopicPartitionInfo tpi, UUID msgId, ToRuleEngineMsg msg, TbQueueCallback callback);
void pushMsgToRuleEngine(TenantId tenantId, EntityId entityId, TbMsg msg, TbQueueCallback callback);
void pushMsgToRuleEngine(TenantId tenantId, EntityId entityId, TbMsg msg, boolean useQueueFromTbMsg, TbQueueCallback callback);
void pushNotificationToRuleEngine(String targetServiceId, FromDeviceRpcResponse response, TbQueueCallback callback);
void pushNotificationToTransport(String targetServiceId, ToTransportMsg response, TbQueueCallback callback);

1
common/data/src/main/java/org/thingsboard/server/common/data/audit/ActionType.java

@ -40,6 +40,7 @@ public enum ActionType {
RELATION_ADD_OR_UPDATE(false, TbMsgType.RELATION_ADD_OR_UPDATE),
RELATION_DELETED(false, TbMsgType.RELATION_DELETED),
RELATIONS_DELETED(false, TbMsgType.RELATIONS_DELETED),
REST_API_RULE_ENGINE_CALL(false, null), // log call to rule engine from REST API
ALARM_ACK(false, TbMsgType.ALARM_ACK),
ALARM_CLEAR(false, TbMsgType.ALARM_CLEAR),
ALARM_DELETE(false, TbMsgType.ALARM_DELETE),

1
common/data/src/main/java/org/thingsboard/server/common/data/msg/TbMsgType.java

@ -67,6 +67,7 @@ public enum TbMsgType {
PROVISION_SUCCESS,
PROVISION_FAILURE,
SEND_EMAIL,
REST_API_REQUEST("REST API request"),
// tellSelfOnly types
GENERATOR_NODE_SELF_MSG(null, true),

4
common/data/src/test/java/org/thingsboard/server/common/data/audit/ActionTypeTest.java

@ -28,6 +28,7 @@ import static org.thingsboard.server.common.data.audit.ActionType.DELETED_COMMEN
import static org.thingsboard.server.common.data.audit.ActionType.LOCKOUT;
import static org.thingsboard.server.common.data.audit.ActionType.LOGIN;
import static org.thingsboard.server.common.data.audit.ActionType.LOGOUT;
import static org.thingsboard.server.common.data.audit.ActionType.REST_API_RULE_ENGINE_CALL;
import static org.thingsboard.server.common.data.audit.ActionType.RPC_CALL;
import static org.thingsboard.server.common.data.audit.ActionType.SMS_SENT;
import static org.thingsboard.server.common.data.audit.ActionType.SUSPENDED;
@ -45,7 +46,8 @@ class ActionTypeTest {
LOGOUT,
LOCKOUT,
DELETED_COMMENT,
SMS_SENT
SMS_SENT,
REST_API_RULE_ENGINE_CALL
);
// backward-compatibility tests

7
common/proto/src/main/proto/queue.proto

@ -101,6 +101,12 @@ message SessionInfoProto {
bool isGateway = 18;
}
message RestApiCallResponseMsgProto {
int64 requestIdMSB = 1;
int64 requestIdLSB = 2;
bytes response = 5;
}
enum SessionEvent {
OPEN = 0;
CLOSED = 1;
@ -1485,6 +1491,7 @@ message ToCoreNotificationMsg {
ToEdgeSyncRequestMsgProto toEdgeSyncRequest = 11;
FromEdgeSyncResponseMsgProto fromEdgeSyncResponse = 12;
ResourceCacheInvalidateMsg resourceCacheInvalidateMsg = 13;
RestApiCallResponseMsgProto restApiCallResponseMsg = 50;
}
/* Messages that are handled by ThingsBoard RuleEngine Service */

5
pom.xml

@ -1999,6 +1999,11 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-lambda</artifactId>
<version>${aws.sdk.version}</version>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsub</artifactId>

45
rest-client/src/main/java/org/thingsboard/rest/client/RestClient.java

@ -3840,6 +3840,51 @@ public class RestClient implements Closeable {
}
}
public JsonNode handleRuleEngineRequest(JsonNode requestBody) {
return restTemplate.exchange(
baseURL + "/api/rule-engine",
HttpMethod.POST,
new HttpEntity<>(requestBody),
new ParameterizedTypeReference<JsonNode>() {
}).getBody();
}
public JsonNode handleRuleEngineRequest(EntityId entityId, JsonNode requestBody) {
return restTemplate.exchange(
baseURL + "/api/rule-engine/{entityType}/{entityId}",
HttpMethod.POST,
new HttpEntity<>(requestBody),
new ParameterizedTypeReference<JsonNode>() {
},
entityId.getEntityType(),
entityId.getId()).getBody();
}
public JsonNode handleRuleEngineRequest(EntityId entityId, int timeout, JsonNode requestBody) {
return restTemplate.exchange(
baseURL + "/api/rule-engine/{entityType}/{entityId}/{timeout}",
HttpMethod.POST,
new HttpEntity<>(requestBody),
new ParameterizedTypeReference<JsonNode>() {
},
entityId.getEntityType(),
entityId.getId(),
timeout).getBody();
}
public JsonNode handleRuleEngineRequest(EntityId entityId, String queueName, int timeout, JsonNode requestBody) {
return restTemplate.exchange(
baseURL + "/api/rule-engine/{entityType}/{entityId}/{queueName}/{timeout}",
HttpMethod.POST,
new HttpEntity<>(requestBody),
new ParameterizedTypeReference<JsonNode>() {
},
entityId.getEntityType(),
entityId.getId(),
queueName,
timeout).getBody();
}
private String getTimeUrlParams(TimePageLink pageLink) {
String urlParams = getUrlParams(pageLink);
if (pageLink.getStartTime() != null) {

3
rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineRpcService.java

@ -18,6 +18,7 @@ package org.thingsboard.rule.engine.api;
import org.thingsboard.server.common.data.id.RpcId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.rpc.Rpc;
import org.thingsboard.server.common.msg.TbMsg;
import java.util.UUID;
import java.util.function.Consumer;
@ -31,5 +32,7 @@ public interface RuleEngineRpcService {
void sendRpcRequestToDevice(RuleEngineDeviceRpcRequest request, Consumer<RuleEngineDeviceRpcResponse> consumer);
void sendRestApiCallReply(String serviceId, UUID requestId, TbMsg msg);
Rpc findRpcById(TenantId tenantId, RpcId id);
}

4
rule-engine/rule-engine-components/pom.xml

@ -100,6 +100,10 @@
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sns</artifactId>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-lambda</artifactId>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsub</artifactId>

151
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/aws/lambda/TbAwsLambdaNode.java

@ -0,0 +1,151 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.aws.lambda;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.handlers.AsyncHandler;
import com.amazonaws.services.lambda.AWSLambdaAsync;
import com.amazonaws.services.lambda.AWSLambdaAsyncClientBuilder;
import com.amazonaws.services.lambda.model.InvokeRequest;
import com.amazonaws.services.lambda.model.InvokeResult;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.rule.engine.external.TbAbstractExternalNode;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
@Slf4j
@RuleNode(
type = ComponentType.EXTERNAL,
name = "aws lambda",
configClazz = TbAwsLambdaNodeConfiguration.class,
nodeDescription = "Publish message to the AWS Lambda",
nodeDetails = "Publishes messages to AWS Lambda, a service that lets you run code " +
"without provisioning or managing servers. " +
"It sends messages using a RequestResponse invocation type. " +
"The node uses a pre-configured client and specified function to run.<br><br>" +
"Output connections: <code>Success</code>, <code>Failure</code>.",
uiResources = {"static/rulenode/rulenode-core-config.js"},
configDirective = "tbExternalNodeLambdaConfig",
iconUrl = "data:image/svg+xml;base64,PHN2ZyB4bWxucz0iaHR0cDovL3d3dy53My5vcmcvMjAwMC9zdmciIHZpZXdCb3g9IjAgMCAyNCAyNCIgd2lkdGg9IjQ4IiBoZWlnaHQ9IjQ4Ij48cGF0aCBkPSJNMTMuMjMgMTAuNTZWMTBjLTEuOTQgMC0zLjk5LjM5LTMuOTkgMi42NyAwIDEuMTYuNjEgMS45NSAxLjYzIDEuOTUuNzYgMCAxLjQzLS40NyAxLjg2LTEuMjIuNTItLjkzLjUtMS44LjUtMi44NG0yLjcgNi41M2MtLjE4LjE2LS40My4xNy0uNjMuMDYtLjg5LS43NC0xLjA1LTEuMDgtMS41NC0xLjc5LTEuNDcgMS41LTIuNTEgMS45NS00LjQyIDEuOTUtMi4yNSAwLTQuMDEtMS4zOS00LjAxLTQuMTcgMC0yLjE4IDEuMTctMy42NCAyLjg2LTQuMzggMS40Ni0uNjQgMy40OS0uNzYgNS4wNC0uOTNWNy41YzAtLjY2LjA1LTEuNDEtLjMzLTEuOTYtLjMyLS40OS0uOTUtLjctMS41LS43LTEuMDIgMC0xLjkzLjUzLTIuMTUgMS42MS0uMDUuMjQtLjI1LjQ4LS40Ny40OWwtMi42LS4yOGMtLjIyLS4wNS0uNDYtLjIyLS40LS41Ni42LTMuMTUgMy40NS00LjEgNi00LjEgMS4zIDAgMyAuMzUgNC4wMyAxLjMzQzE3LjExIDQuNTUgMTcgNi4xOCAxNyA3Ljk1djQuMTdjMCAxLjI1LjUgMS44MSAxIDIuNDguMTcuMjUuMjEuNTQgMCAuNzFsLTIuMDYgMS43OGgtLjAxIj48L3BhdGg+PHBhdGggZD0iTTIwLjE2IDE5LjU0QzE4IDIxLjE0IDE0LjgyIDIyIDEyLjEgMjJjLTMuODEgMC03LjI1LTEuNDEtOS44NS0zLjc2LS4yLS4xOC0uMDItLjQzLjI1LS4yOSAyLjc4IDEuNjMgNi4yNSAyLjYxIDkuODMgMi42MSAyLjQxIDAgNS4wNy0uNSA3LjUxLTEuNTMuMzctLjE2LjY2LjI0LjMyLjUxIj48L3BhdGg+PHBhdGggZD0iTTIxLjA3IDE4LjVjLS4yOC0uMzYtMS44NS0uMTctMi41Ny0uMDgtLjE5LjAyLS4yMi0uMTYtLjAzLS4zIDEuMjQtLjg4IDMuMjktLjYyIDMuNTMtLjMzLjI0LjMtLjA3IDIuMzUtMS4yNCAzLjMyLS4xOC4xNi0uMzUuMDctLjI2LS4xMS4yNi0uNjcuODUtMi4xNC41Ny0yLjV6Ij48L3BhdGg+PC9zdmc+"
)
public class TbAwsLambdaNode extends TbAbstractExternalNode {
private TbAwsLambdaNodeConfiguration config;
private AWSLambdaAsync client;
@Override
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
config = TbNodeUtils.convert(configuration, TbAwsLambdaNodeConfiguration.class);
if (StringUtils.isBlank(config.getFunctionName())) {
throw new TbNodeException("Function name must be set!", true);
}
try {
AWSCredentials awsCredentials = new BasicAWSCredentials(config.getAccessKey(), config.getSecretKey());
client = AWSLambdaAsyncClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(awsCredentials))
.withRegion(config.getRegion())
.withClientConfiguration(new ClientConfiguration()
.withConnectionTimeout((int) TimeUnit.SECONDS.toMillis(config.getConnectionTimeout()))
.withRequestTimeout((int) TimeUnit.SECONDS.toMillis(config.getRequestTimeout())))
.build();
} catch (Exception e) {
throw new TbNodeException(e);
}
}
@Override
public void onMsg(TbContext ctx, TbMsg msg) {
var tbMsg = ackIfNeeded(ctx, msg);
String functionName = TbNodeUtils.processPattern(config.getFunctionName(), tbMsg);
String qualifier = StringUtils.isBlank(config.getQualifier()) ?
TbAwsLambdaNodeConfiguration.DEFAULT_QUALIFIER :
TbNodeUtils.processPattern(config.getQualifier(), tbMsg);
InvokeRequest request = toRequest(tbMsg.getData(), functionName, qualifier);
client.invokeAsync(request, new AsyncHandler<>() {
@Override
public void onError(Exception e) {
tellFailure(ctx, tbMsg, e);
}
@Override
public void onSuccess(InvokeRequest request, InvokeResult invokeResult) {
try {
if (config.isTellFailureIfFuncThrowsExc() && invokeResult.getFunctionError() != null) {
throw new RuntimeException(getPayload(invokeResult));
}
tellSuccess(ctx, getResponseMsg(tbMsg, invokeResult));
} catch (Exception e) {
tellFailure(ctx, processException(tbMsg, invokeResult, e), e);
}
}
});
}
private InvokeRequest toRequest(String requestBody, String functionName, String qualifier) {
return new InvokeRequest()
.withFunctionName(functionName)
.withPayload(requestBody)
.withQualifier(qualifier);
}
private String getPayload(InvokeResult invokeResult) {
ByteBuffer buf = invokeResult.getPayload();
if (buf == null) {
throw new RuntimeException("Payload from result of AWS Lambda function execution is null.");
}
byte[] responseBytes = new byte[buf.remaining()];
buf.get(responseBytes);
return new String(responseBytes);
}
private TbMsg getResponseMsg(TbMsg originalMsg, InvokeResult invokeResult) {
TbMsgMetaData metaData = originalMsg.getMetaData().copy();
metaData.putValue("requestId", invokeResult.getSdkResponseMetadata().getRequestId());
String data = getPayload(invokeResult);
return TbMsg.transformMsg(originalMsg, metaData, data);
}
private TbMsg processException(TbMsg origMsg, InvokeResult invokeResult, Throwable t) {
TbMsgMetaData metaData = origMsg.getMetaData().copy();
metaData.putValue("error", t.getClass() + ": " + t.getMessage());
metaData.putValue("requestId", invokeResult.getSdkResponseMetadata().getRequestId());
return TbMsg.transformMsgMetadata(origMsg, metaData);
}
@Override
public void destroy() {
if (client != null) {
try {
client.shutdown();
} catch (Exception e) {
log.error("Failed to shutdown Lambda client during destroy", e);
}
}
}
}

46
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/aws/lambda/TbAwsLambdaNodeConfiguration.java

@ -0,0 +1,46 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.aws.lambda;
import lombok.Data;
import org.thingsboard.rule.engine.api.NodeConfiguration;
@Data
public class TbAwsLambdaNodeConfiguration implements NodeConfiguration<TbAwsLambdaNodeConfiguration> {
public static final String DEFAULT_QUALIFIER = "$LATEST";
private String accessKey;
private String secretKey;
private String region;
private String functionName;
private String qualifier;
private int connectionTimeout;
private int requestTimeout;
private boolean tellFailureIfFuncThrowsExc;
@Override
public TbAwsLambdaNodeConfiguration defaultConfiguration() {
TbAwsLambdaNodeConfiguration configuration = new TbAwsLambdaNodeConfiguration();
configuration.setRegion("us-east-1");
configuration.setQualifier(DEFAULT_QUALIFIER);
configuration.setConnectionTimeout(10);
configuration.setRequestTimeout(5);
configuration.setTellFailureIfFuncThrowsExc(false);
return configuration;
}
}

66
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbSendRestApiCallReplyNode.java

@ -0,0 +1,66 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.rest;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.msg.TbMsg;
import java.util.UUID;
@Slf4j
@RuleNode(
type = ComponentType.ACTION,
name = "rest call reply",
configClazz = TbSendRestApiCallReplyNodeConfiguration.class,
nodeDescription = "Sends reply to REST API call to rule engine",
nodeDetails = "Expects messages with any message type. Forwards incoming message as a reply to REST API call sent to rule engine.",
uiResources = {"static/rulenode/rulenode-core-config.js"},
configDirective = "tbActionNodeSendRestApiCallReplyConfig",
icon = "call_merge"
)
public class TbSendRestApiCallReplyNode implements TbNode {
private TbSendRestApiCallReplyNodeConfiguration config;
@Override
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
this.config = TbNodeUtils.convert(configuration, TbSendRestApiCallReplyNodeConfiguration.class);
}
@Override
public void onMsg(TbContext ctx, TbMsg msg) {
String serviceIdStr = msg.getMetaData().getValue(config.getServiceIdMetaDataAttribute());
String requestIdStr = msg.getMetaData().getValue(config.getRequestIdMetaDataAttribute());
if (StringUtils.isEmpty(requestIdStr)) {
ctx.tellFailure(msg, new RuntimeException("Request id is not present in the metadata!"));
} else if (StringUtils.isEmpty(serviceIdStr)) {
ctx.tellFailure(msg, new RuntimeException("Service id is not present in the metadata!"));
} else if (StringUtils.isEmpty(msg.getData())) {
ctx.tellFailure(msg, new RuntimeException("Request body is empty!"));
} else {
ctx.getRpcService().sendRestApiCallReply(serviceIdStr, UUID.fromString(requestIdStr), msg);
ctx.tellSuccess(msg);
}
}
}

45
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbSendRestApiCallReplyNodeConfiguration.java

@ -0,0 +1,45 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.rest;
import lombok.Data;
import org.thingsboard.rule.engine.api.NodeConfiguration;
import org.thingsboard.server.common.data.StringUtils;
@Data
public class TbSendRestApiCallReplyNodeConfiguration implements NodeConfiguration<TbSendRestApiCallReplyNodeConfiguration> {
public static final String SERVICE_ID = "serviceId";
public static final String REQUEST_UUID = "requestUUID";
private String serviceIdMetaDataAttribute;
private String requestIdMetaDataAttribute;
@Override
public TbSendRestApiCallReplyNodeConfiguration defaultConfiguration() {
TbSendRestApiCallReplyNodeConfiguration configuration = new TbSendRestApiCallReplyNodeConfiguration();
configuration.setRequestIdMetaDataAttribute(REQUEST_UUID);
configuration.setServiceIdMetaDataAttribute(SERVICE_ID);
return configuration;
}
public String getServiceIdMetaDataAttribute() {
return !StringUtils.isEmpty(serviceIdMetaDataAttribute) ? serviceIdMetaDataAttribute : SERVICE_ID;
}
public String getRequestIdMetaDataAttribute() {
return !StringUtils.isEmpty(requestIdMetaDataAttribute) ? requestIdMetaDataAttribute : REQUEST_UUID;
}
}

308
rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/aws/lambda/TbAwsLambdaNodeTest.java

@ -0,0 +1,308 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.aws.lambda;
import com.amazonaws.ResponseMetadata;
import com.amazonaws.handlers.AsyncHandler;
import com.amazonaws.services.lambda.AWSLambdaAsync;
import com.amazonaws.services.lambda.model.AWSLambdaException;
import com.amazonaws.services.lambda.model.InvokeRequest;
import com.amazonaws.services.lambda.model.InvokeResult;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.NullAndEmptySource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.test.util.ReflectionTestUtils;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Stream;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.thingsboard.rule.engine.aws.lambda.TbAwsLambdaNodeConfiguration.DEFAULT_QUALIFIER;
@ExtendWith(MockitoExtension.class)
public class TbAwsLambdaNodeTest {
private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("ddb88645-7379-4a08-a51c-e84a0b4b3d88"));
private TbAwsLambdaNode node;
private TbAwsLambdaNodeConfiguration config;
@Mock
private TbContext ctx;
@Mock
private AWSLambdaAsync clientMock;
@BeforeEach
void setUp() {
node = new TbAwsLambdaNode();
config = new TbAwsLambdaNodeConfiguration().defaultConfiguration();
}
@Test
public void verifyDefaultConfig() {
assertThat(config.getAccessKey()).isNull();
assertThat(config.getSecretKey()).isNull();
assertThat(config.getRegion()).isEqualTo(("us-east-1"));
assertThat(config.getFunctionName()).isNull();
assertThat(config.getQualifier()).isEqualTo(DEFAULT_QUALIFIER);
assertThat(config.getConnectionTimeout()).isEqualTo(10);
assertThat(config.getRequestTimeout()).isEqualTo(5);
assertThat(config.isTellFailureIfFuncThrowsExc()).isFalse();
}
@ParameterizedTest
@NullAndEmptySource
@ValueSource(strings = " ")
public void givenInvalidFunctionName_whenInit_thenThrowsException(String funcName) {
config.setFunctionName(funcName);
var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
assertThatThrownBy(() -> node.init(ctx, configuration))
.isInstanceOf(TbNodeException.class)
.hasMessage("Function name must be set!");
}
@ParameterizedTest
@MethodSource
public void givenRequest_whenOnMsg_thenTellSuccess(String data, TbMsgMetaData metadata, String functionName, String qualifier, String expectedQualifier) {
init();
config.setFunctionName(functionName);
config.setQualifier(qualifier);
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, metadata, data);
InvokeRequest request = createInvokeRequest(msg);
String requestIdStr = "a124af57-e7c3-4ebb-83bf-b09ff86eaa23";
String funcResponsePayload = "{\"statusCode\":200}";
when(clientMock.invokeAsync(any(), any())).then(invocation -> {
InvokeResult result = new InvokeResult();
result.setSdkResponseMetadata(new ResponseMetadata(Map.of(ResponseMetadata.AWS_REQUEST_ID, requestIdStr)));
result.setPayload(ByteBuffer.wrap(funcResponsePayload.getBytes()));
AsyncHandler<InvokeRequest, InvokeResult> asyncHandler = invocation.getArgument(1);
asyncHandler.onSuccess(request, result);
return null;
});
node.onMsg(ctx, msg);
ArgumentCaptor<InvokeRequest> invokeRequestCaptor = ArgumentCaptor.forClass(InvokeRequest.class);
ArgumentCaptor<TbMsg> msgCaptor = ArgumentCaptor.forClass(TbMsg.class);
verify(clientMock).invokeAsync(invokeRequestCaptor.capture(), any());
verify(ctx).tellSuccess(msgCaptor.capture());
assertThat(invokeRequestCaptor.getValue().getQualifier()).isEqualTo(expectedQualifier);
TbMsgMetaData resultMsgMetadata = metadata.copy();
resultMsgMetadata.putValue("requestId", requestIdStr);
TbMsg resultedMsg = TbMsg.transformMsg(msg, resultMsgMetadata, funcResponsePayload);
assertThat(msgCaptor.getValue()).usingRecursiveComparison()
.ignoringFields("ctx")
.isEqualTo(resultedMsg);
}
private static Stream<Arguments> givenRequest_whenOnMsg_thenTellSuccess() {
return Stream.of(
Arguments.of(TbMsg.EMPTY_JSON_OBJECT, TbMsgMetaData.EMPTY, "functionA", "qualifierA", "qualifierA"),
Arguments.of(TbMsg.EMPTY_JSON_OBJECT, TbMsgMetaData.EMPTY, "functionA", null, DEFAULT_QUALIFIER),
Arguments.of("{\"funcNameMsgPattern\": \"functionB\", \"qualifierMsgPattern\": \"qualifierB\"}",
TbMsgMetaData.EMPTY, "$[funcNameMsgPattern]", "$[qualifierMsgPattern]", "qualifierB"),
Arguments.of(TbMsg.EMPTY_JSON_OBJECT,
new TbMsgMetaData(
Map.of(
"funcNameMdPattern", "functionC",
"qualifierMdPattern", "qualifierC")
), "${funcNameMdPattern}", "${qualifierMdPattern}", "qualifierC")
);
}
@Test
public void givenExceptionWasThrownInsideFunctionAndTellFailureIfFuncThrowsExcIsTrue_whenOnMsg_thenTellFailure() {
init();
config.setTellFailureIfFuncThrowsExc(true);
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_ARRAY);
InvokeRequest request = createInvokeRequest(msg);
String requestIdStr = "a124af57-e7c3-4ebb-83bf-b09ff86eaa23";
String errorMsg = "Unhandled exception from function";
when(clientMock.invokeAsync(any(), any())).then(invocation -> {
InvokeResult result = new InvokeResult();
result.setPayload(ByteBuffer.wrap(errorMsg.getBytes(StandardCharsets.UTF_8)));
result.setFunctionError(errorMsg);
result.setSdkResponseMetadata(new ResponseMetadata(Map.of(ResponseMetadata.AWS_REQUEST_ID, requestIdStr)));
AsyncHandler<InvokeRequest, InvokeResult> asyncHandler = invocation.getArgument(1);
asyncHandler.onSuccess(request, result);
return null;
});
node.onMsg(ctx, msg);
ArgumentCaptor<Throwable> throwableCaptor = ArgumentCaptor.forClass(Throwable.class);
ArgumentCaptor<TbMsg> msgCaptor = ArgumentCaptor.forClass(TbMsg.class);
verify(clientMock).invokeAsync(eq(request), any());
verify(ctx).tellFailure(msgCaptor.capture(), throwableCaptor.capture());
var metadata = Map.of("error", RuntimeException.class + ": " + errorMsg, "requestId", requestIdStr);
TbMsg resultedMsg = TbMsg.transformMsgMetadata(msg, new TbMsgMetaData(metadata));
assertThat(msgCaptor.getValue()).usingRecursiveComparison()
.ignoringFields("ctx")
.isEqualTo(resultedMsg);
assertThat(throwableCaptor.getValue()).isInstanceOf(RuntimeException.class).hasMessage(errorMsg);
}
@Test
public void givenExceptionWasThrownInsideFunctionAndTellFailureIfFuncThrowsExcIsFalse_whenOnMsg_thenTellSuccess() {
init();
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT);
InvokeRequest request = createInvokeRequest(msg);
String requestIdStr = "e83dfbc4-68d5-441c-8ee9-289959a30d3b";
String payload = "{\"errorMessage\":\"Something went wrong\",\"errorType\":\"Exception\",\"requestId\":\"" + requestIdStr + "\"}";
when(clientMock.invokeAsync(any(), any())).then(invocation -> {
InvokeResult result = new InvokeResult();
result.setSdkResponseMetadata(new ResponseMetadata(Map.of("AWS_REQUEST_ID", requestIdStr)));
result.setPayload(ByteBuffer.wrap(payload.getBytes()));
AsyncHandler<InvokeRequest, InvokeResult> asyncHandler = invocation.getArgument(1);
asyncHandler.onSuccess(request, result);
return null;
});
node.onMsg(ctx, msg);
ArgumentCaptor<TbMsg> msgCaptor = ArgumentCaptor.forClass(TbMsg.class);
verify(clientMock).invokeAsync(eq(request), any());
verify(ctx).tellSuccess(msgCaptor.capture());
Map<String, String> metadata = Map.of("requestId", requestIdStr);
TbMsg resultedMsg = TbMsg.transformMsg(msg, new TbMsgMetaData(metadata), payload);
assertThat(msgCaptor.getValue()).usingRecursiveComparison()
.ignoringFields("ctx")
.isEqualTo(resultedMsg);
}
@Test
public void givenPayloadFromResultIsNull_whenOnMsg_thenTellFailure() {
init();
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT);
InvokeRequest request = createInvokeRequest(msg);
String requestIdStr = "12bbb074-e2fc-4381-8f28-d4bd235103d5";
String errorMsg = "Payload from result of AWS Lambda function execution is null.";
when(clientMock.invokeAsync(any(), any())).then(invocation -> {
InvokeResult result = new InvokeResult();
result.setSdkResponseMetadata(new ResponseMetadata(Map.of(ResponseMetadata.AWS_REQUEST_ID, requestIdStr)));
AsyncHandler<InvokeRequest, InvokeResult> asyncHandler = invocation.getArgument(1);
asyncHandler.onSuccess(request, result);
return null;
});
node.onMsg(ctx, msg);
verify(clientMock).invokeAsync(eq(request), any());
ArgumentCaptor<Throwable> throwableCaptor = ArgumentCaptor.forClass(Throwable.class);
ArgumentCaptor<TbMsg> msgCaptor = ArgumentCaptor.forClass(TbMsg.class);
verify(ctx).tellFailure(msgCaptor.capture(), throwableCaptor.capture());
var metadata = Map.of("error", RuntimeException.class + ": " + errorMsg, "requestId", requestIdStr);
TbMsg resultedMsg = TbMsg.transformMsgMetadata(msg, new TbMsgMetaData(metadata));
assertThat(msgCaptor.getValue()).usingRecursiveComparison()
.ignoringFields("ctx")
.isEqualTo(resultedMsg);
assertThat(throwableCaptor.getValue()).isInstanceOf(RuntimeException.class).hasMessage(errorMsg);
}
@Test
public void givenExceptionWasThrownOnAWS_whenOnMsg_thenTellFailure() {
init();
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT);
InvokeRequest request = createInvokeRequest(msg);
String errorMsg = "Simulated error";
when(clientMock.invokeAsync(any(), any())).then(invocation -> {
AsyncHandler<InvokeRequest, InvokeResult> asyncHandler = invocation.getArgument(1);
asyncHandler.onError(new AWSLambdaException(errorMsg));
return null;
});
node.onMsg(ctx, msg);
verify(clientMock).invokeAsync(eq(request), any());
ArgumentCaptor<Throwable> throwableCaptor = ArgumentCaptor.forClass(Throwable.class);
verify(ctx).tellFailure(eq(msg), throwableCaptor.capture());
assertThat(throwableCaptor.getValue()).isInstanceOf(AWSLambdaException.class).hasMessageStartingWith(errorMsg);
}
private void init() {
config.setAccessKey("accessKey");
config.setSecretKey("secretKey");
config.setFunctionName("new-function");
ReflectionTestUtils.setField(node, "client", clientMock);
ReflectionTestUtils.setField(node, "config", config);
}
private InvokeRequest createInvokeRequest(TbMsg msg) {
return new InvokeRequest()
.withFunctionName(getFunctionName(msg))
.withPayload(msg.getData())
.withQualifier(getQualifier(msg));
}
private String getQualifier(TbMsg msg) {
return StringUtils.isBlank(config.getQualifier()) ?
TbAwsLambdaNodeConfiguration.DEFAULT_QUALIFIER :
TbNodeUtils.processPattern(config.getQualifier(), msg);
}
private String getFunctionName(TbMsg msg) {
return TbNodeUtils.processPattern(config.getFunctionName(), msg);
}
}

133
rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/rest/TbSendRestApiCallReplyNodeTest.java

@ -0,0 +1,133 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.rest;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.rule.engine.api.RuleEngineRpcService;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Stream;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatNoException;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
public class TbSendRestApiCallReplyNodeTest {
private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("212445ad-9852-4bfd-819d-6b01ab6ee6b6"));
private TbSendRestApiCallReplyNode node;
private TbSendRestApiCallReplyNodeConfiguration config;
@Mock
private TbContext ctxMock;
@Mock
private RuleEngineRpcService rpcServiceMock;
@BeforeEach
public void setUp() throws TbNodeException {
node = new TbSendRestApiCallReplyNode();
config = new TbSendRestApiCallReplyNodeConfiguration().defaultConfiguration();
var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
node.init(ctxMock, configuration);
}
@Test
public void givenDefaultConfig_whenInit_thenDoesNotThrowException() {
var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
assertThatNoException().isThrownBy(() -> node.init(ctxMock, configuration));
}
@ParameterizedTest
@MethodSource
public void givenValidRestApiRequest_whenOnMsg_thenTellSuccess(String requestIdAttribute, String serviceIdAttribute) throws TbNodeException {
config.setRequestIdMetaDataAttribute(requestIdAttribute);
config.setServiceIdMetaDataAttribute(serviceIdAttribute);
var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
node.init(ctxMock, configuration);
when(ctxMock.getRpcService()).thenReturn(rpcServiceMock);
String requestUUIDStr = "80b7883b-7ec6-4872-9dd3-b2afd5660fa6";
String serviceIdStr = "tb-core-0";
String data = """
{
"temperature": 23,
}
""";
Map<String, String> metadata = Map.of(
requestIdAttribute, requestUUIDStr,
serviceIdAttribute, serviceIdStr);
TbMsg msg = TbMsg.newMsg(TbMsgType.REST_API_REQUEST, DEVICE_ID, new TbMsgMetaData(metadata), data);
node.onMsg(ctxMock, msg);
UUID requestUUID = UUID.fromString(requestUUIDStr);
verify(rpcServiceMock).sendRestApiCallReply(serviceIdStr, requestUUID, msg);
verify(ctxMock).tellSuccess(msg);
}
private static Stream<Arguments> givenValidRestApiRequest_whenOnMsg_thenTellSuccess() {
return Stream.of(
Arguments.of("requestId", "service"),
Arguments.of("requestUUID", "serviceId"),
Arguments.of("some_custom_request_id_field", "some_custom_service_id_field")
);
}
@ParameterizedTest
@MethodSource
public void givenInvalidRequest_whenOnMsg_thenTellFailure(TbMsgMetaData metaData, String data, String errorMsg) {
TbMsg msg = TbMsg.newMsg(TbMsgType.REST_API_REQUEST, DEVICE_ID, metaData, data);
node.onMsg(ctxMock, msg);
ArgumentCaptor<Throwable> captor = ArgumentCaptor.forClass(Throwable.class);
verify(ctxMock).tellFailure(eq(msg), captor.capture());
Throwable throwable = captor.getValue();
assertThat(throwable).isInstanceOf(RuntimeException.class).hasMessage(errorMsg);
}
private static Stream<Arguments> givenInvalidRequest_whenOnMsg_thenTellFailure() {
return Stream.of(
Arguments.of(TbMsgMetaData.EMPTY, TbMsg.EMPTY_STRING, "Request id is not present in the metadata!"),
Arguments.of(new TbMsgMetaData(Map.of("requestUUID", "e1dd3985-efad-45a0-b0d2-0ff5dff2ccac")),
TbMsg.EMPTY_STRING, "Service id is not present in the metadata!"),
Arguments.of(new TbMsgMetaData(Map.of("serviceId", "tb-core-0")),
TbMsg.EMPTY_STRING, "Request id is not present in the metadata!"),
Arguments.of(new TbMsgMetaData(Map.of("requestUUID", "e1dd3985-efad-45a0-b0d2-0ff5dff2ccac", "serviceId", "tb-core-0")),
TbMsg.EMPTY_STRING, "Request body is empty!")
);
}
}

83
ui-ngx/src/assets/help/en_US/rulenode/node-templatization-doc.md

@ -0,0 +1,83 @@
# Rule nodes fields templatization
Templatization is the process of using predefined templates to dynamically insert or substitute values into text.
These templates serve as placeholders for variables that can be filled in later with actual data.
In the context of rule engine, templates are used to extract data from incoming messages during runtime.
This is particularly helpful in the rule node configuration, where templatization allows for dynamic configuration by replacing static values in the configuration fields with real-time values from the incoming messages.
This enables more flexible and automated handling of data, making it easier to perform conditional operations based on varying inputs.
## Syntax
Templates start with a dollar sign (`$`), followed by brackets with a key name inside.
Square brackets (`[]`) are used for message keys, while curly brackets (`{}`) are used for message metadata keys.
For example:
- `$[messageKey]` - will extract value of `messageKey` from incoming message.
- `${metadataKey}` - will extract value of `metadataKey` from incoming message metadata.
In the example above, `messageKey` and `metadataKey` represent any key name that may exist within the message or its metadata.
## Example
Let's review an example. First JSON is message, second is message metadata:
```json
{
"temperature": 26.5,
"humidity": 75.2,
"soilMoisture": 28.9,
"windSpeed": 26.2,
"location": "riverside"
}
```
```json
{
"deviceType": "weather_sensor",
"deviceName": "weather1",
"ts": "1685379440000"
}
```
Assume, we detected an unusually high wind speed and want to send this telemetry reading to some external REST API.
Every reading needs to be associated with specific device and location - this information is available only in real-time.
We can use templates extract necessary data and to construct URL for sending data:
`example-base-url.com/report-reading?location=$[location]&deviceName=${deviceName}`
This template will be resolved to:
`example-base-url.com/report-reading?location=riverside&deviceName=weather1`
Templates are ideal for scenarios where the specific values aren't known at the time of configuration but will become available at runtime.
## Notes
- Templates can be combined with regular text. For example: "Fuel tanks are filled to `$[fuelLevel]`%".
- You can access nested keys in JSON object using dot notation: `$[object.key]`.
- If specified key is missing or value associated with that key is an object or an array, then template string will be returned unchanged.
To illustrate written above let's review an example. Here's content of a message:
```json
{
"number": 123.45,
"string": "text",
"boolean": true,
"array": [1, 2, 3],
"object": {
"property": "propertyValue"
},
"null": null
}
```
Here's a table with comparison between templates and extracted values:
| **Template** | **Extracted value** |
|--------------------|---------------------|
| $[number] | 123.45 |
| $[string] | text |
| $[boolean] | true |
| $[array] | $[array] |
| $[object] | $[object] |
| $[object.property] | propertyValue |
| $[null] | null |
| $[doesNotExist] | $[doesNotExist] |
Loading…
Cancel
Save