diff --git a/application/src/main/java/org/thingsboard/server/controller/BaseController.java b/application/src/main/java/org/thingsboard/server/controller/BaseController.java index f9d1fe7a2f..b723603e8a 100644 --- a/application/src/main/java/org/thingsboard/server/controller/BaseController.java +++ b/application/src/main/java/org/thingsboard/server/controller/BaseController.java @@ -141,6 +141,7 @@ import org.thingsboard.server.dao.widget.WidgetTypeService; import org.thingsboard.server.dao.widget.WidgetsBundleService; import org.thingsboard.server.exception.ThingsboardErrorResponseHandler; import org.thingsboard.server.queue.discovery.PartitionService; +import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.provider.TbQueueProducerProvider; import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.action.EntityActionService; @@ -314,6 +315,9 @@ public abstract class BaseController { @Autowired protected ExportableEntitiesService entitiesService; + @Autowired + protected TbServiceInfoProvider serviceInfoProvider; + @Value("${server.log_controller_error_stack_trace}") @Getter private boolean logControllerErrorStackTrace; diff --git a/application/src/main/java/org/thingsboard/server/controller/ControllerConstants.java b/application/src/main/java/org/thingsboard/server/controller/ControllerConstants.java index 918021ab95..e4b919a616 100644 --- a/application/src/main/java/org/thingsboard/server/controller/ControllerConstants.java +++ b/application/src/main/java/org/thingsboard/server/controller/ControllerConstants.java @@ -1712,4 +1712,6 @@ public class ControllerConstants { MARKDOWN_CODE_BLOCK_START + "[{\"ts\":1634712287000,\"values\":{\"temperature\":26, \"humidity\":87}}, {\"ts\":1634712588000,\"values\":{\"temperature\":25, \"humidity\":88}}]" + MARKDOWN_CODE_BLOCK_END ; + + protected static final String SECURITY_WRITE_CHECK = " Security check is performed to verify that the user has 'WRITE' permission for the entity (entities)."; } diff --git a/application/src/main/java/org/thingsboard/server/controller/RuleEngineController.java b/application/src/main/java/org/thingsboard/server/controller/RuleEngineController.java new file mode 100644 index 0000000000..8cda3d2212 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/controller/RuleEngineController.java @@ -0,0 +1,237 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.controller; + +import com.google.common.util.concurrent.FutureCallback; +import io.swagger.v3.oas.annotations.Parameter; +import jakarta.annotation.Nullable; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.security.access.prepost.PreAuthorize; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.ResponseBody; +import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.context.request.async.DeferredResult; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.server.common.data.StringUtils; +import org.thingsboard.server.common.data.audit.ActionType; +import org.thingsboard.server.common.data.exception.ThingsboardErrorCode; +import org.thingsboard.server.common.data.exception.ThingsboardException; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.EntityIdFactory; +import org.thingsboard.server.common.data.msg.TbMsgType; +import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.TbMsgMetaData; +import org.thingsboard.server.config.annotations.ApiOperation; +import org.thingsboard.server.exception.ToErrorResponseEntity; +import org.thingsboard.server.queue.util.TbCoreComponent; +import org.thingsboard.server.service.ruleengine.RuleEngineCallService; +import org.thingsboard.server.service.security.AccessValidator; +import org.thingsboard.server.service.security.model.SecurityUser; +import org.thingsboard.server.service.security.permission.Operation; + +import java.util.HashMap; +import java.util.UUID; +import java.util.concurrent.TimeoutException; + +import static org.thingsboard.server.controller.ControllerConstants.ENTITY_ID_PARAM_DESCRIPTION; +import static org.thingsboard.server.controller.ControllerConstants.ENTITY_TYPE_PARAM_DESCRIPTION; + +@RestController +@TbCoreComponent +@RequestMapping(TbUrlConstants.RULE_ENGINE_URL_PREFIX) +@Slf4j +public class RuleEngineController extends BaseController { + public static final int DEFAULT_TIMEOUT = 10000; + private static final String MSG_DESCRIPTION_PREFIX = "Creates the Message with type 'REST_API_REQUEST' and payload taken from the request body. "; + private static final String MSG_DESCRIPTION = "This method allows you to extend the regular platform API with the power of Rule Engine. You may use default and custom rule nodes to handle the message. " + + "The generated message contains two important metadata fields:\n\n" + + " * **'serviceId'** to identify the platform server that received the request;\n" + + " * **'requestUUID'** to identify the request and route possible response from the Rule Engine;\n\n" + + "Use **'rest call reply'** rule node to push the reply from rule engine back as a REST API call response. "; + + @Autowired + private RuleEngineCallService ruleEngineCallService; + @Autowired + private AccessValidator accessValidator; + + @ApiOperation(value = "Push user message to the rule engine (handleRuleEngineRequest)", + notes = MSG_DESCRIPTION_PREFIX + + "Uses current User Id ( the one which credentials is used to perform the request) as the Rule Engine message originator. " + + MSG_DESCRIPTION + + "The default timeout of the request processing is 10 seconds." + + "\n\n" + ControllerConstants.SECURITY_WRITE_CHECK) + @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')") + @RequestMapping(value = "/", method = RequestMethod.POST) + @ResponseBody + public DeferredResult handleRuleEngineRequest( + @Parameter(description = "A JSON value representing the message.", required = true) + @RequestBody String requestBody) throws ThingsboardException { + return handleRuleEngineRequest(null, null, null, DEFAULT_TIMEOUT, requestBody); + } + + @ApiOperation(value = "Push entity message to the rule engine (handleRuleEngineRequest)", + notes = MSG_DESCRIPTION_PREFIX + + "Uses specified Entity Id as the Rule Engine message originator. " + + MSG_DESCRIPTION + + "The default timeout of the request processing is 10 seconds." + + "\n\n" + ControllerConstants.SECURITY_WRITE_CHECK) + @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')") + @RequestMapping(value = "/{entityType}/{entityId}", method = RequestMethod.POST) + @ResponseBody + public DeferredResult handleRuleEngineRequest( + @Parameter(description = ENTITY_TYPE_PARAM_DESCRIPTION, required = true) + @PathVariable("entityType") String entityType, + @Parameter(description = ENTITY_ID_PARAM_DESCRIPTION, required = true) + @PathVariable("entityId") String entityIdStr, + @Parameter(description = "A JSON value representing the message.", required = true) + @RequestBody String requestBody) throws ThingsboardException { + return handleRuleEngineRequest(entityType, entityIdStr, null, DEFAULT_TIMEOUT, requestBody); + } + + @ApiOperation(value = "Push entity message with timeout to the rule engine (handleRuleEngineRequest)", + notes = MSG_DESCRIPTION_PREFIX + + "Uses specified Entity Id as the Rule Engine message originator. " + + MSG_DESCRIPTION + + "The platform expects the timeout value in milliseconds." + + "\n\n" + ControllerConstants.SECURITY_WRITE_CHECK) + @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')") + @RequestMapping(value = "/{entityType}/{entityId}/{timeout}", method = RequestMethod.POST) + @ResponseBody + public DeferredResult handleRuleEngineRequest( + @Parameter(description = ENTITY_TYPE_PARAM_DESCRIPTION, required = true) + @PathVariable("entityType") String entityType, + @Parameter(description = ENTITY_ID_PARAM_DESCRIPTION, required = true) + @PathVariable("entityId") String entityIdStr, + @Parameter(description = "Timeout to process the request in milliseconds", required = true) + @PathVariable("timeout") int timeout, + @Parameter(description = "A JSON value representing the message.", required = true) + @RequestBody String requestBody) throws ThingsboardException { + return handleRuleEngineRequest(entityType, entityIdStr, null, timeout, requestBody); + } + + @ApiOperation(value = "Push entity message with timeout and specified queue to the rule engine (handleRuleEngineRequest)", + notes = MSG_DESCRIPTION_PREFIX + + "Uses specified Entity Id as the Rule Engine message originator. " + + MSG_DESCRIPTION + + "If request sent for Device/Device Profile or Asset/Asset Profile entity, specified queue will be used instead of the queue selected in the device or asset profile. " + + "The platform expects the timeout value in milliseconds." + + "\n\n" + ControllerConstants.SECURITY_WRITE_CHECK) + @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')") + @RequestMapping(value = "/{entityType}/{entityId}/{queueName}/{timeout}", method = RequestMethod.POST) + @ResponseBody + public DeferredResult handleRuleEngineRequest( + @Parameter(description = ENTITY_TYPE_PARAM_DESCRIPTION, required = true) + @PathVariable("entityType") String entityType, + @Parameter(description = ENTITY_ID_PARAM_DESCRIPTION, required = true) + @PathVariable("entityId") String entityIdStr, + @Parameter(description = "Queue name to process the request in the rule engine", required = true) + @PathVariable("queueName") String queueName, + @Parameter(description = "Timeout to process the request in milliseconds", required = true) + @PathVariable("timeout") int timeout, + @Parameter(description = "A JSON value representing the message.", required = true) + @RequestBody String requestBody) throws ThingsboardException { + try { + SecurityUser currentUser = getCurrentUser(); + EntityId entityId; + if (StringUtils.isEmpty(entityType) || StringUtils.isEmpty(entityIdStr)) { + entityId = currentUser.getId(); + } else { + entityId = EntityIdFactory.getByTypeAndId(entityType, entityIdStr); + } + //Check that this is a valid JSON + JacksonUtil.toJsonNode(requestBody); + final DeferredResult response = new DeferredResult<>(); + accessValidator.validate(currentUser, Operation.WRITE, entityId, new HttpValidationCallback(response, new FutureCallback>() { + @Override + public void onSuccess(@Nullable DeferredResult result) { + long expTime = System.currentTimeMillis() + timeout; + HashMap metaData = new HashMap<>(); + UUID requestId = UUID.randomUUID(); + metaData.put("serviceId", serviceInfoProvider.getServiceId()); + metaData.put("requestUUID", requestId.toString()); + metaData.put("expirationTime", Long.toString(expTime)); + TbMsg msg = TbMsg.newMsg(queueName, TbMsgType.REST_API_REQUEST, entityId, currentUser.getCustomerId(), new TbMsgMetaData(metaData), requestBody); + ruleEngineCallService.processRestApiCallToRuleEngine(currentUser.getTenantId(), requestId, msg, queueName != null, + reply -> reply(new LocalRequestMetaData(msg, currentUser, result), reply)); + } + + @Override + public void onFailure(Throwable e) { + ResponseEntity entity; + if (e instanceof ToErrorResponseEntity) { + entity = ((ToErrorResponseEntity) e).toErrorResponseEntity(); + } else { + entity = new ResponseEntity(HttpStatus.UNAUTHORIZED); + } + logRuleEngineCall(currentUser, entityId, requestBody, null, e); + response.setResult(entity); + } + })); + return response; + } catch (IllegalArgumentException iae) { + throw new ThingsboardException("Invalid request body", iae, ThingsboardErrorCode.BAD_REQUEST_PARAMS); + } + } + + private void reply(LocalRequestMetaData rpcRequest, TbMsg response) { + DeferredResult responseWriter = rpcRequest.responseWriter(); + if (response == null) { + logRuleEngineCall(rpcRequest, null, new TimeoutException("Processing timeout detected!")); + responseWriter.setResult(new ResponseEntity<>(HttpStatus.REQUEST_TIMEOUT)); + } else { + String responseData = response.getData(); + if (!StringUtils.isEmpty(responseData)) { + try { + logRuleEngineCall(rpcRequest, response, null); + responseWriter.setResult(new ResponseEntity<>(JacksonUtil.toJsonNode(responseData), HttpStatus.OK)); + } catch (IllegalArgumentException e) { + log.debug("Failed to decode device response: {}", responseData, e); + logRuleEngineCall(rpcRequest, response, e); + responseWriter.setResult(new ResponseEntity<>(HttpStatus.NOT_ACCEPTABLE)); + } + } else { + logRuleEngineCall(rpcRequest, response, null); + responseWriter.setResult(new ResponseEntity<>(HttpStatus.OK)); + } + } + } + + private void logRuleEngineCall(LocalRequestMetaData rpcRequest, TbMsg response, Throwable e) { + logRuleEngineCall(rpcRequest.user(), rpcRequest.request().getOriginator(), rpcRequest.request().getData(), response, e); + } + + private void logRuleEngineCall(SecurityUser user, EntityId entityId, String request, TbMsg response, Throwable e) { + auditLogService.logEntityAction( + user.getTenantId(), + user.getCustomerId(), + user.getId(), + user.getName(), + entityId, + null, + ActionType.REST_API_RULE_ENGINE_CALL, + BaseController.toException(e), + request, + response != null ? response.getData() : ""); + } + + private record LocalRequestMetaData(TbMsg request, SecurityUser user, DeferredResult responseWriter) {} +} diff --git a/application/src/main/java/org/thingsboard/server/controller/TbUrlConstants.java b/application/src/main/java/org/thingsboard/server/controller/TbUrlConstants.java index 99d94522ac..74575fd9d5 100644 --- a/application/src/main/java/org/thingsboard/server/controller/TbUrlConstants.java +++ b/application/src/main/java/org/thingsboard/server/controller/TbUrlConstants.java @@ -22,4 +22,5 @@ public class TbUrlConstants { public static final String TELEMETRY_URL_PREFIX = "/api/plugins/telemetry"; public static final String RPC_V1_URL_PREFIX = "/api/plugins/rpc"; public static final String RPC_V2_URL_PREFIX = "/api/rpc"; + public static final String RULE_ENGINE_URL_PREFIX = "/api/rule-engine/"; } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java index cfc592a206..70a39f0851 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java @@ -191,6 +191,14 @@ public class DefaultTbClusterService implements TbClusterService { toCoreNfs.incrementAndGet(); } + @Override + public void pushNotificationToCore(String targetServiceId, TransportProtos.RestApiCallResponseMsgProto responseMsgProto, TbQueueCallback callback) { + TopicPartitionInfo tpi = topicService.getNotificationsTopic(ServiceType.TB_CORE, targetServiceId); + ToCoreNotificationMsg msg = ToCoreNotificationMsg.newBuilder().setRestApiCallResponseMsg(responseMsgProto).build(); + producerProvider.getTbCoreNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), msg), callback); + toCoreNfs.incrementAndGet(); + } + @Override public void pushMsgToRuleEngine(TopicPartitionInfo tpi, UUID msgId, ToRuleEngineMsg msg, TbQueueCallback callback) { log.trace("PUSHING msg: {} to:{}", msg, tpi); @@ -200,6 +208,11 @@ public class DefaultTbClusterService implements TbClusterService { @Override public void pushMsgToRuleEngine(TenantId tenantId, EntityId entityId, TbMsg tbMsg, TbQueueCallback callback) { + pushMsgToRuleEngine(tenantId, entityId, tbMsg, false, callback); + } + + @Override + public void pushMsgToRuleEngine(TenantId tenantId, EntityId entityId, TbMsg tbMsg, boolean useQueueFromTbMsg, TbQueueCallback callback) { if (tenantId == null || tenantId.isNullUid()) { if (entityId.getEntityType().equals(EntityType.TENANT)) { tenantId = TenantId.fromUUID(entityId.getId()); @@ -209,9 +222,8 @@ public class DefaultTbClusterService implements TbClusterService { } } else { HasRuleEngineProfile ruleEngineProfile = getRuleEngineProfileForEntityOrElseNull(tenantId, entityId, tbMsg); - tbMsg = transformMsg(tbMsg, ruleEngineProfile); + tbMsg = transformMsg(tbMsg, ruleEngineProfile, useQueueFromTbMsg); } - ruleEngineProducerService.sendToRuleEngine(producerProvider.getRuleEngineMsgProducer(), tenantId, tbMsg, callback); toRuleEngineMsgs.incrementAndGet(); } @@ -255,10 +267,10 @@ public class DefaultTbClusterService implements TbClusterService { return null; } - private TbMsg transformMsg(TbMsg tbMsg, HasRuleEngineProfile ruleEngineProfile) { + private TbMsg transformMsg(TbMsg tbMsg, HasRuleEngineProfile ruleEngineProfile, boolean useQueueFromTbMsg) { if (ruleEngineProfile != null) { RuleChainId targetRuleChainId = ruleEngineProfile.getDefaultRuleChainId(); - String targetQueueName = ruleEngineProfile.getDefaultQueueName(); + String targetQueueName = useQueueFromTbMsg ? tbMsg.getQueueName() : ruleEngineProfile.getDefaultQueueName(); boolean isRuleChainTransform = targetRuleChainId != null && !targetRuleChainId.equals(tbMsg.getRuleChainId()); boolean isQueueTransform = targetQueueName != null && !targetQueueName.equals(tbMsg.getQueueName()); diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java index ae8ea8f7f4..1dc42218fb 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java @@ -96,6 +96,7 @@ import org.thingsboard.server.service.queue.processing.AbstractConsumerService; import org.thingsboard.server.service.queue.processing.IdMsgPair; import org.thingsboard.server.service.resource.TbImageService; import org.thingsboard.server.service.rpc.TbCoreDeviceRpcService; +import org.thingsboard.server.service.ruleengine.RuleEngineCallService; import org.thingsboard.server.service.security.auth.jwt.settings.JwtSettingsService; import org.thingsboard.server.service.state.DeviceStateService; import org.thingsboard.server.service.subscription.SubscriptionManagerService; @@ -149,6 +150,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService, CoreQueueConfig> mainConsumer; @@ -177,7 +179,8 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService> requests = new ConcurrentHashMap<>(); + + public DefaultRuleEngineCallService(TbClusterService clusterService) { + this.clusterService = clusterService; + } + + @PostConstruct + public void initExecutor() { + executor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("re-rest-callback")); + } + + @PreDestroy + public void shutdownExecutor() { + if (executor != null) { + executor.shutdownNow(); + } + } + + @Override + public void processRestApiCallToRuleEngine(TenantId tenantId, UUID requestId, TbMsg request, boolean useQueueFromTbMsg, Consumer responseConsumer) { + log.trace("[{}] Processing REST API call to rule engine: [{}] for entity: [{}]", tenantId, requestId, request.getOriginator()); + requests.put(requestId, responseConsumer); + sendRequestToRuleEngine(tenantId, request, useQueueFromTbMsg); + scheduleTimeout(request, requestId, requests); + } + + @Override + public void onQueueMsg(TransportProtos.RestApiCallResponseMsgProto restApiCallResponseMsg, TbCallback callback) { + UUID requestId = new UUID(restApiCallResponseMsg.getRequestIdMSB(), restApiCallResponseMsg.getRequestIdLSB()); + Consumer consumer = requests.remove(requestId); + if (consumer != null) { + consumer.accept(TbMsg.fromBytes(null, restApiCallResponseMsg.getResponse().toByteArray(), TbMsgCallback.EMPTY)); + } else { + log.trace("[{}] Unknown or stale rest api call response received", requestId); + } + callback.onSuccess(); + } + + private void sendRequestToRuleEngine(TenantId tenantId, TbMsg msg, boolean useQueueFromTbMsg) { + clusterService.pushMsgToRuleEngine(tenantId, msg.getOriginator(), msg, useQueueFromTbMsg, null); + } + + private void scheduleTimeout(TbMsg request, UUID requestId, ConcurrentMap> requestsMap) { + long expirationTime = Long.parseLong(request.getMetaData().getValue("expirationTime")); + long timeout = Math.max(0, expirationTime - System.currentTimeMillis()); + log.trace("[{}] processing the request: [{}]", this.hashCode(), requestId); + executor.schedule(() -> { + Consumer consumer = requestsMap.remove(requestId); + if (consumer != null) { + log.trace("[{}] request timeout detected: [{}]", this.hashCode(), requestId); + consumer.accept(null); + } + }, timeout, TimeUnit.MILLISECONDS); + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/ruleengine/RuleEngineCallService.java b/application/src/main/java/org/thingsboard/server/service/ruleengine/RuleEngineCallService.java new file mode 100644 index 0000000000..cdaa90398a --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/ruleengine/RuleEngineCallService.java @@ -0,0 +1,31 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.ruleengine; + +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.queue.TbCallback; +import org.thingsboard.server.gen.transport.TransportProtos; + +import java.util.UUID; +import java.util.function.Consumer; + +public interface RuleEngineCallService { + + void processRestApiCallToRuleEngine(TenantId tenantId, UUID requestId, TbMsg request, boolean useQueueFromTbMsg, Consumer responseConsumer); + + void onQueueMsg(TransportProtos.RestApiCallResponseMsgProto restApiCallResponseMsg, TbCallback callback); +} diff --git a/application/src/test/java/org/thingsboard/server/controller/AbstractNotifyEntityTest.java b/application/src/test/java/org/thingsboard/server/controller/AbstractNotifyEntityTest.java index 35c2ea65fa..d549407c44 100644 --- a/application/src/test/java/org/thingsboard/server/controller/AbstractNotifyEntityTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/AbstractNotifyEntityTest.java @@ -386,6 +386,15 @@ public abstract class AbstractNotifyEntityTest extends AbstractWebTest { actionType, cntTime, extractMatcherAdditionalInfo(additionalInfo)); } + protected void testLogEntityActionError(EntityId originatorId, TenantId tenantId, + CustomerId customerId, UserId userId, String userName, + ActionType actionType, Exception exception, Object... additionalInfo) { + ArgumentMatcher matcherError = argument -> argument.getMessage().contains(exception.getMessage()) + & argument.getClass().equals(exception.getClass()); + testLogEntityActionErrorAdditionalInfo(Objects::isNull, originatorId, tenantId, customerId, userId, userName, + actionType, 1, matcherError, extractMatcherAdditionalInfo(additionalInfo)); + } + private void testLogEntityActionAdditionalInfo(ArgumentMatcher matcherEntity, ArgumentMatcher matcherOriginatorId, TenantId tenantId, ArgumentMatcher matcherCustomerId, ArgumentMatcher matcherUserId, String userName, ActionType actionType, @@ -529,8 +538,9 @@ public abstract class AbstractNotifyEntityTest extends AbstractWebTest { Mockito.argThat(matcherEntity), Mockito.eq(actionType), Mockito.argThat(matcherError), - Mockito.argThat(Mockito.eq(matcherAdditionalInfos.get(0))), - Mockito.argThat(Mockito.eq(matcherAdditionalInfos.get(1)))); + Mockito.argThat(matcherAdditionalInfos.get(0)), + Mockito.argThat(matcherAdditionalInfos.get(1))); + break; case 3: Mockito.verify(auditLogService, times(cntTime)) .logEntityAction(Mockito.eq(tenantId), @@ -541,9 +551,9 @@ public abstract class AbstractNotifyEntityTest extends AbstractWebTest { Mockito.argThat(matcherEntity), Mockito.eq(actionType), Mockito.argThat(matcherError), - Mockito.argThat(Mockito.eq(matcherAdditionalInfos.get(0))), - Mockito.argThat(Mockito.eq(matcherAdditionalInfos.get(1))), - Mockito.argThat(Mockito.eq(matcherAdditionalInfos.get(2)))); + Mockito.argThat(matcherAdditionalInfos.get(0)), + Mockito.argThat(matcherAdditionalInfos.get(1)), + Mockito.argThat(matcherAdditionalInfos.get(2))); break; default: Mockito.verify(auditLogService, times(cntTime)) diff --git a/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java b/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java index 8304d0dd1e..b7cc42c7ac 100644 --- a/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java @@ -212,6 +212,7 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest { protected UserId differentCustomerUserId; protected UserId differentTenantCustomerUserId; + protected UserId currentUserId; @SuppressWarnings("rawtypes") private HttpMessageConverter mappingJackson2HttpMessageConverter; @@ -567,6 +568,8 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest { Claims claims = jwsClaims.getPayload(); String subject = claims.getSubject(); Assert.assertEquals(username, subject); + String userId = claims.get("userId", String.class); + this.currentUserId = UserId.fromString(userId); } protected void resetTokens() throws Exception { @@ -633,6 +636,11 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest { return doPost("/api/device?accessToken=" + accessToken, device, Device.class); } + protected Device assignDeviceToCustomer(DeviceId deviceId, CustomerId customerId) { + String deviceIdStr = String.valueOf(deviceId.getId()); + return doPost("/api/customer/" + customerId.getId() + "/device/" + deviceIdStr, Device.class); + } + protected MqttDeviceProfileTransportConfiguration createMqttDeviceProfileTransportConfiguration(TransportPayloadTypeConfiguration transportPayloadTypeConfiguration, boolean sendAckOnValidationException) { MqttDeviceProfileTransportConfiguration mqttDeviceProfileTransportConfiguration = new MqttDeviceProfileTransportConfiguration(); mqttDeviceProfileTransportConfiguration.setDeviceTelemetryTopic(MqttTopics.DEVICE_TELEMETRY_TOPIC); @@ -803,6 +811,10 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest { return readResponse(doPost(urlTemplate, content, params).andExpect(resultMatcher), responseType); } + protected R doPostAsyncWithTypedResponse(String urlTemplate, T content, TypeReference responseType, ResultMatcher resultMatcher, String... params) throws Exception { + return readResponse(doPostAsync(urlTemplate, content, DEFAULT_TIMEOUT, params).andExpect(resultMatcher), responseType); + } + protected R doPostAsync(String urlTemplate, T content, Class responseClass, ResultMatcher resultMatcher, String... params) throws Exception { return readResponse(doPostAsync(urlTemplate, content, DEFAULT_TIMEOUT, params).andExpect(resultMatcher), responseClass); } diff --git a/application/src/test/java/org/thingsboard/server/controller/RuleEngineControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/RuleEngineControllerTest.java new file mode 100644 index 0000000000..9c9322dc38 --- /dev/null +++ b/application/src/test/java/org/thingsboard/server/controller/RuleEngineControllerTest.java @@ -0,0 +1,249 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.controller; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.springframework.boot.test.mock.mockito.SpyBean; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.server.common.data.DataConstants; +import org.thingsboard.server.common.data.Device; +import org.thingsboard.server.common.data.audit.ActionType; +import org.thingsboard.server.common.data.id.CustomerId; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.msg.TbMsgType; +import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.TbMsgMetaData; +import org.thingsboard.server.dao.service.DaoSqlTest; +import org.thingsboard.server.service.ruleengine.RuleEngineCallService; + +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; + +@DaoSqlTest +public class RuleEngineControllerTest extends AbstractControllerTest { + + private final String REQUEST_BODY = "{\"request\":\"download\"}"; + private final String RESPONSE_BODY = "{\"response\":\"downloadOk\"}"; + + @SpyBean + private RuleEngineCallService ruleEngineCallService; + + @Test + public void testHandleRuleEngineRequestWithMsgOriginatorUser() throws Exception { + loginSysAdmin(); + TbMsg responseMsg = TbMsg.newMsg(TbMsgType.REST_API_REQUEST, currentUserId, TbMsgMetaData.EMPTY, RESPONSE_BODY); + mockRestApiCallToRuleEngine(responseMsg); + + JsonNode apiResponse = doPostAsyncWithTypedResponse("/api/rule-engine/", REQUEST_BODY, new TypeReference<>() { + }, status().isOk()); + + assertThat(JacksonUtil.toString(apiResponse)).isEqualTo(RESPONSE_BODY); + ArgumentCaptor requestMsgCaptor = ArgumentCaptor.forClass(TbMsg.class); + verify(ruleEngineCallService).processRestApiCallToRuleEngine(eq(TenantId.SYS_TENANT_ID), any(UUID.class), requestMsgCaptor.capture(), eq(false), any(Consumer.class)); + TbMsg requestMsgCaptorValue = requestMsgCaptor.getValue(); + assertThat(requestMsgCaptorValue.getData()).isEqualTo(REQUEST_BODY); + assertThat(requestMsgCaptorValue.getType()).isEqualTo(TbMsgType.REST_API_REQUEST.name()); + assertThat(requestMsgCaptorValue.getOriginator()).isEqualTo(currentUserId); + assertThat(requestMsgCaptorValue.getCustomerId()).isNull(); + checkMetadataProperties(requestMsgCaptorValue.getMetaData()); + testLogEntityAction(null, currentUserId, TenantId.SYS_TENANT_ID, new CustomerId(EntityId.NULL_UUID), currentUserId, + SYS_ADMIN_EMAIL, ActionType.REST_API_RULE_ENGINE_CALL, 1, REQUEST_BODY, RESPONSE_BODY); + } + + @Test + public void testHandleRuleEngineRequestWithMsgOriginatorDevice() throws Exception { + loginTenantAdmin(); + Device device = createDevice("Test", "123"); + DeviceId deviceId = device.getId(); + TbMsg responseMsg = TbMsg.newMsg(TbMsgType.REST_API_REQUEST, deviceId, TbMsgMetaData.EMPTY, RESPONSE_BODY); + mockRestApiCallToRuleEngine(responseMsg); + + JsonNode apiResponse = doPostAsyncWithTypedResponse("/api/rule-engine/DEVICE/" + deviceId.getId(), REQUEST_BODY, new TypeReference<>() { + }, status().isOk()); + + assertThat(JacksonUtil.toString(apiResponse)).isEqualTo(RESPONSE_BODY); + ArgumentCaptor requestMsgCaptor = ArgumentCaptor.forClass(TbMsg.class); + verify(ruleEngineCallService).processRestApiCallToRuleEngine(eq(tenantId), any(UUID.class), requestMsgCaptor.capture(), eq(false), any(Consumer.class)); + TbMsg requestMsgCaptorValue = requestMsgCaptor.getValue(); + assertThat(requestMsgCaptorValue.getData()).isEqualTo(REQUEST_BODY); + assertThat(requestMsgCaptorValue.getType()).isEqualTo(TbMsgType.REST_API_REQUEST.name()); + assertThat(requestMsgCaptorValue.getOriginator()).isEqualTo(deviceId); + assertThat(requestMsgCaptorValue.getCustomerId()).isNull(); + checkMetadataProperties(requestMsgCaptorValue.getMetaData()); + testLogEntityAction(null, deviceId, tenantId, new CustomerId(EntityId.NULL_UUID), tenantAdminUserId, + TENANT_ADMIN_EMAIL, ActionType.REST_API_RULE_ENGINE_CALL, 1, REQUEST_BODY, RESPONSE_BODY); + } + + @Test + public void testHandleRuleEngineRequestWithMsgOriginatorDeviceAndSpecifiedTimeout() throws Exception { + loginTenantAdmin(); + Device device = createDevice("Test", "123"); + DeviceId deviceId = device.getId(); + TbMsg responseMsg = TbMsg.newMsg(TbMsgType.REST_API_REQUEST, deviceId, TbMsgMetaData.EMPTY, RESPONSE_BODY); + mockRestApiCallToRuleEngine(responseMsg); + + JsonNode apiResponse = doPostAsyncWithTypedResponse("/api/rule-engine/DEVICE/" + deviceId.getId() + "/15000", REQUEST_BODY, new TypeReference<>() { + }, status().isOk()); + + assertThat(JacksonUtil.toString(apiResponse)).isEqualTo(RESPONSE_BODY); + ArgumentCaptor requestMsgCaptor = ArgumentCaptor.forClass(TbMsg.class); + verify(ruleEngineCallService).processRestApiCallToRuleEngine(eq(tenantId), any(UUID.class), requestMsgCaptor.capture(), eq(false), any(Consumer.class)); + TbMsg requestMsgCaptorValue = requestMsgCaptor.getValue(); + assertThat(requestMsgCaptorValue.getData()).isEqualTo(REQUEST_BODY); + assertThat(requestMsgCaptorValue.getType()).isEqualTo(TbMsgType.REST_API_REQUEST.name()); + assertThat(requestMsgCaptorValue.getOriginator()).isEqualTo(deviceId); + assertThat(requestMsgCaptorValue.getCustomerId()).isNull(); + checkMetadataProperties(requestMsgCaptorValue.getMetaData()); + testLogEntityAction(null, deviceId, tenantId, new CustomerId(EntityId.NULL_UUID), tenantAdminUserId, + TENANT_ADMIN_EMAIL, ActionType.REST_API_RULE_ENGINE_CALL, 1, REQUEST_BODY, RESPONSE_BODY); + } + + @Test + public void testHandleRuleEngineRequestWithMsgOriginatorDeviceAndResponseIsNull() throws Exception { + loginTenantAdmin(); + Device device = createDevice("Test", "123"); + DeviceId deviceId = device.getId(); + mockRestApiCallToRuleEngine(null); + + doPostAsync("/api/rule-engine/DEVICE/" + deviceId.getId() + "/15000", REQUEST_BODY, String.class, status().isRequestTimeout()); + + ArgumentCaptor requestMsgCaptor = ArgumentCaptor.forClass(TbMsg.class); + verify(ruleEngineCallService).processRestApiCallToRuleEngine(eq(tenantId), any(UUID.class), requestMsgCaptor.capture(), eq(false), any(Consumer.class)); + TbMsg requestMsgCaptorValue = requestMsgCaptor.getValue(); + assertThat(requestMsgCaptorValue.getData()).isEqualTo(REQUEST_BODY); + assertThat(requestMsgCaptorValue.getType()).isEqualTo(TbMsgType.REST_API_REQUEST.name()); + assertThat(requestMsgCaptorValue.getOriginator()).isEqualTo(deviceId); + assertThat(requestMsgCaptorValue.getCustomerId()).isNull(); + checkMetadataProperties(requestMsgCaptorValue.getMetaData()); + Exception exception = new TimeoutException("Processing timeout detected!"); + testLogEntityActionError(deviceId, tenantId, new CustomerId(EntityId.NULL_UUID), tenantAdminUserId, + TENANT_ADMIN_EMAIL, ActionType.REST_API_RULE_ENGINE_CALL, exception, REQUEST_BODY, ""); + } + + @Test + public void testHandleRuleEngineRequestWithMsgOriginatorDeviceAndSpecifiedQueue() throws Exception { + loginTenantAdmin(); + Device device = createDevice("Test", "123"); + DeviceId deviceId = device.getId(); + TbMsg responseMsg = TbMsg.newMsg(DataConstants.HP_QUEUE_NAME, TbMsgType.REST_API_REQUEST, deviceId, TbMsgMetaData.EMPTY, RESPONSE_BODY); + mockRestApiCallToRuleEngine(responseMsg); + + JsonNode apiResponse = doPostAsyncWithTypedResponse("/api/rule-engine/DEVICE/" + deviceId.getId() + "/HighPriority/1000", REQUEST_BODY, new TypeReference<>() { + }, status().isOk()); + + assertThat(JacksonUtil.toString(apiResponse)).isEqualTo(RESPONSE_BODY); + ArgumentCaptor requestMsgCaptor = ArgumentCaptor.forClass(TbMsg.class); + verify(ruleEngineCallService).processRestApiCallToRuleEngine(eq(tenantId), any(UUID.class), requestMsgCaptor.capture(), eq(true), any(Consumer.class)); + TbMsg requestMsgCaptorValue = requestMsgCaptor.getValue(); + assertThat(requestMsgCaptorValue.getData()).isEqualTo(REQUEST_BODY); + assertThat(requestMsgCaptorValue.getType()).isEqualTo(TbMsgType.REST_API_REQUEST.name()); + assertThat(requestMsgCaptorValue.getQueueName()).isEqualTo(DataConstants.HP_QUEUE_NAME); + assertThat(requestMsgCaptorValue.getOriginator()).isEqualTo(deviceId); + assertThat(requestMsgCaptorValue.getCustomerId()).isNull(); + checkMetadataProperties(requestMsgCaptorValue.getMetaData()); + testLogEntityAction(null, deviceId, tenantId, new CustomerId(EntityId.NULL_UUID), tenantAdminUserId, + TENANT_ADMIN_EMAIL, ActionType.REST_API_RULE_ENGINE_CALL, 1, REQUEST_BODY, RESPONSE_BODY); + } + + @Test + public void testHandleRuleEngineRequestWithInvalidRequestBody() throws Exception { + loginSysAdmin(); + + doPost("/api/rule-engine/", (Object) "@") + .andExpect(status().isBadRequest()) + .andExpect(statusReason(containsString("Invalid request body"))); + + verifyNoInteractions(ruleEngineCallService); + } + + @Test + public void testHandleRuleEngineRequestWithAuthorityCustomerUser() throws Exception { + loginTenantAdmin(); + Device device = createDevice("Test", "123"); + DeviceId deviceId = device.getId(); + assignDeviceToCustomer(deviceId, customerId); + loginCustomerUser(); + + TbMsg responseMsg = TbMsg.newMsg(TbMsgType.REST_API_REQUEST, deviceId, customerId, TbMsgMetaData.EMPTY, RESPONSE_BODY); + mockRestApiCallToRuleEngine(responseMsg); + + JsonNode apiResponse = doPostAsyncWithTypedResponse("/api/rule-engine/DEVICE/" + deviceId.getId(), REQUEST_BODY, new TypeReference<>() { + }, status().isOk()); + + assertThat(JacksonUtil.toString(apiResponse)).isEqualTo(RESPONSE_BODY); + ArgumentCaptor requestMsgCaptor = ArgumentCaptor.forClass(TbMsg.class); + verify(ruleEngineCallService).processRestApiCallToRuleEngine(eq(tenantId), any(UUID.class), requestMsgCaptor.capture(), eq(false), any(Consumer.class)); + TbMsg requestMsgCaptorValue = requestMsgCaptor.getValue(); + assertThat(requestMsgCaptorValue.getData()).isEqualTo(REQUEST_BODY); + assertThat(requestMsgCaptorValue.getType()).isEqualTo(TbMsgType.REST_API_REQUEST.name()); + assertThat(requestMsgCaptorValue.getOriginator()).isEqualTo(deviceId); + assertThat(requestMsgCaptorValue.getCustomerId()).isEqualTo(customerId); + checkMetadataProperties(requestMsgCaptorValue.getMetaData()); + testLogEntityAction(null, deviceId, tenantId, customerId, customerUserId, CUSTOMER_USER_EMAIL, + ActionType.REST_API_RULE_ENGINE_CALL, 1, REQUEST_BODY, RESPONSE_BODY); + } + + @Test + public void testHandleRuleEngineRequestWithoutPermission() throws Exception { + loginTenantAdmin(); + Device device = createDevice("test", "123"); + loginCustomerUser(); + + doPostAsync("/api/rule-engine/DEVICE/" + device.getId().getId(), (Object) REQUEST_BODY, -1L) + .andExpect(status().isForbidden()) + .andExpect(content().string("You don't have permission to perform this operation!")); + + verifyNoInteractions(ruleEngineCallService); + } + + @Test + public void testHandleRuleEngineRequestUnauthorized() throws Exception { + doPost("/api/rule-engine/", (Object) REQUEST_BODY) + .andExpect(status().isUnauthorized()) + .andExpect(statusReason(containsString("Authentication failed"))); + } + + private void mockRestApiCallToRuleEngine(TbMsg responseMsg) { + doAnswer(invocation -> { + Consumer consumer = invocation.getArgument(4); + consumer.accept(responseMsg); + return null; + }).when(ruleEngineCallService).processRestApiCallToRuleEngine(any(TenantId.class), any(UUID.class), any(TbMsg.class), anyBoolean(), any(Consumer.class)); + } + + public void checkMetadataProperties(TbMsgMetaData metaData) { + Map data = metaData.getData(); + assertThat(data).containsKeys("serviceId", "requestUUID", "expirationTime"); + } +} diff --git a/application/src/test/java/org/thingsboard/server/service/queue/DefaultTbClusterServiceTest.java b/application/src/test/java/org/thingsboard/server/service/queue/DefaultTbClusterServiceTest.java index 78cca2de62..59cca779f5 100644 --- a/application/src/test/java/org/thingsboard/server/service/queue/DefaultTbClusterServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/queue/DefaultTbClusterServiceTest.java @@ -19,6 +19,7 @@ import com.google.common.collect.Sets; import lombok.extern.slf4j.Slf4j; import org.junit.Test; import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; import org.springframework.boot.test.mock.mockito.MockBean; import org.springframework.boot.test.mock.mockito.SpyBean; import org.springframework.test.context.ContextConfiguration; @@ -27,6 +28,7 @@ import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.Device; +import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.asset.Asset; import org.thingsboard.server.common.data.id.AssetId; import org.thingsboard.server.common.data.id.AssetProfileId; @@ -37,10 +39,14 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.data.queue.Queue; import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.common.msg.queue.ServiceType; +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.dao.edge.EdgeService; import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.queue.TbQueueCallback; import org.thingsboard.server.queue.TbQueueProducer; +import org.thingsboard.server.queue.common.DefaultTbQueueMsgHeaders; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.common.TbRuleEngineProducerService; import org.thingsboard.server.queue.discovery.PartitionService; @@ -53,13 +59,16 @@ import org.thingsboard.server.service.profile.TbDeviceProfileCache; import java.util.List; import java.util.UUID; +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isNull; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; @Slf4j @@ -249,6 +258,99 @@ public class DefaultTbClusterServiceTest { .send(eq(topicService.getNotificationsTopic(ServiceType.TB_TRANSPORT, monolith2)), any(TbProtoQueueMsg.class), isNull()); } + @Test + public void testPushNotificationToCoreWithRestApiCallResponseMsgProto() { + TopicPartitionInfo tpi = mock(TopicPartitionInfo.class); + TbQueueCallback callbackMock = mock(TbQueueCallback.class); + TbQueueProducer> tbCoreQueueProducer = mock(TbQueueProducer.class); + + doReturn(tpi).when(topicService).getNotificationsTopic(any(ServiceType.class), any(String.class)); + when(producerProvider.getTbCoreNotificationsMsgProducer()).thenReturn(tbCoreQueueProducer); + TransportProtos.RestApiCallResponseMsgProto responseMsgProto = TransportProtos.RestApiCallResponseMsgProto.getDefaultInstance(); + TransportProtos.ToCoreNotificationMsg toCoreNotificationMsg = TransportProtos.ToCoreNotificationMsg.newBuilder().setRestApiCallResponseMsg(responseMsgProto).build(); + + clusterService.pushNotificationToCore(CORE, responseMsgProto, callbackMock); + + verify(topicService).getNotificationsTopic(ServiceType.TB_CORE, CORE); + verify(producerProvider).getTbCoreNotificationsMsgProducer(); + ArgumentCaptor> protoQueueMsgArgumentCaptor = ArgumentCaptor.forClass(TbProtoQueueMsg.class); + verify(tbCoreQueueProducer).send(eq(tpi), protoQueueMsgArgumentCaptor.capture(), eq(callbackMock)); + TbProtoQueueMsg protoQueueMsgArgumentCaptorValue = protoQueueMsgArgumentCaptor.getValue(); + assertThat(protoQueueMsgArgumentCaptorValue.getKey()).isNotNull(); + assertThat(protoQueueMsgArgumentCaptorValue.getValue()).isEqualTo(toCoreNotificationMsg); + assertThat(protoQueueMsgArgumentCaptorValue.getHeaders().getData()).isEqualTo(new DefaultTbQueueMsgHeaders().getData()); + } + + @Test + public void testPushMsgToRuleEngineWithTenantIdIsNullUuidAndEntityIsTenantUseQueueFromMsgIsTrue() { + TbQueueProducer> tbREQueueProducer = mock(TbQueueProducer.class); + TbQueueCallback callback = mock(TbQueueCallback.class); + + TenantId tenantId = TenantId.fromUUID(UUID.fromString("3c8bd350-1239-4a3b-b9c3-4dd76f8e20f1")); + TbMsg requestMsg = TbMsg.newMsg(DataConstants.HP_QUEUE_NAME, TbMsgType.REST_API_REQUEST, tenantId, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT); + + when(producerProvider.getRuleEngineMsgProducer()).thenReturn(tbREQueueProducer); + + clusterService.pushMsgToRuleEngine(TenantId.SYS_TENANT_ID, tenantId, requestMsg, true, callback); + + verify(producerProvider).getRuleEngineMsgProducer(); + verify(ruleEngineProducerService).sendToRuleEngine(tbREQueueProducer, tenantId, requestMsg, callback); + } + + @Test + public void testPushMsgToRuleEngineWithTenantIdIsNullUuidAndEntityIsDevice() { + TenantId tenantId = TenantId.SYS_TENANT_ID; + DeviceId deviceId = new DeviceId(UUID.fromString("aa6d112d-2914-4a22-a9e3-bee33edbdb14")); + TbMsg requestMsg = TbMsg.newMsg(TbMsgType.REST_API_REQUEST, deviceId, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT); + TbQueueCallback callback = mock(TbQueueCallback.class); + + clusterService.pushMsgToRuleEngine(tenantId, deviceId, requestMsg, false, callback); + + verifyNoMoreInteractions(partitionService, producerProvider); + } + + @Test + public void testPushMsgToRuleEngineWithTenantIdIsNotNullUuidUseQueueFromMsgIsTrue() { + TbQueueProducer> tbREQueueProducer = mock(TbQueueProducer.class); + TbQueueCallback callback = mock(TbQueueCallback.class); + + TenantId tenantId = TenantId.fromUUID(UUID.fromString("3c8bd350-1239-4a3b-b9c3-4dd76f8e20f1")); + DeviceId deviceId = new DeviceId(UUID.fromString("adbb9d41-3367-40fd-9e74-7dd7cc5d30cf")); + DeviceProfile deviceProfile = new DeviceProfile(new DeviceProfileId(UUID.fromString("552f5d6d-0b2b-43e1-a7d2-a51cb2a96927"))); + TbMsg requestMsg = TbMsg.newMsg(DataConstants.HP_QUEUE_NAME, TbMsgType.REST_API_REQUEST, deviceId, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT); + + when(deviceProfileCache.get(any(TenantId.class), any(DeviceId.class))).thenReturn(deviceProfile); + when(producerProvider.getRuleEngineMsgProducer()).thenReturn(tbREQueueProducer); + + clusterService.pushMsgToRuleEngine(tenantId, deviceId, requestMsg, true, callback); + + verify(producerProvider).getRuleEngineMsgProducer(); + verify(ruleEngineProducerService).sendToRuleEngine(tbREQueueProducer, tenantId, requestMsg, callback); + } + + @Test + public void testPushMsgToRuleEngineUseQueueFromMsgIsFalse() { + TbQueueProducer> tbREQueueProducer = mock(TbQueueProducer.class); + TbQueueCallback callback = mock(TbQueueCallback.class); + + TenantId tenantId = TenantId.fromUUID(UUID.fromString("5377c8d0-26e5-4d81-84c6-4344043973c8")); + DeviceId deviceId = new DeviceId(UUID.fromString("016c2abb-f46f-49f9-a83d-4d28b803cfe6")); + DeviceProfile deviceProfile = new DeviceProfile(new DeviceProfileId(UUID.fromString("dc5766e2-1a32-4022-859b-743050097ab7"))); + deviceProfile.setDefaultQueueName(DataConstants.MAIN_QUEUE_NAME); + TbMsg requestMsg = TbMsg.newMsg(TbMsgType.REST_API_REQUEST, deviceId, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT); + + when(deviceProfileCache.get(any(TenantId.class), any(DeviceId.class))).thenReturn(deviceProfile); + when(producerProvider.getRuleEngineMsgProducer()).thenReturn(tbREQueueProducer); + + clusterService.pushMsgToRuleEngine(tenantId, deviceId, requestMsg, false, callback); + + verify(producerProvider).getRuleEngineMsgProducer(); + TbMsg expectedMsg = TbMsg.transformMsgQueueName(requestMsg, DataConstants.MAIN_QUEUE_NAME); + ArgumentCaptor actualMsg = ArgumentCaptor.forClass(TbMsg.class); + verify(ruleEngineProducerService).sendToRuleEngine(eq(tbREQueueProducer), eq(tenantId), actualMsg.capture(), eq(callback)); + assertThat(actualMsg.getValue()).usingRecursiveComparison().ignoringFields("ctx").isEqualTo(expectedMsg); + } + protected Queue createTestQueue() { TenantId tenantId = TenantId.SYS_TENANT_ID; Queue queue = new Queue(new QueueId(UUID.randomUUID())); diff --git a/application/src/test/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerServiceTest.java b/application/src/test/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerServiceTest.java index 621d4c5a91..ca6e6728ff 100644 --- a/application/src/test/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerServiceTest.java @@ -29,6 +29,7 @@ import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.service.ruleengine.RuleEngineCallService; import org.thingsboard.server.service.state.DeviceStateService; import java.util.UUID; @@ -48,6 +49,8 @@ public class DefaultTbCoreConsumerServiceTest { private DeviceStateService stateServiceMock; @Mock private TbCoreConsumerStats statsMock; + @Mock + private RuleEngineCallService ruleEngineCallServiceMock; @Mock private TbCallback tbCallbackMock; @@ -529,4 +532,17 @@ public class DefaultTbCoreConsumerServiceTest { then(statsMock).should(never()).log(inactivityMsg); } + @Test + public void givenRestApiCallResponseMsgProto_whenForwardToRuleEngineCallService_thenCallOnQueueMsg() { + // GIVEN + ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "ruleEngineCallService", ruleEngineCallServiceMock); + var restApiCallResponseMsgProto = TransportProtos.RestApiCallResponseMsgProto.getDefaultInstance(); + doCallRealMethod().when(defaultTbCoreConsumerServiceMock).forwardToRuleEngineCallService(restApiCallResponseMsgProto, tbCallbackMock); + + // WHEN + defaultTbCoreConsumerServiceMock.forwardToRuleEngineCallService(restApiCallResponseMsgProto, tbCallbackMock); + + // THEN + then(ruleEngineCallServiceMock).should().onQueueMsg(restApiCallResponseMsgProto, tbCallbackMock); + } } diff --git a/application/src/test/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcServiceTest.java b/application/src/test/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcServiceTest.java new file mode 100644 index 0000000000..0ed53b9561 --- /dev/null +++ b/application/src/test/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcServiceTest.java @@ -0,0 +1,61 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.rpc; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.thingsboard.server.cluster.TbClusterService; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.msg.TbMsgType; +import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.TbMsgMetaData; +import org.thingsboard.server.gen.transport.TransportProtos; +import java.util.UUID; + +import static org.mockito.BDDMockito.then; + +@ExtendWith(MockitoExtension.class) +class DefaultTbRuleEngineRpcServiceTest { + + @Mock + private TbClusterService tbClusterServiceMock; + + @InjectMocks + private DefaultTbRuleEngineRpcService tbRuleEngineRpcService; + + @Test + public void givenTbMsg_whenSendRestApiCallReply_thenPushNotificationToCore() { + // GIVEN + String serviceId = "tb-core-0"; + UUID requestId = UUID.fromString("f64a20df-eb1e-46a3-ba6f-0b3ae053ee0a"); + DeviceId deviceId = new DeviceId(UUID.fromString("1d9f771a-7cdc-4ac7-838c-ba193d05a012")); + TbMsg msg = TbMsg.newMsg(TbMsgType.REST_API_REQUEST, deviceId, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT); + var restApiCallResponseMsgProto = TransportProtos.RestApiCallResponseMsgProto.newBuilder() + .setRequestIdMSB(requestId.getMostSignificantBits()) + .setRequestIdLSB(requestId.getLeastSignificantBits()) + .setResponse(TbMsg.toByteString(msg)) + .build(); + + // WHEN + tbRuleEngineRpcService.sendRestApiCallReply(serviceId, requestId, msg); + + // THEN + then(tbClusterServiceMock).should().pushNotificationToCore(serviceId, restApiCallResponseMsgProto, null); + } +} diff --git a/application/src/test/java/org/thingsboard/server/service/ruleengine/DefaultRuleEngineCallServiceTest.java b/application/src/test/java/org/thingsboard/server/service/ruleengine/DefaultRuleEngineCallServiceTest.java new file mode 100644 index 0000000000..282563574f --- /dev/null +++ b/application/src/test/java/org/thingsboard/server/service/ruleengine/DefaultRuleEngineCallServiceTest.java @@ -0,0 +1,141 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.ruleengine; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.test.util.ReflectionTestUtils; +import org.testcontainers.shaded.org.awaitility.Awaitility; +import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.server.cluster.TbClusterService; +import org.thingsboard.server.common.data.DataConstants; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.msg.TbMsgType; +import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.TbMsgMetaData; +import org.thingsboard.server.common.msg.queue.TbCallback; +import org.thingsboard.server.gen.transport.TransportProtos; + +import java.util.HashMap; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.verify; + +@ExtendWith(MockitoExtension.class) +public class DefaultRuleEngineCallServiceTest { + + private static final TenantId TENANT_ID = TenantId.fromUUID(UUID.fromString("d7210c7f-a152-4e91-8186-19ae85499a6b")); + + private final ConcurrentMap> requests = new ConcurrentHashMap<>(); + + @Mock + private TbClusterService tbClusterServiceMock; + + private DefaultRuleEngineCallService ruleEngineCallService; + private ScheduledExecutorService executor; + + @BeforeEach + void setUp() { + executor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("re-rest-callback")); + ruleEngineCallService = new DefaultRuleEngineCallService(tbClusterServiceMock); + ReflectionTestUtils.setField(ruleEngineCallService, "executor", executor); + ReflectionTestUtils.setField(ruleEngineCallService, "requests", requests); + } + + @AfterEach + void tearDown() { + requests.clear(); + if (executor != null) { + executor.shutdownNow(); + } + } + + @Test + void givenRequest_whenProcessRestApiCallToRuleEngine_thenPushMsgToRuleEngineAndCheckRemovedDueTimeout() { + long timeout = 1L; + long expTime = System.currentTimeMillis() + timeout; + HashMap metaData = new HashMap<>(); + UUID requestId = UUID.randomUUID(); + metaData.put("serviceId", "core"); + metaData.put("requestUUID", requestId.toString()); + metaData.put("expirationTime", Long.toString(expTime)); + TbMsg msg = TbMsg.newMsg(DataConstants.MAIN_QUEUE_NAME, TbMsgType.REST_API_REQUEST, TENANT_ID, new TbMsgMetaData(metaData), "{\"key\":\"value\"}"); + + Consumer anyConsumer = TbMsg::getData; + doAnswer(invocation -> { + //check the presence of request in the map after pushMsgToRuleEngine() + assertThat(requests.size()).isEqualTo(1); + assertThat(requests.get(requestId)).isEqualTo(anyConsumer); + return null; + }).when(tbClusterServiceMock).pushMsgToRuleEngine(any(), any(), any(), anyBoolean(), any()); + ruleEngineCallService.processRestApiCallToRuleEngine(TENANT_ID, requestId, msg, true, anyConsumer); + + verify(tbClusterServiceMock).pushMsgToRuleEngine(TENANT_ID, TENANT_ID, msg, true, null); + //check map is empty after scheduleTimeout() + Awaitility.await("Await until request was deleted from map due to timeout") + .pollDelay(25, TimeUnit.MILLISECONDS) + .atMost(10, TimeUnit.SECONDS) + .until(requests::isEmpty); + } + + @Test + void givenResponse_whenOnQueue_thenAcceptTbMsgResponse() { + long timeout = 10000L; + long expTime = System.currentTimeMillis() + timeout; + HashMap metaData = new HashMap<>(); + UUID requestId = UUID.randomUUID(); + metaData.put("serviceId", "core"); + metaData.put("requestUUID", requestId.toString()); + metaData.put("expirationTime", Long.toString(expTime)); + TbMsg msg = TbMsg.newMsg(DataConstants.MAIN_QUEUE_NAME, TbMsgType.REST_API_REQUEST, TENANT_ID, new TbMsgMetaData(metaData), "{\"key\":\"value\"}"); + + Consumer anyConsumer = TbMsg::getData; + doAnswer(invocation -> { + //check the presence of request in the map after pushMsgToRuleEngine() + assertThat(requests.size()).isEqualTo(1); + assertThat(requests.get(requestId)).isEqualTo(anyConsumer); + ruleEngineCallService.onQueueMsg(getResponse(requestId, msg), TbCallback.EMPTY); + //check map is empty after onQueueMsg() + assertThat(requests.size()).isEqualTo(0); + return null; + }).when(tbClusterServiceMock).pushMsgToRuleEngine(any(), any(), any(), anyBoolean(), any()); + ruleEngineCallService.processRestApiCallToRuleEngine(TENANT_ID, requestId, msg, true, anyConsumer); + + verify(tbClusterServiceMock).pushMsgToRuleEngine(TENANT_ID, TENANT_ID, msg, true, null); + } + + private TransportProtos.RestApiCallResponseMsgProto getResponse(UUID requestId, TbMsg msg) { + return TransportProtos.RestApiCallResponseMsgProto.newBuilder() + .setResponse(TbMsg.toByteString(msg)) + .setRequestIdMSB(requestId.getMostSignificantBits()) + .setRequestIdLSB(requestId.getLeastSignificantBits()) + .build(); + } +} diff --git a/common/cluster-api/src/main/java/org/thingsboard/server/cluster/TbClusterService.java b/common/cluster-api/src/main/java/org/thingsboard/server/cluster/TbClusterService.java index 83e9e792bf..ec37a5a864 100644 --- a/common/cluster-api/src/main/java/org/thingsboard/server/cluster/TbClusterService.java +++ b/common/cluster-api/src/main/java/org/thingsboard/server/cluster/TbClusterService.java @@ -38,6 +38,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg; +import org.thingsboard.server.gen.transport.TransportProtos.RestApiCallResponseMsgProto; import org.thingsboard.server.queue.TbQueueCallback; import org.thingsboard.server.queue.TbQueueClusterService; @@ -57,10 +58,14 @@ public interface TbClusterService extends TbQueueClusterService { void pushNotificationToCore(String targetServiceId, FromDeviceRpcResponse response, TbQueueCallback callback); + void pushNotificationToCore(String targetServiceId, RestApiCallResponseMsgProto msg, TbQueueCallback callback); + void pushMsgToRuleEngine(TopicPartitionInfo tpi, UUID msgId, ToRuleEngineMsg msg, TbQueueCallback callback); void pushMsgToRuleEngine(TenantId tenantId, EntityId entityId, TbMsg msg, TbQueueCallback callback); + void pushMsgToRuleEngine(TenantId tenantId, EntityId entityId, TbMsg msg, boolean useQueueFromTbMsg, TbQueueCallback callback); + void pushNotificationToRuleEngine(String targetServiceId, FromDeviceRpcResponse response, TbQueueCallback callback); void pushNotificationToTransport(String targetServiceId, ToTransportMsg response, TbQueueCallback callback); diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/audit/ActionType.java b/common/data/src/main/java/org/thingsboard/server/common/data/audit/ActionType.java index d3c0889d6b..db56acb8b2 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/audit/ActionType.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/audit/ActionType.java @@ -40,6 +40,7 @@ public enum ActionType { RELATION_ADD_OR_UPDATE(false, TbMsgType.RELATION_ADD_OR_UPDATE), RELATION_DELETED(false, TbMsgType.RELATION_DELETED), RELATIONS_DELETED(false, TbMsgType.RELATIONS_DELETED), + REST_API_RULE_ENGINE_CALL(false, null), // log call to rule engine from REST API ALARM_ACK(false, TbMsgType.ALARM_ACK), ALARM_CLEAR(false, TbMsgType.ALARM_CLEAR), ALARM_DELETE(false, TbMsgType.ALARM_DELETE), diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/msg/TbMsgType.java b/common/data/src/main/java/org/thingsboard/server/common/data/msg/TbMsgType.java index 763f245a09..9aa1994ab6 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/msg/TbMsgType.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/msg/TbMsgType.java @@ -67,6 +67,7 @@ public enum TbMsgType { PROVISION_SUCCESS, PROVISION_FAILURE, SEND_EMAIL, + REST_API_REQUEST("REST API request"), // tellSelfOnly types GENERATOR_NODE_SELF_MSG(null, true), diff --git a/common/data/src/test/java/org/thingsboard/server/common/data/audit/ActionTypeTest.java b/common/data/src/test/java/org/thingsboard/server/common/data/audit/ActionTypeTest.java index f746f47e19..6a5ed54896 100644 --- a/common/data/src/test/java/org/thingsboard/server/common/data/audit/ActionTypeTest.java +++ b/common/data/src/test/java/org/thingsboard/server/common/data/audit/ActionTypeTest.java @@ -28,6 +28,7 @@ import static org.thingsboard.server.common.data.audit.ActionType.DELETED_COMMEN import static org.thingsboard.server.common.data.audit.ActionType.LOCKOUT; import static org.thingsboard.server.common.data.audit.ActionType.LOGIN; import static org.thingsboard.server.common.data.audit.ActionType.LOGOUT; +import static org.thingsboard.server.common.data.audit.ActionType.REST_API_RULE_ENGINE_CALL; import static org.thingsboard.server.common.data.audit.ActionType.RPC_CALL; import static org.thingsboard.server.common.data.audit.ActionType.SMS_SENT; import static org.thingsboard.server.common.data.audit.ActionType.SUSPENDED; @@ -45,7 +46,8 @@ class ActionTypeTest { LOGOUT, LOCKOUT, DELETED_COMMENT, - SMS_SENT + SMS_SENT, + REST_API_RULE_ENGINE_CALL ); // backward-compatibility tests diff --git a/common/proto/src/main/proto/queue.proto b/common/proto/src/main/proto/queue.proto index 50a6140d8b..6a8fb543f6 100644 --- a/common/proto/src/main/proto/queue.proto +++ b/common/proto/src/main/proto/queue.proto @@ -101,6 +101,12 @@ message SessionInfoProto { bool isGateway = 18; } +message RestApiCallResponseMsgProto { + int64 requestIdMSB = 1; + int64 requestIdLSB = 2; + bytes response = 5; +} + enum SessionEvent { OPEN = 0; CLOSED = 1; @@ -1485,6 +1491,7 @@ message ToCoreNotificationMsg { ToEdgeSyncRequestMsgProto toEdgeSyncRequest = 11; FromEdgeSyncResponseMsgProto fromEdgeSyncResponse = 12; ResourceCacheInvalidateMsg resourceCacheInvalidateMsg = 13; + RestApiCallResponseMsgProto restApiCallResponseMsg = 50; } /* Messages that are handled by ThingsBoard RuleEngine Service */ diff --git a/pom.xml b/pom.xml index 2d6b8208a9..3bf3c4f7d2 100755 --- a/pom.xml +++ b/pom.xml @@ -1999,6 +1999,11 @@ + + com.amazonaws + aws-java-sdk-lambda + ${aws.sdk.version} + com.google.cloud google-cloud-pubsub diff --git a/rest-client/src/main/java/org/thingsboard/rest/client/RestClient.java b/rest-client/src/main/java/org/thingsboard/rest/client/RestClient.java index 8783770be3..c116eddc74 100644 --- a/rest-client/src/main/java/org/thingsboard/rest/client/RestClient.java +++ b/rest-client/src/main/java/org/thingsboard/rest/client/RestClient.java @@ -3840,6 +3840,51 @@ public class RestClient implements Closeable { } } + public JsonNode handleRuleEngineRequest(JsonNode requestBody) { + return restTemplate.exchange( + baseURL + "/api/rule-engine", + HttpMethod.POST, + new HttpEntity<>(requestBody), + new ParameterizedTypeReference() { + }).getBody(); + } + + public JsonNode handleRuleEngineRequest(EntityId entityId, JsonNode requestBody) { + return restTemplate.exchange( + baseURL + "/api/rule-engine/{entityType}/{entityId}", + HttpMethod.POST, + new HttpEntity<>(requestBody), + new ParameterizedTypeReference() { + }, + entityId.getEntityType(), + entityId.getId()).getBody(); + } + + public JsonNode handleRuleEngineRequest(EntityId entityId, int timeout, JsonNode requestBody) { + return restTemplate.exchange( + baseURL + "/api/rule-engine/{entityType}/{entityId}/{timeout}", + HttpMethod.POST, + new HttpEntity<>(requestBody), + new ParameterizedTypeReference() { + }, + entityId.getEntityType(), + entityId.getId(), + timeout).getBody(); + } + + public JsonNode handleRuleEngineRequest(EntityId entityId, String queueName, int timeout, JsonNode requestBody) { + return restTemplate.exchange( + baseURL + "/api/rule-engine/{entityType}/{entityId}/{queueName}/{timeout}", + HttpMethod.POST, + new HttpEntity<>(requestBody), + new ParameterizedTypeReference() { + }, + entityId.getEntityType(), + entityId.getId(), + queueName, + timeout).getBody(); + } + private String getTimeUrlParams(TimePageLink pageLink) { String urlParams = getUrlParams(pageLink); if (pageLink.getStartTime() != null) { diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineRpcService.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineRpcService.java index 55f83f823b..99ab76b69d 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineRpcService.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineRpcService.java @@ -18,6 +18,7 @@ package org.thingsboard.rule.engine.api; import org.thingsboard.server.common.data.id.RpcId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.rpc.Rpc; +import org.thingsboard.server.common.msg.TbMsg; import java.util.UUID; import java.util.function.Consumer; @@ -31,5 +32,7 @@ public interface RuleEngineRpcService { void sendRpcRequestToDevice(RuleEngineDeviceRpcRequest request, Consumer consumer); + void sendRestApiCallReply(String serviceId, UUID requestId, TbMsg msg); + Rpc findRpcById(TenantId tenantId, RpcId id); } diff --git a/rule-engine/rule-engine-components/pom.xml b/rule-engine/rule-engine-components/pom.xml index 5ac1421cdb..71e6ff997d 100644 --- a/rule-engine/rule-engine-components/pom.xml +++ b/rule-engine/rule-engine-components/pom.xml @@ -100,6 +100,10 @@ com.amazonaws aws-java-sdk-sns + + com.amazonaws + aws-java-sdk-lambda + com.google.cloud google-cloud-pubsub diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/aws/lambda/TbAwsLambdaNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/aws/lambda/TbAwsLambdaNode.java new file mode 100644 index 0000000000..2512125ff1 --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/aws/lambda/TbAwsLambdaNode.java @@ -0,0 +1,151 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.aws.lambda; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.handlers.AsyncHandler; +import com.amazonaws.services.lambda.AWSLambdaAsync; +import com.amazonaws.services.lambda.AWSLambdaAsyncClientBuilder; +import com.amazonaws.services.lambda.model.InvokeRequest; +import com.amazonaws.services.lambda.model.InvokeResult; +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.rule.engine.api.RuleNode; +import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.TbNodeConfiguration; +import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.rule.engine.api.util.TbNodeUtils; +import org.thingsboard.rule.engine.external.TbAbstractExternalNode; +import org.thingsboard.server.common.data.StringUtils; +import org.thingsboard.server.common.data.plugin.ComponentType; +import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.TbMsgMetaData; + +import java.nio.ByteBuffer; +import java.util.concurrent.TimeUnit; + +@Slf4j +@RuleNode( + type = ComponentType.EXTERNAL, + name = "aws lambda", + configClazz = TbAwsLambdaNodeConfiguration.class, + nodeDescription = "Publish message to the AWS Lambda", + nodeDetails = "Publishes messages to AWS Lambda, a service that lets you run code " + + "without provisioning or managing servers. " + + "It sends messages using a RequestResponse invocation type. " + + "The node uses a pre-configured client and specified function to run.

" + + "Output connections: Success, Failure.", + uiResources = {"static/rulenode/rulenode-core-config.js"}, + configDirective = "tbExternalNodeLambdaConfig", + iconUrl = "data:image/svg+xml;base64,PHN2ZyB4bWxucz0iaHR0cDovL3d3dy53My5vcmcvMjAwMC9zdmciIHZpZXdCb3g9IjAgMCAyNCAyNCIgd2lkdGg9IjQ4IiBoZWlnaHQ9IjQ4Ij48cGF0aCBkPSJNMTMuMjMgMTAuNTZWMTBjLTEuOTQgMC0zLjk5LjM5LTMuOTkgMi42NyAwIDEuMTYuNjEgMS45NSAxLjYzIDEuOTUuNzYgMCAxLjQzLS40NyAxLjg2LTEuMjIuNTItLjkzLjUtMS44LjUtMi44NG0yLjcgNi41M2MtLjE4LjE2LS40My4xNy0uNjMuMDYtLjg5LS43NC0xLjA1LTEuMDgtMS41NC0xLjc5LTEuNDcgMS41LTIuNTEgMS45NS00LjQyIDEuOTUtMi4yNSAwLTQuMDEtMS4zOS00LjAxLTQuMTcgMC0yLjE4IDEuMTctMy42NCAyLjg2LTQuMzggMS40Ni0uNjQgMy40OS0uNzYgNS4wNC0uOTNWNy41YzAtLjY2LjA1LTEuNDEtLjMzLTEuOTYtLjMyLS40OS0uOTUtLjctMS41LS43LTEuMDIgMC0xLjkzLjUzLTIuMTUgMS42MS0uMDUuMjQtLjI1LjQ4LS40Ny40OWwtMi42LS4yOGMtLjIyLS4wNS0uNDYtLjIyLS40LS41Ni42LTMuMTUgMy40NS00LjEgNi00LjEgMS4zIDAgMyAuMzUgNC4wMyAxLjMzQzE3LjExIDQuNTUgMTcgNi4xOCAxNyA3Ljk1djQuMTdjMCAxLjI1LjUgMS44MSAxIDIuNDguMTcuMjUuMjEuNTQgMCAuNzFsLTIuMDYgMS43OGgtLjAxIj48L3BhdGg+PHBhdGggZD0iTTIwLjE2IDE5LjU0QzE4IDIxLjE0IDE0LjgyIDIyIDEyLjEgMjJjLTMuODEgMC03LjI1LTEuNDEtOS44NS0zLjc2LS4yLS4xOC0uMDItLjQzLjI1LS4yOSAyLjc4IDEuNjMgNi4yNSAyLjYxIDkuODMgMi42MSAyLjQxIDAgNS4wNy0uNSA3LjUxLTEuNTMuMzctLjE2LjY2LjI0LjMyLjUxIj48L3BhdGg+PHBhdGggZD0iTTIxLjA3IDE4LjVjLS4yOC0uMzYtMS44NS0uMTctMi41Ny0uMDgtLjE5LjAyLS4yMi0uMTYtLjAzLS4zIDEuMjQtLjg4IDMuMjktLjYyIDMuNTMtLjMzLjI0LjMtLjA3IDIuMzUtMS4yNCAzLjMyLS4xOC4xNi0uMzUuMDctLjI2LS4xMS4yNi0uNjcuODUtMi4xNC41Ny0yLjV6Ij48L3BhdGg+PC9zdmc+" +) +public class TbAwsLambdaNode extends TbAbstractExternalNode { + + private TbAwsLambdaNodeConfiguration config; + private AWSLambdaAsync client; + + @Override + public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { + config = TbNodeUtils.convert(configuration, TbAwsLambdaNodeConfiguration.class); + if (StringUtils.isBlank(config.getFunctionName())) { + throw new TbNodeException("Function name must be set!", true); + } + try { + AWSCredentials awsCredentials = new BasicAWSCredentials(config.getAccessKey(), config.getSecretKey()); + client = AWSLambdaAsyncClientBuilder.standard() + .withCredentials(new AWSStaticCredentialsProvider(awsCredentials)) + .withRegion(config.getRegion()) + .withClientConfiguration(new ClientConfiguration() + .withConnectionTimeout((int) TimeUnit.SECONDS.toMillis(config.getConnectionTimeout())) + .withRequestTimeout((int) TimeUnit.SECONDS.toMillis(config.getRequestTimeout()))) + .build(); + } catch (Exception e) { + throw new TbNodeException(e); + } + } + + @Override + public void onMsg(TbContext ctx, TbMsg msg) { + var tbMsg = ackIfNeeded(ctx, msg); + String functionName = TbNodeUtils.processPattern(config.getFunctionName(), tbMsg); + String qualifier = StringUtils.isBlank(config.getQualifier()) ? + TbAwsLambdaNodeConfiguration.DEFAULT_QUALIFIER : + TbNodeUtils.processPattern(config.getQualifier(), tbMsg); + InvokeRequest request = toRequest(tbMsg.getData(), functionName, qualifier); + client.invokeAsync(request, new AsyncHandler<>() { + @Override + public void onError(Exception e) { + tellFailure(ctx, tbMsg, e); + } + + @Override + public void onSuccess(InvokeRequest request, InvokeResult invokeResult) { + try { + if (config.isTellFailureIfFuncThrowsExc() && invokeResult.getFunctionError() != null) { + throw new RuntimeException(getPayload(invokeResult)); + } + tellSuccess(ctx, getResponseMsg(tbMsg, invokeResult)); + } catch (Exception e) { + tellFailure(ctx, processException(tbMsg, invokeResult, e), e); + } + } + }); + } + + private InvokeRequest toRequest(String requestBody, String functionName, String qualifier) { + return new InvokeRequest() + .withFunctionName(functionName) + .withPayload(requestBody) + .withQualifier(qualifier); + } + + private String getPayload(InvokeResult invokeResult) { + ByteBuffer buf = invokeResult.getPayload(); + if (buf == null) { + throw new RuntimeException("Payload from result of AWS Lambda function execution is null."); + } + byte[] responseBytes = new byte[buf.remaining()]; + buf.get(responseBytes); + return new String(responseBytes); + } + + private TbMsg getResponseMsg(TbMsg originalMsg, InvokeResult invokeResult) { + TbMsgMetaData metaData = originalMsg.getMetaData().copy(); + metaData.putValue("requestId", invokeResult.getSdkResponseMetadata().getRequestId()); + String data = getPayload(invokeResult); + return TbMsg.transformMsg(originalMsg, metaData, data); + } + + private TbMsg processException(TbMsg origMsg, InvokeResult invokeResult, Throwable t) { + TbMsgMetaData metaData = origMsg.getMetaData().copy(); + metaData.putValue("error", t.getClass() + ": " + t.getMessage()); + metaData.putValue("requestId", invokeResult.getSdkResponseMetadata().getRequestId()); + return TbMsg.transformMsgMetadata(origMsg, metaData); + } + + @Override + public void destroy() { + if (client != null) { + try { + client.shutdown(); + } catch (Exception e) { + log.error("Failed to shutdown Lambda client during destroy", e); + } + } + } +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/aws/lambda/TbAwsLambdaNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/aws/lambda/TbAwsLambdaNodeConfiguration.java new file mode 100644 index 0000000000..c7c66599d5 --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/aws/lambda/TbAwsLambdaNodeConfiguration.java @@ -0,0 +1,46 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.aws.lambda; + +import lombok.Data; +import org.thingsboard.rule.engine.api.NodeConfiguration; + +@Data +public class TbAwsLambdaNodeConfiguration implements NodeConfiguration { + + public static final String DEFAULT_QUALIFIER = "$LATEST"; + + private String accessKey; + private String secretKey; + private String region; + private String functionName; + private String qualifier; + private int connectionTimeout; + private int requestTimeout; + private boolean tellFailureIfFuncThrowsExc; + + @Override + public TbAwsLambdaNodeConfiguration defaultConfiguration() { + TbAwsLambdaNodeConfiguration configuration = new TbAwsLambdaNodeConfiguration(); + configuration.setRegion("us-east-1"); + configuration.setQualifier(DEFAULT_QUALIFIER); + configuration.setConnectionTimeout(10); + configuration.setRequestTimeout(5); + configuration.setTellFailureIfFuncThrowsExc(false); + return configuration; + } + +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbSendRestApiCallReplyNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbSendRestApiCallReplyNode.java new file mode 100644 index 0000000000..84441fb213 --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbSendRestApiCallReplyNode.java @@ -0,0 +1,66 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.rest; + +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.rule.engine.api.RuleNode; +import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.TbNode; +import org.thingsboard.rule.engine.api.TbNodeConfiguration; +import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.rule.engine.api.util.TbNodeUtils; +import org.thingsboard.server.common.data.StringUtils; +import org.thingsboard.server.common.data.plugin.ComponentType; +import org.thingsboard.server.common.msg.TbMsg; + +import java.util.UUID; + +@Slf4j +@RuleNode( + type = ComponentType.ACTION, + name = "rest call reply", + configClazz = TbSendRestApiCallReplyNodeConfiguration.class, + nodeDescription = "Sends reply to REST API call to rule engine", + nodeDetails = "Expects messages with any message type. Forwards incoming message as a reply to REST API call sent to rule engine.", + uiResources = {"static/rulenode/rulenode-core-config.js"}, + configDirective = "tbActionNodeSendRestApiCallReplyConfig", + icon = "call_merge" +) +public class TbSendRestApiCallReplyNode implements TbNode { + + private TbSendRestApiCallReplyNodeConfiguration config; + + @Override + public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { + this.config = TbNodeUtils.convert(configuration, TbSendRestApiCallReplyNodeConfiguration.class); + } + + @Override + public void onMsg(TbContext ctx, TbMsg msg) { + String serviceIdStr = msg.getMetaData().getValue(config.getServiceIdMetaDataAttribute()); + String requestIdStr = msg.getMetaData().getValue(config.getRequestIdMetaDataAttribute()); + if (StringUtils.isEmpty(requestIdStr)) { + ctx.tellFailure(msg, new RuntimeException("Request id is not present in the metadata!")); + } else if (StringUtils.isEmpty(serviceIdStr)) { + ctx.tellFailure(msg, new RuntimeException("Service id is not present in the metadata!")); + } else if (StringUtils.isEmpty(msg.getData())) { + ctx.tellFailure(msg, new RuntimeException("Request body is empty!")); + } else { + ctx.getRpcService().sendRestApiCallReply(serviceIdStr, UUID.fromString(requestIdStr), msg); + ctx.tellSuccess(msg); + } + } +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbSendRestApiCallReplyNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbSendRestApiCallReplyNodeConfiguration.java new file mode 100644 index 0000000000..010a6ccb15 --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbSendRestApiCallReplyNodeConfiguration.java @@ -0,0 +1,45 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.rest; + +import lombok.Data; +import org.thingsboard.rule.engine.api.NodeConfiguration; +import org.thingsboard.server.common.data.StringUtils; + +@Data +public class TbSendRestApiCallReplyNodeConfiguration implements NodeConfiguration { + public static final String SERVICE_ID = "serviceId"; + public static final String REQUEST_UUID = "requestUUID"; + + private String serviceIdMetaDataAttribute; + private String requestIdMetaDataAttribute; + + @Override + public TbSendRestApiCallReplyNodeConfiguration defaultConfiguration() { + TbSendRestApiCallReplyNodeConfiguration configuration = new TbSendRestApiCallReplyNodeConfiguration(); + configuration.setRequestIdMetaDataAttribute(REQUEST_UUID); + configuration.setServiceIdMetaDataAttribute(SERVICE_ID); + return configuration; + } + + public String getServiceIdMetaDataAttribute() { + return !StringUtils.isEmpty(serviceIdMetaDataAttribute) ? serviceIdMetaDataAttribute : SERVICE_ID; + } + + public String getRequestIdMetaDataAttribute() { + return !StringUtils.isEmpty(requestIdMetaDataAttribute) ? requestIdMetaDataAttribute : REQUEST_UUID; + } +} diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/aws/lambda/TbAwsLambdaNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/aws/lambda/TbAwsLambdaNodeTest.java new file mode 100644 index 0000000000..4ee954991c --- /dev/null +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/aws/lambda/TbAwsLambdaNodeTest.java @@ -0,0 +1,308 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.aws.lambda; + +import com.amazonaws.ResponseMetadata; +import com.amazonaws.handlers.AsyncHandler; +import com.amazonaws.services.lambda.AWSLambdaAsync; +import com.amazonaws.services.lambda.model.AWSLambdaException; +import com.amazonaws.services.lambda.model.InvokeRequest; +import com.amazonaws.services.lambda.model.InvokeResult; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.NullAndEmptySource; +import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.test.util.ReflectionTestUtils; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.TbNodeConfiguration; +import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.rule.engine.api.util.TbNodeUtils; +import org.thingsboard.server.common.data.StringUtils; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.msg.TbMsgType; +import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.TbMsgMetaData; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Stream; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.thingsboard.rule.engine.aws.lambda.TbAwsLambdaNodeConfiguration.DEFAULT_QUALIFIER; + +@ExtendWith(MockitoExtension.class) +public class TbAwsLambdaNodeTest { + + private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("ddb88645-7379-4a08-a51c-e84a0b4b3d88")); + + private TbAwsLambdaNode node; + private TbAwsLambdaNodeConfiguration config; + + @Mock + private TbContext ctx; + @Mock + private AWSLambdaAsync clientMock; + + @BeforeEach + void setUp() { + node = new TbAwsLambdaNode(); + config = new TbAwsLambdaNodeConfiguration().defaultConfiguration(); + } + + @Test + public void verifyDefaultConfig() { + assertThat(config.getAccessKey()).isNull(); + assertThat(config.getSecretKey()).isNull(); + assertThat(config.getRegion()).isEqualTo(("us-east-1")); + assertThat(config.getFunctionName()).isNull(); + assertThat(config.getQualifier()).isEqualTo(DEFAULT_QUALIFIER); + assertThat(config.getConnectionTimeout()).isEqualTo(10); + assertThat(config.getRequestTimeout()).isEqualTo(5); + assertThat(config.isTellFailureIfFuncThrowsExc()).isFalse(); + } + + @ParameterizedTest + @NullAndEmptySource + @ValueSource(strings = " ") + public void givenInvalidFunctionName_whenInit_thenThrowsException(String funcName) { + config.setFunctionName(funcName); + var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); + assertThatThrownBy(() -> node.init(ctx, configuration)) + .isInstanceOf(TbNodeException.class) + .hasMessage("Function name must be set!"); + } + + @ParameterizedTest + @MethodSource + public void givenRequest_whenOnMsg_thenTellSuccess(String data, TbMsgMetaData metadata, String functionName, String qualifier, String expectedQualifier) { + init(); + config.setFunctionName(functionName); + config.setQualifier(qualifier); + + TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, metadata, data); + + InvokeRequest request = createInvokeRequest(msg); + String requestIdStr = "a124af57-e7c3-4ebb-83bf-b09ff86eaa23"; + String funcResponsePayload = "{\"statusCode\":200}"; + + when(clientMock.invokeAsync(any(), any())).then(invocation -> { + InvokeResult result = new InvokeResult(); + result.setSdkResponseMetadata(new ResponseMetadata(Map.of(ResponseMetadata.AWS_REQUEST_ID, requestIdStr))); + result.setPayload(ByteBuffer.wrap(funcResponsePayload.getBytes())); + AsyncHandler asyncHandler = invocation.getArgument(1); + asyncHandler.onSuccess(request, result); + return null; + }); + + node.onMsg(ctx, msg); + + ArgumentCaptor invokeRequestCaptor = ArgumentCaptor.forClass(InvokeRequest.class); + ArgumentCaptor msgCaptor = ArgumentCaptor.forClass(TbMsg.class); + + verify(clientMock).invokeAsync(invokeRequestCaptor.capture(), any()); + verify(ctx).tellSuccess(msgCaptor.capture()); + + assertThat(invokeRequestCaptor.getValue().getQualifier()).isEqualTo(expectedQualifier); + TbMsgMetaData resultMsgMetadata = metadata.copy(); + resultMsgMetadata.putValue("requestId", requestIdStr); + TbMsg resultedMsg = TbMsg.transformMsg(msg, resultMsgMetadata, funcResponsePayload); + assertThat(msgCaptor.getValue()).usingRecursiveComparison() + .ignoringFields("ctx") + .isEqualTo(resultedMsg); + } + + private static Stream givenRequest_whenOnMsg_thenTellSuccess() { + return Stream.of( + Arguments.of(TbMsg.EMPTY_JSON_OBJECT, TbMsgMetaData.EMPTY, "functionA", "qualifierA", "qualifierA"), + Arguments.of(TbMsg.EMPTY_JSON_OBJECT, TbMsgMetaData.EMPTY, "functionA", null, DEFAULT_QUALIFIER), + Arguments.of("{\"funcNameMsgPattern\": \"functionB\", \"qualifierMsgPattern\": \"qualifierB\"}", + TbMsgMetaData.EMPTY, "$[funcNameMsgPattern]", "$[qualifierMsgPattern]", "qualifierB"), + Arguments.of(TbMsg.EMPTY_JSON_OBJECT, + new TbMsgMetaData( + Map.of( + "funcNameMdPattern", "functionC", + "qualifierMdPattern", "qualifierC") + ), "${funcNameMdPattern}", "${qualifierMdPattern}", "qualifierC") + ); + } + + @Test + public void givenExceptionWasThrownInsideFunctionAndTellFailureIfFuncThrowsExcIsTrue_whenOnMsg_thenTellFailure() { + init(); + config.setTellFailureIfFuncThrowsExc(true); + + TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_ARRAY); + InvokeRequest request = createInvokeRequest(msg); + String requestIdStr = "a124af57-e7c3-4ebb-83bf-b09ff86eaa23"; + String errorMsg = "Unhandled exception from function"; + + when(clientMock.invokeAsync(any(), any())).then(invocation -> { + InvokeResult result = new InvokeResult(); + result.setPayload(ByteBuffer.wrap(errorMsg.getBytes(StandardCharsets.UTF_8))); + result.setFunctionError(errorMsg); + result.setSdkResponseMetadata(new ResponseMetadata(Map.of(ResponseMetadata.AWS_REQUEST_ID, requestIdStr))); + AsyncHandler asyncHandler = invocation.getArgument(1); + asyncHandler.onSuccess(request, result); + return null; + }); + + node.onMsg(ctx, msg); + + ArgumentCaptor throwableCaptor = ArgumentCaptor.forClass(Throwable.class); + ArgumentCaptor msgCaptor = ArgumentCaptor.forClass(TbMsg.class); + + verify(clientMock).invokeAsync(eq(request), any()); + verify(ctx).tellFailure(msgCaptor.capture(), throwableCaptor.capture()); + + var metadata = Map.of("error", RuntimeException.class + ": " + errorMsg, "requestId", requestIdStr); + TbMsg resultedMsg = TbMsg.transformMsgMetadata(msg, new TbMsgMetaData(metadata)); + + assertThat(msgCaptor.getValue()).usingRecursiveComparison() + .ignoringFields("ctx") + .isEqualTo(resultedMsg); + assertThat(throwableCaptor.getValue()).isInstanceOf(RuntimeException.class).hasMessage(errorMsg); + } + + @Test + public void givenExceptionWasThrownInsideFunctionAndTellFailureIfFuncThrowsExcIsFalse_whenOnMsg_thenTellSuccess() { + init(); + + TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT); + InvokeRequest request = createInvokeRequest(msg); + String requestIdStr = "e83dfbc4-68d5-441c-8ee9-289959a30d3b"; + String payload = "{\"errorMessage\":\"Something went wrong\",\"errorType\":\"Exception\",\"requestId\":\"" + requestIdStr + "\"}"; + + when(clientMock.invokeAsync(any(), any())).then(invocation -> { + InvokeResult result = new InvokeResult(); + result.setSdkResponseMetadata(new ResponseMetadata(Map.of("AWS_REQUEST_ID", requestIdStr))); + result.setPayload(ByteBuffer.wrap(payload.getBytes())); + AsyncHandler asyncHandler = invocation.getArgument(1); + asyncHandler.onSuccess(request, result); + return null; + }); + + node.onMsg(ctx, msg); + + ArgumentCaptor msgCaptor = ArgumentCaptor.forClass(TbMsg.class); + + verify(clientMock).invokeAsync(eq(request), any()); + verify(ctx).tellSuccess(msgCaptor.capture()); + + Map metadata = Map.of("requestId", requestIdStr); + TbMsg resultedMsg = TbMsg.transformMsg(msg, new TbMsgMetaData(metadata), payload); + + assertThat(msgCaptor.getValue()).usingRecursiveComparison() + .ignoringFields("ctx") + .isEqualTo(resultedMsg); + } + + @Test + public void givenPayloadFromResultIsNull_whenOnMsg_thenTellFailure() { + init(); + + TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT); + InvokeRequest request = createInvokeRequest(msg); + String requestIdStr = "12bbb074-e2fc-4381-8f28-d4bd235103d5"; + String errorMsg = "Payload from result of AWS Lambda function execution is null."; + + when(clientMock.invokeAsync(any(), any())).then(invocation -> { + InvokeResult result = new InvokeResult(); + result.setSdkResponseMetadata(new ResponseMetadata(Map.of(ResponseMetadata.AWS_REQUEST_ID, requestIdStr))); + AsyncHandler asyncHandler = invocation.getArgument(1); + asyncHandler.onSuccess(request, result); + return null; + }); + + node.onMsg(ctx, msg); + + verify(clientMock).invokeAsync(eq(request), any()); + + ArgumentCaptor throwableCaptor = ArgumentCaptor.forClass(Throwable.class); + ArgumentCaptor msgCaptor = ArgumentCaptor.forClass(TbMsg.class); + + verify(ctx).tellFailure(msgCaptor.capture(), throwableCaptor.capture()); + + var metadata = Map.of("error", RuntimeException.class + ": " + errorMsg, "requestId", requestIdStr); + TbMsg resultedMsg = TbMsg.transformMsgMetadata(msg, new TbMsgMetaData(metadata)); + + assertThat(msgCaptor.getValue()).usingRecursiveComparison() + .ignoringFields("ctx") + .isEqualTo(resultedMsg); + assertThat(throwableCaptor.getValue()).isInstanceOf(RuntimeException.class).hasMessage(errorMsg); + } + + @Test + public void givenExceptionWasThrownOnAWS_whenOnMsg_thenTellFailure() { + init(); + TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT); + InvokeRequest request = createInvokeRequest(msg); + + String errorMsg = "Simulated error"; + when(clientMock.invokeAsync(any(), any())).then(invocation -> { + AsyncHandler asyncHandler = invocation.getArgument(1); + asyncHandler.onError(new AWSLambdaException(errorMsg)); + return null; + }); + + node.onMsg(ctx, msg); + + verify(clientMock).invokeAsync(eq(request), any()); + ArgumentCaptor throwableCaptor = ArgumentCaptor.forClass(Throwable.class); + verify(ctx).tellFailure(eq(msg), throwableCaptor.capture()); + assertThat(throwableCaptor.getValue()).isInstanceOf(AWSLambdaException.class).hasMessageStartingWith(errorMsg); + } + + private void init() { + config.setAccessKey("accessKey"); + config.setSecretKey("secretKey"); + config.setFunctionName("new-function"); + ReflectionTestUtils.setField(node, "client", clientMock); + ReflectionTestUtils.setField(node, "config", config); + } + + private InvokeRequest createInvokeRequest(TbMsg msg) { + return new InvokeRequest() + .withFunctionName(getFunctionName(msg)) + .withPayload(msg.getData()) + .withQualifier(getQualifier(msg)); + } + + private String getQualifier(TbMsg msg) { + return StringUtils.isBlank(config.getQualifier()) ? + TbAwsLambdaNodeConfiguration.DEFAULT_QUALIFIER : + TbNodeUtils.processPattern(config.getQualifier(), msg); + } + + private String getFunctionName(TbMsg msg) { + return TbNodeUtils.processPattern(config.getFunctionName(), msg); + } + +} diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/rest/TbSendRestApiCallReplyNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/rest/TbSendRestApiCallReplyNodeTest.java new file mode 100644 index 0000000000..c603e111e2 --- /dev/null +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/rest/TbSendRestApiCallReplyNodeTest.java @@ -0,0 +1,133 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.rest; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.rule.engine.api.RuleEngineRpcService; +import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.TbNodeConfiguration; +import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.msg.TbMsgType; +import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.TbMsgMetaData; + +import java.util.Map; +import java.util.UUID; +import java.util.stream.Stream; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatNoException; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class TbSendRestApiCallReplyNodeTest { + + private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("212445ad-9852-4bfd-819d-6b01ab6ee6b6")); + + private TbSendRestApiCallReplyNode node; + private TbSendRestApiCallReplyNodeConfiguration config; + + @Mock + private TbContext ctxMock; + @Mock + private RuleEngineRpcService rpcServiceMock; + + @BeforeEach + public void setUp() throws TbNodeException { + node = new TbSendRestApiCallReplyNode(); + config = new TbSendRestApiCallReplyNodeConfiguration().defaultConfiguration(); + var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); + node.init(ctxMock, configuration); + } + + @Test + public void givenDefaultConfig_whenInit_thenDoesNotThrowException() { + var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); + assertThatNoException().isThrownBy(() -> node.init(ctxMock, configuration)); + } + + @ParameterizedTest + @MethodSource + public void givenValidRestApiRequest_whenOnMsg_thenTellSuccess(String requestIdAttribute, String serviceIdAttribute) throws TbNodeException { + config.setRequestIdMetaDataAttribute(requestIdAttribute); + config.setServiceIdMetaDataAttribute(serviceIdAttribute); + var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); + node.init(ctxMock, configuration); + when(ctxMock.getRpcService()).thenReturn(rpcServiceMock); + String requestUUIDStr = "80b7883b-7ec6-4872-9dd3-b2afd5660fa6"; + String serviceIdStr = "tb-core-0"; + String data = """ + { + "temperature": 23, + } + """; + Map metadata = Map.of( + requestIdAttribute, requestUUIDStr, + serviceIdAttribute, serviceIdStr); + TbMsg msg = TbMsg.newMsg(TbMsgType.REST_API_REQUEST, DEVICE_ID, new TbMsgMetaData(metadata), data); + + node.onMsg(ctxMock, msg); + + UUID requestUUID = UUID.fromString(requestUUIDStr); + verify(rpcServiceMock).sendRestApiCallReply(serviceIdStr, requestUUID, msg); + verify(ctxMock).tellSuccess(msg); + } + + private static Stream givenValidRestApiRequest_whenOnMsg_thenTellSuccess() { + return Stream.of( + Arguments.of("requestId", "service"), + Arguments.of("requestUUID", "serviceId"), + Arguments.of("some_custom_request_id_field", "some_custom_service_id_field") + ); + } + + @ParameterizedTest + @MethodSource + public void givenInvalidRequest_whenOnMsg_thenTellFailure(TbMsgMetaData metaData, String data, String errorMsg) { + TbMsg msg = TbMsg.newMsg(TbMsgType.REST_API_REQUEST, DEVICE_ID, metaData, data); + + node.onMsg(ctxMock, msg); + + ArgumentCaptor captor = ArgumentCaptor.forClass(Throwable.class); + verify(ctxMock).tellFailure(eq(msg), captor.capture()); + Throwable throwable = captor.getValue(); + assertThat(throwable).isInstanceOf(RuntimeException.class).hasMessage(errorMsg); + } + + private static Stream givenInvalidRequest_whenOnMsg_thenTellFailure() { + return Stream.of( + Arguments.of(TbMsgMetaData.EMPTY, TbMsg.EMPTY_STRING, "Request id is not present in the metadata!"), + Arguments.of(new TbMsgMetaData(Map.of("requestUUID", "e1dd3985-efad-45a0-b0d2-0ff5dff2ccac")), + TbMsg.EMPTY_STRING, "Service id is not present in the metadata!"), + Arguments.of(new TbMsgMetaData(Map.of("serviceId", "tb-core-0")), + TbMsg.EMPTY_STRING, "Request id is not present in the metadata!"), + Arguments.of(new TbMsgMetaData(Map.of("requestUUID", "e1dd3985-efad-45a0-b0d2-0ff5dff2ccac", "serviceId", "tb-core-0")), + TbMsg.EMPTY_STRING, "Request body is empty!") + ); + } +} diff --git a/ui-ngx/src/assets/help/en_US/rulenode/node-templatization-doc.md b/ui-ngx/src/assets/help/en_US/rulenode/node-templatization-doc.md new file mode 100644 index 0000000000..9e21a0bcb5 --- /dev/null +++ b/ui-ngx/src/assets/help/en_US/rulenode/node-templatization-doc.md @@ -0,0 +1,83 @@ +# Rule nodes fields templatization + +Templatization is the process of using predefined templates to dynamically insert or substitute values into text. +These templates serve as placeholders for variables that can be filled in later with actual data. + +In the context of rule engine, templates are used to extract data from incoming messages during runtime. +This is particularly helpful in the rule node configuration, where templatization allows for dynamic configuration by replacing static values in the configuration fields with real-time values from the incoming messages. +This enables more flexible and automated handling of data, making it easier to perform conditional operations based on varying inputs. + +## Syntax + +Templates start with a dollar sign (`$`), followed by brackets with a key name inside. +Square brackets (`[]`) are used for message keys, while curly brackets (`{}`) are used for message metadata keys. +For example: +- `$[messageKey]` - will extract value of `messageKey` from incoming message. +- `${metadataKey}` - will extract value of `metadataKey` from incoming message metadata. + +In the example above, `messageKey` and `metadataKey` represent any key name that may exist within the message or its metadata. + +## Example + +Let's review an example. First JSON is message, second is message metadata: + +```json +{ + "temperature": 26.5, + "humidity": 75.2, + "soilMoisture": 28.9, + "windSpeed": 26.2, + "location": "riverside" +} +``` +```json +{ + "deviceType": "weather_sensor", + "deviceName": "weather1", + "ts": "1685379440000" +} +``` + +Assume, we detected an unusually high wind speed and want to send this telemetry reading to some external REST API. +Every reading needs to be associated with specific device and location - this information is available only in real-time. +We can use templates extract necessary data and to construct URL for sending data: + +`example-base-url.com/report-reading?location=$[location]&deviceName=${deviceName}` + +This template will be resolved to: + +`example-base-url.com/report-reading?location=riverside&deviceName=weather1` + +Templates are ideal for scenarios where the specific values aren't known at the time of configuration but will become available at runtime. + +## Notes + +- Templates can be combined with regular text. For example: "Fuel tanks are filled to `$[fuelLevel]`%". +- You can access nested keys in JSON object using dot notation: `$[object.key]`. +- If specified key is missing or value associated with that key is an object or an array, then template string will be returned unchanged. + +To illustrate written above let's review an example. Here's content of a message: +```json +{ + "number": 123.45, + "string": "text", + "boolean": true, + "array": [1, 2, 3], + "object": { + "property": "propertyValue" + }, + "null": null +} +``` +Here's a table with comparison between templates and extracted values: + +| **Template** | **Extracted value** | +|--------------------|---------------------| +| $[number] | 123.45 | +| $[string] | text | +| $[boolean] | true | +| $[array] | $[array] | +| $[object] | $[object] | +| $[object.property] | propertyValue | +| $[null] | null | +| $[doesNotExist] | $[doesNotExist] |