Browse Source

added rule engine controller and send rest api call reply node

pull/10786/head
IrynaMatveieva 2 years ago
parent
commit
74ac5a949f
  1. 4
      application/src/main/java/org/thingsboard/server/controller/BaseController.java
  2. 2
      application/src/main/java/org/thingsboard/server/controller/ControllerConstants.java
  3. 237
      application/src/main/java/org/thingsboard/server/controller/RuleEngineController.java
  4. 1
      application/src/main/java/org/thingsboard/server/controller/TbUrlConstants.java
  5. 20
      application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java
  6. 13
      application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java
  7. 11
      application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcService.java
  8. 100
      application/src/main/java/org/thingsboard/server/service/ruleengine/DefaultRuleEngineCallService.java
  9. 31
      application/src/main/java/org/thingsboard/server/service/ruleengine/RuleEngineCallService.java
  10. 21
      application/src/test/java/org/thingsboard/server/controller/AbstractNotifyEntityTest.java
  11. 10
      application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java
  12. 252
      application/src/test/java/org/thingsboard/server/controller/RuleEngineControllerTest.java
  13. 51
      application/src/test/java/org/thingsboard/server/service/queue/DefaultTbClusterServiceTest.java
  14. 16
      application/src/test/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerServiceTest.java
  15. 61
      application/src/test/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcServiceTest.java
  16. 166
      application/src/test/java/org/thingsboard/server/service/ruleengine/DefaultRuleEngineCallServiceTest.java
  17. 6
      common/cluster-api/src/main/java/org/thingsboard/server/cluster/TbClusterService.java
  18. 1
      common/data/src/main/java/org/thingsboard/server/common/data/audit/ActionType.java
  19. 1
      common/data/src/main/java/org/thingsboard/server/common/data/msg/TbMsgType.java
  20. 7
      common/proto/src/main/proto/queue.proto
  21. 45
      rest-client/src/main/java/org/thingsboard/rest/client/RestClient.java
  22. 3
      rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineRpcService.java
  23. 66
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbSendRestApiCallReplyNode.java
  24. 45
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbSendRestApiCallReplyNodeConfiguration.java
  25. 133
      rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/rest/TbSendRestApiCallReplyNodeTest.java

4
application/src/main/java/org/thingsboard/server/controller/BaseController.java

