@ -333,6 +333,30 @@ public abstract class AbstractMqttTimeseriesProtoIntegrationTest extends Abstrac
assertFalse ( callback . isPubAckReceived ( ) ) ;
}
@Test
public void testPushTelemetryGatewayWithMalformedPayloadAndSendPubAckOnErrorEnabled ( ) throws Exception {
processBeforeTest ( "Test Post Telemetry device proto payload" , "Test Post Telemetry gateway proto payload" , TransportPayloadType . PROTOBUF , POST_DATA_TELEMETRY_TOPIC , null , true ) ;
CountDownLatch latch = new CountDownLatch ( 1 ) ;
MqttAsyncClient client = getMqttAsyncClient ( gatewayAccessToken ) ;
TestMqttPublishCallback callback = new TestMqttPublishCallback ( latch ) ;
client . setCallback ( callback ) ;
publishMqttMsg ( client , MALFORMED_PROTO_PAYLOAD . getBytes ( ) , POST_DATA_TELEMETRY_TOPIC ) ;
latch . await ( 3 , TimeUnit . SECONDS ) ;
assertTrue ( callback . isPubAckReceived ( ) ) ;
}
@Test
public void testPushTelemetryGatewayWithMalformedPayloadAndSendPubAckOnErrorDisabled ( ) throws Exception {
processBeforeTest ( "Test Post Telemetry device proto payload" , "Test Post Telemetry gateway proto payload" , TransportPayloadType . PROTOBUF , POST_DATA_TELEMETRY_TOPIC , null , false ) ;
CountDownLatch latch = new CountDownLatch ( 1 ) ;
MqttAsyncClient client = getMqttAsyncClient ( gatewayAccessToken ) ;
TestMqttPublishCallback callback = new TestMqttPublishCallback ( latch ) ;
client . setCallback ( callback ) ;
publishMqttMsg ( client , MALFORMED_PROTO_PAYLOAD . getBytes ( ) , POST_DATA_TELEMETRY_TOPIC ) ;
latch . await ( 3 , TimeUnit . SECONDS ) ;
assertFalse ( callback . isPubAckReceived ( ) ) ;
}
private DynamicSchema getDynamicSchema ( String deviceTelemetryProtoSchema ) {
DeviceProfileTransportConfiguration transportConfiguration = deviceProfile . getProfileData ( ) . getTransportConfiguration ( ) ;
assertTrue ( transportConfiguration instanceof MqttDeviceProfileTransportConfiguration ) ;