Browse Source

Consumer improvements

pull/2566/head
Andrii Shvaika 6 years ago
parent
commit
ea4f356376
  1. 3
      application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java
  2. 6
      application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java

3
application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java

@ -88,6 +88,9 @@ public class DefaultTbCoreConsumerService implements TbCoreConsumerService {
while (!stopped) {
try {
List<TbProtoQueueMsg<ToCoreMsg>> msgs = consumer.poll(pollDuration);
if(msgs.isEmpty()){
continue;
}
ConcurrentMap<UUID, TbProtoQueueMsg<ToCoreMsg>> ackMap = msgs.stream().collect(
Collectors.toConcurrentMap(s -> UUID.randomUUID(), Function.identity()));
CountDownLatch processingTimeoutLatch = new CountDownLatch(1);

6
application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java

@ -86,6 +86,9 @@ public class DefaultTbRuleEngineConsumerService implements TbRuleEngineConsumerS
while (!stopped) {
try {
List<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> msgs = consumer.poll(pollDuration);
if(msgs.isEmpty()){
continue;
}
ConcurrentMap<UUID, TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> ackMap = msgs.stream().collect(
Collectors.toConcurrentMap(s -> UUID.randomUUID(), Function.identity()));
CountDownLatch processingTimeoutLatch = new CountDownLatch(1);
@ -120,7 +123,8 @@ public class DefaultTbRuleEngineConsumerService implements TbRuleEngineConsumerS
}
//TODO 2.5
private void forwardToRuleEngineActor(TransportProtos.TransportToRuleEngineMsg toDeviceActorMsg, TbMsgCallback callback) {
private void forwardToRuleEngineActor(TransportProtos.TransportToRuleEngineMsg toRuleEngineMsg, TbMsgCallback callback) {
log.info("Received RULE ENGINE msg: {}", toRuleEngineMsg);
// if (statsEnabled) {
// stats.log(toDeviceActorMsg);
// }

Loading…
Cancel
Save