@ -140,6 +140,7 @@ import org.thingsboard.server.dao.widget.WidgetTypeService;
import org.thingsboard.server.dao.widget.WidgetsBundleService;
import org.thingsboard.server.exception.ThingsboardErrorResponseHandler;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.action.EntityActionService;
@ -313,6 +314,9 @@ public abstract class BaseController {
@Autowired
protected ExportableEntitiesService entitiesService;
@Autowired
protected TbServiceInfoProvider serviceInfoProvider;
@Value("${server.log_controller_error_stack_trace}")
@Getter
private boolean logControllerErrorStackTrace;

2
application/src/main/java/org/thingsboard/server/controller/ControllerConstants.java

@ -1710,4 +1710,6 @@ public class ControllerConstants {
MARKDOWN_CODE_BLOCK_START +
"[{\"ts\":1634712287000,\"values\":{\"temperature\":26, \"humidity\":87}}, {\"ts\":1634712588000,\"values\":{\"temperature\":25, \"humidity\":88}}]" +
MARKDOWN_CODE_BLOCK_END ;
protected static final String SECURITY_WRITE_CHECK = " Security check is performed to verify that the user has 'WRITE' permission for the entity (entities).";
}

237
application/src/main/java/org/thingsboard/server/controller/RuleEngineController.java

@ -0,0 +1,237 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.controller;
import com.google.common.util.concurrent.FutureCallback;
import io.swagger.v3.oas.annotations.Parameter;
import jakarta.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.audit.ActionType;
import org.thingsboard.server.common.data.exception.ThingsboardErrorCode;
import org.thingsboard.server.common.data.exception.ThingsboardException;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityIdFactory;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.config.annotations.ApiOperation;
import org.thingsboard.server.exception.ToErrorResponseEntity;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.ruleengine.RuleEngineCallService;
import org.thingsboard.server.service.security.AccessValidator;
import org.thingsboard.server.service.security.model.SecurityUser;
import org.thingsboard.server.service.security.permission.Operation;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import static org.thingsboard.server.controller.ControllerConstants.ENTITY_ID_PARAM_DESCRIPTION;
import static org.thingsboard.server.controller.ControllerConstants.ENTITY_TYPE_PARAM_DESCRIPTION;
@RestController
@TbCoreComponent
@RequestMapping(TbUrlConstants.RULE_ENGINE_URL_PREFIX)
@Slf4j
public class RuleEngineController extends BaseController {
public static final int DEFAULT_TIMEOUT = 10000;
private static final String MSG_DESCRIPTION_PREFIX = "Creates the Message with type 'REST_API_REQUEST' and payload taken from the request body. ";
private static final String MSG_DESCRIPTION = "This method allows you to extend the regular platform API with the power of Rule Engine. You may use default and custom rule nodes to handle the message. " +
"The generated message contains two important metadata fields:\n\n" +
" * **'serviceId'** to identify the platform server that received the request;\n" +
" * **'requestUUID'** to identify the request and route possible response from the Rule Engine;\n\n" +
"Use **'rest call reply'** rule node to push the reply from rule engine back as a REST API call response. ";
@Autowired
private RuleEngineCallService ruleEngineCallService;
@Autowired
private AccessValidator accessValidator;
@ApiOperation(value = "Push user message to the rule engine (handleRuleEngineRequest)",
notes = MSG_DESCRIPTION_PREFIX +
"Uses current User Id ( the one which credentials is used to perform the request) as the Rule Engine message originator. " +
MSG_DESCRIPTION +
"The default timeout of the request processing is 10 seconds."
+ "\n\n" + ControllerConstants.SECURITY_WRITE_CHECK)
@PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
@RequestMapping(value = "/", method = RequestMethod.POST)
@ResponseBody
public DeferredResult<ResponseEntity> handleRuleEngineRequest(
@Parameter(description = "A JSON value representing the message.", required = true)
@RequestBody String requestBody) throws ThingsboardException {
return handleRuleEngineRequest(null, null, null, DEFAULT_TIMEOUT, requestBody);
}
@ApiOperation(value = "Push entity message to the rule engine (handleRuleEngineRequest)",
notes = MSG_DESCRIPTION_PREFIX +
"Uses specified Entity Id as the Rule Engine message originator. " +
MSG_DESCRIPTION +
"The default timeout of the request processing is 10 seconds."
+ "\n\n" + ControllerConstants.SECURITY_WRITE_CHECK)
@PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
@RequestMapping(value = "/{entityType}/{entityId}", method = RequestMethod.POST)
@ResponseBody
public DeferredResult<ResponseEntity> handleRuleEngineRequest(
@Parameter(description = ENTITY_TYPE_PARAM_DESCRIPTION, required = true)
@PathVariable("entityType") String entityType,
@Parameter(description = ENTITY_ID_PARAM_DESCRIPTION, required = true)
@PathVariable("entityId") String entityIdStr,
@Parameter(description = "A JSON value representing the message.", required = true)
@RequestBody String requestBody) throws ThingsboardException {
return handleRuleEngineRequest(entityType, entityIdStr, null, DEFAULT_TIMEOUT, requestBody);
}
@ApiOperation(value = "Push entity message with timeout to the rule engine (handleRuleEngineRequest)",
notes = MSG_DESCRIPTION_PREFIX +
"Uses specified Entity Id as the Rule Engine message originator. " +
MSG_DESCRIPTION +
"The platform expects the timeout value in milliseconds."
+ "\n\n" + ControllerConstants.SECURITY_WRITE_CHECK)
@PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
@RequestMapping(value = "/{entityType}/{entityId}/{timeout}", method = RequestMethod.POST)
@ResponseBody
public DeferredResult<ResponseEntity> handleRuleEngineRequest(
@Parameter(description = ENTITY_TYPE_PARAM_DESCRIPTION, required = true)
@PathVariable("entityType") String entityType,
@Parameter(description = ENTITY_ID_PARAM_DESCRIPTION, required = true)
@PathVariable("entityId") String entityIdStr,
@Parameter(description = "Timeout to process the request in milliseconds", required = true)
@PathVariable("timeout") int timeout,
@Parameter(description = "A JSON value representing the message.", required = true)
@RequestBody String requestBody) throws ThingsboardException {
return handleRuleEngineRequest(entityType, entityIdStr, null, timeout, requestBody);
}
@ApiOperation(value = "Push entity message with timeout and specified queue to the rule engine (handleRuleEngineRequest)",
notes = MSG_DESCRIPTION_PREFIX +
"Uses specified Entity Id as the Rule Engine message originator. " +
MSG_DESCRIPTION +
"If request sent for Device/Device Profile or Asset/Asset Profile entity, specified queue will be used instead of the queue selected in the device or asset profile. " +
"The platform expects the timeout value in milliseconds."
+ "\n\n" + ControllerConstants.SECURITY_WRITE_CHECK)
@PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
@RequestMapping(value = "/{entityType}/{entityId}/{queueName}/{timeout}", method = RequestMethod.POST)
@ResponseBody
public DeferredResult<ResponseEntity> handleRuleEngineRequest(
@Parameter(description = ENTITY_TYPE_PARAM_DESCRIPTION, required = true)
@PathVariable("entityType") String entityType,
@Parameter(description = ENTITY_ID_PARAM_DESCRIPTION, required = true)
@PathVariable("entityId") String entityIdStr,
@Parameter(description = "Queue name to process the request in the rule engine", required = true)
@PathVariable("queueName") String queueName,
@Parameter(description = "Timeout to process the request in milliseconds", required = true)
@PathVariable("timeout") int timeout,
@Parameter(description = "A JSON value representing the message.", required = true)
@RequestBody String requestBody) throws ThingsboardException {
try {
SecurityUser currentUser = getCurrentUser();
EntityId entityId;
if (StringUtils.isEmpty(entityType) || StringUtils.isEmpty(entityIdStr)) {
entityId = currentUser.getId();
} else {
entityId = EntityIdFactory.getByTypeAndId(entityType, entityIdStr);
}
//Check that this is a valid JSON
JacksonUtil.toJsonNode(requestBody);
final DeferredResult<ResponseEntity> response = new DeferredResult<>();
accessValidator.validate(currentUser, Operation.WRITE, entityId, new HttpValidationCallback(response, new FutureCallback<DeferredResult<ResponseEntity>>() {
@Override
public void onSuccess(@Nullable DeferredResult<ResponseEntity> result) {
long expTime = System.currentTimeMillis() + timeout;
HashMap<String, String> metaData = new HashMap<>();
UUID requestId = UUID.randomUUID();
metaData.put("serviceId", serviceInfoProvider.getServiceId());
metaData.put("requestUUID", requestId.toString());
metaData.put("expirationTime", Long.toString(expTime));
TbMsg msg = TbMsg.newMsg(queueName, TbMsgType.REST_API_REQUEST, entityId, currentUser.getCustomerId(), new TbMsgMetaData(metaData), requestBody);
ruleEngineCallService.processRestApiCallToRuleEngine(currentUser.getTenantId(), requestId, msg, queueName != null,
reply -> reply(new LocalRequestMetaData(msg, currentUser, result), reply));
}
@Override
public void onFailure(Throwable e) {
ResponseEntity entity;
if (e instanceof ToErrorResponseEntity) {
entity = ((ToErrorResponseEntity) e).toErrorResponseEntity();
} else {
entity = new ResponseEntity(HttpStatus.UNAUTHORIZED);
}
logRuleEngineCall(currentUser, entityId, requestBody, null, e);
response.setResult(entity);
}
}));
return response;
} catch (IllegalArgumentException iae) {
throw new ThingsboardException("Invalid request body", iae, ThingsboardErrorCode.BAD_REQUEST_PARAMS);
}
}
private void reply(LocalRequestMetaData rpcRequest, TbMsg response) {
DeferredResult<ResponseEntity> responseWriter = rpcRequest.responseWriter;
if (response == null) {
logRuleEngineCall(rpcRequest, null, new TimeoutException("Processing timeout detected!"));
responseWriter.setResult(new ResponseEntity<>(HttpStatus.REQUEST_TIMEOUT));
} else {
String responseData = response.getData();
if (!StringUtils.isEmpty(responseData)) {
try {
logRuleEngineCall(rpcRequest, response, null);
responseWriter.setResult(new ResponseEntity<>(JacksonUtil.toJsonNode(responseData), HttpStatus.OK));
} catch (IllegalArgumentException e) {
log.debug("Failed to decode device response: {}", responseData, e);
logRuleEngineCall(rpcRequest, response, e);
responseWriter.setResult(new ResponseEntity<>(HttpStatus.NOT_ACCEPTABLE));
}
} else {
logRuleEngineCall(rpcRequest, response, null);
responseWriter.setResult(new ResponseEntity<>(HttpStatus.OK));
}
}
}
private void logRuleEngineCall(LocalRequestMetaData rpcRequest, TbMsg response, Throwable e) {
logRuleEngineCall(rpcRequest.user, rpcRequest.request.getOriginator(), rpcRequest.request.getData(), response, e);
}
private void logRuleEngineCall(SecurityUser user, EntityId entityId, String request, TbMsg response, Throwable e) {
auditLogService.logEntityAction(
user.getTenantId(),
user.getCustomerId(),
user.getId(),
user.getName(),
entityId,
null,
ActionType.REST_API_RULE_ENGINE_CALL,
BaseController.toException(e),
request,
response != null ? response.getData() : "");
}
private record LocalRequestMetaData(TbMsg request, SecurityUser user, DeferredResult<ResponseEntity> responseWriter) {}
}

1
application/src/main/java/org/thingsboard/server/controller/TbUrlConstants.java

@ -22,4 +22,5 @@ public class TbUrlConstants {
public static final String TELEMETRY_URL_PREFIX = "/api/plugins/telemetry";
public static final String RPC_V1_URL_PREFIX = "/api/plugins/rpc";
public static final String RPC_V2_URL_PREFIX = "/api/rpc";
public static final String RULE_ENGINE_URL_PREFIX = "/api/rule-engine/";
}

20
application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java

@ -30,7 +30,6 @@ import org.thingsboard.server.common.data.EdgeUtils;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.HasName;
import org.thingsboard.server.common.data.HasRuleEngineProfile;
import org.thingsboard.server.common.data.TbResource;
import org.thingsboard.server.common.data.TbResourceInfo;
import org.thingsboard.server.common.data.Tenant;
import org.thingsboard.server.common.data.TenantProfile;
@ -183,6 +182,14 @@ public class DefaultTbClusterService implements TbClusterService {
toCoreNfs.incrementAndGet();
}
@Override
public void pushNotificationToCore(String targetServiceId, TransportProtos.RestApiCallResponseMsgProto responseMsgProto, TbQueueCallback callback) {
TopicPartitionInfo tpi = topicService.getNotificationsTopic(ServiceType.TB_CORE, targetServiceId);
ToCoreNotificationMsg msg = ToCoreNotificationMsg.newBuilder().setRestApiCallResponseMsg(responseMsgProto).build();
producerProvider.getTbCoreNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), msg), callback);
toCoreNfs.incrementAndGet();
}
@Override
public void pushMsgToRuleEngine(TopicPartitionInfo tpi, UUID msgId, ToRuleEngineMsg msg, TbQueueCallback callback) {
log.trace("PUSHING msg: {} to:{}", msg, tpi);
@ -192,6 +199,11 @@ public class DefaultTbClusterService implements TbClusterService {
@Override
public void pushMsgToRuleEngine(TenantId tenantId, EntityId entityId, TbMsg tbMsg, TbQueueCallback callback) {
pushMsgToRuleEngine(tenantId, entityId, tbMsg, false, callback);
}
@Override
public void pushMsgToRuleEngine(TenantId tenantId, EntityId entityId, TbMsg tbMsg, boolean useQueueFromTbMsg, TbQueueCallback callback) {
if (tenantId == null || tenantId.isNullUid()) {
if (entityId.getEntityType().equals(EntityType.TENANT)) {
tenantId = TenantId.fromUUID(entityId.getId());
@ -201,7 +213,7 @@ public class DefaultTbClusterService implements TbClusterService {
}
} else {
HasRuleEngineProfile ruleEngineProfile = getRuleEngineProfileForEntityOrElseNull(tenantId, entityId);
tbMsg = transformMsg(tbMsg, ruleEngineProfile);
tbMsg = transformMsg(tbMsg, ruleEngineProfile, useQueueFromTbMsg);
}
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tbMsg.getQueueName(), tenantId, entityId);
log.trace("PUSHING msg: {} to:{}", tbMsg, tpi);
@ -226,10 +238,10 @@ public class DefaultTbClusterService implements TbClusterService {
return null;
}
private TbMsg transformMsg(TbMsg tbMsg, HasRuleEngineProfile ruleEngineProfile) {
private TbMsg transformMsg(TbMsg tbMsg, HasRuleEngineProfile ruleEngineProfile, boolean useQueueFromTbMsg) {
if (ruleEngineProfile != null) {
RuleChainId targetRuleChainId = ruleEngineProfile.getDefaultRuleChainId();
String targetQueueName = ruleEngineProfile.getDefaultQueueName();
String targetQueueName = useQueueFromTbMsg ? tbMsg.getQueueName() : ruleEngineProfile.getDefaultQueueName();
boolean isRuleChainTransform = targetRuleChainId != null && !targetRuleChainId.equals(tbMsg.getRuleChainId());
boolean isQueueTransform = targetQueueName != null && !targetQueueName.equals(tbMsg.getQueueName());

13
application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java

@ -96,6 +96,7 @@ import org.thingsboard.server.service.queue.processing.AbstractConsumerService;
import org.thingsboard.server.service.queue.processing.IdMsgPair;
import org.thingsboard.server.service.resource.TbImageService;
import org.thingsboard.server.service.rpc.TbCoreDeviceRpcService;
import org.thingsboard.server.service.ruleengine.RuleEngineCallService;
import org.thingsboard.server.service.security.auth.jwt.settings.JwtSettingsService;
import org.thingsboard.server.service.state.DeviceStateService;
import org.thingsboard.server.service.subscription.SubscriptionManagerService;
@ -149,6 +150,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
private final NotificationRuleProcessor notificationRuleProcessor;
private final TbCoreQueueFactory queueFactory;
private final TbImageService imageService;
private final RuleEngineCallService ruleEngineCallService;
private final TbCoreConsumerStats stats;
private MainQueueConsumerManager<TbProtoQueueMsg<ToCoreMsg>, CoreQueueConfig> mainConsumer;
@ -177,7 +179,8 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
JwtSettingsService jwtSettingsService,
NotificationSchedulerService notificationSchedulerService,
NotificationRuleProcessor notificationRuleProcessor,
TbImageService imageService) {
TbImageService imageService,
RuleEngineCallService ruleEngineCallService) {
super(actorContext, tenantProfileCache, deviceProfileCache, assetProfileCache, apiUsageStateService, partitionService,
eventPublisher, jwtSettingsService);
this.stateService = stateService;
@ -192,6 +195,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
this.notificationSchedulerService = notificationSchedulerService;
this.notificationRuleProcessor = notificationRuleProcessor;
this.imageService = imageService;
this.ruleEngineCallService = ruleEngineCallService;
this.queueFactory = tbCoreQueueFactory;
}
@ -377,6 +381,9 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
} else if (toCoreNotification.hasFromDeviceRpcResponse()) {
log.trace("[{}] Forwarding message to RPC service {}", id, toCoreNotification.getFromDeviceRpcResponse());
forwardToCoreRpcService(toCoreNotification.getFromDeviceRpcResponse(), callback);
} else if (toCoreNotification.hasRestApiCallResponseMsg()) {
log.trace("[{}] Forwarding message to RuleEngineCallService service {}", id, toCoreNotification.getRestApiCallResponseMsg());
forwardToRuleEngineCallService(toCoreNotification.getRestApiCallResponseMsg(), callback);
} else if (toCoreNotification.hasComponentLifecycle()) {
handleComponentLifecycleMsg(id, ProtoUtils.fromProto(toCoreNotification.getComponentLifecycle()));
callback.onSuccess();
@ -738,6 +745,10 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
actorContext.getDbCallbackExecutor());
}
void forwardToRuleEngineCallService(TransportProtos.RestApiCallResponseMsgProto restApiCallResponseMsg, TbCallback callback) {
ruleEngineCallService.onQueueMsg(restApiCallResponseMsg, callback);
}
private void throwNotHandled(Object msg, TbCallback callback) {
log.warn("Message not handled: {}", msg);
callback.onFailure(new RuntimeException("Message not handled!"));

11
application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcService.java

@ -27,6 +27,7 @@ import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.rpc.Rpc;
import org.thingsboard.server.common.data.rpc.RpcError;
import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequestActorMsg;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
@ -127,6 +128,16 @@ public class DefaultTbRuleEngineRpcService implements TbRuleEngineDeviceRpcServi
});
}
@Override
public void sendRestApiCallReply(String serviceId, UUID requestId, TbMsg tbMsg) {
TransportProtos.RestApiCallResponseMsgProto msg = TransportProtos.RestApiCallResponseMsgProto.newBuilder()
.setRequestIdMSB(requestId.getMostSignificantBits())
.setRequestIdLSB(requestId.getLeastSignificantBits())
.setResponse(TbMsg.toByteString(tbMsg))
.build();
clusterService.pushNotificationToCore(serviceId, msg, null);
}
@Override
public Rpc findRpcById(TenantId tenantId, RpcId id) {
return rpcService.findById(tenantId, id);

100
application/src/main/java/org/thingsboard/server/service/ruleengine/DefaultRuleEngineCallService.java

@ -0,0 +1,100 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.ruleengine;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.queue.TbMsgCallback;
import org.thingsboard.server.gen.transport.TransportProtos;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
@Service
@Slf4j
public class DefaultRuleEngineCallService implements RuleEngineCallService {
private final TbClusterService clusterService;
private ScheduledExecutorService executor;
private final ConcurrentMap<UUID, Consumer<TbMsg>> requests = new ConcurrentHashMap<>();
public DefaultRuleEngineCallService(TbClusterService clusterService) {
this.clusterService = clusterService;
}
@PostConstruct
public void initExecutor() {
executor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("re-rest-callback"));
}
@PreDestroy
public void shutdownExecutor() {
if (executor != null) {
executor.shutdownNow();
}
}
@Override
public void processRestApiCallToRuleEngine(TenantId tenantId, UUID requestId, TbMsg request, boolean useQueueFromTbMsg, Consumer<TbMsg> responseConsumer) {
log.trace("[{}] Processing REST API call to rule engine: [{}] for entity: [{}]", tenantId, requestId, request.getOriginator());
requests.put(requestId, responseConsumer);
sendRequestToRuleEngine(tenantId, request, useQueueFromTbMsg);
scheduleTimeout(request, requestId, requests);
}
@Override
public void onQueueMsg(TransportProtos.RestApiCallResponseMsgProto restApiCallResponseMsg, TbCallback callback) {
UUID requestId = new UUID(restApiCallResponseMsg.getRequestIdMSB(), restApiCallResponseMsg.getRequestIdLSB());
Consumer<TbMsg> consumer = requests.remove(requestId);
if (consumer != null) {
consumer.accept(TbMsg.fromBytes(null, restApiCallResponseMsg.getResponse().toByteArray(), TbMsgCallback.EMPTY));
} else {
log.trace("[{}] Unknown or stale rest api call response received", requestId);
}
callback.onSuccess();
}
private void sendRequestToRuleEngine(TenantId tenantId, TbMsg msg, boolean useQueueFromTbMsg) {
clusterService.pushMsgToRuleEngine(tenantId, msg.getOriginator(), msg, useQueueFromTbMsg, null);
}
private void scheduleTimeout(TbMsg request, UUID requestId, ConcurrentMap<UUID, Consumer<TbMsg>> requestsMap) {
long expirationTime = Long.parseLong(request.getMetaData().getValue("expirationTime"));
long timeout = Math.max(0, expirationTime - System.currentTimeMillis());
log.trace("[{}] processing the request: [{}]", this.hashCode(), requestId);
executor.schedule(() -> {
Consumer<TbMsg> consumer = requestsMap.remove(requestId);
if (consumer != null) {
log.trace("[{}] request timeout detected: [{}]", this.hashCode(), requestId);
consumer.accept(null);
}
}, timeout, TimeUnit.MILLISECONDS);
}
}

31
application/src/main/java/org/thingsboard/server/service/ruleengine/RuleEngineCallService.java

@ -0,0 +1,31 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.ruleengine;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.gen.transport.TransportProtos;
import java.util.UUID;
import java.util.function.Consumer;
public interface RuleEngineCallService {
void processRestApiCallToRuleEngine(TenantId tenantId, UUID requestId, TbMsg request, boolean useQueueFromTbMsg, Consumer<TbMsg> responseConsumer);
void onQueueMsg(TransportProtos.RestApiCallResponseMsgProto restApiCallResponseMsg, TbCallback callback);
}

21
application/src/test/java/org/thingsboard/server/controller/AbstractNotifyEntityTest.java

@ -386,6 +386,16 @@ public abstract class AbstractNotifyEntityTest extends AbstractWebTest {
actionType, cntTime, extractMatcherAdditionalInfo(additionalInfo));
}
protected void testLogEntityActionError(HasName entity, EntityId originatorId, TenantId tenantId,
CustomerId customerId, UserId userId, String userName,
ActionType actionType, Exception exp, Object... additionalInfo) {
ArgumentMatcher<Exception> matcherError = argument -> argument.getMessage().contains(exp.getMessage())
& argument.getClass().equals(exp.getClass());
ArgumentMatcher<HasName> matcherEntityEquals = entity == null ? Objects::isNull : argument -> argument.toString().equals(entity.toString());
testLogEntityActionErrorAdditionalInfo(matcherEntityEquals, originatorId, tenantId, customerId, userId, userName,
actionType, 1, matcherError, extractMatcherAdditionalInfo(additionalInfo));
}
private void testLogEntityActionAdditionalInfo(ArgumentMatcher<HasName> matcherEntity, ArgumentMatcher<EntityId> matcherOriginatorId,
TenantId tenantId, ArgumentMatcher<CustomerId> matcherCustomerId,
ArgumentMatcher<UserId> matcherUserId, String userName, ActionType actionType,
@ -529,8 +539,9 @@ public abstract class AbstractNotifyEntityTest extends AbstractWebTest {
Mockito.argThat(matcherEntity),
Mockito.eq(actionType),
Mockito.argThat(matcherError),
Mockito.argThat(Mockito.eq(matcherAdditionalInfos.get(0))),
Mockito.argThat(Mockito.eq(matcherAdditionalInfos.get(1))));
Mockito.argThat(matcherAdditionalInfos.get(0)),
Mockito.argThat(matcherAdditionalInfos.get(1)));
break;
case 3:
Mockito.verify(auditLogService, times(cntTime))
.logEntityAction(Mockito.eq(tenantId),
@ -541,9 +552,9 @@ public abstract class AbstractNotifyEntityTest extends AbstractWebTest {
Mockito.argThat(matcherEntity),
Mockito.eq(actionType),
Mockito.argThat(matcherError),
Mockito.argThat(Mockito.eq(matcherAdditionalInfos.get(0))),
Mockito.argThat(Mockito.eq(matcherAdditionalInfos.get(1))),
Mockito.argThat(Mockito.eq(matcherAdditionalInfos.get(2))));
Mockito.argThat(matcherAdditionalInfos.get(0)),
Mockito.argThat(matcherAdditionalInfos.get(1)),
Mockito.argThat(matcherAdditionalInfos.get(2)));
break;
default:
Mockito.verify(auditLogService, times(cntTime))

