|
|
|
@ -39,19 +39,23 @@ import org.thingsboard.rule.engine.api.TbNodeException; |
|
|
|
import org.thingsboard.rule.engine.api.util.TbNodeUtils; |
|
|
|
import org.thingsboard.server.common.data.id.DeviceId; |
|
|
|
import org.thingsboard.server.common.data.msg.TbMsgType; |
|
|
|
import org.thingsboard.server.common.data.msg.TbNodeConnectionType; |
|
|
|
import org.thingsboard.server.common.msg.TbMsg; |
|
|
|
import org.thingsboard.server.common.msg.TbMsgMetaData; |
|
|
|
|
|
|
|
import java.io.IOException; |
|
|
|
import java.util.Map; |
|
|
|
import java.util.UUID; |
|
|
|
import java.util.concurrent.TimeUnit; |
|
|
|
import java.util.stream.Stream; |
|
|
|
|
|
|
|
import static org.assertj.core.api.Assertions.assertThat; |
|
|
|
import static org.assertj.core.api.Assertions.assertThatNoException; |
|
|
|
import static org.assertj.core.api.Assertions.assertThatThrownBy; |
|
|
|
import static org.mockito.ArgumentMatchers.any; |
|
|
|
import static org.mockito.ArgumentMatchers.eq; |
|
|
|
import static org.mockito.BDDMockito.given; |
|
|
|
import static org.mockito.BDDMockito.never; |
|
|
|
import static org.mockito.BDDMockito.spy; |
|
|
|
import static org.mockito.BDDMockito.then; |
|
|
|
import static org.mockito.BDDMockito.willReturn; |
|
|
|
@ -103,9 +107,10 @@ class TbPubSubNodeTest { |
|
|
|
|
|
|
|
@ParameterizedTest |
|
|
|
@MethodSource |
|
|
|
public void givenMessageAttributesPatterns_whenOnMsg_thenTellSuccess( |
|
|
|
public void givenForceAckIsTrueAndMessageAttributesPatterns_whenOnMsg_thenEnqueueForTellNext( |
|
|
|
String attributeName, String attributeValue, TbMsgMetaData metaData, String data) { |
|
|
|
config.setMessageAttributes(Map.of(attributeName, attributeValue)); |
|
|
|
ReflectionTestUtils.setField(node, "forceAck", true); |
|
|
|
init(); |
|
|
|
|
|
|
|
String messageId = "2070443601311540"; |
|
|
|
@ -115,6 +120,7 @@ class TbPubSubNodeTest { |
|
|
|
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, metaData, data); |
|
|
|
node.onMsg(ctxMock, msg); |
|
|
|
|
|
|
|
then(ctxMock).should().ack(msg); |
|
|
|
PubsubMessage.Builder pubsubMessageBuilder = PubsubMessage.newBuilder(); |
|
|
|
pubsubMessageBuilder.setData(ByteString.copyFromUtf8(msg.getData())); |
|
|
|
this.config.getMessageAttributes().forEach((k, v) -> { |
|
|
|
@ -124,7 +130,7 @@ class TbPubSubNodeTest { |
|
|
|
}); |
|
|
|
then(pubSubClientMock).should().publish(pubsubMessageBuilder.build()); |
|
|
|
ArgumentCaptor<TbMsg> actualMsg = ArgumentCaptor.forClass(TbMsg.class); |
|
|
|
then(ctxMock).should().tellSuccess(actualMsg.capture()); |
|
|
|
then(ctxMock).should().enqueueForTellNext(actualMsg.capture(), eq(TbNodeConnectionType.SUCCESS)); |
|
|
|
metaData.putValue("messageId", messageId); |
|
|
|
TbMsg expectedMsg = TbMsg.transformMsgMetadata(msg, metaData); |
|
|
|
assertThat(actualMsg.getValue()) |
|
|
|
@ -133,7 +139,7 @@ class TbPubSubNodeTest { |
|
|
|
.isEqualTo(expectedMsg); |
|
|
|
} |
|
|
|
|
|
|
|
private static Stream<Arguments> givenMessageAttributesPatterns_whenOnMsg_thenTellSuccess() { |
|
|
|
private static Stream<Arguments> givenForceAckIsTrueAndMessageAttributesPatterns_whenOnMsg_thenEnqueueForTellNext() { |
|
|
|
return Stream.of( |
|
|
|
Arguments.of("attributeName", "attributeValue", new TbMsgMetaData(), TbMsg.EMPTY_JSON_OBJECT), |
|
|
|
Arguments.of("${mdAttrName}", "${mdAttrValue}", new TbMsgMetaData( |
|
|
|
@ -147,8 +153,9 @@ class TbPubSubNodeTest { |
|
|
|
} |
|
|
|
|
|
|
|
@Test |
|
|
|
public void givenErrorOccursOnTheGCP_whenOnMsg_thenTellFailure() { |
|
|
|
public void givenForceAckIsFalseAndErrorOccursOnTheGCP_whenOnMsg_thenTellFailure() { |
|
|
|
init(); |
|
|
|
ReflectionTestUtils.setField(node, "forceAck", false); |
|
|
|
|
|
|
|
String errorMsg = "Something went wrong!"; |
|
|
|
ApiFuture<String> failedFuture = ApiFutures.immediateFailedFuture(new RuntimeException(errorMsg)); |
|
|
|
@ -159,6 +166,7 @@ class TbPubSubNodeTest { |
|
|
|
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, metaData, TbMsg.EMPTY_JSON_OBJECT); |
|
|
|
node.onMsg(ctxMock, msg); |
|
|
|
|
|
|
|
then(ctxMock).should(never()).ack(any()); |
|
|
|
ArgumentCaptor<TbMsg> actualMsg = ArgumentCaptor.forClass(TbMsg.class); |
|
|
|
ArgumentCaptor<Throwable> actualError = ArgumentCaptor.forClass(Throwable.class); |
|
|
|
then(ctxMock).should().tellFailure(actualMsg.capture(), actualError.capture()); |
|
|
|
@ -171,6 +179,21 @@ class TbPubSubNodeTest { |
|
|
|
assertThat(actualError.getValue()).isInstanceOf(RuntimeException.class).hasMessage(errorMsg); |
|
|
|
} |
|
|
|
|
|
|
|
@Test |
|
|
|
public void givenPubSubClientIsNotNull_whenDestroy_thenShutDownAndAwaitTermination() throws InterruptedException { |
|
|
|
ReflectionTestUtils.setField(node, "pubSubClient", pubSubClientMock); |
|
|
|
node.destroy(); |
|
|
|
then(pubSubClientMock).should().shutdown(); |
|
|
|
then(pubSubClientMock).should().awaitTermination(1, TimeUnit.SECONDS); |
|
|
|
} |
|
|
|
|
|
|
|
@Test |
|
|
|
public void givenPubSubClientIsNull_whenDestroy_thenShutDownAndAwaitTermination() { |
|
|
|
ReflectionTestUtils.setField(node, "pubSubClient", null); |
|
|
|
node.destroy(); |
|
|
|
then(pubSubClientMock).shouldHaveNoInteractions(); |
|
|
|
} |
|
|
|
|
|
|
|
private void init() { |
|
|
|
ReflectionTestUtils.setField(node, "config", config); |
|
|
|
ReflectionTestUtils.setField(node, "pubSubClient", pubSubClientMock); |
|
|
|
|