|
|
|
@ -39,17 +39,22 @@ import org.mockito.runners.MockitoJUnitRunner; |
|
|
|
import org.thingsboard.server.queue.TbQueueAdmin; |
|
|
|
import org.thingsboard.server.queue.TbQueueConsumer; |
|
|
|
import org.thingsboard.server.queue.TbQueueMsg; |
|
|
|
import org.thingsboard.server.queue.TbQueueMsgHeaders; |
|
|
|
import org.thingsboard.server.queue.TbQueueProducer; |
|
|
|
|
|
|
|
import java.util.concurrent.CountDownLatch; |
|
|
|
import java.util.concurrent.ExecutorService; |
|
|
|
import java.util.concurrent.TimeUnit; |
|
|
|
|
|
|
|
import static org.junit.Assert.*; |
|
|
|
import static org.junit.Assert.assertEquals; |
|
|
|
import static org.junit.Assert.assertFalse; |
|
|
|
import static org.junit.Assert.assertNotEquals; |
|
|
|
import static org.junit.Assert.assertTrue; |
|
|
|
import static org.mockito.BDDMockito.willAnswer; |
|
|
|
import static org.mockito.BDDMockito.willDoNothing; |
|
|
|
import static org.mockito.BDDMockito.willReturn; |
|
|
|
import static org.mockito.Matchers.any; |
|
|
|
import static org.mockito.Mockito.mock; |
|
|
|
import static org.mockito.Mockito.never; |
|
|
|
import static org.mockito.Mockito.spy; |
|
|
|
import static org.mockito.Mockito.times; |
|
|
|
@ -70,7 +75,7 @@ public class DefaultTbQueueRequestTemplateTest { |
|
|
|
ExecutorService executor; |
|
|
|
String topic = "js-responses-tb-node-0"; |
|
|
|
long maxRequestTimeout = 20; |
|
|
|
long maxPendingRequests = 10000; |
|
|
|
long maxPendingRequests = 32; |
|
|
|
long pollInterval = 25; |
|
|
|
|
|
|
|
DefaultTbQueueRequestTemplate inst; |
|
|
|
@ -78,9 +83,9 @@ public class DefaultTbQueueRequestTemplateTest { |
|
|
|
@Before |
|
|
|
public void setUp() throws Exception { |
|
|
|
willReturn(topic).given(responseTemplate).getTopic(); |
|
|
|
inst = new DefaultTbQueueRequestTemplate( |
|
|
|
inst = spy(new DefaultTbQueueRequestTemplate( |
|
|
|
queueAdmin, requestTemplate, responseTemplate, |
|
|
|
maxRequestTimeout, maxPendingRequests, pollInterval, executorMock); |
|
|
|
maxRequestTimeout, maxPendingRequests, pollInterval, executorMock)); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
@ -103,7 +108,6 @@ public class DefaultTbQueueRequestTemplateTest { |
|
|
|
|
|
|
|
@Test |
|
|
|
public void givenExternalExecutor_whenInitStop_thenOK() { |
|
|
|
inst = spy(inst); |
|
|
|
willDoNothing().given(inst).mainLoop(); |
|
|
|
|
|
|
|
inst.init(); |
|
|
|
@ -123,7 +127,6 @@ public class DefaultTbQueueRequestTemplateTest { |
|
|
|
|
|
|
|
@Test |
|
|
|
public void givenMainLoop_whenLoopFewTimes_thenVerifyInvocationCount() throws InterruptedException { |
|
|
|
inst = spy(inst); |
|
|
|
executor = inst.createExecutor(); |
|
|
|
CountDownLatch latch = new CountDownLatch(5); |
|
|
|
willDoNothing().given(inst).sleep(); |
|
|
|
@ -146,4 +149,42 @@ public class DefaultTbQueueRequestTemplateTest { |
|
|
|
verify(inst, times(2)).sleep(); |
|
|
|
} |
|
|
|
|
|
|
|
@Test |
|
|
|
public void givenMessages_whenSend_thenOK() { |
|
|
|
willDoNothing().given(inst).sendToRequestTemplate(any(), any(), any(), any()); |
|
|
|
inst.init(); |
|
|
|
int msgCount = 10; |
|
|
|
for (int i = 0; i < msgCount; i++) { |
|
|
|
inst.send(getRequestMsgMock()); |
|
|
|
} |
|
|
|
assertEquals(msgCount, inst.pendingRequests.size()); |
|
|
|
verify(inst, times(msgCount)).sendToRequestTemplate(any(), any(), any(), any()); |
|
|
|
} |
|
|
|
|
|
|
|
@Test |
|
|
|
public void givenMessagesOverMaxPendingRequests_whenSend_thenImmediateFailedFutureForTheOfRequests() { |
|
|
|
willDoNothing().given(inst).sendToRequestTemplate(any(), any(), any(), any()); |
|
|
|
inst.init(); |
|
|
|
assertEquals(0, inst.tickSize); |
|
|
|
int msgOverflowCount = 10; |
|
|
|
for (int i = 0; i < inst.maxPendingRequests; i++) { |
|
|
|
assertFalse(inst.send(getRequestMsgMock()).isDone()); //SettableFuture future - pending only
|
|
|
|
} |
|
|
|
for (int i = 0; i < msgOverflowCount; i++) { |
|
|
|
assertTrue("max pending requests overflow", inst.send(getRequestMsgMock()).isDone()); //overflow, immediate failed future
|
|
|
|
} |
|
|
|
assertEquals(inst.maxPendingRequests, inst.pendingRequests.size()); |
|
|
|
verify(inst, times((int) inst.maxPendingRequests)).sendToRequestTemplate(any(), any(), any(), any()); |
|
|
|
} |
|
|
|
|
|
|
|
@Test |
|
|
|
public void givenNothing_whenFetchAndProcessResponsesWithTimeout_thenFail() { |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
TbQueueMsg getRequestMsgMock() { |
|
|
|
TbQueueMsg requestMsg = mock(TbQueueMsg.class); |
|
|
|
willReturn(mock(TbQueueMsgHeaders.class)).given(requestMsg).getHeaders(); |
|
|
|
return requestMsg; |
|
|
|
} |
|
|
|
} |