10
application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java

@ -633,6 +633,12 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest {
return doPost("/api/device?accessToken=" + accessToken, device, Device.class);
}
protected Device assignDeviceToCustomer(String name, String accessToken, CustomerId customerId) throws Exception {
Device device = createDevice(name, accessToken);
String deviceIdStr = String.valueOf(device.getId().getId());
return doPost("/api/customer/" + customerId.getId() + "/device/" + deviceIdStr, device, Device.class);
}
protected MqttDeviceProfileTransportConfiguration createMqttDeviceProfileTransportConfiguration(TransportPayloadTypeConfiguration transportPayloadTypeConfiguration, boolean sendAckOnValidationException) {
MqttDeviceProfileTransportConfiguration mqttDeviceProfileTransportConfiguration = new MqttDeviceProfileTransportConfiguration();
mqttDeviceProfileTransportConfiguration.setDeviceTelemetryTopic(MqttTopics.DEVICE_TELEMETRY_TOPIC);
@ -803,6 +809,10 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest {
return readResponse(doPost(urlTemplate, content, params).andExpect(resultMatcher), responseType);
}
protected <T, R> R doPostAsyncWithTypedResponse(String urlTemplate, T content, TypeReference<R> responseType, ResultMatcher resultMatcher, String... params) throws Exception {
return readResponse(doPostAsync(urlTemplate, content, DEFAULT_TIMEOUT, params).andExpect(resultMatcher), responseType);
}
protected <T> T doPostAsync(String urlTemplate, T content, Class<T> responseClass, ResultMatcher resultMatcher, String... params) throws Exception {
return readResponse(doPostAsync(urlTemplate, content, DEFAULT_TIMEOUT, params).andExpect(resultMatcher), responseClass);
}

252
application/src/test/java/org/thingsboard/server/controller/RuleEngineControllerTest.java

@ -0,0 +1,252 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.controller;
import com.fasterxml.jackson.core.type.TypeReference;
import io.jsonwebtoken.Claims;
import io.jsonwebtoken.Jws;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.mock.mockito.SpyBean;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.test.web.servlet.MvcResult;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.audit.ActionType;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.UserId;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.dao.service.DaoSqlTest;
import org.thingsboard.server.service.ruleengine.RuleEngineCallService;
import org.thingsboard.server.service.security.model.token.JwtTokenFactory;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
@DaoSqlTest
public class RuleEngineControllerTest extends AbstractControllerTest {
private static final String REQUEST_BODY = "{\"temperature\":23}";
@SpyBean
private RuleEngineCallService ruleEngineCallService;
@Autowired
private JwtTokenFactory jwtTokenFactory;
@Test
public void testHandleRuleEngineRequestWithMsgOriginatorUser() throws Exception {
loginSysAdmin();
UserId sysAdminUserId = getCurrentUserId();
TbMsg msg = TbMsg.newMsg(TbMsgType.REST_API_REQUEST, sysAdminUserId, new CustomerId(TenantId.SYS_TENANT_ID.getId()), TbMsgMetaData.EMPTY, REQUEST_BODY);
doAnswer(invocation -> {
Consumer<TbMsg> consumer = invocation.getArgument(4);
consumer.accept(msg);
return null;
}).when(ruleEngineCallService).processRestApiCallToRuleEngine(eq(TenantId.SYS_TENANT_ID), any(UUID.class), any(TbMsg.class), anyBoolean(), any());
var response = doPostAsyncWithTypedResponse("/api/rule-engine/", REQUEST_BODY, new TypeReference<>() {
}, status().isOk());
assertThat(Objects.requireNonNull(JacksonUtil.toString(response))).isEqualTo(REQUEST_BODY);
ArgumentCaptor<TbMsg> captor = ArgumentCaptor.forClass(TbMsg.class);
verify(ruleEngineCallService).processRestApiCallToRuleEngine(eq(TenantId.SYS_TENANT_ID), any(), captor.capture(), eq(false), any());
TbMsg tbMsg = captor.getValue();
assertThat(tbMsg.getData()).isEqualTo(REQUEST_BODY);
assertThat(tbMsg.getType()).isEqualTo(msg.getType());
assertThat(tbMsg.getOriginator()).isEqualTo(sysAdminUserId);
testLogEntityAction(null, sysAdminUserId, TenantId.SYS_TENANT_ID, new CustomerId(TenantId.SYS_TENANT_ID.getId()), sysAdminUserId,
SYS_ADMIN_EMAIL, ActionType.REST_API_RULE_ENGINE_CALL, 1, REQUEST_BODY, REQUEST_BODY);
}
@Test
public void testHandleRuleEngineRequestWithMsgOriginatorDevice() throws Exception {
loginTenantAdmin();
Device device = createDevice("Test", "123");
DeviceId deviceId = device.getId();
TbMsg msg = TbMsg.newMsg(TbMsgType.REST_API_REQUEST, deviceId, new CustomerId(TenantId.SYS_TENANT_ID.getId()), TbMsgMetaData.EMPTY, REQUEST_BODY);
mockSuccessfulRestApiCallToRuleEngine(msg);
var response = doPostAsyncWithTypedResponse("/api/rule-engine/DEVICE/" + deviceId.getId(), REQUEST_BODY, new TypeReference<>() {
}, status().isOk());
assertThat(Objects.requireNonNull(JacksonUtil.toString(response))).isEqualTo(REQUEST_BODY);
ArgumentCaptor<TbMsg> captor = ArgumentCaptor.forClass(TbMsg.class);
verify(ruleEngineCallService).processRestApiCallToRuleEngine(eq(tenantId), any(), captor.capture(), eq(false), any());
TbMsg tbMsg = captor.getValue();
assertThat(tbMsg.getData()).isEqualTo(REQUEST_BODY);
assertThat(tbMsg.getType()).isEqualTo(msg.getType());
assertThat(tbMsg.getOriginator()).isEqualTo(deviceId);
testLogEntityAction(null, deviceId, tenantId, new CustomerId(TenantId.SYS_TENANT_ID.getId()), tenantAdminUserId,
TENANT_ADMIN_EMAIL, ActionType.REST_API_RULE_ENGINE_CALL, 1, REQUEST_BODY, REQUEST_BODY);
}
@Test
public void testHandleRuleEngineRequestWithMsgOriginatorDeviceAndSpecifiedTimeout() throws Exception {
loginTenantAdmin();
Device device = createDevice("Test", "123");
DeviceId deviceId = device.getId();
TbMsg msg = TbMsg.newMsg(TbMsgType.REST_API_REQUEST, deviceId, new CustomerId(TenantId.SYS_TENANT_ID.getId()), TbMsgMetaData.EMPTY, REQUEST_BODY);
mockSuccessfulRestApiCallToRuleEngine(msg);
var response = doPostAsyncWithTypedResponse("/api/rule-engine/DEVICE/" + deviceId.getId() + "/15000", REQUEST_BODY, new TypeReference<>() {
}, status().isOk());
assertThat(Objects.requireNonNull(JacksonUtil.toString(response))).isEqualTo(REQUEST_BODY);
ArgumentCaptor<TbMsg> captor = ArgumentCaptor.forClass(TbMsg.class);
verify(ruleEngineCallService).processRestApiCallToRuleEngine(eq(tenantId), any(), captor.capture(), eq(false), any());
TbMsg tbMsg = captor.getValue();
assertThat(tbMsg.getData()).isEqualTo(REQUEST_BODY);
assertThat(tbMsg.getType()).isEqualTo(msg.getType());
assertThat(tbMsg.getOriginator()).isEqualTo(deviceId);
testLogEntityAction(null, deviceId, tenantId, new CustomerId(TenantId.SYS_TENANT_ID.getId()), tenantAdminUserId,
TENANT_ADMIN_EMAIL, ActionType.REST_API_RULE_ENGINE_CALL, 1, REQUEST_BODY, REQUEST_BODY);
}
@Test
public void testHandleRuleEngineRequestWithMsgOriginatorDeviceAndResponseIsNull() throws Exception {
loginTenantAdmin();
Device device = createDevice("Test", "123");
DeviceId deviceId = device.getId();
TbMsg msg = TbMsg.newMsg(TbMsgType.REST_API_REQUEST, deviceId, new CustomerId(TenantId.SYS_TENANT_ID.getId()), TbMsgMetaData.EMPTY, REQUEST_BODY);
mockSuccessfulRestApiCallToRuleEngine(null);
doPostAsync("/api/rule-engine/DEVICE/" + deviceId.getId() + "/15000", REQUEST_BODY, String.class, status().isRequestTimeout());
ArgumentCaptor<TbMsg> captor = ArgumentCaptor.forClass(TbMsg.class);
verify(ruleEngineCallService).processRestApiCallToRuleEngine(eq(tenantId), any(), captor.capture(), eq(false), any());
TbMsg tbMsg = captor.getValue();
assertThat(tbMsg.getData()).isEqualTo(REQUEST_BODY);
assertThat(tbMsg.getType()).isEqualTo(msg.getType());
assertThat(tbMsg.getOriginator()).isEqualTo(deviceId);
Exception exception = new TimeoutException("Processing timeout detected!");
testLogEntityActionError(null, deviceId, tenantId, new CustomerId(TenantId.SYS_TENANT_ID.getId()), tenantAdminUserId,
TENANT_ADMIN_EMAIL, ActionType.REST_API_RULE_ENGINE_CALL, exception, REQUEST_BODY, "");
}
@Test
public void testHandleRuleEngineRequestWithMsgOriginatorDeviceAndSpecifiedQueue() throws Exception {
loginTenantAdmin();
Device device = createDevice("Test", "123");
DeviceId deviceId = device.getId();
TbMsg msg = TbMsg.newMsg("HighPriority", TbMsgType.REST_API_REQUEST, deviceId, TbMsgMetaData.EMPTY, REQUEST_BODY);
mockSuccessfulRestApiCallToRuleEngine(msg);
var response = doPostAsyncWithTypedResponse("/api/rule-engine/DEVICE/" + deviceId.getId() + "/HighPriority/1000", REQUEST_BODY, new TypeReference<>() {
}, status().isOk());
assertThat(Objects.requireNonNull(JacksonUtil.toString(response))).isEqualTo(REQUEST_BODY);
ArgumentCaptor<TbMsg> captor = ArgumentCaptor.forClass(TbMsg.class);
verify(ruleEngineCallService).processRestApiCallToRuleEngine(eq(tenantId), any(), captor.capture(), eq(true), any());
TbMsg tbMsg = captor.getValue();
assertThat(tbMsg.getData()).isEqualTo(REQUEST_BODY);
assertThat(tbMsg.getType()).isEqualTo(msg.getType());
assertThat(tbMsg.getQueueName()).isEqualTo(msg.getQueueName());
assertThat(tbMsg.getOriginator()).isEqualTo(deviceId);
testLogEntityAction(null, deviceId, tenantId, new CustomerId(TenantId.SYS_TENANT_ID.getId()), tenantAdminUserId,
TENANT_ADMIN_EMAIL, ActionType.REST_API_RULE_ENGINE_CALL, 1, REQUEST_BODY, REQUEST_BODY);
}
@Test
public void testHandleRuleEngineRequestWithInvalidRequestBody() throws Exception {
loginSysAdmin();
doPost("/api/rule-engine/", (Object) "@")
.andExpect(status().isBadRequest())
.andExpect(statusReason(containsString("Invalid request body")));
verifyNoMoreInteractions(ruleEngineCallService);
}
@Test
public void testHandleRuleEngineRequestWithAuthorityCustomerUser() throws Exception {
loginTenantAdmin();
Device device = assignDeviceToCustomer("test", "123", customerId);
DeviceId deviceId = device.getId();
loginCustomerUser();
TbMsg msg = TbMsg.newMsg(TbMsgType.REST_API_REQUEST, deviceId, customerId, TbMsgMetaData.EMPTY, REQUEST_BODY);
mockSuccessfulRestApiCallToRuleEngine(msg);
var response = doPostAsyncWithTypedResponse("/api/rule-engine/DEVICE/" + deviceId.getId(), REQUEST_BODY, new TypeReference<>() {
}, status().isOk());
assertThat(Objects.requireNonNull(JacksonUtil.toString(response))).isEqualTo(REQUEST_BODY);
ArgumentCaptor<TbMsg> captor = ArgumentCaptor.forClass(TbMsg.class);
verify(ruleEngineCallService).processRestApiCallToRuleEngine(eq(tenantId), any(), captor.capture(), eq(false), any());
TbMsg tbMsg = captor.getValue();
assertThat(tbMsg.getData()).isEqualTo(REQUEST_BODY);
assertThat(tbMsg.getType()).isEqualTo(msg.getType());
assertThat(tbMsg.getOriginator()).isEqualTo(deviceId);
testLogEntityAction(null, deviceId, tenantId, customerId, customerUserId, CUSTOMER_USER_EMAIL,
ActionType.REST_API_RULE_ENGINE_CALL, 1, REQUEST_BODY, REQUEST_BODY);
}
@Test
public void testHandleRuleEngineRequestWithoutPermission() throws Exception {
loginTenantAdmin();
Device device = createDevice("test", "123");
loginCustomerUser();
MvcResult result = doPost("/api/rule-engine/DEVICE/" + device.getId().getId(), (Object) REQUEST_BODY).andReturn();
ResponseEntity response = (ResponseEntity) result.getAsyncResult();
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.FORBIDDEN);
assertThat(Objects.requireNonNull(response.getBody()).toString()).isEqualTo("You don't have permission to perform this operation!");
verify(ruleEngineCallService, never()).processRestApiCallToRuleEngine(any(), any(), any(), anyBoolean(), any());
}
@Test
public void testHandleRuleEngineRequestUnauthorized() throws Exception {
doPost("/api/rule-engine/", (Object) REQUEST_BODY)
.andExpect(status().isUnauthorized())
.andExpect(statusReason(containsString("Authentication failed")));
}
private void mockSuccessfulRestApiCallToRuleEngine(TbMsg msg) {
doAnswer(invocation -> {
Consumer<TbMsg> consumer = invocation.getArgument(4);
consumer.accept(msg);
return null;
}).when(ruleEngineCallService).processRestApiCallToRuleEngine(eq(tenantId), any(UUID.class), any(TbMsg.class), anyBoolean(), any());
}
private UserId getCurrentUserId() {
Jws<Claims> jwsClaims = jwtTokenFactory.parseTokenClaims(token);
Claims claims = jwsClaims.getPayload();
String userId = claims.get("userId", String.class);
return UserId.fromString(userId);
}
}

