8 changed files with 739 additions and 1 deletions
@ -0,0 +1,22 @@ |
|||
/** |
|||
* Copyright © 2016-2022 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.rule.engine.deduplication; |
|||
|
|||
public enum DeduplicationId { |
|||
|
|||
ORIGINATOR, TENANT, CUSTOMER |
|||
|
|||
} |
|||
@ -0,0 +1,22 @@ |
|||
/** |
|||
* Copyright © 2016-2022 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.rule.engine.deduplication; |
|||
|
|||
public enum DeduplicationStrategy { |
|||
|
|||
FIRST, LAST, ALL |
|||
|
|||
} |
|||
@ -0,0 +1,237 @@ |
|||
/** |
|||
* Copyright © 2016-2022 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.rule.engine.deduplication; |
|||
|
|||
import com.fasterxml.jackson.databind.node.ArrayNode; |
|||
import com.fasterxml.jackson.databind.node.ObjectNode; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.data.util.Pair; |
|||
import org.thingsboard.common.util.JacksonUtil; |
|||
import org.thingsboard.rule.engine.api.RuleNode; |
|||
import org.thingsboard.rule.engine.api.TbContext; |
|||
import org.thingsboard.rule.engine.api.TbNode; |
|||
import org.thingsboard.rule.engine.api.TbNodeConfiguration; |
|||
import org.thingsboard.rule.engine.api.TbNodeException; |
|||
import org.thingsboard.rule.engine.api.TbRelationTypes; |
|||
import org.thingsboard.rule.engine.api.util.TbNodeUtils; |
|||
import org.thingsboard.server.common.data.id.EntityId; |
|||
import org.thingsboard.server.common.data.plugin.ComponentType; |
|||
import org.thingsboard.server.common.data.util.TbPair; |
|||
import org.thingsboard.server.common.msg.TbMsg; |
|||
import org.thingsboard.server.common.msg.TbMsgMetaData; |
|||
|
|||
import java.util.ArrayList; |
|||
import java.util.Comparator; |
|||
import java.util.HashMap; |
|||
import java.util.Iterator; |
|||
import java.util.LinkedList; |
|||
import java.util.List; |
|||
import java.util.Map; |
|||
import java.util.Optional; |
|||
import java.util.concurrent.ExecutionException; |
|||
import java.util.concurrent.TimeUnit; |
|||
|
|||
@RuleNode( |
|||
type = ComponentType.ACTION, |
|||
name = "deduplication", |
|||
configClazz = TbMsgDeduplicationNodeConfiguration.class, |
|||
nodeDescription = "Deduplicate messages for a configurable period based on a specified deduplication strategy.", |
|||
nodeDetails = "Rule node allows you to select one of the following strategy to deduplicate messages: <br></br>" + |
|||
"<b>FIRST</b> - return first message that arrived during deduplication period.<br></br>" + |
|||
"<b>LAST</b> - return last message that arrived during deduplication period.<br></br>" + |
|||
"<b>ALL</b> - return all messages as a single JSON array message. Where each element represents object with <b>msg</b> and <b>metadata</b> inner properties.<br></br>", |
|||
icon = "content_copy", |
|||
uiResources = {"static/rulenode/rulenode-core-config.js"}, |
|||
configDirective = "tbActionNodeMsgDeduplicationConfig" |
|||
) |
|||
@Slf4j |
|||
public class TbMsgDeduplicationNode implements TbNode { |
|||
|
|||
private static final String TB_MSG_DEDUPLICATION_TIMEOUT_MSG = "TbMsgDeduplicationNodeMsg"; |
|||
private static final int TB_MSG_DEDUPLICATION_TIMEOUT = 5000; |
|||
public static final int TB_MSG_DEDUPLICATION_RETRY_DELAY = 10; |
|||
|
|||
private TbMsgDeduplicationNodeConfiguration config; |
|||
|
|||
private final Map<EntityId, List<TbMsg>> deduplicationMap; |
|||
private long deduplicationInterval; |
|||
private long lastScheduledTs; |
|||
private DeduplicationId deduplicationId; |
|||
|
|||
public TbMsgDeduplicationNode() { |
|||
this.deduplicationMap = new HashMap<>(); |
|||
} |
|||
|
|||
@Override |
|||
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { |
|||
this.config = TbNodeUtils.convert(configuration, TbMsgDeduplicationNodeConfiguration.class); |
|||
this.deduplicationInterval = TimeUnit.SECONDS.toMillis(config.getInterval()); |
|||
this.deduplicationId = config.getId(); |
|||
scheduleTickMsg(ctx); |
|||
} |
|||
|
|||
@Override |
|||
public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException { |
|||
if (TB_MSG_DEDUPLICATION_TIMEOUT_MSG.equals(msg.getType())) { |
|||
try { |
|||
processDeduplication(ctx); |
|||
} finally { |
|||
scheduleTickMsg(ctx); |
|||
} |
|||
} else { |
|||
processOnRegularMsg(ctx, msg); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public void destroy() { |
|||
deduplicationMap.clear(); |
|||
} |
|||
|
|||
private void processOnRegularMsg(TbContext ctx, TbMsg msg) { |
|||
EntityId id = getDeduplicationId(ctx, msg); |
|||
List<TbMsg> deduplicationMsgs = deduplicationMap.computeIfAbsent(id, k -> new LinkedList<>()); |
|||
if (deduplicationMsgs.size() < config.getMaxPendingMsgs()) { |
|||
log.trace("[{}][{}] Adding msg: [{}][{}] to the pending msgs map ...", ctx.getSelfId(), id, msg.getId(), msg.getMetaDataTs()); |
|||
deduplicationMsgs.add(msg); |
|||
ctx.ack(msg); |
|||
} else { |
|||
log.trace("[{}] Max limit of pending messages reached for deduplication id: [{}]", ctx.getSelfId(), id); |
|||
ctx.tellFailure(msg, new RuntimeException("[" + ctx.getSelfId() + "] Max limit of pending messages reached for deduplication id: [" + id + "]")); |
|||
} |
|||
} |
|||
|
|||
private EntityId getDeduplicationId(TbContext ctx, TbMsg msg) { |
|||
switch (deduplicationId) { |
|||
case ORIGINATOR: |
|||
return msg.getOriginator(); |
|||
case TENANT: |
|||
return ctx.getTenantId(); |
|||
case CUSTOMER: |
|||
return msg.getCustomerId(); |
|||
default: |
|||
throw new IllegalStateException("Unsupported deduplication id: " + deduplicationId); |
|||
} |
|||
} |
|||
|
|||
private void processDeduplication(TbContext ctx) { |
|||
if (deduplicationMap.isEmpty()) { |
|||
return; |
|||
} |
|||
List<TbMsg> deduplicationResults = new ArrayList<>(); |
|||
long deduplicationTimeoutMs = System.currentTimeMillis(); |
|||
deduplicationMap.forEach((entityId, tbMsgs) -> { |
|||
if (tbMsgs.isEmpty()) { |
|||
return; |
|||
} |
|||
Optional<TbPair<Long, Long>> packBoundsOpt = findValidPack(tbMsgs, deduplicationTimeoutMs); |
|||
while (packBoundsOpt.isPresent()) { |
|||
TbPair<Long, Long> packBounds = packBoundsOpt.get(); |
|||
if (DeduplicationStrategy.ALL.equals(config.getStrategy())) { |
|||
List<TbMsg> pack = new ArrayList<>(); |
|||
for (Iterator<TbMsg> iterator = tbMsgs.iterator(); iterator.hasNext(); ) { |
|||
TbMsg msg = iterator.next(); |
|||
long msgTs = msg.getMetaDataTs(); |
|||
if (msgTs >= packBounds.getFirst() && msgTs < packBounds.getSecond()) { |
|||
pack.add(msg); |
|||
iterator.remove(); |
|||
} |
|||
} |
|||
deduplicationResults.add(TbMsg.newMsg( |
|||
config.getQueueName(), |
|||
config.getOutMsgType(), |
|||
entityId, |
|||
getMetadata(), |
|||
getMergedData(pack))); |
|||
} else { |
|||
TbMsg resultMsg = null; |
|||
boolean searchMin = DeduplicationStrategy.FIRST.equals(config.getStrategy()); |
|||
for (Iterator<TbMsg> iterator = tbMsgs.iterator(); iterator.hasNext(); ) { |
|||
TbMsg msg = iterator.next(); |
|||
long msgTs = msg.getMetaDataTs(); |
|||
if (msgTs >= packBounds.getFirst() && msgTs < packBounds.getSecond()) { |
|||
iterator.remove(); |
|||
if (resultMsg == null |
|||
|| (searchMin && msg.getMetaDataTs() < resultMsg.getMetaDataTs()) |
|||
|| (!searchMin && msg.getMetaDataTs() > resultMsg.getMetaDataTs())) { |
|||
resultMsg = msg; |
|||
} |
|||
} |
|||
} |
|||
deduplicationResults.add(resultMsg); |
|||
} |
|||
packBoundsOpt = findValidPack(tbMsgs, deduplicationTimeoutMs); |
|||
} |
|||
}); |
|||
deduplicationResults.forEach(outMsg -> enqueueForTellNextWithRetry(ctx, outMsg, 0)); |
|||
} |
|||
|
|||
private Optional<TbPair<Long, Long>> findValidPack(List<TbMsg> msgs, long deduplicationTimeoutMs) { |
|||
Optional<TbMsg> min = msgs.stream().min(Comparator.comparing(TbMsg::getMetaDataTs)); |
|||
return min.map(minTsMsg -> { |
|||
long packStartTs = minTsMsg.getMetaDataTs(); |
|||
long packEndTs = packStartTs + deduplicationInterval; |
|||
if (packEndTs <= deduplicationTimeoutMs) { |
|||
return new TbPair<>(packStartTs, packEndTs); |
|||
} |
|||
return null; |
|||
}); |
|||
} |
|||
|
|||
private void enqueueForTellNextWithRetry(TbContext ctx, TbMsg msg, int retryAttempt) { |
|||
if (config.getMaxRetries() > retryAttempt) { |
|||
ctx.enqueueForTellNext(msg, TbRelationTypes.SUCCESS, |
|||
() -> { |
|||
log.trace("[{}][{}][{}] Successfully enqueue deduplication result message!", ctx.getSelfId(), msg.getOriginator(), retryAttempt); |
|||
}, |
|||
throwable -> { |
|||
log.trace("[{}][{}][{}] Failed to enqueue deduplication output message due to: ", ctx.getSelfId(), msg.getOriginator(), retryAttempt, throwable); |
|||
ctx.schedule(() -> { |
|||
enqueueForTellNextWithRetry(ctx, msg, retryAttempt + 1); |
|||
}, TB_MSG_DEDUPLICATION_RETRY_DELAY, TimeUnit.SECONDS); |
|||
}); |
|||
} |
|||
} |
|||
|
|||
private void scheduleTickMsg(TbContext ctx) { |
|||
long curTs = System.currentTimeMillis(); |
|||
if (lastScheduledTs == 0L) { |
|||
lastScheduledTs = curTs; |
|||
} |
|||
lastScheduledTs += TB_MSG_DEDUPLICATION_TIMEOUT; |
|||
long curDelay = Math.max(0L, (lastScheduledTs - curTs)); |
|||
TbMsg tickMsg = ctx.newMsg(null, TB_MSG_DEDUPLICATION_TIMEOUT_MSG, ctx.getSelfId(), new TbMsgMetaData(), ""); |
|||
ctx.tellSelf(tickMsg, curDelay); |
|||
} |
|||
|
|||
private String getMergedData(List<TbMsg> msgs) { |
|||
ArrayNode mergedData = JacksonUtil.OBJECT_MAPPER.createArrayNode(); |
|||
msgs.forEach(msg -> { |
|||
ObjectNode msgNode = JacksonUtil.newObjectNode(); |
|||
msgNode.set("msg", JacksonUtil.toJsonNode(msg.getData())); |
|||
msgNode.set("metadata", JacksonUtil.valueToTree(msg.getMetaData().getData())); |
|||
mergedData.add(msgNode); |
|||
}); |
|||
return JacksonUtil.toString(mergedData); |
|||
} |
|||
|
|||
private TbMsgMetaData getMetadata() { |
|||
TbMsgMetaData metaData = new TbMsgMetaData(); |
|||
metaData.putValue("ts", String.valueOf(System.currentTimeMillis())); |
|||
return metaData; |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,46 @@ |
|||
/** |
|||
* Copyright © 2016-2022 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.rule.engine.deduplication; |
|||
|
|||
import lombok.Data; |
|||
import org.thingsboard.rule.engine.api.NodeConfiguration; |
|||
|
|||
@Data |
|||
public class TbMsgDeduplicationNodeConfiguration implements NodeConfiguration<TbMsgDeduplicationNodeConfiguration> { |
|||
|
|||
private int interval; |
|||
private DeduplicationId id; |
|||
private DeduplicationStrategy strategy; |
|||
|
|||
// Advanced settings:
|
|||
private int maxPendingMsgs; |
|||
private int maxRetries; |
|||
|
|||
// only for DeduplicationStrategy.ALL:
|
|||
private String outMsgType; |
|||
private String queueName; |
|||
|
|||
@Override |
|||
public TbMsgDeduplicationNodeConfiguration defaultConfiguration() { |
|||
TbMsgDeduplicationNodeConfiguration configuration = new TbMsgDeduplicationNodeConfiguration(); |
|||
configuration.setInterval(60); |
|||
configuration.setId(DeduplicationId.ORIGINATOR); |
|||
configuration.setStrategy(DeduplicationStrategy.FIRST); |
|||
configuration.setMaxPendingMsgs(100); |
|||
configuration.setMaxRetries(3); |
|||
return configuration; |
|||
} |
|||
} |
|||
@ -0,0 +1,402 @@ |
|||
/** |
|||
* Copyright © 2016-2022 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.rule.engine.action; |
|||
|
|||
import com.fasterxml.jackson.databind.node.ArrayNode; |
|||
import com.fasterxml.jackson.databind.node.ObjectNode; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.junit.jupiter.api.AfterEach; |
|||
import org.junit.jupiter.api.Assertions; |
|||
import org.junit.jupiter.api.BeforeEach; |
|||
import org.junit.jupiter.api.Test; |
|||
import org.mockito.ArgumentCaptor; |
|||
import org.mockito.ArgumentMatchers; |
|||
import org.mockito.stubbing.Answer; |
|||
import org.thingsboard.common.util.JacksonUtil; |
|||
import org.thingsboard.common.util.ThingsBoardThreadFactory; |
|||
import org.thingsboard.rule.engine.api.TbContext; |
|||
import org.thingsboard.rule.engine.api.TbNodeConfiguration; |
|||
import org.thingsboard.rule.engine.api.TbNodeException; |
|||
import org.thingsboard.rule.engine.api.TbRelationTypes; |
|||
import org.thingsboard.rule.engine.deduplication.DeduplicationStrategy; |
|||
import org.thingsboard.rule.engine.deduplication.TbMsgDeduplicationNode; |
|||
import org.thingsboard.rule.engine.deduplication.TbMsgDeduplicationNodeConfiguration; |
|||
import org.thingsboard.server.common.data.id.DeviceId; |
|||
import org.thingsboard.server.common.data.id.EntityId; |
|||
import org.thingsboard.server.common.data.id.RuleNodeId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.msg.TbMsg; |
|||
import org.thingsboard.server.common.msg.TbMsgMetaData; |
|||
import org.thingsboard.server.common.msg.session.SessionMsgType; |
|||
|
|||
import java.util.ArrayList; |
|||
import java.util.List; |
|||
import java.util.Random; |
|||
import java.util.UUID; |
|||
import java.util.concurrent.CountDownLatch; |
|||
import java.util.concurrent.ExecutionException; |
|||
import java.util.concurrent.Executors; |
|||
import java.util.concurrent.ScheduledExecutorService; |
|||
import java.util.concurrent.TimeUnit; |
|||
import java.util.concurrent.atomic.AtomicInteger; |
|||
import java.util.concurrent.atomic.AtomicLong; |
|||
import java.util.function.Consumer; |
|||
|
|||
import static org.mockito.ArgumentMatchers.any; |
|||
import static org.mockito.ArgumentMatchers.eq; |
|||
import static org.mockito.ArgumentMatchers.isNull; |
|||
import static org.mockito.ArgumentMatchers.nullable; |
|||
import static org.mockito.Mockito.doAnswer; |
|||
import static org.mockito.Mockito.mock; |
|||
import static org.mockito.Mockito.spy; |
|||
import static org.mockito.Mockito.times; |
|||
import static org.mockito.Mockito.verify; |
|||
import static org.mockito.Mockito.when; |
|||
|
|||
@Slf4j |
|||
public class TbMsgDeduplicationNodeTest { |
|||
|
|||
private static final String MAIN_QUEUE_NAME = "Main"; |
|||
private static final String HIGH_PRIORITY_QUEUE_NAME = "HighPriority"; |
|||
private static final String TB_MSG_DEDUPLICATION_TIMEOUT_MSG = "TbMsgDeduplicationNodeMsg"; |
|||
|
|||
private TbContext ctx; |
|||
|
|||
private final ThingsBoardThreadFactory factory = ThingsBoardThreadFactory.forName("de-duplication-node-test"); |
|||
private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(factory); |
|||
private final int deduplicationInterval = 1; |
|||
|
|||
private TenantId tenantId; |
|||
|
|||
private TbMsgDeduplicationNode node; |
|||
private TbMsgDeduplicationNodeConfiguration config; |
|||
private TbNodeConfiguration nodeConfiguration; |
|||
|
|||
private CountDownLatch awaitTellSelfLatch; |
|||
|
|||
@BeforeEach |
|||
public void init() throws TbNodeException { |
|||
ctx = mock(TbContext.class); |
|||
|
|||
tenantId = TenantId.fromUUID(UUID.randomUUID()); |
|||
RuleNodeId ruleNodeId = new RuleNodeId(UUID.randomUUID()); |
|||
|
|||
when(ctx.getSelfId()).thenReturn(ruleNodeId); |
|||
when(ctx.getTenantId()).thenReturn(tenantId); |
|||
|
|||
doAnswer((Answer<TbMsg>) invocationOnMock -> { |
|||
String type = (String) (invocationOnMock.getArguments())[1]; |
|||
EntityId originator = (EntityId) (invocationOnMock.getArguments())[2]; |
|||
TbMsgMetaData metaData = (TbMsgMetaData) (invocationOnMock.getArguments())[3]; |
|||
String data = (String) (invocationOnMock.getArguments())[4]; |
|||
return TbMsg.newMsg(type, originator, metaData.copy(), data); |
|||
}).when(ctx).newMsg(isNull(), eq(TB_MSG_DEDUPLICATION_TIMEOUT_MSG), nullable(EntityId.class), any(TbMsgMetaData.class), any(String.class)); |
|||
node = spy(new TbMsgDeduplicationNode()); |
|||
config = new TbMsgDeduplicationNodeConfiguration().defaultConfiguration(); |
|||
} |
|||
|
|||
private void invokeTellSelf(int maxNumberOfInvocation) { |
|||
invokeTellSelf(maxNumberOfInvocation, false, 0); |
|||
} |
|||
|
|||
private void invokeTellSelf(int maxNumberOfInvocation, boolean delayScheduleTimeout, int delayMultiplier) { |
|||
AtomicLong scheduleTimeout = new AtomicLong(deduplicationInterval); |
|||
AtomicInteger scheduleCount = new AtomicInteger(0); |
|||
doAnswer((Answer<Void>) invocationOnMock -> { |
|||
scheduleCount.getAndIncrement(); |
|||
if (scheduleCount.get() <= maxNumberOfInvocation) { |
|||
TbMsg msg = (TbMsg) (invocationOnMock.getArguments())[0]; |
|||
executorService.schedule(() -> { |
|||
try { |
|||
node.onMsg(ctx, msg); |
|||
awaitTellSelfLatch.countDown(); |
|||
} catch (ExecutionException | InterruptedException | TbNodeException e) { |
|||
log.error("Failed to execute tellSelf method call due to: ", e); |
|||
} |
|||
}, scheduleTimeout.get(), TimeUnit.SECONDS); |
|||
if (delayScheduleTimeout) { |
|||
scheduleTimeout.set(scheduleTimeout.get() * delayMultiplier); |
|||
} |
|||
} |
|||
|
|||
return null; |
|||
}).when(ctx).tellSelf(ArgumentMatchers.any(TbMsg.class), ArgumentMatchers.anyLong()); |
|||
} |
|||
|
|||
@AfterEach |
|||
public void destroy() { |
|||
executorService.shutdown(); |
|||
node.destroy(); |
|||
} |
|||
|
|||
@Test |
|||
public void given_100_messages_strategy_first_then_verifyOutput() throws TbNodeException, ExecutionException, InterruptedException { |
|||
int wantedNumberOfTellSelfInvocation = 2; |
|||
int msgCount = 100; |
|||
awaitTellSelfLatch = new CountDownLatch(wantedNumberOfTellSelfInvocation); |
|||
invokeTellSelf(wantedNumberOfTellSelfInvocation); |
|||
|
|||
config.setInterval(deduplicationInterval); |
|||
config.setMaxPendingMsgs(msgCount); |
|||
nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); |
|||
node.init(ctx, nodeConfiguration); |
|||
|
|||
DeviceId deviceId = new DeviceId(UUID.randomUUID()); |
|||
long currentTimeMillis = System.currentTimeMillis(); |
|||
|
|||
List<TbMsg> inputMsgs = getTbMsgs(deviceId, msgCount, currentTimeMillis, 500); |
|||
for (TbMsg msg : inputMsgs) { |
|||
node.onMsg(ctx, msg); |
|||
} |
|||
|
|||
TbMsg msgToReject = createMsg(deviceId, inputMsgs.get(inputMsgs.size() - 1).getMetaDataTs() + 2); |
|||
node.onMsg(ctx, msgToReject); |
|||
|
|||
awaitTellSelfLatch.await(); |
|||
|
|||
ArgumentCaptor<TbMsg> newMsgCaptor = ArgumentCaptor.forClass(TbMsg.class); |
|||
ArgumentCaptor<Runnable> successCaptor = ArgumentCaptor.forClass(Runnable.class); |
|||
ArgumentCaptor<Consumer<Throwable>> failureCaptor = ArgumentCaptor.forClass(Consumer.class); |
|||
|
|||
verify(ctx, times(msgCount)).ack(any()); |
|||
verify(ctx, times(1)).tellFailure(eq(msgToReject), any()); |
|||
verify(node, times(msgCount + wantedNumberOfTellSelfInvocation + 1)).onMsg(eq(ctx), any()); |
|||
verify(ctx, times(1)).enqueueForTellNext(newMsgCaptor.capture(), eq(TbRelationTypes.SUCCESS), successCaptor.capture(), failureCaptor.capture()); |
|||
Assertions.assertEquals(inputMsgs.get(0), newMsgCaptor.getValue()); |
|||
} |
|||
|
|||
@Test |
|||
public void given_100_messages_strategy_last_then_verifyOutput() throws TbNodeException, ExecutionException, InterruptedException { |
|||
int wantedNumberOfTellSelfInvocation = 2; |
|||
int msgCount = 100; |
|||
awaitTellSelfLatch = new CountDownLatch(wantedNumberOfTellSelfInvocation); |
|||
invokeTellSelf(wantedNumberOfTellSelfInvocation); |
|||
|
|||
config.setStrategy(DeduplicationStrategy.LAST); |
|||
config.setInterval(deduplicationInterval); |
|||
config.setMaxPendingMsgs(msgCount); |
|||
nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); |
|||
node.init(ctx, nodeConfiguration); |
|||
|
|||
DeviceId deviceId = new DeviceId(UUID.randomUUID()); |
|||
long currentTimeMillis = System.currentTimeMillis(); |
|||
|
|||
List<TbMsg> inputMsgs = getTbMsgs(deviceId, msgCount, currentTimeMillis, 500); |
|||
TbMsg msgWithLatestTs = getMsgWithLatestTs(inputMsgs); |
|||
|
|||
for (TbMsg msg : inputMsgs) { |
|||
node.onMsg(ctx, msg); |
|||
} |
|||
|
|||
TbMsg msgToReject = createMsg(deviceId, inputMsgs.get(inputMsgs.size() - 1).getMetaDataTs() + 2); |
|||
node.onMsg(ctx, msgToReject); |
|||
|
|||
awaitTellSelfLatch.await(); |
|||
|
|||
ArgumentCaptor<TbMsg> newMsgCaptor = ArgumentCaptor.forClass(TbMsg.class); |
|||
ArgumentCaptor<Runnable> successCaptor = ArgumentCaptor.forClass(Runnable.class); |
|||
ArgumentCaptor<Consumer<Throwable>> failureCaptor = ArgumentCaptor.forClass(Consumer.class); |
|||
|
|||
verify(ctx, times(msgCount)).ack(any()); |
|||
verify(ctx, times(1)).tellFailure(eq(msgToReject), any()); |
|||
verify(node, times(msgCount + wantedNumberOfTellSelfInvocation + 1)).onMsg(eq(ctx), any()); |
|||
verify(ctx, times(1)).enqueueForTellNext(newMsgCaptor.capture(), eq(TbRelationTypes.SUCCESS), successCaptor.capture(), failureCaptor.capture()); |
|||
Assertions.assertEquals(msgWithLatestTs, newMsgCaptor.getValue()); |
|||
} |
|||
|
|||
@Test |
|||
public void given_100_messages_strategy_all_then_verifyOutput() throws TbNodeException, ExecutionException, InterruptedException { |
|||
int wantedNumberOfTellSelfInvocation = 2; |
|||
int msgCount = 100; |
|||
awaitTellSelfLatch = new CountDownLatch(wantedNumberOfTellSelfInvocation); |
|||
invokeTellSelf(wantedNumberOfTellSelfInvocation); |
|||
|
|||
config.setInterval(deduplicationInterval); |
|||
config.setStrategy(DeduplicationStrategy.ALL); |
|||
config.setOutMsgType(SessionMsgType.POST_ATTRIBUTES_REQUEST.name()); |
|||
config.setQueueName(HIGH_PRIORITY_QUEUE_NAME); |
|||
nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); |
|||
node.init(ctx, nodeConfiguration); |
|||
|
|||
DeviceId deviceId = new DeviceId(UUID.randomUUID()); |
|||
long currentTimeMillis = System.currentTimeMillis(); |
|||
|
|||
List<TbMsg> inputMsgs = getTbMsgs(deviceId, msgCount, currentTimeMillis, 500); |
|||
for (TbMsg msg : inputMsgs) { |
|||
node.onMsg(ctx, msg); |
|||
} |
|||
|
|||
awaitTellSelfLatch.await(); |
|||
|
|||
ArgumentCaptor<TbMsg> newMsgCaptor = ArgumentCaptor.forClass(TbMsg.class); |
|||
ArgumentCaptor<Runnable> successCaptor = ArgumentCaptor.forClass(Runnable.class); |
|||
ArgumentCaptor<Consumer<Throwable>> failureCaptor = ArgumentCaptor.forClass(Consumer.class); |
|||
|
|||
verify(ctx, times(msgCount)).ack(any()); |
|||
verify(node, times(msgCount + wantedNumberOfTellSelfInvocation)).onMsg(eq(ctx), any()); |
|||
verify(ctx, times(1)).enqueueForTellNext(newMsgCaptor.capture(), eq(TbRelationTypes.SUCCESS), successCaptor.capture(), failureCaptor.capture()); |
|||
|
|||
Assertions.assertEquals(1, newMsgCaptor.getAllValues().size()); |
|||
TbMsg outMessage = newMsgCaptor.getAllValues().get(0); |
|||
Assertions.assertEquals(getMergedData(inputMsgs), outMessage.getData()); |
|||
Assertions.assertEquals(deviceId, outMessage.getOriginator()); |
|||
Assertions.assertEquals(config.getOutMsgType(), outMessage.getType()); |
|||
Assertions.assertEquals(config.getQueueName(), outMessage.getQueueName()); |
|||
} |
|||
|
|||
@Test |
|||
public void given_100_messages_strategy_all_then_verifyOutput_2_packs() throws TbNodeException, ExecutionException, InterruptedException { |
|||
int wantedNumberOfTellSelfInvocation = 2; |
|||
int msgCount = 100; |
|||
awaitTellSelfLatch = new CountDownLatch(wantedNumberOfTellSelfInvocation); |
|||
invokeTellSelf(wantedNumberOfTellSelfInvocation, true, 3); |
|||
|
|||
config.setInterval(deduplicationInterval); |
|||
config.setStrategy(DeduplicationStrategy.ALL); |
|||
config.setOutMsgType(SessionMsgType.POST_ATTRIBUTES_REQUEST.name()); |
|||
config.setQueueName(HIGH_PRIORITY_QUEUE_NAME); |
|||
nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); |
|||
node.init(ctx, nodeConfiguration); |
|||
|
|||
DeviceId deviceId = new DeviceId(UUID.randomUUID()); |
|||
long currentTimeMillis = System.currentTimeMillis(); |
|||
|
|||
List<TbMsg> firstMsgPack = getTbMsgs(deviceId, msgCount / 2, currentTimeMillis, 500); |
|||
for (TbMsg msg : firstMsgPack) { |
|||
node.onMsg(ctx, msg); |
|||
} |
|||
long firstPackDeduplicationPackEndTs = firstMsgPack.get(0).getMetaDataTs() + TimeUnit.SECONDS.toMillis(deduplicationInterval); |
|||
|
|||
List<TbMsg> secondMsgPack = getTbMsgs(deviceId, msgCount / 2, firstPackDeduplicationPackEndTs, 500); |
|||
for (TbMsg msg : secondMsgPack) { |
|||
node.onMsg(ctx, msg); |
|||
} |
|||
|
|||
awaitTellSelfLatch.await(); |
|||
|
|||
ArgumentCaptor<TbMsg> newMsgCaptor = ArgumentCaptor.forClass(TbMsg.class); |
|||
ArgumentCaptor<Runnable> successCaptor = ArgumentCaptor.forClass(Runnable.class); |
|||
ArgumentCaptor<Consumer<Throwable>> failureCaptor = ArgumentCaptor.forClass(Consumer.class); |
|||
|
|||
verify(ctx, times(msgCount)).ack(any()); |
|||
verify(node, times(msgCount + wantedNumberOfTellSelfInvocation)).onMsg(eq(ctx), any()); |
|||
verify(ctx, times(2)).enqueueForTellNext(newMsgCaptor.capture(), eq(TbRelationTypes.SUCCESS), successCaptor.capture(), failureCaptor.capture()); |
|||
|
|||
List<TbMsg> resultMsgs = newMsgCaptor.getAllValues(); |
|||
Assertions.assertEquals(2, resultMsgs.size()); |
|||
|
|||
TbMsg firstMsg = resultMsgs.get(0); |
|||
Assertions.assertEquals(getMergedData(firstMsgPack), firstMsg.getData()); |
|||
Assertions.assertEquals(deviceId, firstMsg.getOriginator()); |
|||
Assertions.assertEquals(config.getOutMsgType(), firstMsg.getType()); |
|||
Assertions.assertEquals(config.getQueueName(), firstMsg.getQueueName()); |
|||
|
|||
TbMsg secondMsg = resultMsgs.get(1); |
|||
Assertions.assertEquals(getMergedData(secondMsgPack), secondMsg.getData()); |
|||
Assertions.assertEquals(deviceId, secondMsg.getOriginator()); |
|||
Assertions.assertEquals(config.getOutMsgType(), secondMsg.getType()); |
|||
Assertions.assertEquals(config.getQueueName(), secondMsg.getQueueName()); |
|||
} |
|||
|
|||
@Test |
|||
public void given_100_messages_strategy_last_then_verifyOutput_2_packs() throws TbNodeException, ExecutionException, InterruptedException { |
|||
int wantedNumberOfTellSelfInvocation = 2; |
|||
int msgCount = 100; |
|||
awaitTellSelfLatch = new CountDownLatch(wantedNumberOfTellSelfInvocation); |
|||
invokeTellSelf(wantedNumberOfTellSelfInvocation, true, 3); |
|||
|
|||
config.setInterval(deduplicationInterval); |
|||
config.setStrategy(DeduplicationStrategy.LAST); |
|||
nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); |
|||
node.init(ctx, nodeConfiguration); |
|||
|
|||
DeviceId deviceId = new DeviceId(UUID.randomUUID()); |
|||
long currentTimeMillis = System.currentTimeMillis(); |
|||
|
|||
List<TbMsg> firstMsgPack = getTbMsgs(deviceId, msgCount / 2, currentTimeMillis, 500); |
|||
for (TbMsg msg : firstMsgPack) { |
|||
node.onMsg(ctx, msg); |
|||
} |
|||
long firstPackDeduplicationPackEndTs = firstMsgPack.get(0).getMetaDataTs() + TimeUnit.SECONDS.toMillis(deduplicationInterval); |
|||
TbMsg msgWithLatestTsInFirstPack = getMsgWithLatestTs(firstMsgPack); |
|||
|
|||
List<TbMsg> secondMsgPack = getTbMsgs(deviceId, msgCount / 2, firstPackDeduplicationPackEndTs, 500); |
|||
for (TbMsg msg : secondMsgPack) { |
|||
node.onMsg(ctx, msg); |
|||
} |
|||
TbMsg msgWithLatestTsInSecondPack = getMsgWithLatestTs(secondMsgPack); |
|||
|
|||
awaitTellSelfLatch.await(); |
|||
|
|||
ArgumentCaptor<TbMsg> newMsgCaptor = ArgumentCaptor.forClass(TbMsg.class); |
|||
ArgumentCaptor<Runnable> successCaptor = ArgumentCaptor.forClass(Runnable.class); |
|||
ArgumentCaptor<Consumer<Throwable>> failureCaptor = ArgumentCaptor.forClass(Consumer.class); |
|||
|
|||
verify(ctx, times(msgCount)).ack(any()); |
|||
verify(node, times(msgCount + wantedNumberOfTellSelfInvocation)).onMsg(eq(ctx), any()); |
|||
verify(ctx, times(2)).enqueueForTellNext(newMsgCaptor.capture(), eq(TbRelationTypes.SUCCESS), successCaptor.capture(), failureCaptor.capture()); |
|||
|
|||
List<TbMsg> resultMsgs = newMsgCaptor.getAllValues(); |
|||
Assertions.assertEquals(2, resultMsgs.size()); |
|||
Assertions.assertTrue(resultMsgs.contains(msgWithLatestTsInFirstPack)); |
|||
Assertions.assertTrue(resultMsgs.contains(msgWithLatestTsInSecondPack)); |
|||
} |
|||
|
|||
private TbMsg getMsgWithLatestTs(List<TbMsg> firstMsgPack) { |
|||
int indexOfLastMsgInArray = firstMsgPack.size() - 1; |
|||
int indexToSetMaxTs = new Random().nextInt(indexOfLastMsgInArray) + 1; |
|||
TbMsg currentMaxTsMsg = firstMsgPack.get(indexOfLastMsgInArray); |
|||
TbMsg newLastMsgOfArray = firstMsgPack.get(indexToSetMaxTs); |
|||
firstMsgPack.set(indexOfLastMsgInArray, newLastMsgOfArray); |
|||
firstMsgPack.set(indexToSetMaxTs, currentMaxTsMsg); |
|||
return currentMaxTsMsg; |
|||
} |
|||
|
|||
private List<TbMsg> getTbMsgs(DeviceId deviceId, int msgCount, long currentTimeMillis, int initTsStep) { |
|||
List<TbMsg> inputMsgs = new ArrayList<>(); |
|||
var ts = currentTimeMillis + initTsStep; |
|||
for (int i = 0; i < msgCount; i++) { |
|||
inputMsgs.add(createMsg(deviceId, ts)); |
|||
ts += 2; |
|||
} |
|||
return inputMsgs; |
|||
} |
|||
|
|||
private TbMsg createMsg(DeviceId deviceId, long ts) { |
|||
ObjectNode dataNode = JacksonUtil.newObjectNode(); |
|||
dataNode.put("deviceId", deviceId.getId().toString()); |
|||
TbMsgMetaData metaData = new TbMsgMetaData(); |
|||
metaData.putValue("ts", String.valueOf(ts)); |
|||
return TbMsg.newMsg( |
|||
MAIN_QUEUE_NAME, |
|||
SessionMsgType.POST_TELEMETRY_REQUEST.name(), |
|||
deviceId, |
|||
metaData, |
|||
JacksonUtil.toString(dataNode)); |
|||
} |
|||
|
|||
private String getMergedData(List<TbMsg> msgs) { |
|||
ArrayNode mergedData = JacksonUtil.OBJECT_MAPPER.createArrayNode(); |
|||
msgs.forEach(msg -> { |
|||
ObjectNode msgNode = JacksonUtil.newObjectNode(); |
|||
msgNode.set("msg", JacksonUtil.toJsonNode(msg.getData())); |
|||
msgNode.set("metadata", JacksonUtil.valueToTree(msg.getMetaData().getData())); |
|||
mergedData.add(msgNode); |
|||
}); |
|||
return JacksonUtil.toString(mergedData); |
|||
} |
|||
|
|||
} |
|||
Loading…
Reference in new issue