|
|
@ -26,6 +26,7 @@ import org.apache.kafka.clients.admin.TopicDescription; |
|
|
import org.apache.kafka.clients.consumer.OffsetAndMetadata; |
|
|
import org.apache.kafka.clients.consumer.OffsetAndMetadata; |
|
|
import org.apache.kafka.common.TopicPartition; |
|
|
import org.apache.kafka.common.TopicPartition; |
|
|
import org.apache.kafka.common.errors.TopicExistsException; |
|
|
import org.apache.kafka.common.errors.TopicExistsException; |
|
|
|
|
|
import org.thingsboard.server.queue.TbEdgeQueueAdmin; |
|
|
import org.thingsboard.server.queue.TbQueueAdmin; |
|
|
import org.thingsboard.server.queue.TbQueueAdmin; |
|
|
import org.thingsboard.server.queue.util.PropertyUtils; |
|
|
import org.thingsboard.server.queue.util.PropertyUtils; |
|
|
|
|
|
|
|
|
@ -43,7 +44,7 @@ import java.util.stream.Collectors; |
|
|
* Created by ashvayka on 24.09.18. |
|
|
* Created by ashvayka on 24.09.18. |
|
|
*/ |
|
|
*/ |
|
|
@Slf4j |
|
|
@Slf4j |
|
|
public class TbKafkaAdmin implements TbQueueAdmin { |
|
|
public class TbKafkaAdmin implements TbQueueAdmin, TbEdgeQueueAdmin { |
|
|
|
|
|
|
|
|
private final TbKafkaSettings settings; |
|
|
private final TbKafkaSettings settings; |
|
|
private final Map<String, String> topicConfigs; |
|
|
private final Map<String, String> topicConfigs; |
|
|
@ -149,17 +150,38 @@ public class TbKafkaAdmin implements TbQueueAdmin { |
|
|
* */ |
|
|
* */ |
|
|
public void syncOffsets(String fatGroupId, String newGroupId, Integer partitionId) { |
|
|
public void syncOffsets(String fatGroupId, String newGroupId, Integer partitionId) { |
|
|
try { |
|
|
try { |
|
|
syncOffsetsUnsafe(fatGroupId, newGroupId, partitionId); |
|
|
log.info("syncOffsets [{}][{}][{}]", fatGroupId, newGroupId, partitionId); |
|
|
|
|
|
if (partitionId == null) { |
|
|
|
|
|
return; |
|
|
|
|
|
} |
|
|
|
|
|
syncOffsetsUnsafe(fatGroupId, newGroupId, "." + partitionId); |
|
|
} catch (Exception e) { |
|
|
} catch (Exception e) { |
|
|
log.warn("Failed to syncOffsets from {} to {} partitionId {}", fatGroupId, newGroupId, partitionId, e); |
|
|
log.warn("Failed to syncOffsets from {} to {} partitionId {}", fatGroupId, newGroupId, partitionId, e); |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
void syncOffsetsUnsafe(String fatGroupId, String newGroupId, Integer partitionId) throws ExecutionException, InterruptedException, TimeoutException { |
|
|
/** |
|
|
log.info("syncOffsets [{}][{}][{}]", fatGroupId, newGroupId, partitionId); |
|
|
* Sync edge notifications offsets from a fat group to a single group per edge |
|
|
if (partitionId == null) { |
|
|
* */ |
|
|
return; |
|
|
public void syncEdgeNotificationsOffsets(String fatGroupId, String newGroupId) { |
|
|
|
|
|
try { |
|
|
|
|
|
log.info("syncEdgeNotificationsOffsets [{}][{}]", fatGroupId, newGroupId); |
|
|
|
|
|
syncOffsetsUnsafe(fatGroupId, newGroupId, newGroupId); |
|
|
|
|
|
} catch (Exception e) { |
|
|
|
|
|
log.warn("Failed to syncEdgeNotificationsOffsets from {} to {}", fatGroupId, newGroupId, e); |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
|
|
public void deleteConsumerGroup(String consumerGroupId) { |
|
|
|
|
|
try { |
|
|
|
|
|
settings.getAdminClient().deleteConsumerGroups(Collections.singletonList(consumerGroupId)); |
|
|
|
|
|
} catch (Exception e) { |
|
|
|
|
|
log.warn("Failed to delete consumer group {}", consumerGroupId, e); |
|
|
} |
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
void syncOffsetsUnsafe(String fatGroupId, String newGroupId, String topicSuffix) throws ExecutionException, InterruptedException, TimeoutException { |
|
|
Map<TopicPartition, OffsetAndMetadata> oldOffsets = getConsumerGroupOffsets(fatGroupId); |
|
|
Map<TopicPartition, OffsetAndMetadata> oldOffsets = getConsumerGroupOffsets(fatGroupId); |
|
|
if (oldOffsets.isEmpty()) { |
|
|
if (oldOffsets.isEmpty()) { |
|
|
return; |
|
|
return; |
|
|
@ -167,7 +189,7 @@ public class TbKafkaAdmin implements TbQueueAdmin { |
|
|
|
|
|
|
|
|
for (var consumerOffset : oldOffsets.entrySet()) { |
|
|
for (var consumerOffset : oldOffsets.entrySet()) { |
|
|
var tp = consumerOffset.getKey(); |
|
|
var tp = consumerOffset.getKey(); |
|
|
if (!tp.topic().endsWith("." + partitionId)) { |
|
|
if (!tp.topic().endsWith(topicSuffix)) { |
|
|
continue; |
|
|
continue; |
|
|
} |
|
|
} |
|
|
var om = consumerOffset.getValue(); |
|
|
var om = consumerOffset.getValue(); |
|
|
|