51
application/src/test/java/org/thingsboard/server/service/queue/DefaultTbClusterServiceTest.java

@ -19,16 +19,22 @@ import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.boot.test.mock.mockito.SpyBean;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.QueueId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.data.queue.Queue;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.dao.edge.EdgeService;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.TbQueueProducer;
@ -43,6 +49,7 @@ import org.thingsboard.server.service.profile.TbDeviceProfileCache;
import java.util.List;
import java.util.UUID;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
@ -237,6 +244,50 @@ public class DefaultTbClusterServiceTest {
.send(eq(topicService.getNotificationsTopic(ServiceType.TB_TRANSPORT, monolith2)), any(TbProtoQueueMsg.class), isNull());
}
@Test
public void testPushNotificationToCoreWithRestApiCallResponseMsgProto() {
TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreNotificationMsg>> tbCoreQueueProducer = mock(TbQueueProducer.class);
TopicPartitionInfo tpi = new TopicPartitionInfo(ServiceType.TB_CORE.name().toLowerCase() + ".notifications." + CORE, null, null, false);
when(producerProvider.getTbCoreNotificationsMsgProducer()).thenReturn(tbCoreQueueProducer);
TransportProtos.RestApiCallResponseMsgProto responseMsgProto = TransportProtos.RestApiCallResponseMsgProto.getDefaultInstance();
TransportProtos.ToCoreNotificationMsg toCoreNotificationMsg = TransportProtos.ToCoreNotificationMsg.newBuilder().setRestApiCallResponseMsg(responseMsgProto).build();
clusterService.pushNotificationToCore(CORE, responseMsgProto, null);
verify(topicService).getNotificationsTopic(ServiceType.TB_CORE, CORE);
verify(producerProvider).getTbCoreNotificationsMsgProducer();
ArgumentCaptor<TbProtoQueueMsg<TransportProtos.ToCoreNotificationMsg>> protoQueueMsgArgumentCaptor = ArgumentCaptor.forClass(TbProtoQueueMsg.class);
verify(tbCoreQueueProducer).send(eq(tpi), protoQueueMsgArgumentCaptor.capture(), isNull());
TbProtoQueueMsg<TransportProtos.ToCoreNotificationMsg> protoQueueMsgArgumentCaptorValue = protoQueueMsgArgumentCaptor.getValue();
assertThat(protoQueueMsgArgumentCaptorValue.getValue()).isEqualTo(toCoreNotificationMsg);
}
@Test
public void testPushMsgToRuleEngineUsingQueueFromMsg() {
TenantId tenantId = TenantId.SYS_TENANT_ID;
DeviceId deviceId = new DeviceId(UUID.randomUUID());
TbMsg tbMsg = TbMsg.newMsg("main", TbMsgType.REST_API_REQUEST, deviceId, null, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT);
TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> tbREQueueProducer = mock(TbQueueProducer.class);
TopicPartitionInfo tpi = new TopicPartitionInfo(ServiceType.TB_RULE_ENGINE.name().toLowerCase() + ".notifications." + CORE, tenantId, null, false);
TransportProtos.ToRuleEngineMsg msg = TransportProtos.ToRuleEngineMsg.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setTbMsg(TbMsg.toByteString(tbMsg)).build();
when(producerProvider.getRuleEngineMsgProducer()).thenReturn(tbREQueueProducer);
when(partitionService.resolve(any(), any(), any(), any())).thenReturn(tpi);
clusterService.pushMsgToRuleEngine(tenantId, tenantId, tbMsg, true, null);
verify(partitionService).resolve(ServiceType.TB_RULE_ENGINE, "main", tenantId, tenantId);
verify(producerProvider).getRuleEngineMsgProducer();
ArgumentCaptor<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> protoQueueMsgArgumentCaptor = ArgumentCaptor.forClass(TbProtoQueueMsg.class);
verify(tbREQueueProducer).send(eq(tpi), protoQueueMsgArgumentCaptor.capture(), isNull());
TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg> protoQueueMsgArgumentCaptorValue = protoQueueMsgArgumentCaptor.getValue();
assertThat(protoQueueMsgArgumentCaptorValue.getValue()).isEqualTo(msg);
}
protected Queue createTestQueue() {
TenantId tenantId = TenantId.SYS_TENANT_ID;
Queue queue = new Queue(new QueueId(UUID.randomUUID()));

16
application/src/test/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerServiceTest.java

@ -29,6 +29,7 @@ import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.service.ruleengine.RuleEngineCallService;
import org.thingsboard.server.service.state.DeviceStateService;
import java.util.UUID;
@ -48,6 +49,8 @@ public class DefaultTbCoreConsumerServiceTest {
private DeviceStateService stateServiceMock;
@Mock
private TbCoreConsumerStats statsMock;
@Mock
private RuleEngineCallService ruleEngineCallServiceMock;
@Mock
private TbCallback tbCallbackMock;
@ -529,4 +532,17 @@ public class DefaultTbCoreConsumerServiceTest {
then(statsMock).should(never()).log(inactivityMsg);
}
@Test
public void givenRestApiCallResponseMsgProto_whenForwardToRuleEngineCallService_thenCallOnQueueMsg() {
// GIVEN
ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "ruleEngineCallService", ruleEngineCallServiceMock);
var restApiCallResponseMsgProto = TransportProtos.RestApiCallResponseMsgProto.getDefaultInstance();
doCallRealMethod().when(defaultTbCoreConsumerServiceMock).forwardToRuleEngineCallService(restApiCallResponseMsgProto, tbCallbackMock);
// WHEN
defaultTbCoreConsumerServiceMock.forwardToRuleEngineCallService(restApiCallResponseMsgProto, tbCallbackMock);
// THEN
then(ruleEngineCallServiceMock).should().onQueueMsg(restApiCallResponseMsgProto, tbCallbackMock);
}
}

