|
|
|
@ -43,6 +43,8 @@ import org.thingsboard.server.queue.provider.TbRuleEngineQueueFactory; |
|
|
|
import org.thingsboard.server.service.cf.AbstractCalculatedFieldStateService; |
|
|
|
import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId; |
|
|
|
|
|
|
|
import java.util.concurrent.CountDownLatch; |
|
|
|
import java.util.concurrent.TimeUnit; |
|
|
|
import java.util.concurrent.atomic.AtomicInteger; |
|
|
|
|
|
|
|
import static org.thingsboard.server.queue.common.AbstractTbQueueTemplate.bytesToString; |
|
|
|
@ -61,6 +63,8 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta |
|
|
|
|
|
|
|
@Value("${queue.calculated_fields.poll_interval:25}") |
|
|
|
private long pollInterval; |
|
|
|
@Value("${queue.calculated_fields.pack_processing_timeout:60000}") |
|
|
|
private long packProcessingTimeout; |
|
|
|
|
|
|
|
private TbKafkaProducerTemplate<TbProtoQueueMsg<CalculatedFieldStateProto>> stateProducer; |
|
|
|
|
|
|
|
@ -74,21 +78,39 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta |
|
|
|
.topic(partitionService.getTopic(queueKey)) |
|
|
|
.pollInterval(pollInterval) |
|
|
|
.msgPackProcessor((msgs, consumer, consumerKey, config) -> { |
|
|
|
CountDownLatch completionLatch = new CountDownLatch(msgs.size()); |
|
|
|
for (TbProtoQueueMsg<CalculatedFieldStateProto> msg : msgs) { |
|
|
|
TbCallback callback = new TbCallback() { |
|
|
|
@Override |
|
|
|
public void onSuccess() { |
|
|
|
int processedMsgCount = counter.incrementAndGet(); |
|
|
|
if (processedMsgCount % 10000 == 0) { |
|
|
|
log.info("Processed {} CF state messages", processedMsgCount); |
|
|
|
} |
|
|
|
completionLatch.countDown(); |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public void onFailure(Throwable t) { |
|
|
|
log.error("Failed to process CF state message: {}", msg, t); |
|
|
|
completionLatch.countDown(); |
|
|
|
} |
|
|
|
}; |
|
|
|
|
|
|
|
try { |
|
|
|
if (msg.getValue() != null) { |
|
|
|
processRestoredState(msg.getValue()); |
|
|
|
processRestoredState(msg.getValue(), callback); |
|
|
|
} else { |
|
|
|
processRestoredState(getStateId(msg.getHeaders()), null); |
|
|
|
processRestoredState(getStateId(msg.getHeaders()), null, callback); |
|
|
|
} |
|
|
|
} catch (Throwable t) { |
|
|
|
log.error("Failed to process state message: {}", msg, t); |
|
|
|
callback.onFailure(t); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
int processedMsgCount = counter.incrementAndGet(); |
|
|
|
if (processedMsgCount % 10000 == 0) { |
|
|
|
log.info("Processed {} calculated field state msgs", processedMsgCount); |
|
|
|
} |
|
|
|
boolean success = completionLatch.await(packProcessingTimeout, TimeUnit.MILLISECONDS); |
|
|
|
if (!success) { |
|
|
|
log.error("Timeout to process CF state messages pack of size {}", msgs.size()); |
|
|
|
} |
|
|
|
}) |
|
|
|
.consumerCreator((queueConfig, tpi) -> queueFactory.createCalculatedFieldStateConsumer()) |
|
|
|
|