Browse Source
Use Kafka to store and process Edge Events to improve processing throughputpull/12157/head
committed by
GitHub
101 changed files with 1872 additions and 1265 deletions
File diff suppressed because it is too large
@ -0,0 +1,55 @@ |
|||
/** |
|||
* Copyright © 2016-2024 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.edge.rpc; |
|||
|
|||
import com.google.common.util.concurrent.Futures; |
|||
import com.google.common.util.concurrent.ListenableFuture; |
|||
import lombok.RequiredArgsConstructor; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; |
|||
import org.springframework.stereotype.Service; |
|||
import org.thingsboard.server.common.data.edge.EdgeEvent; |
|||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; |
|||
import org.thingsboard.server.common.util.ProtoUtils; |
|||
import org.thingsboard.server.dao.edge.BaseEdgeEventService; |
|||
import org.thingsboard.server.gen.transport.TransportProtos.ToEdgeEventNotificationMsg; |
|||
import org.thingsboard.server.queue.common.TbProtoQueueMsg; |
|||
import org.thingsboard.server.queue.discovery.TopicService; |
|||
import org.thingsboard.server.queue.provider.TbQueueProducerProvider; |
|||
|
|||
import java.util.UUID; |
|||
|
|||
@Slf4j |
|||
@Service |
|||
@RequiredArgsConstructor |
|||
@ConditionalOnExpression("'${queue.type:null}'=='kafka'") |
|||
public class KafkaEdgeEventService extends BaseEdgeEventService { |
|||
|
|||
private final TopicService topicService; |
|||
private final TbQueueProducerProvider producerProvider; |
|||
|
|||
@Override |
|||
public ListenableFuture<Void> saveAsync(EdgeEvent edgeEvent) { |
|||
validateEdgeEvent(edgeEvent); |
|||
|
|||
TopicPartitionInfo tpi = topicService.getEdgeEventNotificationsTopic(edgeEvent.getTenantId(), edgeEvent.getEdgeId()); |
|||
ToEdgeEventNotificationMsg msg = ToEdgeEventNotificationMsg.newBuilder().setEdgeEventMsg(ProtoUtils.toProto(edgeEvent)).build(); |
|||
producerProvider.getTbEdgeEventsMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), msg), null); |
|||
|
|||
return Futures.immediateFuture(null); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,146 @@ |
|||
/** |
|||
* Copyright © 2016-2024 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.edge.rpc; |
|||
|
|||
import com.google.common.util.concurrent.Futures; |
|||
import com.google.common.util.concurrent.ListenableFuture; |
|||
import io.grpc.stub.StreamObserver; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.thingsboard.common.util.ThingsBoardThreadFactory; |
|||
import org.thingsboard.server.common.data.edge.Edge; |
|||
import org.thingsboard.server.common.data.edge.EdgeEvent; |
|||
import org.thingsboard.server.common.data.id.EdgeId; |
|||
import org.thingsboard.server.common.util.ProtoUtils; |
|||
import org.thingsboard.server.gen.edge.v1.DownlinkMsg; |
|||
import org.thingsboard.server.gen.edge.v1.ResponseMsg; |
|||
import org.thingsboard.server.gen.transport.TransportProtos.ToEdgeEventNotificationMsg; |
|||
import org.thingsboard.server.queue.TbQueueConsumer; |
|||
import org.thingsboard.server.queue.common.TbProtoQueueMsg; |
|||
import org.thingsboard.server.queue.common.consumer.QueueConsumerManager; |
|||
import org.thingsboard.server.queue.discovery.TopicService; |
|||
import org.thingsboard.server.queue.kafka.TbKafkaAdmin; |
|||
import org.thingsboard.server.queue.kafka.TbKafkaSettings; |
|||
import org.thingsboard.server.queue.kafka.TbKafkaTopicConfigs; |
|||
import org.thingsboard.server.queue.provider.TbCoreQueueFactory; |
|||
import org.thingsboard.server.service.edge.EdgeContextComponent; |
|||
|
|||
import java.util.ArrayList; |
|||
import java.util.List; |
|||
import java.util.UUID; |
|||
import java.util.concurrent.ExecutorService; |
|||
import java.util.concurrent.Executors; |
|||
import java.util.concurrent.ScheduledExecutorService; |
|||
import java.util.function.BiConsumer; |
|||
|
|||
@Slf4j |
|||
public class KafkaEdgeGrpcSession extends EdgeGrpcSession { |
|||
|
|||
private final TopicService topicService; |
|||
private final TbCoreQueueFactory tbCoreQueueFactory; |
|||
|
|||
private final TbKafkaSettings kafkaSettings; |
|||
private final TbKafkaTopicConfigs kafkaTopicConfigs; |
|||
|
|||
private volatile boolean isHighPriorityProcessing; |
|||
|
|||
private QueueConsumerManager<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> consumer; |
|||
|
|||
private ExecutorService consumerExecutor; |
|||
|
|||
public KafkaEdgeGrpcSession(EdgeContextComponent ctx, TopicService topicService, TbCoreQueueFactory tbCoreQueueFactory, |
|||
TbKafkaSettings kafkaSettings, TbKafkaTopicConfigs kafkaTopicConfigs, StreamObserver<ResponseMsg> outputStream, |
|||
BiConsumer<EdgeId, EdgeGrpcSession> sessionOpenListener, BiConsumer<Edge, UUID> sessionCloseListener, |
|||
ScheduledExecutorService sendDownlinkExecutorService, int maxInboundMessageSize, int maxHighPriorityQueueSizePerSession) { |
|||
super(ctx, outputStream, sessionOpenListener, sessionCloseListener, sendDownlinkExecutorService, maxInboundMessageSize, maxHighPriorityQueueSizePerSession); |
|||
this.topicService = topicService; |
|||
this.tbCoreQueueFactory = tbCoreQueueFactory; |
|||
this.kafkaSettings = kafkaSettings; |
|||
this.kafkaTopicConfigs = kafkaTopicConfigs; |
|||
} |
|||
|
|||
private void processMsgs(List<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> msgs, TbQueueConsumer<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> consumer) { |
|||
log.trace("[{}][{}] starting processing edge events", tenantId, sessionId); |
|||
if (isConnected() && isSyncCompleted() && !isHighPriorityProcessing) { |
|||
List<EdgeEvent> edgeEvents = new ArrayList<>(); |
|||
for (TbProtoQueueMsg<ToEdgeEventNotificationMsg> msg : msgs) { |
|||
EdgeEvent edgeEvent = ProtoUtils.fromProto(msg.getValue().getEdgeEventMsg()); |
|||
edgeEvents.add(edgeEvent); |
|||
} |
|||
List<DownlinkMsg> downlinkMsgsPack = convertToDownlinkMsgsPack(edgeEvents); |
|||
try { |
|||
boolean isInterrupted = sendDownlinkMsgsPack(downlinkMsgsPack).get(); |
|||
if (isInterrupted) { |
|||
log.debug("[{}][{}][{}] Send downlink messages task was interrupted", tenantId, edge.getId(), sessionId); |
|||
} else { |
|||
consumer.commit(); |
|||
} |
|||
} catch (Exception e) { |
|||
log.error("[{}] Failed to process all downlink messages", sessionId, e); |
|||
} |
|||
} else { |
|||
try { |
|||
Thread.sleep(ctx.getEdgeEventStorageSettings().getNoRecordsSleepInterval()); |
|||
} catch (InterruptedException interruptedException) { |
|||
log.trace("Failed to wait until the server has capacity to handle new requests", interruptedException); |
|||
} |
|||
log.trace("[{}][{}] edge is not connected or sync is not completed. Skipping iteration", tenantId, sessionId); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public ListenableFuture<Boolean> migrateEdgeEvents() throws Exception { |
|||
return super.processEdgeEvents(); |
|||
} |
|||
|
|||
@Override |
|||
public ListenableFuture<Boolean> processEdgeEvents() { |
|||
if (consumer == null) { |
|||
this.consumerExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("edge-event-consumer")); |
|||
this.consumer = QueueConsumerManager.<TbProtoQueueMsg<ToEdgeEventNotificationMsg>>builder() |
|||
.name("TB Edge events") |
|||
.msgPackProcessor(this::processMsgs) |
|||
.pollInterval(ctx.getEdgeEventStorageSettings().getNoRecordsSleepInterval()) |
|||
.consumerCreator(() -> tbCoreQueueFactory.createEdgeEventMsgConsumer(tenantId, edge.getId())) |
|||
.consumerExecutor(consumerExecutor) |
|||
.threadPrefix("edge-events") |
|||
.build(); |
|||
consumer.subscribe(); |
|||
consumer.launch(); |
|||
} |
|||
return Futures.immediateFuture(Boolean.FALSE); |
|||
} |
|||
|
|||
@Override |
|||
public void processHighPriorityEvents() { |
|||
isHighPriorityProcessing = true; |
|||
super.processHighPriorityEvents(); |
|||
isHighPriorityProcessing = false; |
|||
} |
|||
|
|||
@Override |
|||
public void destroy() { |
|||
consumer.stop(); |
|||
consumerExecutor.shutdown(); |
|||
} |
|||
|
|||
@Override |
|||
public void cleanUp() { |
|||
String topic = topicService.buildEdgeEventNotificationsTopicPartitionInfo(tenantId, edge.getId()).getTopic(); |
|||
TbKafkaAdmin kafkaAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getEdgeEventConfigs()); |
|||
kafkaAdmin.deleteTopic(topic); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,46 @@ |
|||
/** |
|||
* Copyright © 2016-2024 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.edge.rpc; |
|||
|
|||
import com.google.common.util.concurrent.Futures; |
|||
import com.google.common.util.concurrent.ListenableFuture; |
|||
import io.grpc.stub.StreamObserver; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.thingsboard.server.common.data.edge.Edge; |
|||
import org.thingsboard.server.common.data.id.EdgeId; |
|||
import org.thingsboard.server.gen.edge.v1.ResponseMsg; |
|||
import org.thingsboard.server.service.edge.EdgeContextComponent; |
|||
|
|||
import java.util.UUID; |
|||
import java.util.concurrent.ScheduledExecutorService; |
|||
import java.util.function.BiConsumer; |
|||
|
|||
@Slf4j |
|||
public class PostgresEdgeGrpcSession extends EdgeGrpcSession { |
|||
|
|||
PostgresEdgeGrpcSession(EdgeContextComponent ctx, StreamObserver<ResponseMsg> outputStream, |
|||
BiConsumer<EdgeId, EdgeGrpcSession> sessionOpenListener, |
|||
BiConsumer<Edge, UUID> sessionCloseListener, ScheduledExecutorService sendDownlinkExecutorService, |
|||
int maxInboundMessageSize, int maxHighPriorityQueueSizePerSession) { |
|||
super(ctx, outputStream, sessionOpenListener, sessionCloseListener, sendDownlinkExecutorService, maxInboundMessageSize, maxHighPriorityQueueSizePerSession); |
|||
} |
|||
|
|||
@Override |
|||
public ListenableFuture<Boolean> migrateEdgeEvents() { |
|||
return Futures.immediateFuture(Boolean.FALSE); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,160 @@ |
|||
/** |
|||
* Copyright © 2016-2024 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.ttl; |
|||
|
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.beans.factory.annotation.Value; |
|||
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; |
|||
import org.springframework.scheduling.annotation.Scheduled; |
|||
import org.springframework.stereotype.Service; |
|||
import org.thingsboard.server.common.data.AttributeScope; |
|||
import org.thingsboard.server.common.data.edge.Edge; |
|||
import org.thingsboard.server.common.data.id.EdgeId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.data.kv.AttributeKvEntry; |
|||
import org.thingsboard.server.dao.attributes.AttributesService; |
|||
import org.thingsboard.server.dao.edge.EdgeService; |
|||
import org.thingsboard.server.dao.tenant.TenantService; |
|||
import org.thingsboard.server.queue.discovery.PartitionService; |
|||
import org.thingsboard.server.queue.discovery.TopicService; |
|||
import org.thingsboard.server.queue.kafka.TbKafkaAdmin; |
|||
import org.thingsboard.server.queue.kafka.TbKafkaSettings; |
|||
import org.thingsboard.server.queue.kafka.TbKafkaTopicConfigs; |
|||
import org.thingsboard.server.queue.util.TbCoreComponent; |
|||
|
|||
import java.time.Instant; |
|||
import java.util.ArrayList; |
|||
import java.util.Date; |
|||
import java.util.HashMap; |
|||
import java.util.List; |
|||
import java.util.Map; |
|||
import java.util.Set; |
|||
import java.util.UUID; |
|||
import java.util.concurrent.TimeUnit; |
|||
|
|||
import static org.thingsboard.server.service.state.DefaultDeviceStateService.LAST_CONNECT_TIME; |
|||
|
|||
@Slf4j |
|||
@Service |
|||
@TbCoreComponent |
|||
@ConditionalOnExpression("'${queue.type:null}'=='kafka' && ${edges.enabled:true} && ${sql.ttl.edge_events.edge_events_ttl:0} > 0") |
|||
public class KafkaEdgeTopicsCleanUpService extends AbstractCleanUpService { |
|||
|
|||
private static final String EDGE_EVENT_TOPIC_NAME = "tb_edge_event.notifications."; |
|||
|
|||
private final TopicService topicService; |
|||
private final TenantService tenantService; |
|||
private final EdgeService edgeService; |
|||
private final AttributesService attributesService; |
|||
private final TbKafkaAdmin kafkaAdmin; |
|||
|
|||
@Value("${sql.ttl.edge_events.edge_events_ttl:2628000}") |
|||
private long ttlSeconds; |
|||
|
|||
public KafkaEdgeTopicsCleanUpService(PartitionService partitionService, EdgeService edgeService, |
|||
TenantService tenantService, AttributesService attributesService, |
|||
TopicService topicService, TbKafkaSettings kafkaSettings, TbKafkaTopicConfigs kafkaTopicConfigs) { |
|||
super(partitionService); |
|||
this.topicService = topicService; |
|||
this.tenantService = tenantService; |
|||
this.edgeService = edgeService; |
|||
this.attributesService = attributesService; |
|||
this.kafkaAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getEdgeEventConfigs()); |
|||
} |
|||
|
|||
@Scheduled(initialDelayString = "#{T(org.apache.commons.lang3.RandomUtils).nextLong(0, ${sql.ttl.edge_events.execution_interval_ms})}", fixedDelayString = "${sql.ttl.edge_events.execution_interval_ms}") |
|||
public void cleanUp() { |
|||
if (!isSystemTenantPartitionMine()) { |
|||
return; |
|||
} |
|||
|
|||
Set<String> topics = kafkaAdmin.getAllTopics(); |
|||
if (topics == null || topics.isEmpty()) { |
|||
return; |
|||
} |
|||
|
|||
String edgeTopicPrefix = topicService.buildTopicName(EDGE_EVENT_TOPIC_NAME); |
|||
List<String> matchingTopics = topics.stream().filter(topic -> topic.startsWith(edgeTopicPrefix)).toList(); |
|||
if (matchingTopics.isEmpty()) { |
|||
log.debug("No matching topics found with prefix [{}]. Skipping cleanup.", edgeTopicPrefix); |
|||
return; |
|||
} |
|||
|
|||
Map<TenantId, List<EdgeId>> tenantEdgeMap = extractTenantAndEdgeIds(matchingTopics, edgeTopicPrefix); |
|||
|
|||
long currentTimeMillis = System.currentTimeMillis(); |
|||
long ttlMillis = TimeUnit.SECONDS.toMillis(ttlSeconds); |
|||
|
|||
tenantEdgeMap.forEach((tenantId, edgeIds) -> processTenantCleanUp(tenantId, edgeIds, ttlMillis, currentTimeMillis)); |
|||
} |
|||
|
|||
private void processTenantCleanUp(TenantId tenantId, List<EdgeId> edgeIds, long ttlMillis, long currentTimeMillis) { |
|||
boolean tenantExists = tenantService.tenantExists(tenantId); |
|||
if (tenantExists) { |
|||
for (EdgeId edgeId : edgeIds) { |
|||
try { |
|||
attributesService.find(tenantId, edgeId, AttributeScope.SERVER_SCOPE, LAST_CONNECT_TIME).get() |
|||
.flatMap(AttributeKvEntry::getLongValue) |
|||
.filter(lastConnectTime -> isTopicExpired(lastConnectTime, ttlMillis, currentTimeMillis)) |
|||
.ifPresentOrElse(lastConnectTime -> { |
|||
String topic = topicService.buildEdgeEventNotificationsTopicPartitionInfo(tenantId, edgeId).getTopic(); |
|||
if (kafkaAdmin.isTopicEmpty(topic)) { |
|||
kafkaAdmin.deleteTopic(topic); |
|||
log.info("[{}] Removed outdated topic {} for edge {} older than {}", |
|||
tenantId, topic, edgeId, Date.from(Instant.ofEpochMilli(currentTimeMillis - ttlMillis))); |
|||
} |
|||
}, () -> { |
|||
Edge edge = edgeService.findEdgeById(tenantId, edgeId); |
|||
if (edge == null) { |
|||
String topic = topicService.buildEdgeEventNotificationsTopicPartitionInfo(tenantId, edgeId).getTopic(); |
|||
kafkaAdmin.deleteTopic(topic); |
|||
log.info("[{}] Removed topic {} for deleted edge {}", tenantId, topic, edgeId); |
|||
} |
|||
}); |
|||
} catch (Exception e) { |
|||
log.error("[{}] Failed to delete topic for edge {}", tenantId, edgeId, e); |
|||
} |
|||
} |
|||
} else { |
|||
for (EdgeId edgeId : edgeIds) { |
|||
String topic = topicService.buildEdgeEventNotificationsTopicPartitionInfo(tenantId, edgeId).getTopic(); |
|||
kafkaAdmin.deleteTopic(topic); |
|||
} |
|||
log.info("[{}] Removed topics for not existing tenant and edges {}", tenantId, edgeIds); |
|||
} |
|||
} |
|||
|
|||
private boolean isTopicExpired(long lastConnectTime, long ttlMillis, long currentTimeMillis) { |
|||
return lastConnectTime + ttlMillis < currentTimeMillis; |
|||
} |
|||
|
|||
private Map<TenantId, List<EdgeId>> extractTenantAndEdgeIds(List<String> topics, String prefix) { |
|||
Map<TenantId, List<EdgeId>> tenantEdgeMap = new HashMap<>(); |
|||
for (String topic : topics) { |
|||
try { |
|||
String remaining = topic.substring(prefix.length()); |
|||
String[] parts = remaining.split("\\."); |
|||
TenantId tenantId = new TenantId(UUID.fromString(parts[0])); |
|||
EdgeId edgeId = new EdgeId(UUID.fromString(parts[1])); |
|||
tenantEdgeMap.computeIfAbsent(tenantId, id -> new ArrayList<>()).add(edgeId); |
|||
} catch (Exception e) { |
|||
log.warn("Failed to extract TenantId and EdgeId from topic [{}]", topic, e); |
|||
} |
|||
} |
|||
return tenantEdgeMap; |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,81 @@ |
|||
/** |
|||
* Copyright © 2016-2024 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.dao.edge; |
|||
|
|||
import com.google.common.util.concurrent.FutureCallback; |
|||
import com.google.common.util.concurrent.Futures; |
|||
import com.google.common.util.concurrent.ListenableFuture; |
|||
import jakarta.annotation.PostConstruct; |
|||
import jakarta.annotation.PreDestroy; |
|||
import lombok.RequiredArgsConstructor; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.jetbrains.annotations.NotNull; |
|||
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; |
|||
import org.springframework.context.ApplicationEventPublisher; |
|||
import org.springframework.stereotype.Service; |
|||
import org.thingsboard.common.util.ThingsBoardThreadFactory; |
|||
import org.thingsboard.server.common.data.edge.EdgeEvent; |
|||
import org.thingsboard.server.dao.eventsourcing.SaveEntityEvent; |
|||
|
|||
import java.util.concurrent.ExecutorService; |
|||
import java.util.concurrent.Executors; |
|||
|
|||
@Slf4j |
|||
@Service |
|||
@RequiredArgsConstructor |
|||
@ConditionalOnExpression("'${queue.type:null}'!='kafka'") |
|||
public class PostgresEdgeEventService extends BaseEdgeEventService { |
|||
|
|||
private final EdgeEventDao edgeEventDao; |
|||
private final ApplicationEventPublisher eventPublisher; |
|||
|
|||
private ExecutorService edgeEventExecutor; |
|||
|
|||
@PostConstruct |
|||
public void initExecutor() { |
|||
edgeEventExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("edge-event-service")); |
|||
} |
|||
|
|||
@PreDestroy |
|||
public void shutdownExecutor() { |
|||
if (edgeEventExecutor != null) { |
|||
edgeEventExecutor.shutdown(); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public ListenableFuture<Void> saveAsync(EdgeEvent edgeEvent) { |
|||
validateEdgeEvent(edgeEvent); |
|||
ListenableFuture<Void> saveFuture = edgeEventDao.saveAsync(edgeEvent); |
|||
|
|||
Futures.addCallback(saveFuture, new FutureCallback<>() { |
|||
@Override |
|||
public void onSuccess(Void result) { |
|||
eventPublisher.publishEvent(SaveEntityEvent.builder() |
|||
.tenantId(edgeEvent.getTenantId()) |
|||
.entityId(edgeEvent.getEdgeId()) |
|||
.entity(edgeEvent) |
|||
.build()); |
|||
} |
|||
|
|||
@Override |
|||
public void onFailure(@NotNull Throwable throwable) {} |
|||
}, edgeEventExecutor); |
|||
|
|||
return saveFuture; |
|||
} |
|||
|
|||
} |
|||
Some files were not shown because too many files changed in this diff
Loading…
Reference in new issue