61
application/src/test/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcServiceTest.java

@ -0,0 +1,61 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.rpc;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.gen.transport.TransportProtos;
import java.util.UUID;
import static org.mockito.BDDMockito.then;
@ExtendWith(MockitoExtension.class)
class DefaultTbRuleEngineRpcServiceTest {
@Mock
private TbClusterService tbClusterServiceMock;
@InjectMocks
private DefaultTbRuleEngineRpcService tbRuleEngineRpcService;
@Test
public void givenTbMsg_whenSendRestApiCallReply_thenPushNotificationToCore() {
// GIVEN
String serviceId = "tb-core-0";
UUID requestId = UUID.fromString("f64a20df-eb1e-46a3-ba6f-0b3ae053ee0a");
DeviceId deviceId = new DeviceId(UUID.fromString("1d9f771a-7cdc-4ac7-838c-ba193d05a012"));
TbMsg msg = TbMsg.newMsg(null, TbMsgType.REST_API_REQUEST, deviceId, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT);
var restApiCallResponseMsgProto = TransportProtos.RestApiCallResponseMsgProto.newBuilder()
.setRequestIdMSB(requestId.getMostSignificantBits())
.setRequestIdLSB(requestId.getLeastSignificantBits())
.setResponse(TbMsg.toByteString(msg))
.build();
// WHEN
tbRuleEngineRpcService.sendRestApiCallReply(serviceId, requestId, msg);
// THEN
then(tbClusterServiceMock).should().pushNotificationToCore(serviceId, restApiCallResponseMsgProto, null);
}
}

