From 74ac5a949f2ff66e4e16213dc73d635b0ffdd2bf Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Tue, 14 May 2024 10:05:41 +0300 Subject: [PATCH] added rule engine controller and send rest api call reply node --- .../server/controller/BaseController.java | 4 + .../controller/ControllerConstants.java | 2 + .../controller/RuleEngineController.java | 237 ++++++++++++++++ .../server/controller/TbUrlConstants.java | 1 + .../queue/DefaultTbClusterService.java | 20 +- .../queue/DefaultTbCoreConsumerService.java | 13 +- .../rpc/DefaultTbRuleEngineRpcService.java | 11 + .../DefaultRuleEngineCallService.java | 100 +++++++ .../ruleengine/RuleEngineCallService.java | 31 +++ .../controller/AbstractNotifyEntityTest.java | 21 +- .../server/controller/AbstractWebTest.java | 10 + .../controller/RuleEngineControllerTest.java | 252 ++++++++++++++++++ .../queue/DefaultTbClusterServiceTest.java | 51 ++++ .../DefaultTbCoreConsumerServiceTest.java | 16 ++ .../DefaultTbRuleEngineRpcServiceTest.java | 61 +++++ .../DefaultRuleEngineCallServiceTest.java | 166 ++++++++++++ .../server/cluster/TbClusterService.java | 6 +- .../server/common/data/audit/ActionType.java | 1 + .../server/common/data/msg/TbMsgType.java | 1 + common/proto/src/main/proto/queue.proto | 7 + .../thingsboard/rest/client/RestClient.java | 45 ++++ .../rule/engine/api/RuleEngineRpcService.java | 3 + .../rest/TbSendRestApiCallReplyNode.java | 66 +++++ ...SendRestApiCallReplyNodeConfiguration.java | 45 ++++ .../rest/TbSendRestApiCallReplyNodeTest.java | 133 +++++++++ 25 files changed, 1292 insertions(+), 11 deletions(-) create mode 100644 application/src/main/java/org/thingsboard/server/controller/RuleEngineController.java create mode 100644 application/src/main/java/org/thingsboard/server/service/ruleengine/DefaultRuleEngineCallService.java create mode 100644 application/src/main/java/org/thingsboard/server/service/ruleengine/RuleEngineCallService.java create mode 100644 application/src/test/java/org/thingsboard/server/controller/RuleEngineControllerTest.java create mode 100644 application/src/test/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcServiceTest.java create mode 100644 application/src/test/java/org/thingsboard/server/service/ruleengine/DefaultRuleEngineCallServiceTest.java create mode 100644 rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbSendRestApiCallReplyNode.java create mode 100644 rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbSendRestApiCallReplyNodeConfiguration.java create mode 100644 rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/rest/TbSendRestApiCallReplyNodeTest.java diff --git a/application/src/main/java/org/thingsboard/server/controller/BaseController.java b/application/src/main/java/org/thingsboard/server/controller/BaseController.java index 0942329f40..0ae129b119 100644 --- a/application/src/main/java/org/thingsboard/server/controller/BaseController.java +++ b/application/src/main/java/org/thingsboard/server/controller/BaseController.java @@ -140,6 +140,7 @@ import org.thingsboard.server.dao.widget.WidgetTypeService; import org.thingsboard.server.dao.widget.WidgetsBundleService; import org.thingsboard.server.exception.ThingsboardErrorResponseHandler; import org.thingsboard.server.queue.discovery.PartitionService; +import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.provider.TbQueueProducerProvider; import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.action.EntityActionService; @@ -313,6 +314,9 @@ public abstract class BaseController { @Autowired protected ExportableEntitiesService entitiesService; + @Autowired + protected TbServiceInfoProvider serviceInfoProvider; + @Value("${server.log_controller_error_stack_trace}") @Getter private boolean logControllerErrorStackTrace; diff --git a/application/src/main/java/org/thingsboard/server/controller/ControllerConstants.java b/application/src/main/java/org/thingsboard/server/controller/ControllerConstants.java index d9232cda78..d41ffc8fc6 100644 --- a/application/src/main/java/org/thingsboard/server/controller/ControllerConstants.java +++ b/application/src/main/java/org/thingsboard/server/controller/ControllerConstants.java @@ -1710,4 +1710,6 @@ public class ControllerConstants { MARKDOWN_CODE_BLOCK_START + "[{\"ts\":1634712287000,\"values\":{\"temperature\":26, \"humidity\":87}}, {\"ts\":1634712588000,\"values\":{\"temperature\":25, \"humidity\":88}}]" + MARKDOWN_CODE_BLOCK_END ; + + protected static final String SECURITY_WRITE_CHECK = " Security check is performed to verify that the user has 'WRITE' permission for the entity (entities)."; } diff --git a/application/src/main/java/org/thingsboard/server/controller/RuleEngineController.java b/application/src/main/java/org/thingsboard/server/controller/RuleEngineController.java new file mode 100644 index 0000000000..16458e42db --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/controller/RuleEngineController.java @@ -0,0 +1,237 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.controller; + +import com.google.common.util.concurrent.FutureCallback; +import io.swagger.v3.oas.annotations.Parameter; +import jakarta.annotation.Nullable; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.security.access.prepost.PreAuthorize; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.ResponseBody; +import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.context.request.async.DeferredResult; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.server.common.data.StringUtils; +import org.thingsboard.server.common.data.audit.ActionType; +import org.thingsboard.server.common.data.exception.ThingsboardErrorCode; +import org.thingsboard.server.common.data.exception.ThingsboardException; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.EntityIdFactory; +import org.thingsboard.server.common.data.msg.TbMsgType; +import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.TbMsgMetaData; +import org.thingsboard.server.config.annotations.ApiOperation; +import org.thingsboard.server.exception.ToErrorResponseEntity; +import org.thingsboard.server.queue.util.TbCoreComponent; +import org.thingsboard.server.service.ruleengine.RuleEngineCallService; +import org.thingsboard.server.service.security.AccessValidator; +import org.thingsboard.server.service.security.model.SecurityUser; +import org.thingsboard.server.service.security.permission.Operation; + +import java.util.HashMap; +import java.util.UUID; +import java.util.concurrent.TimeoutException; + +import static org.thingsboard.server.controller.ControllerConstants.ENTITY_ID_PARAM_DESCRIPTION; +import static org.thingsboard.server.controller.ControllerConstants.ENTITY_TYPE_PARAM_DESCRIPTION; + +@RestController +@TbCoreComponent +@RequestMapping(TbUrlConstants.RULE_ENGINE_URL_PREFIX) +@Slf4j +public class RuleEngineController extends BaseController { + public static final int DEFAULT_TIMEOUT = 10000; + private static final String MSG_DESCRIPTION_PREFIX = "Creates the Message with type 'REST_API_REQUEST' and payload taken from the request body. "; + private static final String MSG_DESCRIPTION = "This method allows you to extend the regular platform API with the power of Rule Engine. You may use default and custom rule nodes to handle the message. " + + "The generated message contains two important metadata fields:\n\n" + + " * **'serviceId'** to identify the platform server that received the request;\n" + + " * **'requestUUID'** to identify the request and route possible response from the Rule Engine;\n\n" + + "Use **'rest call reply'** rule node to push the reply from rule engine back as a REST API call response. "; + + @Autowired + private RuleEngineCallService ruleEngineCallService; + @Autowired + private AccessValidator accessValidator; + + @ApiOperation(value = "Push user message to the rule engine (handleRuleEngineRequest)", + notes = MSG_DESCRIPTION_PREFIX + + "Uses current User Id ( the one which credentials is used to perform the request) as the Rule Engine message originator. " + + MSG_DESCRIPTION + + "The default timeout of the request processing is 10 seconds." + + "\n\n" + ControllerConstants.SECURITY_WRITE_CHECK) + @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')") + @RequestMapping(value = "/", method = RequestMethod.POST) + @ResponseBody + public DeferredResult handleRuleEngineRequest( + @Parameter(description = "A JSON value representing the message.", required = true) + @RequestBody String requestBody) throws ThingsboardException { + return handleRuleEngineRequest(null, null, null, DEFAULT_TIMEOUT, requestBody); + } + + @ApiOperation(value = "Push entity message to the rule engine (handleRuleEngineRequest)", + notes = MSG_DESCRIPTION_PREFIX + + "Uses specified Entity Id as the Rule Engine message originator. " + + MSG_DESCRIPTION + + "The default timeout of the request processing is 10 seconds." + + "\n\n" + ControllerConstants.SECURITY_WRITE_CHECK) + @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')") + @RequestMapping(value = "/{entityType}/{entityId}", method = RequestMethod.POST) + @ResponseBody + public DeferredResult handleRuleEngineRequest( + @Parameter(description = ENTITY_TYPE_PARAM_DESCRIPTION, required = true) + @PathVariable("entityType") String entityType, + @Parameter(description = ENTITY_ID_PARAM_DESCRIPTION, required = true) + @PathVariable("entityId") String entityIdStr, + @Parameter(description = "A JSON value representing the message.", required = true) + @RequestBody String requestBody) throws ThingsboardException { + return handleRuleEngineRequest(entityType, entityIdStr, null, DEFAULT_TIMEOUT, requestBody); + } + + @ApiOperation(value = "Push entity message with timeout to the rule engine (handleRuleEngineRequest)", + notes = MSG_DESCRIPTION_PREFIX + + "Uses specified Entity Id as the Rule Engine message originator. " + + MSG_DESCRIPTION + + "The platform expects the timeout value in milliseconds." + + "\n\n" + ControllerConstants.SECURITY_WRITE_CHECK) + @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')") + @RequestMapping(value = "/{entityType}/{entityId}/{timeout}", method = RequestMethod.POST) + @ResponseBody + public DeferredResult handleRuleEngineRequest( + @Parameter(description = ENTITY_TYPE_PARAM_DESCRIPTION, required = true) + @PathVariable("entityType") String entityType, + @Parameter(description = ENTITY_ID_PARAM_DESCRIPTION, required = true) + @PathVariable("entityId") String entityIdStr, + @Parameter(description = "Timeout to process the request in milliseconds", required = true) + @PathVariable("timeout") int timeout, + @Parameter(description = "A JSON value representing the message.", required = true) + @RequestBody String requestBody) throws ThingsboardException { + return handleRuleEngineRequest(entityType, entityIdStr, null, timeout, requestBody); + } + + @ApiOperation(value = "Push entity message with timeout and specified queue to the rule engine (handleRuleEngineRequest)", + notes = MSG_DESCRIPTION_PREFIX + + "Uses specified Entity Id as the Rule Engine message originator. " + + MSG_DESCRIPTION + + "If request sent for Device/Device Profile or Asset/Asset Profile entity, specified queue will be used instead of the queue selected in the device or asset profile. " + + "The platform expects the timeout value in milliseconds." + + "\n\n" + ControllerConstants.SECURITY_WRITE_CHECK) + @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')") + @RequestMapping(value = "/{entityType}/{entityId}/{queueName}/{timeout}", method = RequestMethod.POST) + @ResponseBody + public DeferredResult handleRuleEngineRequest( + @Parameter(description = ENTITY_TYPE_PARAM_DESCRIPTION, required = true) + @PathVariable("entityType") String entityType, + @Parameter(description = ENTITY_ID_PARAM_DESCRIPTION, required = true) + @PathVariable("entityId") String entityIdStr, + @Parameter(description = "Queue name to process the request in the rule engine", required = true) + @PathVariable("queueName") String queueName, + @Parameter(description = "Timeout to process the request in milliseconds", required = true) + @PathVariable("timeout") int timeout, + @Parameter(description = "A JSON value representing the message.", required = true) + @RequestBody String requestBody) throws ThingsboardException { + try { + SecurityUser currentUser = getCurrentUser(); + EntityId entityId; + if (StringUtils.isEmpty(entityType) || StringUtils.isEmpty(entityIdStr)) { + entityId = currentUser.getId(); + } else { + entityId = EntityIdFactory.getByTypeAndId(entityType, entityIdStr); + } + //Check that this is a valid JSON + JacksonUtil.toJsonNode(requestBody); + final DeferredResult response = new DeferredResult<>(); + accessValidator.validate(currentUser, Operation.WRITE, entityId, new HttpValidationCallback(response, new FutureCallback>() { + @Override + public void onSuccess(@Nullable DeferredResult result) { + long expTime = System.currentTimeMillis() + timeout; + HashMap metaData = new HashMap<>(); + UUID requestId = UUID.randomUUID(); + metaData.put("serviceId", serviceInfoProvider.getServiceId()); + metaData.put("requestUUID", requestId.toString()); + metaData.put("expirationTime", Long.toString(expTime)); + TbMsg msg = TbMsg.newMsg(queueName, TbMsgType.REST_API_REQUEST, entityId, currentUser.getCustomerId(), new TbMsgMetaData(metaData), requestBody); + ruleEngineCallService.processRestApiCallToRuleEngine(currentUser.getTenantId(), requestId, msg, queueName != null, + reply -> reply(new LocalRequestMetaData(msg, currentUser, result), reply)); + } + + @Override + public void onFailure(Throwable e) { + ResponseEntity entity; + if (e instanceof ToErrorResponseEntity) { + entity = ((ToErrorResponseEntity) e).toErrorResponseEntity(); + } else { + entity = new ResponseEntity(HttpStatus.UNAUTHORIZED); + } + logRuleEngineCall(currentUser, entityId, requestBody, null, e); + response.setResult(entity); + } + })); + return response; + } catch (IllegalArgumentException iae) { + throw new ThingsboardException("Invalid request body", iae, ThingsboardErrorCode.BAD_REQUEST_PARAMS); + } + } + + private void reply(LocalRequestMetaData rpcRequest, TbMsg response) { + DeferredResult responseWriter = rpcRequest.responseWriter; + if (response == null) { + logRuleEngineCall(rpcRequest, null, new TimeoutException("Processing timeout detected!")); + responseWriter.setResult(new ResponseEntity<>(HttpStatus.REQUEST_TIMEOUT)); + } else { + String responseData = response.getData(); + if (!StringUtils.isEmpty(responseData)) { + try { + logRuleEngineCall(rpcRequest, response, null); + responseWriter.setResult(new ResponseEntity<>(JacksonUtil.toJsonNode(responseData), HttpStatus.OK)); + } catch (IllegalArgumentException e) { + log.debug("Failed to decode device response: {}", responseData, e); + logRuleEngineCall(rpcRequest, response, e); + responseWriter.setResult(new ResponseEntity<>(HttpStatus.NOT_ACCEPTABLE)); + } + } else { + logRuleEngineCall(rpcRequest, response, null); + responseWriter.setResult(new ResponseEntity<>(HttpStatus.OK)); + } + } + } + + private void logRuleEngineCall(LocalRequestMetaData rpcRequest, TbMsg response, Throwable e) { + logRuleEngineCall(rpcRequest.user, rpcRequest.request.getOriginator(), rpcRequest.request.getData(), response, e); + } + + private void logRuleEngineCall(SecurityUser user, EntityId entityId, String request, TbMsg response, Throwable e) { + auditLogService.logEntityAction( + user.getTenantId(), + user.getCustomerId(), + user.getId(), + user.getName(), + entityId, + null, + ActionType.REST_API_RULE_ENGINE_CALL, + BaseController.toException(e), + request, + response != null ? response.getData() : ""); + } + + private record LocalRequestMetaData(TbMsg request, SecurityUser user, DeferredResult responseWriter) {} +} diff --git a/application/src/main/java/org/thingsboard/server/controller/TbUrlConstants.java b/application/src/main/java/org/thingsboard/server/controller/TbUrlConstants.java index 99d94522ac..74575fd9d5 100644 --- a/application/src/main/java/org/thingsboard/server/controller/TbUrlConstants.java +++ b/application/src/main/java/org/thingsboard/server/controller/TbUrlConstants.java @@ -22,4 +22,5 @@ public class TbUrlConstants { public static final String TELEMETRY_URL_PREFIX = "/api/plugins/telemetry"; public static final String RPC_V1_URL_PREFIX = "/api/plugins/rpc"; public static final String RPC_V2_URL_PREFIX = "/api/rpc"; + public static final String RULE_ENGINE_URL_PREFIX = "/api/rule-engine/"; } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java index 7af2579b89..0c62620fab 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java @@ -30,7 +30,6 @@ import org.thingsboard.server.common.data.EdgeUtils; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.HasName; import org.thingsboard.server.common.data.HasRuleEngineProfile; -import org.thingsboard.server.common.data.TbResource; import org.thingsboard.server.common.data.TbResourceInfo; import org.thingsboard.server.common.data.Tenant; import org.thingsboard.server.common.data.TenantProfile; @@ -183,6 +182,14 @@ public class DefaultTbClusterService implements TbClusterService { toCoreNfs.incrementAndGet(); } + @Override + public void pushNotificationToCore(String targetServiceId, TransportProtos.RestApiCallResponseMsgProto responseMsgProto, TbQueueCallback callback) { + TopicPartitionInfo tpi = topicService.getNotificationsTopic(ServiceType.TB_CORE, targetServiceId); + ToCoreNotificationMsg msg = ToCoreNotificationMsg.newBuilder().setRestApiCallResponseMsg(responseMsgProto).build(); + producerProvider.getTbCoreNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), msg), callback); + toCoreNfs.incrementAndGet(); + } + @Override public void pushMsgToRuleEngine(TopicPartitionInfo tpi, UUID msgId, ToRuleEngineMsg msg, TbQueueCallback callback) { log.trace("PUSHING msg: {} to:{}", msg, tpi); @@ -192,6 +199,11 @@ public class DefaultTbClusterService implements TbClusterService { @Override public void pushMsgToRuleEngine(TenantId tenantId, EntityId entityId, TbMsg tbMsg, TbQueueCallback callback) { + pushMsgToRuleEngine(tenantId, entityId, tbMsg, false, callback); + } + + @Override + public void pushMsgToRuleEngine(TenantId tenantId, EntityId entityId, TbMsg tbMsg, boolean useQueueFromTbMsg, TbQueueCallback callback) { if (tenantId == null || tenantId.isNullUid()) { if (entityId.getEntityType().equals(EntityType.TENANT)) { tenantId = TenantId.fromUUID(entityId.getId()); @@ -201,7 +213,7 @@ public class DefaultTbClusterService implements TbClusterService { } } else { HasRuleEngineProfile ruleEngineProfile = getRuleEngineProfileForEntityOrElseNull(tenantId, entityId); - tbMsg = transformMsg(tbMsg, ruleEngineProfile); + tbMsg = transformMsg(tbMsg, ruleEngineProfile, useQueueFromTbMsg); } TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tbMsg.getQueueName(), tenantId, entityId); log.trace("PUSHING msg: {} to:{}", tbMsg, tpi); @@ -226,10 +238,10 @@ public class DefaultTbClusterService implements TbClusterService { return null; } - private TbMsg transformMsg(TbMsg tbMsg, HasRuleEngineProfile ruleEngineProfile) { + private TbMsg transformMsg(TbMsg tbMsg, HasRuleEngineProfile ruleEngineProfile, boolean useQueueFromTbMsg) { if (ruleEngineProfile != null) { RuleChainId targetRuleChainId = ruleEngineProfile.getDefaultRuleChainId(); - String targetQueueName = ruleEngineProfile.getDefaultQueueName(); + String targetQueueName = useQueueFromTbMsg ? tbMsg.getQueueName() : ruleEngineProfile.getDefaultQueueName(); boolean isRuleChainTransform = targetRuleChainId != null && !targetRuleChainId.equals(tbMsg.getRuleChainId()); boolean isQueueTransform = targetQueueName != null && !targetQueueName.equals(tbMsg.getQueueName()); diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java index fbef0cfd29..1d016ee33d 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java @@ -96,6 +96,7 @@ import org.thingsboard.server.service.queue.processing.AbstractConsumerService; import org.thingsboard.server.service.queue.processing.IdMsgPair; import org.thingsboard.server.service.resource.TbImageService; import org.thingsboard.server.service.rpc.TbCoreDeviceRpcService; +import org.thingsboard.server.service.ruleengine.RuleEngineCallService; import org.thingsboard.server.service.security.auth.jwt.settings.JwtSettingsService; import org.thingsboard.server.service.state.DeviceStateService; import org.thingsboard.server.service.subscription.SubscriptionManagerService; @@ -149,6 +150,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService, CoreQueueConfig> mainConsumer; @@ -177,7 +179,8 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService> requests = new ConcurrentHashMap<>(); + + public DefaultRuleEngineCallService(TbClusterService clusterService) { + this.clusterService = clusterService; + } + + @PostConstruct + public void initExecutor() { + executor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("re-rest-callback")); + } + + @PreDestroy + public void shutdownExecutor() { + if (executor != null) { + executor.shutdownNow(); + } + } + + @Override + public void processRestApiCallToRuleEngine(TenantId tenantId, UUID requestId, TbMsg request, boolean useQueueFromTbMsg, Consumer responseConsumer) { + log.trace("[{}] Processing REST API call to rule engine: [{}] for entity: [{}]", tenantId, requestId, request.getOriginator()); + requests.put(requestId, responseConsumer); + sendRequestToRuleEngine(tenantId, request, useQueueFromTbMsg); + scheduleTimeout(request, requestId, requests); + } + + @Override + public void onQueueMsg(TransportProtos.RestApiCallResponseMsgProto restApiCallResponseMsg, TbCallback callback) { + UUID requestId = new UUID(restApiCallResponseMsg.getRequestIdMSB(), restApiCallResponseMsg.getRequestIdLSB()); + Consumer consumer = requests.remove(requestId); + if (consumer != null) { + consumer.accept(TbMsg.fromBytes(null, restApiCallResponseMsg.getResponse().toByteArray(), TbMsgCallback.EMPTY)); + } else { + log.trace("[{}] Unknown or stale rest api call response received", requestId); + } + callback.onSuccess(); + } + + private void sendRequestToRuleEngine(TenantId tenantId, TbMsg msg, boolean useQueueFromTbMsg) { + clusterService.pushMsgToRuleEngine(tenantId, msg.getOriginator(), msg, useQueueFromTbMsg, null); + } + + private void scheduleTimeout(TbMsg request, UUID requestId, ConcurrentMap> requestsMap) { + long expirationTime = Long.parseLong(request.getMetaData().getValue("expirationTime")); + long timeout = Math.max(0, expirationTime - System.currentTimeMillis()); + log.trace("[{}] processing the request: [{}]", this.hashCode(), requestId); + executor.schedule(() -> { + Consumer consumer = requestsMap.remove(requestId); + if (consumer != null) { + log.trace("[{}] request timeout detected: [{}]", this.hashCode(), requestId); + consumer.accept(null); + } + }, timeout, TimeUnit.MILLISECONDS); + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/ruleengine/RuleEngineCallService.java b/application/src/main/java/org/thingsboard/server/service/ruleengine/RuleEngineCallService.java new file mode 100644 index 0000000000..cdaa90398a --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/ruleengine/RuleEngineCallService.java @@ -0,0 +1,31 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.ruleengine; + +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.queue.TbCallback; +import org.thingsboard.server.gen.transport.TransportProtos; + +import java.util.UUID; +import java.util.function.Consumer; + +public interface RuleEngineCallService { + + void processRestApiCallToRuleEngine(TenantId tenantId, UUID requestId, TbMsg request, boolean useQueueFromTbMsg, Consumer responseConsumer); + + void onQueueMsg(TransportProtos.RestApiCallResponseMsgProto restApiCallResponseMsg, TbCallback callback); +} diff --git a/application/src/test/java/org/thingsboard/server/controller/AbstractNotifyEntityTest.java b/application/src/test/java/org/thingsboard/server/controller/AbstractNotifyEntityTest.java index 35c2ea65fa..9bb49c1d62 100644 --- a/application/src/test/java/org/thingsboard/server/controller/AbstractNotifyEntityTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/AbstractNotifyEntityTest.java @@ -386,6 +386,16 @@ public abstract class AbstractNotifyEntityTest extends AbstractWebTest { actionType, cntTime, extractMatcherAdditionalInfo(additionalInfo)); } + protected void testLogEntityActionError(HasName entity, EntityId originatorId, TenantId tenantId, + CustomerId customerId, UserId userId, String userName, + ActionType actionType, Exception exp, Object... additionalInfo) { + ArgumentMatcher matcherError = argument -> argument.getMessage().contains(exp.getMessage()) + & argument.getClass().equals(exp.getClass()); + ArgumentMatcher matcherEntityEquals = entity == null ? Objects::isNull : argument -> argument.toString().equals(entity.toString()); + testLogEntityActionErrorAdditionalInfo(matcherEntityEquals, originatorId, tenantId, customerId, userId, userName, + actionType, 1, matcherError, extractMatcherAdditionalInfo(additionalInfo)); + } + private void testLogEntityActionAdditionalInfo(ArgumentMatcher matcherEntity, ArgumentMatcher matcherOriginatorId, TenantId tenantId, ArgumentMatcher matcherCustomerId, ArgumentMatcher matcherUserId, String userName, ActionType actionType, @@ -529,8 +539,9 @@ public abstract class AbstractNotifyEntityTest extends AbstractWebTest { Mockito.argThat(matcherEntity), Mockito.eq(actionType), Mockito.argThat(matcherError), - Mockito.argThat(Mockito.eq(matcherAdditionalInfos.get(0))), - Mockito.argThat(Mockito.eq(matcherAdditionalInfos.get(1)))); + Mockito.argThat(matcherAdditionalInfos.get(0)), + Mockito.argThat(matcherAdditionalInfos.get(1))); + break; case 3: Mockito.verify(auditLogService, times(cntTime)) .logEntityAction(Mockito.eq(tenantId), @@ -541,9 +552,9 @@ public abstract class AbstractNotifyEntityTest extends AbstractWebTest { Mockito.argThat(matcherEntity), Mockito.eq(actionType), Mockito.argThat(matcherError), - Mockito.argThat(Mockito.eq(matcherAdditionalInfos.get(0))), - Mockito.argThat(Mockito.eq(matcherAdditionalInfos.get(1))), - Mockito.argThat(Mockito.eq(matcherAdditionalInfos.get(2)))); + Mockito.argThat(matcherAdditionalInfos.get(0)), + Mockito.argThat(matcherAdditionalInfos.get(1)), + Mockito.argThat(matcherAdditionalInfos.get(2))); break; default: Mockito.verify(auditLogService, times(cntTime)) diff --git a/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java b/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java index b83a1b567c..32ff733e0c 100644 --- a/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java @@ -633,6 +633,12 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest { return doPost("/api/device?accessToken=" + accessToken, device, Device.class); } + protected Device assignDeviceToCustomer(String name, String accessToken, CustomerId customerId) throws Exception { + Device device = createDevice(name, accessToken); + String deviceIdStr = String.valueOf(device.getId().getId()); + return doPost("/api/customer/" + customerId.getId() + "/device/" + deviceIdStr, device, Device.class); + } + protected MqttDeviceProfileTransportConfiguration createMqttDeviceProfileTransportConfiguration(TransportPayloadTypeConfiguration transportPayloadTypeConfiguration, boolean sendAckOnValidationException) { MqttDeviceProfileTransportConfiguration mqttDeviceProfileTransportConfiguration = new MqttDeviceProfileTransportConfiguration(); mqttDeviceProfileTransportConfiguration.setDeviceTelemetryTopic(MqttTopics.DEVICE_TELEMETRY_TOPIC); @@ -803,6 +809,10 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest { return readResponse(doPost(urlTemplate, content, params).andExpect(resultMatcher), responseType); } + protected R doPostAsyncWithTypedResponse(String urlTemplate, T content, TypeReference responseType, ResultMatcher resultMatcher, String... params) throws Exception { + return readResponse(doPostAsync(urlTemplate, content, DEFAULT_TIMEOUT, params).andExpect(resultMatcher), responseType); + } + protected T doPostAsync(String urlTemplate, T content, Class responseClass, ResultMatcher resultMatcher, String... params) throws Exception { return readResponse(doPostAsync(urlTemplate, content, DEFAULT_TIMEOUT, params).andExpect(resultMatcher), responseClass); } diff --git a/application/src/test/java/org/thingsboard/server/controller/RuleEngineControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/RuleEngineControllerTest.java new file mode 100644 index 0000000000..dac268588d --- /dev/null +++ b/application/src/test/java/org/thingsboard/server/controller/RuleEngineControllerTest.java @@ -0,0 +1,252 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.controller; + +import com.fasterxml.jackson.core.type.TypeReference; +import io.jsonwebtoken.Claims; +import io.jsonwebtoken.Jws; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.mock.mockito.SpyBean; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.test.web.servlet.MvcResult; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.server.common.data.Device; +import org.thingsboard.server.common.data.audit.ActionType; +import org.thingsboard.server.common.data.id.CustomerId; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.id.UserId; +import org.thingsboard.server.common.data.msg.TbMsgType; +import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.TbMsgMetaData; +import org.thingsboard.server.dao.service.DaoSqlTest; +import org.thingsboard.server.service.ruleengine.RuleEngineCallService; +import org.thingsboard.server.service.security.model.token.JwtTokenFactory; + +import java.util.Objects; +import java.util.UUID; +import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; + +@DaoSqlTest +public class RuleEngineControllerTest extends AbstractControllerTest { + + private static final String REQUEST_BODY = "{\"temperature\":23}"; + + @SpyBean + private RuleEngineCallService ruleEngineCallService; + + @Autowired + private JwtTokenFactory jwtTokenFactory; + + @Test + public void testHandleRuleEngineRequestWithMsgOriginatorUser() throws Exception { + loginSysAdmin(); + UserId sysAdminUserId = getCurrentUserId(); + TbMsg msg = TbMsg.newMsg(TbMsgType.REST_API_REQUEST, sysAdminUserId, new CustomerId(TenantId.SYS_TENANT_ID.getId()), TbMsgMetaData.EMPTY, REQUEST_BODY); + doAnswer(invocation -> { + Consumer consumer = invocation.getArgument(4); + consumer.accept(msg); + return null; + }).when(ruleEngineCallService).processRestApiCallToRuleEngine(eq(TenantId.SYS_TENANT_ID), any(UUID.class), any(TbMsg.class), anyBoolean(), any()); + + var response = doPostAsyncWithTypedResponse("/api/rule-engine/", REQUEST_BODY, new TypeReference<>() { + }, status().isOk()); + + assertThat(Objects.requireNonNull(JacksonUtil.toString(response))).isEqualTo(REQUEST_BODY); + ArgumentCaptor captor = ArgumentCaptor.forClass(TbMsg.class); + verify(ruleEngineCallService).processRestApiCallToRuleEngine(eq(TenantId.SYS_TENANT_ID), any(), captor.capture(), eq(false), any()); + TbMsg tbMsg = captor.getValue(); + assertThat(tbMsg.getData()).isEqualTo(REQUEST_BODY); + assertThat(tbMsg.getType()).isEqualTo(msg.getType()); + assertThat(tbMsg.getOriginator()).isEqualTo(sysAdminUserId); + testLogEntityAction(null, sysAdminUserId, TenantId.SYS_TENANT_ID, new CustomerId(TenantId.SYS_TENANT_ID.getId()), sysAdminUserId, + SYS_ADMIN_EMAIL, ActionType.REST_API_RULE_ENGINE_CALL, 1, REQUEST_BODY, REQUEST_BODY); + } + + @Test + public void testHandleRuleEngineRequestWithMsgOriginatorDevice() throws Exception { + loginTenantAdmin(); + Device device = createDevice("Test", "123"); + DeviceId deviceId = device.getId(); + TbMsg msg = TbMsg.newMsg(TbMsgType.REST_API_REQUEST, deviceId, new CustomerId(TenantId.SYS_TENANT_ID.getId()), TbMsgMetaData.EMPTY, REQUEST_BODY); + mockSuccessfulRestApiCallToRuleEngine(msg); + + var response = doPostAsyncWithTypedResponse("/api/rule-engine/DEVICE/" + deviceId.getId(), REQUEST_BODY, new TypeReference<>() { + }, status().isOk()); + + assertThat(Objects.requireNonNull(JacksonUtil.toString(response))).isEqualTo(REQUEST_BODY); + ArgumentCaptor captor = ArgumentCaptor.forClass(TbMsg.class); + verify(ruleEngineCallService).processRestApiCallToRuleEngine(eq(tenantId), any(), captor.capture(), eq(false), any()); + TbMsg tbMsg = captor.getValue(); + assertThat(tbMsg.getData()).isEqualTo(REQUEST_BODY); + assertThat(tbMsg.getType()).isEqualTo(msg.getType()); + assertThat(tbMsg.getOriginator()).isEqualTo(deviceId); + testLogEntityAction(null, deviceId, tenantId, new CustomerId(TenantId.SYS_TENANT_ID.getId()), tenantAdminUserId, + TENANT_ADMIN_EMAIL, ActionType.REST_API_RULE_ENGINE_CALL, 1, REQUEST_BODY, REQUEST_BODY); + } + + @Test + public void testHandleRuleEngineRequestWithMsgOriginatorDeviceAndSpecifiedTimeout() throws Exception { + loginTenantAdmin(); + Device device = createDevice("Test", "123"); + DeviceId deviceId = device.getId(); + TbMsg msg = TbMsg.newMsg(TbMsgType.REST_API_REQUEST, deviceId, new CustomerId(TenantId.SYS_TENANT_ID.getId()), TbMsgMetaData.EMPTY, REQUEST_BODY); + mockSuccessfulRestApiCallToRuleEngine(msg); + + var response = doPostAsyncWithTypedResponse("/api/rule-engine/DEVICE/" + deviceId.getId() + "/15000", REQUEST_BODY, new TypeReference<>() { + }, status().isOk()); + + assertThat(Objects.requireNonNull(JacksonUtil.toString(response))).isEqualTo(REQUEST_BODY); + ArgumentCaptor captor = ArgumentCaptor.forClass(TbMsg.class); + verify(ruleEngineCallService).processRestApiCallToRuleEngine(eq(tenantId), any(), captor.capture(), eq(false), any()); + TbMsg tbMsg = captor.getValue(); + assertThat(tbMsg.getData()).isEqualTo(REQUEST_BODY); + assertThat(tbMsg.getType()).isEqualTo(msg.getType()); + assertThat(tbMsg.getOriginator()).isEqualTo(deviceId); + testLogEntityAction(null, deviceId, tenantId, new CustomerId(TenantId.SYS_TENANT_ID.getId()), tenantAdminUserId, + TENANT_ADMIN_EMAIL, ActionType.REST_API_RULE_ENGINE_CALL, 1, REQUEST_BODY, REQUEST_BODY); + } + + @Test + public void testHandleRuleEngineRequestWithMsgOriginatorDeviceAndResponseIsNull() throws Exception { + loginTenantAdmin(); + Device device = createDevice("Test", "123"); + DeviceId deviceId = device.getId(); + TbMsg msg = TbMsg.newMsg(TbMsgType.REST_API_REQUEST, deviceId, new CustomerId(TenantId.SYS_TENANT_ID.getId()), TbMsgMetaData.EMPTY, REQUEST_BODY); + mockSuccessfulRestApiCallToRuleEngine(null); + + doPostAsync("/api/rule-engine/DEVICE/" + deviceId.getId() + "/15000", REQUEST_BODY, String.class, status().isRequestTimeout()); + + ArgumentCaptor captor = ArgumentCaptor.forClass(TbMsg.class); + verify(ruleEngineCallService).processRestApiCallToRuleEngine(eq(tenantId), any(), captor.capture(), eq(false), any()); + TbMsg tbMsg = captor.getValue(); + assertThat(tbMsg.getData()).isEqualTo(REQUEST_BODY); + assertThat(tbMsg.getType()).isEqualTo(msg.getType()); + assertThat(tbMsg.getOriginator()).isEqualTo(deviceId); + Exception exception = new TimeoutException("Processing timeout detected!"); + testLogEntityActionError(null, deviceId, tenantId, new CustomerId(TenantId.SYS_TENANT_ID.getId()), tenantAdminUserId, + TENANT_ADMIN_EMAIL, ActionType.REST_API_RULE_ENGINE_CALL, exception, REQUEST_BODY, ""); + } + + @Test + public void testHandleRuleEngineRequestWithMsgOriginatorDeviceAndSpecifiedQueue() throws Exception { + loginTenantAdmin(); + Device device = createDevice("Test", "123"); + DeviceId deviceId = device.getId(); + TbMsg msg = TbMsg.newMsg("HighPriority", TbMsgType.REST_API_REQUEST, deviceId, TbMsgMetaData.EMPTY, REQUEST_BODY); + mockSuccessfulRestApiCallToRuleEngine(msg); + + var response = doPostAsyncWithTypedResponse("/api/rule-engine/DEVICE/" + deviceId.getId() + "/HighPriority/1000", REQUEST_BODY, new TypeReference<>() { + }, status().isOk()); + + assertThat(Objects.requireNonNull(JacksonUtil.toString(response))).isEqualTo(REQUEST_BODY); + ArgumentCaptor captor = ArgumentCaptor.forClass(TbMsg.class); + verify(ruleEngineCallService).processRestApiCallToRuleEngine(eq(tenantId), any(), captor.capture(), eq(true), any()); + TbMsg tbMsg = captor.getValue(); + assertThat(tbMsg.getData()).isEqualTo(REQUEST_BODY); + assertThat(tbMsg.getType()).isEqualTo(msg.getType()); + assertThat(tbMsg.getQueueName()).isEqualTo(msg.getQueueName()); + assertThat(tbMsg.getOriginator()).isEqualTo(deviceId); + testLogEntityAction(null, deviceId, tenantId, new CustomerId(TenantId.SYS_TENANT_ID.getId()), tenantAdminUserId, + TENANT_ADMIN_EMAIL, ActionType.REST_API_RULE_ENGINE_CALL, 1, REQUEST_BODY, REQUEST_BODY); + } + + @Test + public void testHandleRuleEngineRequestWithInvalidRequestBody() throws Exception { + loginSysAdmin(); + + doPost("/api/rule-engine/", (Object) "@") + .andExpect(status().isBadRequest()) + .andExpect(statusReason(containsString("Invalid request body"))); + + verifyNoMoreInteractions(ruleEngineCallService); + } + + @Test + public void testHandleRuleEngineRequestWithAuthorityCustomerUser() throws Exception { + loginTenantAdmin(); + Device device = assignDeviceToCustomer("test", "123", customerId); + DeviceId deviceId = device.getId(); + loginCustomerUser(); + + TbMsg msg = TbMsg.newMsg(TbMsgType.REST_API_REQUEST, deviceId, customerId, TbMsgMetaData.EMPTY, REQUEST_BODY); + mockSuccessfulRestApiCallToRuleEngine(msg); + + var response = doPostAsyncWithTypedResponse("/api/rule-engine/DEVICE/" + deviceId.getId(), REQUEST_BODY, new TypeReference<>() { + }, status().isOk()); + + assertThat(Objects.requireNonNull(JacksonUtil.toString(response))).isEqualTo(REQUEST_BODY); + ArgumentCaptor captor = ArgumentCaptor.forClass(TbMsg.class); + verify(ruleEngineCallService).processRestApiCallToRuleEngine(eq(tenantId), any(), captor.capture(), eq(false), any()); + TbMsg tbMsg = captor.getValue(); + assertThat(tbMsg.getData()).isEqualTo(REQUEST_BODY); + assertThat(tbMsg.getType()).isEqualTo(msg.getType()); + assertThat(tbMsg.getOriginator()).isEqualTo(deviceId); + testLogEntityAction(null, deviceId, tenantId, customerId, customerUserId, CUSTOMER_USER_EMAIL, + ActionType.REST_API_RULE_ENGINE_CALL, 1, REQUEST_BODY, REQUEST_BODY); + } + + @Test + public void testHandleRuleEngineRequestWithoutPermission() throws Exception { + loginTenantAdmin(); + Device device = createDevice("test", "123"); + loginCustomerUser(); + + MvcResult result = doPost("/api/rule-engine/DEVICE/" + device.getId().getId(), (Object) REQUEST_BODY).andReturn(); + + ResponseEntity response = (ResponseEntity) result.getAsyncResult(); + assertThat(response.getStatusCode()).isEqualTo(HttpStatus.FORBIDDEN); + assertThat(Objects.requireNonNull(response.getBody()).toString()).isEqualTo("You don't have permission to perform this operation!"); + verify(ruleEngineCallService, never()).processRestApiCallToRuleEngine(any(), any(), any(), anyBoolean(), any()); + } + + @Test + public void testHandleRuleEngineRequestUnauthorized() throws Exception { + doPost("/api/rule-engine/", (Object) REQUEST_BODY) + .andExpect(status().isUnauthorized()) + .andExpect(statusReason(containsString("Authentication failed"))); + } + + private void mockSuccessfulRestApiCallToRuleEngine(TbMsg msg) { + doAnswer(invocation -> { + Consumer consumer = invocation.getArgument(4); + consumer.accept(msg); + return null; + }).when(ruleEngineCallService).processRestApiCallToRuleEngine(eq(tenantId), any(UUID.class), any(TbMsg.class), anyBoolean(), any()); + } + + private UserId getCurrentUserId() { + Jws jwsClaims = jwtTokenFactory.parseTokenClaims(token); + Claims claims = jwsClaims.getPayload(); + String userId = claims.get("userId", String.class); + return UserId.fromString(userId); + } +} diff --git a/application/src/test/java/org/thingsboard/server/service/queue/DefaultTbClusterServiceTest.java b/application/src/test/java/org/thingsboard/server/service/queue/DefaultTbClusterServiceTest.java index 33b0fc7801..5ece34cd7f 100644 --- a/application/src/test/java/org/thingsboard/server/service/queue/DefaultTbClusterServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/queue/DefaultTbClusterServiceTest.java @@ -19,16 +19,22 @@ import com.google.common.collect.Sets; import lombok.extern.slf4j.Slf4j; import org.junit.Test; import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; import org.springframework.boot.test.mock.mockito.MockBean; import org.springframework.boot.test.mock.mockito.SpyBean; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringRunner; import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.DataConstants; +import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.QueueId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.data.queue.Queue; +import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.common.msg.queue.ServiceType; +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.dao.edge.EdgeService; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.TbQueueProducer; @@ -43,6 +49,7 @@ import org.thingsboard.server.service.profile.TbDeviceProfileCache; import java.util.List; import java.util.UUID; +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isNull; @@ -237,6 +244,50 @@ public class DefaultTbClusterServiceTest { .send(eq(topicService.getNotificationsTopic(ServiceType.TB_TRANSPORT, monolith2)), any(TbProtoQueueMsg.class), isNull()); } + @Test + public void testPushNotificationToCoreWithRestApiCallResponseMsgProto() { + TbQueueProducer> tbCoreQueueProducer = mock(TbQueueProducer.class); + TopicPartitionInfo tpi = new TopicPartitionInfo(ServiceType.TB_CORE.name().toLowerCase() + ".notifications." + CORE, null, null, false); + + when(producerProvider.getTbCoreNotificationsMsgProducer()).thenReturn(tbCoreQueueProducer); + TransportProtos.RestApiCallResponseMsgProto responseMsgProto = TransportProtos.RestApiCallResponseMsgProto.getDefaultInstance(); + TransportProtos.ToCoreNotificationMsg toCoreNotificationMsg = TransportProtos.ToCoreNotificationMsg.newBuilder().setRestApiCallResponseMsg(responseMsgProto).build(); + + clusterService.pushNotificationToCore(CORE, responseMsgProto, null); + + verify(topicService).getNotificationsTopic(ServiceType.TB_CORE, CORE); + verify(producerProvider).getTbCoreNotificationsMsgProducer(); + ArgumentCaptor> protoQueueMsgArgumentCaptor = ArgumentCaptor.forClass(TbProtoQueueMsg.class); + verify(tbCoreQueueProducer).send(eq(tpi), protoQueueMsgArgumentCaptor.capture(), isNull()); + TbProtoQueueMsg protoQueueMsgArgumentCaptorValue = protoQueueMsgArgumentCaptor.getValue(); + assertThat(protoQueueMsgArgumentCaptorValue.getValue()).isEqualTo(toCoreNotificationMsg); + } + + @Test + public void testPushMsgToRuleEngineUsingQueueFromMsg() { + TenantId tenantId = TenantId.SYS_TENANT_ID; + DeviceId deviceId = new DeviceId(UUID.randomUUID()); + TbMsg tbMsg = TbMsg.newMsg("main", TbMsgType.REST_API_REQUEST, deviceId, null, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT); + TbQueueProducer> tbREQueueProducer = mock(TbQueueProducer.class); + TopicPartitionInfo tpi = new TopicPartitionInfo(ServiceType.TB_RULE_ENGINE.name().toLowerCase() + ".notifications." + CORE, tenantId, null, false); + TransportProtos.ToRuleEngineMsg msg = TransportProtos.ToRuleEngineMsg.newBuilder() + .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) + .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) + .setTbMsg(TbMsg.toByteString(tbMsg)).build(); + + when(producerProvider.getRuleEngineMsgProducer()).thenReturn(tbREQueueProducer); + when(partitionService.resolve(any(), any(), any(), any())).thenReturn(tpi); + + clusterService.pushMsgToRuleEngine(tenantId, tenantId, tbMsg, true, null); + + verify(partitionService).resolve(ServiceType.TB_RULE_ENGINE, "main", tenantId, tenantId); + verify(producerProvider).getRuleEngineMsgProducer(); + ArgumentCaptor> protoQueueMsgArgumentCaptor = ArgumentCaptor.forClass(TbProtoQueueMsg.class); + verify(tbREQueueProducer).send(eq(tpi), protoQueueMsgArgumentCaptor.capture(), isNull()); + TbProtoQueueMsg protoQueueMsgArgumentCaptorValue = protoQueueMsgArgumentCaptor.getValue(); + assertThat(protoQueueMsgArgumentCaptorValue.getValue()).isEqualTo(msg); + } + protected Queue createTestQueue() { TenantId tenantId = TenantId.SYS_TENANT_ID; Queue queue = new Queue(new QueueId(UUID.randomUUID())); diff --git a/application/src/test/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerServiceTest.java b/application/src/test/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerServiceTest.java index 621d4c5a91..ca6e6728ff 100644 --- a/application/src/test/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerServiceTest.java @@ -29,6 +29,7 @@ import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.service.ruleengine.RuleEngineCallService; import org.thingsboard.server.service.state.DeviceStateService; import java.util.UUID; @@ -48,6 +49,8 @@ public class DefaultTbCoreConsumerServiceTest { private DeviceStateService stateServiceMock; @Mock private TbCoreConsumerStats statsMock; + @Mock + private RuleEngineCallService ruleEngineCallServiceMock; @Mock private TbCallback tbCallbackMock; @@ -529,4 +532,17 @@ public class DefaultTbCoreConsumerServiceTest { then(statsMock).should(never()).log(inactivityMsg); } + @Test + public void givenRestApiCallResponseMsgProto_whenForwardToRuleEngineCallService_thenCallOnQueueMsg() { + // GIVEN + ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "ruleEngineCallService", ruleEngineCallServiceMock); + var restApiCallResponseMsgProto = TransportProtos.RestApiCallResponseMsgProto.getDefaultInstance(); + doCallRealMethod().when(defaultTbCoreConsumerServiceMock).forwardToRuleEngineCallService(restApiCallResponseMsgProto, tbCallbackMock); + + // WHEN + defaultTbCoreConsumerServiceMock.forwardToRuleEngineCallService(restApiCallResponseMsgProto, tbCallbackMock); + + // THEN + then(ruleEngineCallServiceMock).should().onQueueMsg(restApiCallResponseMsgProto, tbCallbackMock); + } } diff --git a/application/src/test/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcServiceTest.java b/application/src/test/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcServiceTest.java new file mode 100644 index 0000000000..718648e143 --- /dev/null +++ b/application/src/test/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcServiceTest.java @@ -0,0 +1,61 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.rpc; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.thingsboard.server.cluster.TbClusterService; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.msg.TbMsgType; +import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.TbMsgMetaData; +import org.thingsboard.server.gen.transport.TransportProtos; +import java.util.UUID; + +import static org.mockito.BDDMockito.then; + +@ExtendWith(MockitoExtension.class) +class DefaultTbRuleEngineRpcServiceTest { + + @Mock + private TbClusterService tbClusterServiceMock; + + @InjectMocks + private DefaultTbRuleEngineRpcService tbRuleEngineRpcService; + + @Test + public void givenTbMsg_whenSendRestApiCallReply_thenPushNotificationToCore() { + // GIVEN + String serviceId = "tb-core-0"; + UUID requestId = UUID.fromString("f64a20df-eb1e-46a3-ba6f-0b3ae053ee0a"); + DeviceId deviceId = new DeviceId(UUID.fromString("1d9f771a-7cdc-4ac7-838c-ba193d05a012")); + TbMsg msg = TbMsg.newMsg(null, TbMsgType.REST_API_REQUEST, deviceId, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT); + var restApiCallResponseMsgProto = TransportProtos.RestApiCallResponseMsgProto.newBuilder() + .setRequestIdMSB(requestId.getMostSignificantBits()) + .setRequestIdLSB(requestId.getLeastSignificantBits()) + .setResponse(TbMsg.toByteString(msg)) + .build(); + + // WHEN + tbRuleEngineRpcService.sendRestApiCallReply(serviceId, requestId, msg); + + // THEN + then(tbClusterServiceMock).should().pushNotificationToCore(serviceId, restApiCallResponseMsgProto, null); + } +} diff --git a/application/src/test/java/org/thingsboard/server/service/ruleengine/DefaultRuleEngineCallServiceTest.java b/application/src/test/java/org/thingsboard/server/service/ruleengine/DefaultRuleEngineCallServiceTest.java new file mode 100644 index 0000000000..8976cd99f3 --- /dev/null +++ b/application/src/test/java/org/thingsboard/server/service/ruleengine/DefaultRuleEngineCallServiceTest.java @@ -0,0 +1,166 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.ruleengine; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.test.util.ReflectionTestUtils; +import org.testcontainers.shaded.org.awaitility.Awaitility; +import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.server.cluster.TbClusterService; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.msg.TbMsgType; +import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.TbMsgMetaData; +import org.thingsboard.server.common.msg.queue.TbCallback; +import org.thingsboard.server.gen.transport.TransportProtos; + +import java.util.HashMap; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.verify; + +@ExtendWith(MockitoExtension.class) +public class DefaultRuleEngineCallServiceTest { + + private static final TenantId TENANT_ID = TenantId.fromUUID(UUID.fromString("d7210c7f-a152-4e91-8186-19ae85499a6b")); + + private final ConcurrentMap> requests = new ConcurrentHashMap<>(); + + @Mock + private TbClusterService tbClusterServiceMock; + + private DefaultRuleEngineCallService ruleEngineCallService; + private ScheduledExecutorService executor; + + @BeforeEach + void setUp() { + executor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("rpc-callback")); + ruleEngineCallService = new DefaultRuleEngineCallService(tbClusterServiceMock); + ReflectionTestUtils.setField(ruleEngineCallService, "executor", executor); + ReflectionTestUtils.setField(ruleEngineCallService, "requests", requests); + } + + @AfterEach + void tearDown() { + requests.clear(); + if (executor != null) { + executor.shutdown(); + try { + if (!executor.awaitTermination(10L, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException e) { + executor.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + } + + @Test + void givenRequest_whenProcessRestApiCallToRuleEngine_thenPushMsgToRuleEngine() { + long timeout = 100L; + long expTime = System.currentTimeMillis() + timeout; + HashMap metaData = new HashMap<>(); + UUID requestId = UUID.randomUUID(); + metaData.put("serviceId", "core"); + metaData.put("requestUUID", requestId.toString()); + metaData.put("expirationTime", Long.toString(expTime)); + TbMsg msg = TbMsg.newMsg("main", TbMsgType.REST_API_REQUEST, TENANT_ID, new TbMsgMetaData(metaData), "{\"key\":\"value\"}"); + Consumer anyConsumer = TbMsg::getData; + ruleEngineCallService.processRestApiCallToRuleEngine(TENANT_ID, requestId, msg, true, anyConsumer); + + assertThat(requests.size()).isEqualTo(1); + assertThat(requests.get(requestId)).isEqualTo(anyConsumer); + verify(tbClusterServiceMock).pushMsgToRuleEngine(TENANT_ID, TENANT_ID, msg, true, null); + } + + @Test + void givenSmallTimeout_whenProcessRestApiCallToRuleEngine_thenDoesNotReturnResponse() { + long timeout = 1L; + long expTime = System.currentTimeMillis() + timeout; + HashMap metaData = new HashMap<>(); + UUID requestId = UUID.randomUUID(); + metaData.put("serviceId", "core"); + metaData.put("requestUUID", requestId.toString()); + metaData.put("expirationTime", Long.toString(expTime)); + TbMsg msg = TbMsg.newMsg("main", TbMsgType.REST_API_REQUEST, TENANT_ID, new TbMsgMetaData(metaData), "{\"key\":\"value\"}"); + + Consumer anyConsumer = TbMsg::getData; + doAnswer(invocation -> { + //check the presence of request in the map after pushMsgToRuleEngine() + assertThat(requests.size()).isEqualTo(1); + assertThat(requests.get(requestId)).isEqualTo(anyConsumer); + return null; + }).when(tbClusterServiceMock).pushMsgToRuleEngine(any(), any(), any(), anyBoolean(), any()); + ruleEngineCallService.processRestApiCallToRuleEngine(TENANT_ID, requestId, msg, true, anyConsumer); + + verify(tbClusterServiceMock).pushMsgToRuleEngine(TENANT_ID, TENANT_ID, msg, true, null); + //check map is empty after scheduleTimeout() + Awaitility.await("Await until request was deleted from map due to timeout") + .pollDelay(25, TimeUnit.MILLISECONDS) + .atMost(10, TimeUnit.SECONDS) + .until(requests::isEmpty); + } + + @Test + void givenResponse_whenOnQueue_thenAcceptTbMsgResponse() { + long timeout = 10000L; + long expTime = System.currentTimeMillis() + timeout; + HashMap metaData = new HashMap<>(); + UUID requestId = UUID.randomUUID(); + metaData.put("serviceId", "core"); + metaData.put("requestUUID", requestId.toString()); + metaData.put("expirationTime", Long.toString(expTime)); + TbMsg msg = TbMsg.newMsg("main", TbMsgType.REST_API_REQUEST, TENANT_ID, new TbMsgMetaData(metaData), "{\"key\":\"value\"}"); + + Consumer anyConsumer = TbMsg::getData; + doAnswer(invocation -> { + //check the presence of request in the map after pushMsgToRuleEngine() + assertThat(requests.size()).isEqualTo(1); + assertThat(requests.get(requestId)).isEqualTo(anyConsumer); + ruleEngineCallService.onQueueMsg(getResponse(requestId, msg), TbCallback.EMPTY); + //check map is empty after onQueueMsg() + assertThat(requests.size()).isEqualTo(0); + return null; + }).when(tbClusterServiceMock).pushMsgToRuleEngine(any(), any(), any(), anyBoolean(), any()); + ruleEngineCallService.processRestApiCallToRuleEngine(TENANT_ID, requestId, msg, true, anyConsumer); + + verify(tbClusterServiceMock).pushMsgToRuleEngine(TENANT_ID, TENANT_ID, msg, true, null); + } + + private TransportProtos.RestApiCallResponseMsgProto getResponse(UUID requestId, TbMsg msg) { + return TransportProtos.RestApiCallResponseMsgProto.newBuilder() + .setResponse(TbMsg.toByteString(msg)) + .setRequestIdMSB(requestId.getMostSignificantBits()) + .setRequestIdLSB(requestId.getLeastSignificantBits()) + .build(); + } +} diff --git a/common/cluster-api/src/main/java/org/thingsboard/server/cluster/TbClusterService.java b/common/cluster-api/src/main/java/org/thingsboard/server/cluster/TbClusterService.java index 301ef69dcc..ec37a5a864 100644 --- a/common/cluster-api/src/main/java/org/thingsboard/server/cluster/TbClusterService.java +++ b/common/cluster-api/src/main/java/org/thingsboard/server/cluster/TbClusterService.java @@ -18,7 +18,6 @@ package org.thingsboard.server.cluster; import org.thingsboard.server.common.data.ApiUsageState; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.DeviceProfile; -import org.thingsboard.server.common.data.TbResource; import org.thingsboard.server.common.data.TbResourceInfo; import org.thingsboard.server.common.data.Tenant; import org.thingsboard.server.common.data.TenantProfile; @@ -39,6 +38,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg; +import org.thingsboard.server.gen.transport.TransportProtos.RestApiCallResponseMsgProto; import org.thingsboard.server.queue.TbQueueCallback; import org.thingsboard.server.queue.TbQueueClusterService; @@ -58,10 +58,14 @@ public interface TbClusterService extends TbQueueClusterService { void pushNotificationToCore(String targetServiceId, FromDeviceRpcResponse response, TbQueueCallback callback); + void pushNotificationToCore(String targetServiceId, RestApiCallResponseMsgProto msg, TbQueueCallback callback); + void pushMsgToRuleEngine(TopicPartitionInfo tpi, UUID msgId, ToRuleEngineMsg msg, TbQueueCallback callback); void pushMsgToRuleEngine(TenantId tenantId, EntityId entityId, TbMsg msg, TbQueueCallback callback); + void pushMsgToRuleEngine(TenantId tenantId, EntityId entityId, TbMsg msg, boolean useQueueFromTbMsg, TbQueueCallback callback); + void pushNotificationToRuleEngine(String targetServiceId, FromDeviceRpcResponse response, TbQueueCallback callback); void pushNotificationToTransport(String targetServiceId, ToTransportMsg response, TbQueueCallback callback); diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/audit/ActionType.java b/common/data/src/main/java/org/thingsboard/server/common/data/audit/ActionType.java index d3c0889d6b..db56acb8b2 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/audit/ActionType.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/audit/ActionType.java @@ -40,6 +40,7 @@ public enum ActionType { RELATION_ADD_OR_UPDATE(false, TbMsgType.RELATION_ADD_OR_UPDATE), RELATION_DELETED(false, TbMsgType.RELATION_DELETED), RELATIONS_DELETED(false, TbMsgType.RELATIONS_DELETED), + REST_API_RULE_ENGINE_CALL(false, null), // log call to rule engine from REST API ALARM_ACK(false, TbMsgType.ALARM_ACK), ALARM_CLEAR(false, TbMsgType.ALARM_CLEAR), ALARM_DELETE(false, TbMsgType.ALARM_DELETE), diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/msg/TbMsgType.java b/common/data/src/main/java/org/thingsboard/server/common/data/msg/TbMsgType.java index 763f245a09..9aa1994ab6 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/msg/TbMsgType.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/msg/TbMsgType.java @@ -67,6 +67,7 @@ public enum TbMsgType { PROVISION_SUCCESS, PROVISION_FAILURE, SEND_EMAIL, + REST_API_REQUEST("REST API request"), // tellSelfOnly types GENERATOR_NODE_SELF_MSG(null, true), diff --git a/common/proto/src/main/proto/queue.proto b/common/proto/src/main/proto/queue.proto index 7b2aa2fe1b..a37eb118c7 100644 --- a/common/proto/src/main/proto/queue.proto +++ b/common/proto/src/main/proto/queue.proto @@ -98,6 +98,12 @@ message SessionInfoProto { int64 customerIdLSB = 15; } +message RestApiCallResponseMsgProto { + int64 requestIdMSB = 1; + int64 requestIdLSB = 2; + bytes response = 5; +} + enum SessionEvent { OPEN = 0; CLOSED = 1; @@ -1465,6 +1471,7 @@ message ToCoreNotificationMsg { ToEdgeSyncRequestMsgProto toEdgeSyncRequest = 11; FromEdgeSyncResponseMsgProto fromEdgeSyncResponse = 12; ResourceCacheInvalidateMsg resourceCacheInvalidateMsg = 13; + RestApiCallResponseMsgProto restApiCallResponseMsg = 50; } /* Messages that are handled by ThingsBoard RuleEngine Service */ diff --git a/rest-client/src/main/java/org/thingsboard/rest/client/RestClient.java b/rest-client/src/main/java/org/thingsboard/rest/client/RestClient.java index 8783770be3..c116eddc74 100644 --- a/rest-client/src/main/java/org/thingsboard/rest/client/RestClient.java +++ b/rest-client/src/main/java/org/thingsboard/rest/client/RestClient.java @@ -3840,6 +3840,51 @@ public class RestClient implements Closeable { } } + public JsonNode handleRuleEngineRequest(JsonNode requestBody) { + return restTemplate.exchange( + baseURL + "/api/rule-engine", + HttpMethod.POST, + new HttpEntity<>(requestBody), + new ParameterizedTypeReference() { + }).getBody(); + } + + public JsonNode handleRuleEngineRequest(EntityId entityId, JsonNode requestBody) { + return restTemplate.exchange( + baseURL + "/api/rule-engine/{entityType}/{entityId}", + HttpMethod.POST, + new HttpEntity<>(requestBody), + new ParameterizedTypeReference() { + }, + entityId.getEntityType(), + entityId.getId()).getBody(); + } + + public JsonNode handleRuleEngineRequest(EntityId entityId, int timeout, JsonNode requestBody) { + return restTemplate.exchange( + baseURL + "/api/rule-engine/{entityType}/{entityId}/{timeout}", + HttpMethod.POST, + new HttpEntity<>(requestBody), + new ParameterizedTypeReference() { + }, + entityId.getEntityType(), + entityId.getId(), + timeout).getBody(); + } + + public JsonNode handleRuleEngineRequest(EntityId entityId, String queueName, int timeout, JsonNode requestBody) { + return restTemplate.exchange( + baseURL + "/api/rule-engine/{entityType}/{entityId}/{queueName}/{timeout}", + HttpMethod.POST, + new HttpEntity<>(requestBody), + new ParameterizedTypeReference() { + }, + entityId.getEntityType(), + entityId.getId(), + queueName, + timeout).getBody(); + } + private String getTimeUrlParams(TimePageLink pageLink) { String urlParams = getUrlParams(pageLink); if (pageLink.getStartTime() != null) { diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineRpcService.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineRpcService.java index 55f83f823b..99ab76b69d 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineRpcService.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineRpcService.java @@ -18,6 +18,7 @@ package org.thingsboard.rule.engine.api; import org.thingsboard.server.common.data.id.RpcId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.rpc.Rpc; +import org.thingsboard.server.common.msg.TbMsg; import java.util.UUID; import java.util.function.Consumer; @@ -31,5 +32,7 @@ public interface RuleEngineRpcService { void sendRpcRequestToDevice(RuleEngineDeviceRpcRequest request, Consumer consumer); + void sendRestApiCallReply(String serviceId, UUID requestId, TbMsg msg); + Rpc findRpcById(TenantId tenantId, RpcId id); } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbSendRestApiCallReplyNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbSendRestApiCallReplyNode.java new file mode 100644 index 0000000000..84441fb213 --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbSendRestApiCallReplyNode.java @@ -0,0 +1,66 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.rest; + +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.rule.engine.api.RuleNode; +import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.TbNode; +import org.thingsboard.rule.engine.api.TbNodeConfiguration; +import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.rule.engine.api.util.TbNodeUtils; +import org.thingsboard.server.common.data.StringUtils; +import org.thingsboard.server.common.data.plugin.ComponentType; +import org.thingsboard.server.common.msg.TbMsg; + +import java.util.UUID; + +@Slf4j +@RuleNode( + type = ComponentType.ACTION, + name = "rest call reply", + configClazz = TbSendRestApiCallReplyNodeConfiguration.class, + nodeDescription = "Sends reply to REST API call to rule engine", + nodeDetails = "Expects messages with any message type. Forwards incoming message as a reply to REST API call sent to rule engine.", + uiResources = {"static/rulenode/rulenode-core-config.js"}, + configDirective = "tbActionNodeSendRestApiCallReplyConfig", + icon = "call_merge" +) +public class TbSendRestApiCallReplyNode implements TbNode { + + private TbSendRestApiCallReplyNodeConfiguration config; + + @Override + public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { + this.config = TbNodeUtils.convert(configuration, TbSendRestApiCallReplyNodeConfiguration.class); + } + + @Override + public void onMsg(TbContext ctx, TbMsg msg) { + String serviceIdStr = msg.getMetaData().getValue(config.getServiceIdMetaDataAttribute()); + String requestIdStr = msg.getMetaData().getValue(config.getRequestIdMetaDataAttribute()); + if (StringUtils.isEmpty(requestIdStr)) { + ctx.tellFailure(msg, new RuntimeException("Request id is not present in the metadata!")); + } else if (StringUtils.isEmpty(serviceIdStr)) { + ctx.tellFailure(msg, new RuntimeException("Service id is not present in the metadata!")); + } else if (StringUtils.isEmpty(msg.getData())) { + ctx.tellFailure(msg, new RuntimeException("Request body is empty!")); + } else { + ctx.getRpcService().sendRestApiCallReply(serviceIdStr, UUID.fromString(requestIdStr), msg); + ctx.tellSuccess(msg); + } + } +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbSendRestApiCallReplyNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbSendRestApiCallReplyNodeConfiguration.java new file mode 100644 index 0000000000..010a6ccb15 --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbSendRestApiCallReplyNodeConfiguration.java @@ -0,0 +1,45 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.rest; + +import lombok.Data; +import org.thingsboard.rule.engine.api.NodeConfiguration; +import org.thingsboard.server.common.data.StringUtils; + +@Data +public class TbSendRestApiCallReplyNodeConfiguration implements NodeConfiguration { + public static final String SERVICE_ID = "serviceId"; + public static final String REQUEST_UUID = "requestUUID"; + + private String serviceIdMetaDataAttribute; + private String requestIdMetaDataAttribute; + + @Override + public TbSendRestApiCallReplyNodeConfiguration defaultConfiguration() { + TbSendRestApiCallReplyNodeConfiguration configuration = new TbSendRestApiCallReplyNodeConfiguration(); + configuration.setRequestIdMetaDataAttribute(REQUEST_UUID); + configuration.setServiceIdMetaDataAttribute(SERVICE_ID); + return configuration; + } + + public String getServiceIdMetaDataAttribute() { + return !StringUtils.isEmpty(serviceIdMetaDataAttribute) ? serviceIdMetaDataAttribute : SERVICE_ID; + } + + public String getRequestIdMetaDataAttribute() { + return !StringUtils.isEmpty(requestIdMetaDataAttribute) ? requestIdMetaDataAttribute : REQUEST_UUID; + } +} diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/rest/TbSendRestApiCallReplyNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/rest/TbSendRestApiCallReplyNodeTest.java new file mode 100644 index 0000000000..c37d4b28a0 --- /dev/null +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/rest/TbSendRestApiCallReplyNodeTest.java @@ -0,0 +1,133 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.rest; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.rule.engine.api.RuleEngineRpcService; +import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.TbNodeConfiguration; +import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.msg.TbMsgType; +import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.TbMsgMetaData; + +import java.util.Map; +import java.util.UUID; +import java.util.stream.Stream; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatNoException; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class TbSendRestApiCallReplyNodeTest { + + private static final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("212445ad-9852-4bfd-819d-6b01ab6ee6b6")); + + private TbSendRestApiCallReplyNode node; + private TbSendRestApiCallReplyNodeConfiguration config; + + @Mock + private TbContext ctxMock; + @Mock + private RuleEngineRpcService rpcServiceMock; + + @BeforeEach + public void setUp() throws TbNodeException { + node = new TbSendRestApiCallReplyNode(); + config = new TbSendRestApiCallReplyNodeConfiguration().defaultConfiguration(); + var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); + node.init(ctxMock, configuration); + } + + @Test + public void givenDefaultConfig_whenInit_thenDoesNotThrowException() { + var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); + assertThatNoException().isThrownBy(() -> node.init(ctxMock, configuration)); + } + + @ParameterizedTest + @MethodSource + public void givenValidRestApiRequest_whenOnMsg_thenTellSuccess(String requestIdAttribute, String serviceIdAttribute) throws TbNodeException { + config.setRequestIdMetaDataAttribute(requestIdAttribute); + config.setServiceIdMetaDataAttribute(serviceIdAttribute); + var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); + node.init(ctxMock, configuration); + when(ctxMock.getRpcService()).thenReturn(rpcServiceMock); + String requestUUIDStr = "80b7883b-7ec6-4872-9dd3-b2afd5660fa6"; + String serviceIdStr = "tb-core-0"; + String data = """ + { + "temperature": 23, + } + """; + Map metadata = Map.of( + requestIdAttribute, requestUUIDStr, + serviceIdAttribute, serviceIdStr); + TbMsg msg = TbMsg.newMsg(TbMsgType.REST_API_REQUEST, DEVICE_ID, new TbMsgMetaData(metadata), data); + + node.onMsg(ctxMock, msg); + + UUID requestUUID = UUID.fromString(requestUUIDStr); + verify(rpcServiceMock).sendRestApiCallReply(serviceIdStr, requestUUID, msg); + verify(ctxMock).tellSuccess(msg); + } + + private static Stream givenValidRestApiRequest_whenOnMsg_thenTellSuccess() { + return Stream.of( + Arguments.of("requestId", "service"), + Arguments.of("requestUUID", "serviceId"), + Arguments.of("some_custom_request_id_field", "some_custom_service_id_field") + ); + } + + @ParameterizedTest + @MethodSource + public void givenInvalidRequest_whenOnMsg_thenTellFailure(TbMsgMetaData metaData, String data, String errorMsg) { + TbMsg msg = TbMsg.newMsg(TbMsgType.REST_API_REQUEST, DEVICE_ID, metaData, data); + + node.onMsg(ctxMock, msg); + + ArgumentCaptor captor = ArgumentCaptor.forClass(Throwable.class); + verify(ctxMock).tellFailure(eq(msg), captor.capture()); + Throwable throwable = captor.getValue(); + assertThat(throwable).isInstanceOf(RuntimeException.class).hasMessage(errorMsg); + } + + private static Stream givenInvalidRequest_whenOnMsg_thenTellFailure() { + return Stream.of( + Arguments.of(TbMsgMetaData.EMPTY, TbMsg.EMPTY_STRING, "Request id is not present in the metadata!"), + Arguments.of(new TbMsgMetaData(Map.of("requestUUID", "e1dd3985-efad-45a0-b0d2-0ff5dff2ccac")), + TbMsg.EMPTY_STRING, "Service id is not present in the metadata!"), + Arguments.of(new TbMsgMetaData(Map.of("serviceId", "tb-core-0")), + TbMsg.EMPTY_STRING, "Request id is not present in the metadata!"), + Arguments.of(new TbMsgMetaData(Map.of("requestUUID", "e1dd3985-efad-45a0-b0d2-0ff5dff2ccac", "serviceId", "tb-core-0")), + TbMsg.EMPTY_STRING, "Request body is empty!") + ); + } +}