|
|
|
@ -35,8 +35,8 @@ import org.springframework.test.util.ReflectionTestUtils; |
|
|
|
import org.thingsboard.common.util.ListeningExecutor; |
|
|
|
import org.thingsboard.rule.engine.TestDbCallbackExecutor; |
|
|
|
import org.thingsboard.rule.engine.api.TbContext; |
|
|
|
import org.thingsboard.rule.engine.api.TbNodeException; |
|
|
|
import org.thingsboard.rule.engine.api.util.TbNodeUtils; |
|
|
|
import org.thingsboard.rule.engine.aws.sqs.TbSqsNodeConfiguration.QueueType; |
|
|
|
import org.thingsboard.server.common.data.id.DeviceId; |
|
|
|
import org.thingsboard.server.common.data.msg.TbMsgType; |
|
|
|
import org.thingsboard.server.common.msg.TbMsg; |
|
|
|
@ -47,15 +47,14 @@ import java.util.HashMap; |
|
|
|
import java.util.Map; |
|
|
|
import java.util.UUID; |
|
|
|
import java.util.concurrent.Callable; |
|
|
|
import java.util.concurrent.ExecutionException; |
|
|
|
import java.util.stream.Stream; |
|
|
|
|
|
|
|
import static org.assertj.core.api.AssertionsForClassTypes.assertThat; |
|
|
|
import static org.assertj.core.api.Assertions.assertThat; |
|
|
|
import static org.mockito.ArgumentMatchers.any; |
|
|
|
import static org.mockito.BDDMockito.given; |
|
|
|
import static org.mockito.BDDMockito.then; |
|
|
|
import static org.mockito.Mockito.mock; |
|
|
|
import static org.mockito.Mockito.verify; |
|
|
|
import static org.mockito.Mockito.verifyNoMoreInteractions; |
|
|
|
import static org.mockito.Mockito.when; |
|
|
|
|
|
|
|
@ExtendWith(MockitoExtension.class) |
|
|
|
class TbSqsNodeTest { |
|
|
|
@ -63,6 +62,9 @@ class TbSqsNodeTest { |
|
|
|
private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("764de824-929f-4114-95ea-0ea0401ffa3d")); |
|
|
|
private final ListeningExecutor executor = new TestDbCallbackExecutor(); |
|
|
|
|
|
|
|
private final String messageId = "msgId-1d186a16-80c7-44b3-a245-a1fc835f20c7"; |
|
|
|
private final String requestId = "reqId-bef0799b-dde9-4aa0-855b-86bbafaeaf31"; |
|
|
|
|
|
|
|
private TbSqsNode node; |
|
|
|
private TbSqsNodeConfiguration config; |
|
|
|
|
|
|
|
@ -85,7 +87,7 @@ class TbSqsNodeTest { |
|
|
|
|
|
|
|
@Test |
|
|
|
void verifyDefaultConfig() { |
|
|
|
assertThat(config.getQueueType()).isEqualTo(TbSqsNodeConfiguration.QueueType.STANDARD); |
|
|
|
assertThat(config.getQueueType()).isEqualTo(QueueType.STANDARD); |
|
|
|
assertThat(config.getQueueUrlPattern()).isEqualTo("https://sqs.us-east-1.amazonaws.com/123456789012/my-queue-name"); |
|
|
|
assertThat(config.getDelaySeconds()).isEqualTo(0); |
|
|
|
assertThat(config.getMessageAttributes()).isEqualTo(Collections.emptyMap()); |
|
|
|
@ -96,55 +98,102 @@ class TbSqsNodeTest { |
|
|
|
|
|
|
|
@ParameterizedTest |
|
|
|
@MethodSource |
|
|
|
void givenQueueUrlAndMsgAttributesPatternsAndQueueTypes_whenOnMsg_thenTellSuccess(String queueUrl, |
|
|
|
TbMsgMetaData metaData, |
|
|
|
String data, |
|
|
|
Map<String, String> attributes, |
|
|
|
TbSqsNodeConfiguration.QueueType queueType) { |
|
|
|
config.setAccessKeyId("accessKeyId"); |
|
|
|
config.setSecretAccessKey("secretAccessKey"); |
|
|
|
void givenQueueUrlPatternsAndQueueTypeIsFifo_whenOnMsg_thenVerifyRequest(String queueUrl, TbMsgMetaData metaData, String data) { |
|
|
|
config.setQueueType(QueueType.FIFO); |
|
|
|
config.setQueueUrlPattern(queueUrl); |
|
|
|
config.setQueueType(queueType); |
|
|
|
config.setMessageAttributes(attributes); |
|
|
|
|
|
|
|
String messageId = "msgId-1d186a16-80c7-44b3-a245-a1fc835f20c7"; |
|
|
|
String requestId = "reqId-bef0799b-dde9-4aa0-855b-86bbafaeaf31"; |
|
|
|
String messageBodyMd5 = "msgBodyMd5-55fb8ba2-2b71-4673-a82a-969756764761"; |
|
|
|
String messageAttributesMd5 = "msgAttrMd5-e3ba3eef-52ae-436a-bec1-0c2c2252d1f1"; |
|
|
|
String sequenceNumber = "seqNum-bb5ddce0-cf4e-4295-b015-524bdb6a332f"; |
|
|
|
mockSendingMsgRequest(); |
|
|
|
|
|
|
|
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, metaData, data); |
|
|
|
node.onMsg(ctxMock, msg); |
|
|
|
|
|
|
|
SendMessageRequest sendMsgRequest = new SendMessageRequest() |
|
|
|
.withQueueUrl(TbNodeUtils.processPattern(queueUrl, msg)) |
|
|
|
.withMessageBody(data) |
|
|
|
.withMessageDeduplicationId(msg.getId().toString()) |
|
|
|
.withMessageGroupId(DEVICE_ID.toString()); |
|
|
|
then(sqsClientMock).should().sendMessage(sendMsgRequest); |
|
|
|
} |
|
|
|
|
|
|
|
private static Stream<Arguments> givenQueueUrlPatternsAndQueueTypeIsFifo_whenOnMsg_thenVerifyRequest() { |
|
|
|
return Stream.of( |
|
|
|
Arguments.of( |
|
|
|
"https://sqs.us-east-1.amazonaws.com/123456789012/new-queue-name", |
|
|
|
TbMsgMetaData.EMPTY, |
|
|
|
TbMsg.EMPTY_JSON_OBJECT), |
|
|
|
Arguments.of( |
|
|
|
"https://sqs.us-east-1.amazonaws.com/123456789012/$[msgQueueName]", |
|
|
|
TbMsgMetaData.EMPTY, |
|
|
|
"{\"msgQueueName\":\"msg-queue-name\"}"), |
|
|
|
Arguments.of( |
|
|
|
"https://sqs.us-east-1.amazonaws.com/123456789012/${mdQueueName}", |
|
|
|
new TbMsgMetaData(Map.of("mdQueueName", "md-queue-name")), |
|
|
|
TbMsg.EMPTY_JSON_OBJECT) |
|
|
|
); |
|
|
|
} |
|
|
|
|
|
|
|
when(ctxMock.getExternalCallExecutor()).thenReturn(executor); |
|
|
|
when(sqsClientMock.sendMessage(any(SendMessageRequest.class))).thenReturn(sendMessageResultMock); |
|
|
|
when(sendMessageResultMock.getMessageId()).thenReturn(messageId); |
|
|
|
when(sendMessageResultMock.getSdkResponseMetadata()).thenReturn(responseMetadataMock); |
|
|
|
when(responseMetadataMock.getRequestId()).thenReturn(requestId); |
|
|
|
when(sendMessageResultMock.getMD5OfMessageBody()).thenReturn(messageBodyMd5); |
|
|
|
when(sendMessageResultMock.getMD5OfMessageAttributes()).thenReturn(messageAttributesMd5); |
|
|
|
when(sendMessageResultMock.getSequenceNumber()).thenReturn(sequenceNumber); |
|
|
|
@ParameterizedTest |
|
|
|
@MethodSource |
|
|
|
void givenMsgAttributesPatternsAndQueueTypeIsStandard_whenOnMsg_thenVerifyRequest(TbMsgMetaData metaData, String data, |
|
|
|
Map<String, String> attributes) { |
|
|
|
config.setMessageAttributes(attributes); |
|
|
|
|
|
|
|
mockSendingMsgRequest(); |
|
|
|
|
|
|
|
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, metaData, data); |
|
|
|
node.onMsg(ctxMock, msg); |
|
|
|
|
|
|
|
Map<String, MessageAttributeValue> messageAttributes = new HashMap<>(); |
|
|
|
this.config.getMessageAttributes().forEach((k,v) -> { |
|
|
|
this.config.getMessageAttributes().forEach((k, v) -> { |
|
|
|
String name = TbNodeUtils.processPattern(k, msg); |
|
|
|
String val = TbNodeUtils.processPattern(v, msg); |
|
|
|
messageAttributes.put(name, new MessageAttributeValue().withDataType("String").withStringValue(val)); |
|
|
|
}); |
|
|
|
SendMessageRequest sendMsgRequest = new SendMessageRequest() |
|
|
|
.withQueueUrl(TbNodeUtils.processPattern(queueUrl, msg)) |
|
|
|
SendMessageRequest sendMsgRequest = new SendMessageRequest() |
|
|
|
.withQueueUrl(config.getQueueUrlPattern()) |
|
|
|
.withMessageBody(data) |
|
|
|
.withMessageAttributes(messageAttributes); |
|
|
|
if (queueType == TbSqsNodeConfiguration.QueueType.STANDARD) { |
|
|
|
sendMsgRequest.setDelaySeconds(0); |
|
|
|
} else { |
|
|
|
sendMsgRequest.withMessageDeduplicationId(msg.getId().toString()); |
|
|
|
sendMsgRequest.withMessageGroupId(DEVICE_ID.toString()); |
|
|
|
} |
|
|
|
verify(sqsClientMock).sendMessage(sendMsgRequest); |
|
|
|
.withMessageAttributes(messageAttributes) |
|
|
|
.withDelaySeconds(config.getDelaySeconds()); |
|
|
|
then(sqsClientMock).should().sendMessage(sendMsgRequest); |
|
|
|
} |
|
|
|
|
|
|
|
private static Stream<Arguments> givenMsgAttributesPatternsAndQueueTypeIsStandard_whenOnMsg_thenVerifyRequest() { |
|
|
|
return Stream.of( |
|
|
|
Arguments.of(TbMsgMetaData.EMPTY, |
|
|
|
TbMsg.EMPTY_JSON_OBJECT, |
|
|
|
Map.of("attributeName", "attributeValue")), |
|
|
|
Arguments.of(TbMsgMetaData.EMPTY, |
|
|
|
"{\"msgAttrNamePattern\":\"msgAttrName\",\"msgAttrValuePattern\":\"msgAttrValue\"}", |
|
|
|
Map.of("$[msgAttrNamePattern]", "$[msgAttrValuePattern]")), |
|
|
|
Arguments.of(new TbMsgMetaData(Map.of("mdAttrNamePattern", "mdAttrName", "mdAttrValuePattern", "mdAttrValue")), |
|
|
|
TbMsg.EMPTY_JSON_OBJECT, |
|
|
|
Map.of("${mdAttrNamePattern}", "${mdAttrValuePattern}")) |
|
|
|
); |
|
|
|
} |
|
|
|
|
|
|
|
@Test |
|
|
|
void givenMsgResultContainsBodyAndAttributesAndNumber_whenOnMsg_thenTellSuccess() { |
|
|
|
String messageBodyMd5 = "msgBodyMd5-55fb8ba2-2b71-4673-a82a-969756764761"; |
|
|
|
String messageAttributesMd5 = "msgAttrMd5-e3ba3eef-52ae-436a-bec1-0c2c2252d1f1"; |
|
|
|
String sequenceNumber = "seqNum-bb5ddce0-cf4e-4295-b015-524bdb6a332f"; |
|
|
|
|
|
|
|
mockSendingMsgRequest(); |
|
|
|
given(sendMessageResultMock.getMD5OfMessageBody()).willReturn(messageBodyMd5); |
|
|
|
given(sendMessageResultMock.getMD5OfMessageAttributes()).willReturn(messageAttributesMd5); |
|
|
|
given(sendMessageResultMock.getSequenceNumber()).willReturn(sequenceNumber); |
|
|
|
|
|
|
|
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT); |
|
|
|
node.onMsg(ctxMock, msg); |
|
|
|
|
|
|
|
SendMessageRequest sendMsgRequest = new SendMessageRequest() |
|
|
|
.withQueueUrl(TbNodeUtils.processPattern(config.getQueueUrlPattern(), msg)) |
|
|
|
.withMessageBody(msg.getData()) |
|
|
|
.withDelaySeconds(config.getDelaySeconds()); |
|
|
|
then(sqsClientMock).should().sendMessage(sendMsgRequest); |
|
|
|
ArgumentCaptor<TbMsg> msgArgumentCaptor = ArgumentCaptor.forClass(TbMsg.class); |
|
|
|
verify(ctxMock).tellSuccess(msgArgumentCaptor.capture()); |
|
|
|
assertThat(msgArgumentCaptor.getValue().getMetaData().getData()) |
|
|
|
then(ctxMock).should().tellSuccess(msgArgumentCaptor.capture()); |
|
|
|
Map<String, String> metadata = msgArgumentCaptor.getValue().getMetaData().getData(); |
|
|
|
assertThat(metadata) |
|
|
|
.hasFieldOrPropertyWithValue("messageId", messageId) |
|
|
|
.hasFieldOrPropertyWithValue("requestId", requestId) |
|
|
|
.hasFieldOrPropertyWithValue("messageBodyMd5", messageBodyMd5) |
|
|
|
@ -153,47 +202,33 @@ class TbSqsNodeTest { |
|
|
|
verifyNoMoreInteractions(ctxMock, sqsClientMock, sendMessageResultMock, responseMetadataMock); |
|
|
|
} |
|
|
|
|
|
|
|
private static Stream<Arguments> givenQueueUrlAndMsgAttributesPatternsAndQueueTypes_whenOnMsg_thenTellSuccess() { |
|
|
|
return Stream.of( |
|
|
|
Arguments.of( |
|
|
|
"https://sqs.us-east-1.amazonaws.com/123456789012/new-queue-name", |
|
|
|
TbMsgMetaData.EMPTY, |
|
|
|
TbMsg.EMPTY_JSON_OBJECT, |
|
|
|
Map.of("attributeName", "attributeValue"), |
|
|
|
TbSqsNodeConfiguration.QueueType.STANDARD), |
|
|
|
Arguments.of( |
|
|
|
"https://sqs.us-east-1.amazonaws.com/123456789012/$[msgQueueName]", |
|
|
|
TbMsgMetaData.EMPTY, |
|
|
|
"{\"msgQueueName\":\"msg-queue-name\",\"msgAttrNamePattern\":\"msgAttrName\",\"msgAttrValuePattern\":\"msgAttrValue\"}", |
|
|
|
Map.of("$[msgAttrNamePattern]", "$[msgAttrValuePattern]"), |
|
|
|
TbSqsNodeConfiguration.QueueType.FIFO), |
|
|
|
Arguments.of("https://sqs.us-east-1.amazonaws.com/123456789012/${mdQueueName}", |
|
|
|
new TbMsgMetaData(Map.of("mdQueueName", "md-queue-name", "mdAttrNamePattern", "mdAttrName", "mdAttrValuePattern", "mdAttrValue")), |
|
|
|
TbMsg.EMPTY_JSON_OBJECT, |
|
|
|
Map.of("${mdAttrNamePattern}", "${mdAttrValuePattern}"), |
|
|
|
TbSqsNodeConfiguration.QueueType.STANDARD) |
|
|
|
); |
|
|
|
} |
|
|
|
|
|
|
|
@Test |
|
|
|
void givenErrorOccursDuringProcessingRequest_whenOnMsg_thenTellFailure() throws TbNodeException, ExecutionException, InterruptedException { |
|
|
|
void givenErrorOccursDuringProcessingRequest_whenOnMsg_thenTellFailure() { |
|
|
|
ListeningExecutor listeningExecutor = mock(ListeningExecutor.class); |
|
|
|
when(ctxMock.getExternalCallExecutor()).thenReturn(listeningExecutor); |
|
|
|
given(ctxMock.getExternalCallExecutor()).willReturn(listeningExecutor); |
|
|
|
String errorMsg = "Something went wrong"; |
|
|
|
|
|
|
|
ListenableFuture<TbMsg> failedFuture = Futures.immediateFailedFuture(new RuntimeException(errorMsg)); |
|
|
|
when(listeningExecutor.executeAsync(any(Callable.class))).thenReturn(failedFuture); |
|
|
|
given(listeningExecutor.executeAsync(any(Callable.class))).willReturn(failedFuture); |
|
|
|
|
|
|
|
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT); |
|
|
|
node.onMsg(ctxMock, msg); |
|
|
|
|
|
|
|
ArgumentCaptor<TbMsg> msgArgumentCaptor = ArgumentCaptor.forClass(TbMsg.class); |
|
|
|
ArgumentCaptor<Throwable> throwableCaptor = ArgumentCaptor.forClass(Throwable.class); |
|
|
|
verify(ctxMock).tellFailure(msgArgumentCaptor.capture(), throwableCaptor.capture()); |
|
|
|
then(ctxMock).should().tellFailure(msgArgumentCaptor.capture(), throwableCaptor.capture()); |
|
|
|
assertThat(msgArgumentCaptor.getValue().getMetaData().getData()) |
|
|
|
.hasFieldOrPropertyWithValue("error", RuntimeException.class + ": " + errorMsg); |
|
|
|
assertThat(throwableCaptor.getValue()).isInstanceOf(RuntimeException.class).hasMessage(errorMsg); |
|
|
|
verifyNoMoreInteractions(ctxMock, sqsClientMock); |
|
|
|
} |
|
|
|
|
|
|
|
private void mockSendingMsgRequest() { |
|
|
|
given(ctxMock.getExternalCallExecutor()).willReturn(executor); |
|
|
|
given(sqsClientMock.sendMessage(any(SendMessageRequest.class))).willReturn(sendMessageResultMock); |
|
|
|
given(sendMessageResultMock.getMessageId()).willReturn(messageId); |
|
|
|
given(sendMessageResultMock.getSdkResponseMetadata()).willReturn(responseMetadataMock); |
|
|
|
given(responseMetadataMock.getRequestId()).willReturn(requestId); |
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|