166
application/src/test/java/org/thingsboard/server/service/ruleengine/DefaultRuleEngineCallServiceTest.java

@ -0,0 +1,166 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.ruleengine;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.test.util.ReflectionTestUtils;
import org.testcontainers.shaded.org.awaitility.Awaitility;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.gen.transport.TransportProtos;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.verify;
@ExtendWith(MockitoExtension.class)
public class DefaultRuleEngineCallServiceTest {
private static final TenantId TENANT_ID = TenantId.fromUUID(UUID.fromString("d7210c7f-a152-4e91-8186-19ae85499a6b"));
private final ConcurrentMap<UUID, Consumer<TbMsg>> requests = new ConcurrentHashMap<>();
@Mock
private TbClusterService tbClusterServiceMock;
private DefaultRuleEngineCallService ruleEngineCallService;
private ScheduledExecutorService executor;
@BeforeEach
void setUp() {
executor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("rpc-callback"));
ruleEngineCallService = new DefaultRuleEngineCallService(tbClusterServiceMock);
ReflectionTestUtils.setField(ruleEngineCallService, "executor", executor);
ReflectionTestUtils.setField(ruleEngineCallService, "requests", requests);
}
@AfterEach
void tearDown() {
requests.clear();
if (executor != null) {
executor.shutdown();
try {
if (!executor.awaitTermination(10L, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
@Test
void givenRequest_whenProcessRestApiCallToRuleEngine_thenPushMsgToRuleEngine() {
long timeout = 100L;
long expTime = System.currentTimeMillis() + timeout;
HashMap<String, String> metaData = new HashMap<>();
UUID requestId = UUID.randomUUID();
metaData.put("serviceId", "core");
metaData.put("requestUUID", requestId.toString());
metaData.put("expirationTime", Long.toString(expTime));
TbMsg msg = TbMsg.newMsg("main", TbMsgType.REST_API_REQUEST, TENANT_ID, new TbMsgMetaData(metaData), "{\"key\":\"value\"}");
Consumer<TbMsg> anyConsumer = TbMsg::getData;
ruleEngineCallService.processRestApiCallToRuleEngine(TENANT_ID, requestId, msg, true, anyConsumer);
assertThat(requests.size()).isEqualTo(1);
assertThat(requests.get(requestId)).isEqualTo(anyConsumer);
verify(tbClusterServiceMock).pushMsgToRuleEngine(TENANT_ID, TENANT_ID, msg, true, null);
}
@Test
void givenSmallTimeout_whenProcessRestApiCallToRuleEngine_thenDoesNotReturnResponse() {
long timeout = 1L;
long expTime = System.currentTimeMillis() + timeout;
HashMap<String, String> metaData = new HashMap<>();
UUID requestId = UUID.randomUUID();
metaData.put("serviceId", "core");
metaData.put("requestUUID", requestId.toString());
metaData.put("expirationTime", Long.toString(expTime));
TbMsg msg = TbMsg.newMsg("main", TbMsgType.REST_API_REQUEST, TENANT_ID, new TbMsgMetaData(metaData), "{\"key\":\"value\"}");
Consumer<TbMsg> anyConsumer = TbMsg::getData;
doAnswer(invocation -> {
//check the presence of request in the map after pushMsgToRuleEngine()
assertThat(requests.size()).isEqualTo(1);
assertThat(requests.get(requestId)).isEqualTo(anyConsumer);
return null;
}).when(tbClusterServiceMock).pushMsgToRuleEngine(any(), any(), any(), anyBoolean(), any());
ruleEngineCallService.processRestApiCallToRuleEngine(TENANT_ID, requestId, msg, true, anyConsumer);
verify(tbClusterServiceMock).pushMsgToRuleEngine(TENANT_ID, TENANT_ID, msg, true, null);
//check map is empty after scheduleTimeout()
Awaitility.await("Await until request was deleted from map due to timeout")
.pollDelay(25, TimeUnit.MILLISECONDS)
.atMost(10, TimeUnit.SECONDS)
.until(requests::isEmpty);
}
@Test
void givenResponse_whenOnQueue_thenAcceptTbMsgResponse() {
long timeout = 10000L;
long expTime = System.currentTimeMillis() + timeout;
HashMap<String, String> metaData = new HashMap<>();
UUID requestId = UUID.randomUUID();
metaData.put("serviceId", "core");
metaData.put("requestUUID", requestId.toString());
metaData.put("expirationTime", Long.toString(expTime));
TbMsg msg = TbMsg.newMsg("main", TbMsgType.REST_API_REQUEST, TENANT_ID, new TbMsgMetaData(metaData), "{\"key\":\"value\"}");
Consumer<TbMsg> anyConsumer = TbMsg::getData;
doAnswer(invocation -> {
//check the presence of request in the map after pushMsgToRuleEngine()
assertThat(requests.size()).isEqualTo(1);
assertThat(requests.get(requestId)).isEqualTo(anyConsumer);
ruleEngineCallService.onQueueMsg(getResponse(requestId, msg), TbCallback.EMPTY);
//check map is empty after onQueueMsg()
assertThat(requests.size()).isEqualTo(0);
return null;
}).when(tbClusterServiceMock).pushMsgToRuleEngine(any(), any(), any(), anyBoolean(), any());
ruleEngineCallService.processRestApiCallToRuleEngine(TENANT_ID, requestId, msg, true, anyConsumer);
verify(tbClusterServiceMock).pushMsgToRuleEngine(TENANT_ID, TENANT_ID, msg, true, null);
}
private TransportProtos.RestApiCallResponseMsgProto getResponse(UUID requestId, TbMsg msg) {
return TransportProtos.RestApiCallResponseMsgProto.newBuilder()
.setResponse(TbMsg.toByteString(msg))
.setRequestIdMSB(requestId.getMostSignificantBits())
.setRequestIdLSB(requestId.getLeastSignificantBits())
.build();
}
}

6
common/cluster-api/src/main/java/org/thingsboard/server/cluster/TbClusterService.java

@ -18,7 +18,6 @@ package org.thingsboard.server.cluster;
import org.thingsboard.server.common.data.ApiUsageState;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.TbResource;
import org.thingsboard.server.common.data.TbResourceInfo;
import org.thingsboard.server.common.data.Tenant;
import org.thingsboard.server.common.data.TenantProfile;
@ -39,6 +38,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.RestApiCallResponseMsgProto;
import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.TbQueueClusterService;
@ -58,10 +58,14 @@ public interface TbClusterService extends TbQueueClusterService {
void pushNotificationToCore(String targetServiceId, FromDeviceRpcResponse response, TbQueueCallback callback);
void pushNotificationToCore(String targetServiceId, RestApiCallResponseMsgProto msg, TbQueueCallback callback);
void pushMsgToRuleEngine(TopicPartitionInfo tpi, UUID msgId, ToRuleEngineMsg msg, TbQueueCallback callback);
void pushMsgToRuleEngine(TenantId tenantId, EntityId entityId, TbMsg msg, TbQueueCallback callback);
void pushMsgToRuleEngine(TenantId tenantId, EntityId entityId, TbMsg msg, boolean useQueueFromTbMsg, TbQueueCallback callback);
void pushNotificationToRuleEngine(String targetServiceId, FromDeviceRpcResponse response, TbQueueCallback callback);
void pushNotificationToTransport(String targetServiceId, ToTransportMsg response, TbQueueCallback callback);

1
common/data/src/main/java/org/thingsboard/server/common/data/audit/ActionType.java

@ -40,6 +40,7 @@ public enum ActionType {
RELATION_ADD_OR_UPDATE(false, TbMsgType.RELATION_ADD_OR_UPDATE),
RELATION_DELETED(false, TbMsgType.RELATION_DELETED),
RELATIONS_DELETED(false, TbMsgType.RELATIONS_DELETED),
REST_API_RULE_ENGINE_CALL(false, null), // log call to rule engine from REST API
ALARM_ACK(false, TbMsgType.ALARM_ACK),
ALARM_CLEAR(false, TbMsgType.ALARM_CLEAR),
ALARM_DELETE(false, TbMsgType.ALARM_DELETE),

1
common/data/src/main/java/org/thingsboard/server/common/data/msg/TbMsgType.java

@ -67,6 +67,7 @@ public enum TbMsgType {
PROVISION_SUCCESS,
PROVISION_FAILURE,
SEND_EMAIL,
REST_API_REQUEST("REST API request"),
// tellSelfOnly types
GENERATOR_NODE_SELF_MSG(null, true),

7
common/proto/src/main/proto/queue.proto

@ -98,6 +98,12 @@ message SessionInfoProto {
int64 customerIdLSB = 15;
}
message RestApiCallResponseMsgProto {
int64 requestIdMSB = 1;
int64 requestIdLSB = 2;
bytes response = 5;
}
enum SessionEvent {
OPEN = 0;
CLOSED = 1;
@ -1465,6 +1471,7 @@ message ToCoreNotificationMsg {
ToEdgeSyncRequestMsgProto toEdgeSyncRequest = 11;
FromEdgeSyncResponseMsgProto fromEdgeSyncResponse = 12;
ResourceCacheInvalidateMsg resourceCacheInvalidateMsg = 13;
RestApiCallResponseMsgProto restApiCallResponseMsg = 50;
}
/* Messages that are handled by ThingsBoard RuleEngine Service */

45
rest-client/src/main/java/org/thingsboard/rest/client/RestClient.java

@ -3840,6 +3840,51 @@ public class RestClient implements Closeable {
}
}
public JsonNode handleRuleEngineRequest(JsonNode requestBody) {
return restTemplate.exchange(
baseURL + "/api/rule-engine",
HttpMethod.POST,
new HttpEntity<>(requestBody),
new ParameterizedTypeReference<JsonNode>() {
}).getBody();
}
public JsonNode handleRuleEngineRequest(EntityId entityId, JsonNode requestBody) {
return restTemplate.exchange(
baseURL + "/api/rule-engine/{entityType}/{entityId}",
HttpMethod.POST,
new HttpEntity<>(requestBody),
new ParameterizedTypeReference<JsonNode>() {
},
entityId.getEntityType(),
entityId.getId()).getBody();
}
public JsonNode handleRuleEngineRequest(EntityId entityId, int timeout, JsonNode requestBody) {
return restTemplate.exchange(
baseURL + "/api/rule-engine/{entityType}/{entityId}/{timeout}",
HttpMethod.POST,
new HttpEntity<>(requestBody),
new ParameterizedTypeReference<JsonNode>() {
},
entityId.getEntityType(),
entityId.getId(),
timeout).getBody();
}
public JsonNode handleRuleEngineRequest(EntityId entityId, String queueName, int timeout, JsonNode requestBody) {
return restTemplate.exchange(
baseURL + "/api/rule-engine/{entityType}/{entityId}/{queueName}/{timeout}",
HttpMethod.POST,
new HttpEntity<>(requestBody),
new ParameterizedTypeReference<JsonNode>() {
},
entityId.getEntityType(),
entityId.getId(),
queueName,
timeout).getBody();
}
private String getTimeUrlParams(TimePageLink pageLink) {
String urlParams = getUrlParams(pageLink);
if (pageLink.getStartTime() != null) {

3
rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineRpcService.java

@ -18,6 +18,7 @@ package org.thingsboard.rule.engine.api;
import org.thingsboard.server.common.data.id.RpcId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.rpc.Rpc;
import org.thingsboard.server.common.msg.TbMsg;
import java.util.UUID;
import java.util.function.Consumer;
@ -31,5 +32,7 @@ public interface RuleEngineRpcService {
void sendRpcRequestToDevice(RuleEngineDeviceRpcRequest request, Consumer<RuleEngineDeviceRpcResponse> consumer);
void sendRestApiCallReply(String serviceId, UUID requestId, TbMsg msg);
Rpc findRpcById(TenantId tenantId, RpcId id);
}

66
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbSendRestApiCallReplyNode.java

@ -0,0 +1,66 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.rest;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.msg.TbMsg;
import java.util.UUID;
@Slf4j
@RuleNode(
type = ComponentType.ACTION,
name = "rest call reply",
configClazz = TbSendRestApiCallReplyNodeConfiguration.class,
nodeDescription = "Sends reply to REST API call to rule engine",
nodeDetails = "Expects messages with any message type. Forwards incoming message as a reply to REST API call sent to rule engine.",
uiResources = {"static/rulenode/rulenode-core-config.js"},
configDirective = "tbActionNodeSendRestApiCallReplyConfig",
icon = "call_merge"
)
public class TbSendRestApiCallReplyNode implements TbNode {
private TbSendRestApiCallReplyNodeConfiguration config;
@Override
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
this.config = TbNodeUtils.convert(configuration, TbSendRestApiCallReplyNodeConfiguration.class);
}
@Override
public void onMsg(TbContext ctx, TbMsg msg) {
String serviceIdStr = msg.getMetaData().getValue(config.getServiceIdMetaDataAttribute());
String requestIdStr = msg.getMetaData().getValue(config.getRequestIdMetaDataAttribute());
if (StringUtils.isEmpty(requestIdStr)) {
ctx.tellFailure(msg, new RuntimeException("Request id is not present in the metadata!"));
} else if (StringUtils.isEmpty(serviceIdStr)) {
ctx.tellFailure(msg, new RuntimeException("Service id is not present in the metadata!"));
} else if (StringUtils.isEmpty(msg.getData())) {
ctx.tellFailure(msg, new RuntimeException("Request body is empty!"));
} else {
ctx.getRpcService().sendRestApiCallReply(serviceIdStr, UUID.fromString(requestIdStr), msg);
ctx.tellSuccess(msg);
}
}
}

45
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbSendRestApiCallReplyNodeConfiguration.java

@ -0,0 +1,45 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.rest;
import lombok.Data;
import org.thingsboard.rule.engine.api.NodeConfiguration;
import org.thingsboard.server.common.data.StringUtils;
@Data
public class TbSendRestApiCallReplyNodeConfiguration implements NodeConfiguration<TbSendRestApiCallReplyNodeConfiguration> {
public static final String SERVICE_ID = "serviceId";
public static final String REQUEST_UUID = "requestUUID";
private String serviceIdMetaDataAttribute;
private String requestIdMetaDataAttribute;
@Override
public TbSendRestApiCallReplyNodeConfiguration defaultConfiguration() {
TbSendRestApiCallReplyNodeConfiguration configuration = new TbSendRestApiCallReplyNodeConfiguration();
configuration.setRequestIdMetaDataAttribute(REQUEST_UUID);
configuration.setServiceIdMetaDataAttribute(SERVICE_ID);
return configuration;
}
public String getServiceIdMetaDataAttribute() {
return !StringUtils.isEmpty(serviceIdMetaDataAttribute) ? serviceIdMetaDataAttribute : SERVICE_ID;
}
public String getRequestIdMetaDataAttribute() {
return !StringUtils.isEmpty(requestIdMetaDataAttribute) ? requestIdMetaDataAttribute : REQUEST_UUID;
}
}

133
rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/rest/TbSendRestApiCallReplyNodeTest.java

@ -0,0 +1,133 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.rest;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.rule.engine.api.RuleEngineRpcService;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Stream;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatNoException;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
public class TbSendRestApiCallReplyNodeTest {
private static final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("212445ad-9852-4bfd-819d-6b01ab6ee6b6"));
private TbSendRestApiCallReplyNode node;
private TbSendRestApiCallReplyNodeConfiguration config;
@Mock
private TbContext ctxMock;
@Mock
private RuleEngineRpcService rpcServiceMock;
@BeforeEach
public void setUp() throws TbNodeException {
node = new TbSendRestApiCallReplyNode();
config = new TbSendRestApiCallReplyNodeConfiguration().defaultConfiguration();
var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
node.init(ctxMock, configuration);
}
@Test
public void givenDefaultConfig_whenInit_thenDoesNotThrowException() {
var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
assertThatNoException().isThrownBy(() -> node.init(ctxMock, configuration));
}
@ParameterizedTest
@MethodSource
public void givenValidRestApiRequest_whenOnMsg_thenTellSuccess(String requestIdAttribute, String serviceIdAttribute) throws TbNodeException {
config.setRequestIdMetaDataAttribute(requestIdAttribute);
config.setServiceIdMetaDataAttribute(serviceIdAttribute);
var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
node.init(ctxMock, configuration);
when(ctxMock.getRpcService()).thenReturn(rpcServiceMock);
String requestUUIDStr = "80b7883b-7ec6-4872-9dd3-b2afd5660fa6";
String serviceIdStr = "tb-core-0";
String data = """
{
"temperature": 23,
}
""";
Map<String, String> metadata = Map.of(
requestIdAttribute, requestUUIDStr,
serviceIdAttribute, serviceIdStr);
TbMsg msg = TbMsg.newMsg(TbMsgType.REST_API_REQUEST, DEVICE_ID, new TbMsgMetaData(metadata), data);
node.onMsg(ctxMock, msg);
UUID requestUUID = UUID.fromString(requestUUIDStr);
verify(rpcServiceMock).sendRestApiCallReply(serviceIdStr, requestUUID, msg);
verify(ctxMock).tellSuccess(msg);
}
private static Stream<Arguments> givenValidRestApiRequest_whenOnMsg_thenTellSuccess() {
return Stream.of(
Arguments.of("requestId", "service"),
Arguments.of("requestUUID", "serviceId"),
Arguments.of("some_custom_request_id_field", "some_custom_service_id_field")
);
}
@ParameterizedTest
@MethodSource
public void givenInvalidRequest_whenOnMsg_thenTellFailure(TbMsgMetaData metaData, String data, String errorMsg) {
TbMsg msg = TbMsg.newMsg(TbMsgType.REST_API_REQUEST, DEVICE_ID, metaData, data);
node.onMsg(ctxMock, msg);
ArgumentCaptor<Throwable> captor = ArgumentCaptor.forClass(Throwable.class);
verify(ctxMock).tellFailure(eq(msg), captor.capture());
Throwable throwable = captor.getValue();
assertThat(throwable).isInstanceOf(RuntimeException.class).hasMessage(errorMsg);
}
private static Stream<Arguments> givenInvalidRequest_whenOnMsg_thenTellFailure() {
return Stream.of(
Arguments.of(TbMsgMetaData.EMPTY, TbMsg.EMPTY_STRING, "Request id is not present in the metadata!"),
Arguments.of(new TbMsgMetaData(Map.of("requestUUID", "e1dd3985-efad-45a0-b0d2-0ff5dff2ccac")),
TbMsg.EMPTY_STRING, "Service id is not present in the metadata!"),
Arguments.of(new TbMsgMetaData(Map.of("serviceId", "tb-core-0")),
TbMsg.EMPTY_STRING, "Request id is not present in the metadata!"),
Arguments.of(new TbMsgMetaData(Map.of("requestUUID", "e1dd3985-efad-45a0-b0d2-0ff5dff2ccac", "serviceId", "tb-core-0")),
TbMsg.EMPTY_STRING, "Request body is empty!")
);
}
}
Loading…
Cancel
Save