From 704064615a42617e61e10a468cbabf31c10f7260 Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Tue, 5 May 2026 11:55:26 +0300 Subject: [PATCH 1/4] feat(js-executor): support lz4 compression for Kafka producer --- msa/js-executor/config/default.yml | 2 +- msa/js-executor/package.json | 1 + msa/js-executor/queue/kafkaTemplate.ts | 17 +++- msa/js-executor/yarn.lock | 105 +++++++++++++++++++++++++ 4 files changed, 123 insertions(+), 2 deletions(-) diff --git a/msa/js-executor/config/default.yml b/msa/js-executor/config/default.yml index be49ece5a6..5939b0e29a 100644 --- a/msa/js-executor/config/default.yml +++ b/msa/js-executor/config/default.yml @@ -34,7 +34,7 @@ kafka: partitions_consumed_concurrently: "1" # (EXPERIMENTAL) increase this value if you are planning to handle more than one partition (scale up, scale down) - this will decrease the latency requestTimeout: "30000" # The default value in kafkajs is: 30000 connectionTimeout: "1000" # The default value in kafkajs is: 1000 - compression: "none" # gzip or uncompressed + compression: "none" # gzip, lz4 or none topic_properties: "retention.ms:604800000;segment.bytes:52428800;retention.bytes:104857600;partitions:100;min.insync.replicas:1" use_confluent_cloud: false client_id: "kafkajs" #inject pod name to easy identify the client using /opt/kafka/bin/kafka-consumer-groups.sh diff --git a/msa/js-executor/package.json b/msa/js-executor/package.json index b096a3ae99..dfa50f6765 100644 --- a/msa/js-executor/package.json +++ b/msa/js-executor/package.json @@ -17,6 +17,7 @@ "express": "^5.1.0", "js-yaml": "^4.1.1", "kafkajs": "^2.2.4", + "@2l/kafkajs-lz4": "^1.3.2", "long": "^5.3.2", "uuid-parse": "^1.1.0", "winston": "^3.17.0", diff --git a/msa/js-executor/queue/kafkaTemplate.ts b/msa/js-executor/queue/kafkaTemplate.ts index 4527e9e1ce..5a340b2cfd 100644 --- a/msa/js-executor/queue/kafkaTemplate.ts +++ b/msa/js-executor/queue/kafkaTemplate.ts @@ -21,6 +21,7 @@ import { JsInvokeMessageProcessor } from '../api/jsInvokeMessageProcessor' import { IQueue } from './queue.models'; import { Admin, + CompressionCodecs, CompressionTypes, Consumer, Kafka, @@ -30,11 +31,14 @@ import { Producer, TopicMessages } from 'kafkajs'; +import LZ4Codec from '@2l/kafkajs-lz4'; import { isNotEmptyStr } from '../api/utils'; import { KeyObject } from 'tls'; import process, { exit, kill } from 'process'; +CompressionCodecs[CompressionTypes.LZ4] = new LZ4Codec().codec; + export class KafkaTemplate implements IQueue { private logger = _logger(`kafkaTemplate`); @@ -46,7 +50,18 @@ export class KafkaTemplate implements IQueue { private linger = Number(config.get('kafka.linger_ms')); private requestTimeout = Number(config.get('kafka.requestTimeout')); private connectionTimeout = Number(config.get('kafka.connectionTimeout')); - private compressionType = (config.get('kafka.compression') === "gzip") ? CompressionTypes.GZIP : CompressionTypes.None; + private compressionType = KafkaTemplate.resolveCompressionType(config.get('kafka.compression')); + + private static resolveCompressionType(compression: string): CompressionTypes { + switch (compression) { + case 'gzip': + return CompressionTypes.GZIP; + case 'lz4': + return CompressionTypes.LZ4; + default: + return CompressionTypes.None; + } + } private partitionsConsumedConcurrently = Number(config.get('kafka.partitions_consumed_concurrently')); private kafkaClient: Kafka; diff --git a/msa/js-executor/yarn.lock b/msa/js-executor/yarn.lock index 7e2fe58a13..5e3330e0d7 100644 --- a/msa/js-executor/yarn.lock +++ b/msa/js-executor/yarn.lock @@ -2,6 +2,78 @@ # yarn lockfile v1 +"@2l/kafkajs-lz4@^1.3.2": + version "1.3.2" + resolved "https://registry.yarnpkg.com/@2l/kafkajs-lz4/-/kafkajs-lz4-1.3.2.tgz#25259f693a709816ef4eb62514c1ce1bab80502f" + integrity sha512-yq5dx4CbL2sofWXKuadyty3ZKRcZyqe5iiuuOWjZgtIed9MBZ2Wqe5hQN18+a5XVfR4SFQGVISb/a0oTjdPjwQ== + dependencies: + lz4-napi "^2.8.0" + +"@antoniomuso/lz4-napi-android-arm-eabi@2.9.0": + version "2.9.0" + resolved "https://registry.yarnpkg.com/@antoniomuso/lz4-napi-android-arm-eabi/-/lz4-napi-android-arm-eabi-2.9.0.tgz#5ea67847f0a761c9aec22a31212d0e429ca01fb2" + integrity sha512-aeT/9SoWq7rnmzssWuCKUPaxVt3fzE9q+xq/ZHbnUSmrm8/EhLOACMvQeCOnL0IZsmPh8EpuwIE1TZyM9iQPRA== + +"@antoniomuso/lz4-napi-android-arm64@2.9.0": + version "2.9.0" + resolved "https://registry.yarnpkg.com/@antoniomuso/lz4-napi-android-arm64/-/lz4-napi-android-arm64-2.9.0.tgz#64cb0aac70267bb071bba2d8e382301e058811ca" + integrity sha512-ibQ0qiEvmljXAM97IgOZfh+PeiSQ0Rqf2HErJlZPVm2v4GVJoB67v21v1TUydqNNV5L8bwufVoZ90nheL8X9ZA== + +"@antoniomuso/lz4-napi-darwin-arm64@2.9.0": + version "2.9.0" + resolved "https://registry.yarnpkg.com/@antoniomuso/lz4-napi-darwin-arm64/-/lz4-napi-darwin-arm64-2.9.0.tgz#669e48b165af11cec20581acc2caa0d1c0f84472" + integrity sha512-1su4K1MWa4bcWoZlHajv+luGmFDV1JwIsvjtDF+0HhUveSDPP+8A4Z34zOZidURIr08Sl7M7ViPth6ZQ9SqnAA== + +"@antoniomuso/lz4-napi-darwin-x64@2.9.0": + version "2.9.0" + resolved "https://registry.yarnpkg.com/@antoniomuso/lz4-napi-darwin-x64/-/lz4-napi-darwin-x64-2.9.0.tgz#a00ddf021772e26bd01b3ca5e3025f11d1667edf" + integrity sha512-8Lnbm2MkdJtiJ/nbcRS9zRyGp3G0sG6D+Y/x1vTP8nZs3/f8tBwYNsjxCQyyXNNyHcYWwVGbk68onP/pyDljOA== + +"@antoniomuso/lz4-napi-freebsd-x64@2.9.0": + version "2.9.0" + resolved "https://registry.yarnpkg.com/@antoniomuso/lz4-napi-freebsd-x64/-/lz4-napi-freebsd-x64-2.9.0.tgz#fdc2b292489bd6d4e44eb7bb059fb9bd4b860f14" + integrity sha512-k04EMVOjntKDPrdR4Tf8WyNseuk9PTtSGw8WHyp4CTjoR1s+YJxtp9SMnThe5o2q0TATwk8WGYb/Howrp5OMxw== + +"@antoniomuso/lz4-napi-linux-arm-gnueabihf@2.9.0": + version "2.9.0" + resolved "https://registry.yarnpkg.com/@antoniomuso/lz4-napi-linux-arm-gnueabihf/-/lz4-napi-linux-arm-gnueabihf-2.9.0.tgz#9fcdf93b3ca5aa24469e4bf11dd7100df8c71975" + integrity sha512-H92F8zPZmgy2r8IhCWh3qIBfLp2BQ5cp18RoDXhtGFWwkh+5gVWrZp11IVznrsdgB0QeW0VR7dAMMHg3WLOPfA== + +"@antoniomuso/lz4-napi-linux-arm64-gnu@2.9.0": + version "2.9.0" + resolved "https://registry.yarnpkg.com/@antoniomuso/lz4-napi-linux-arm64-gnu/-/lz4-napi-linux-arm64-gnu-2.9.0.tgz#9d22e075a9cb60c3cb415428138618ae4a79f3c5" + integrity sha512-25crh0qs/3Rj3fMI8ulYD0DoaKsidUhMBki2aeO69ZK+F8bmQ/e2++FlgJ6f3EgMP5CNxJtnZXKhPOraQWjwAw== + +"@antoniomuso/lz4-napi-linux-arm64-musl@2.9.0": + version "2.9.0" + resolved "https://registry.yarnpkg.com/@antoniomuso/lz4-napi-linux-arm64-musl/-/lz4-napi-linux-arm64-musl-2.9.0.tgz#d957bce3accd49199bb6de522f5163efa2068fe4" + integrity sha512-eJtHp38zuLaYI0/cOV/BKcNQiXUBo4GPx53FTf0Y307yUjLsn48LNeN0vD28Ct9YrbUae3bQvMD5AD86She0ww== + +"@antoniomuso/lz4-napi-linux-x64-gnu@2.9.0": + version "2.9.0" + resolved "https://registry.yarnpkg.com/@antoniomuso/lz4-napi-linux-x64-gnu/-/lz4-napi-linux-x64-gnu-2.9.0.tgz#c86567d5aa23863059afbee829fb9f45895d3869" + integrity sha512-mDjS4dyjRKaZQcAP71SphkYH5r3kufB30ih/VETVu/br2toCfBk6Zr1xhL1r+L7FaVAFzF62B7h30CiqrN0Awg== + +"@antoniomuso/lz4-napi-linux-x64-musl@2.9.0": + version "2.9.0" + resolved "https://registry.yarnpkg.com/@antoniomuso/lz4-napi-linux-x64-musl/-/lz4-napi-linux-x64-musl-2.9.0.tgz#7d02612dec7d6247645aa321871b15247da0ce7d" + integrity sha512-pvU7Z7qjkjn17NkddBtBQ7C2iRqjtZ7WJ3Jqrjtj4XxolY3Q0HaYMvWjkWhzb9AKGZbj5y+EHYtbVoZJ2TSQhQ== + +"@antoniomuso/lz4-napi-win32-arm64-msvc@2.9.0": + version "2.9.0" + resolved "https://registry.yarnpkg.com/@antoniomuso/lz4-napi-win32-arm64-msvc/-/lz4-napi-win32-arm64-msvc-2.9.0.tgz#c901bfec718303ed86867129ecc0371eaa883517" + integrity sha512-aioLlbpJl0QPEXLXhh2bzyitc3T7Jot3f1ap6WdKiRa+CIjMHXw1nxJXy07MLXif10r+qVZr86ic8dvwErgqEQ== + +"@antoniomuso/lz4-napi-win32-ia32-msvc@2.9.0": + version "2.9.0" + resolved "https://registry.yarnpkg.com/@antoniomuso/lz4-napi-win32-ia32-msvc/-/lz4-napi-win32-ia32-msvc-2.9.0.tgz#11e48b18b7923c250735e0130998e5968ae91130" + integrity sha512-VaF4XMTdYb59TsPsiqnWwsNaWKHhgxF33z5p4zg4n0tp20eWozl76hn8B+aXthSs40W0W1N97QhxxV4oXGd8cg== + +"@antoniomuso/lz4-napi-win32-x64-msvc@2.9.0": + version "2.9.0" + resolved "https://registry.yarnpkg.com/@antoniomuso/lz4-napi-win32-x64-msvc/-/lz4-napi-win32-x64-msvc-2.9.0.tgz#20d9f71a638f3277cd0b7662e966f90e53d98af8" + integrity sha512-wfA8ShO3eGLxJ1LDwXJo87XL2D4NkMJV1pfHPvLZpD0MWb9u8VfgS+gKK5YhT7XKjzVdeIna9jgFdn2HBnZBxA== + "@babel/generator@^7.23.0": version "7.28.3" resolved "https://registry.yarnpkg.com/@babel/generator/-/generator-7.28.3.tgz#9626c1741c650cbac39121694a0f2d7451b8ef3e" @@ -100,6 +172,18 @@ "@jridgewell/resolve-uri" "^3.1.0" "@jridgewell/sourcemap-codec" "^1.4.14" +"@napi-rs/triples@^1.2.0": + version "1.2.0" + resolved "https://registry.yarnpkg.com/@napi-rs/triples/-/triples-1.2.0.tgz#bcd9c936acb93890e7015818e0181f3db421aafa" + integrity sha512-HAPjR3bnCsdXBsATpDIP5WCrw0JcACwhhrwIAQhiR46n+jm+a2F8kBsfseAuWtSyQ+H3Yebt2k43B5dy+04yMA== + +"@node-rs/helper@^1.3.3": + version "1.6.0" + resolved "https://registry.yarnpkg.com/@node-rs/helper/-/helper-1.6.0.tgz#83e5381de6e898d0b8c92178bb8d897d619e3a3a" + integrity sha512-2OTh/tokcLA1qom1zuCJm2gQzaZljCCbtX1YCrwRVd/toz7KxaDRFeLTAPwhs8m9hWgzrBn5rShRm6IaZofCPw== + dependencies: + "@napi-rs/triples" "^1.2.0" + "@tsconfig/node10@^1.0.7": version "1.0.11" resolved "https://registry.yarnpkg.com/@tsconfig/node10/-/node10-1.0.11.tgz#6ee46400685f130e278128c7b38b7e031ff5b2f2" @@ -998,6 +1082,27 @@ long@^5.3.2: resolved "https://registry.yarnpkg.com/long/-/long-5.3.2.tgz#1d84463095999262d7d7b7f8bfd4a8cc55167f83" integrity sha512-mNAgZ1GmyNhD7AuqnTG3/VQ26o760+ZYBPKjPvugO8+nLbYfX6TVpJPseBvopbdY+qpZ/lKUnmEc1LeZYS3QAA== +lz4-napi@^2.8.0: + version "2.9.0" + resolved "https://registry.yarnpkg.com/lz4-napi/-/lz4-napi-2.9.0.tgz#4a700974a1154f82b3c0e1b7030fe63140051e7d" + integrity sha512-ZOWqxBMIK5768aD20tYn5B6Pp9WPM9UG/LHk8neG9p0gC1DtjdzhTtlkxhAjvTRpmJvMtnnqLKlT+COlqAt9cQ== + dependencies: + "@node-rs/helper" "^1.3.3" + optionalDependencies: + "@antoniomuso/lz4-napi-android-arm-eabi" "2.9.0" + "@antoniomuso/lz4-napi-android-arm64" "2.9.0" + "@antoniomuso/lz4-napi-darwin-arm64" "2.9.0" + "@antoniomuso/lz4-napi-darwin-x64" "2.9.0" + "@antoniomuso/lz4-napi-freebsd-x64" "2.9.0" + "@antoniomuso/lz4-napi-linux-arm-gnueabihf" "2.9.0" + "@antoniomuso/lz4-napi-linux-arm64-gnu" "2.9.0" + "@antoniomuso/lz4-napi-linux-arm64-musl" "2.9.0" + "@antoniomuso/lz4-napi-linux-x64-gnu" "2.9.0" + "@antoniomuso/lz4-napi-linux-x64-musl" "2.9.0" + "@antoniomuso/lz4-napi-win32-arm64-msvc" "2.9.0" + "@antoniomuso/lz4-napi-win32-ia32-msvc" "2.9.0" + "@antoniomuso/lz4-napi-win32-x64-msvc" "2.9.0" + make-error@^1.1.1: version "1.3.6" resolved "https://registry.yarnpkg.com/make-error/-/make-error-1.3.6.tgz#2eb2e37ea9b67c4891f684a1394799af484cf7a2" From d7557339c21be678d93f0efe663c5117ee75df0b Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Wed, 6 May 2026 16:12:15 +0300 Subject: [PATCH 2/4] Added lz4 as compression option into yml files --- application/src/main/resources/thingsboard.yml | 4 ++-- edqs/src/main/resources/edqs.yml | 4 ++-- msa/js-executor/config/custom-environment-variables.yml | 2 +- msa/vc-executor/src/main/resources/tb-vc-executor.yml | 2 +- transport/coap/src/main/resources/tb-coap-transport.yml | 4 ++-- transport/http/src/main/resources/tb-http-transport.yml | 2 +- transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml | 4 ++-- transport/mqtt/src/main/resources/tb-mqtt-transport.yml | 4 ++-- transport/snmp/src/main/resources/tb-snmp-transport.yml | 4 ++-- 9 files changed, 15 insertions(+), 15 deletions(-) diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 84bc3f7afc..2c24caa1f2 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -1689,8 +1689,8 @@ queue: acks: "${TB_KAFKA_ACKS:all}" # Number of retries. Resend any record whose send fails with a potentially transient error retries: "${TB_KAFKA_RETRIES:1}" - # The compression type for all data generated by the producer. The default is none (i.e. no compression). Valid values none or gzip - compression.type: "${TB_KAFKA_COMPRESSION_TYPE:none}" # none or gzip + # The compression type for all data generated by the producer. The default is none (i.e. no compression). Valid values: none, gzip or lz4 + compression.type: "${TB_KAFKA_COMPRESSION_TYPE:none}" # none, gzip or lz4 # Default batch size. This setting gives the upper bound of the batch size to be sent batch.size: "${TB_KAFKA_BATCH_SIZE:16384}" # This variable creates a small amount of artificial delay—that is, rather than immediately sending out a record diff --git a/edqs/src/main/resources/edqs.yml b/edqs/src/main/resources/edqs.yml index 010994dc88..4dc404e2ca 100644 --- a/edqs/src/main/resources/edqs.yml +++ b/edqs/src/main/resources/edqs.yml @@ -103,8 +103,8 @@ queue: acks: "${TB_KAFKA_ACKS:all}" # Number of retries. Resend any record whose send fails with a potentially transient error retries: "${TB_KAFKA_RETRIES:1}" - # The compression type for all data generated by the producer. The default is none (i.e. no compression). Valid values none or gzip - compression.type: "${TB_KAFKA_COMPRESSION_TYPE:none}" # none or gzip + # The compression type for all data generated by the producer. The default is none (i.e. no compression). Valid values: none, gzip or lz4 + compression.type: "${TB_KAFKA_COMPRESSION_TYPE:none}" # none, gzip or lz4 # Default batch size. This setting gives the upper bound of the batch size to be sent batch.size: "${TB_KAFKA_BATCH_SIZE:16384}" # This variable creates a small amount of artificial delay—that is, rather than immediately sending out a record diff --git a/msa/js-executor/config/custom-environment-variables.yml b/msa/js-executor/config/custom-environment-variables.yml index 6a98361c72..9923fe4ed3 100644 --- a/msa/js-executor/config/custom-environment-variables.yml +++ b/msa/js-executor/config/custom-environment-variables.yml @@ -34,7 +34,7 @@ kafka: partitions_consumed_concurrently: "TB_KAFKA_PARTITIONS_CONSUMED_CONCURRENTLY" # (EXPERIMENTAL) increase this value if you are planning to handle more than one partition (scale up, scale down) - this will decrease the latency requestTimeout: "TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS" connectionTimeout: "TB_KAFKA_CONNECTION_TIMEOUT_MS" - compression: "TB_QUEUE_KAFKA_COMPRESSION" # gzip or uncompressed + compression: "TB_QUEUE_KAFKA_COMPRESSION" # gzip, lz4 or none topic_properties: "TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES" use_confluent_cloud: "TB_QUEUE_KAFKA_USE_CONFLUENT_CLOUD" client_id: "KAFKA_CLIENT_ID" #inject pod name to easy identify the client using /opt/kafka/bin/kafka-consumer-groups.sh diff --git a/msa/vc-executor/src/main/resources/tb-vc-executor.yml b/msa/vc-executor/src/main/resources/tb-vc-executor.yml index 8314985622..9ed3299606 100644 --- a/msa/vc-executor/src/main/resources/tb-vc-executor.yml +++ b/msa/vc-executor/src/main/resources/tb-vc-executor.yml @@ -73,7 +73,7 @@ queue: acks: "${TB_KAFKA_ACKS:all}" # Number of retries. Resend any record whose send fails with a potentially transient error retries: "${TB_KAFKA_RETRIES:1}" - compression.type: "${TB_KAFKA_COMPRESSION_TYPE:none}" # none or gzip + compression.type: "${TB_KAFKA_COMPRESSION_TYPE:none}" # none, gzip or lz4 # Default batch size. This setting gives the upper bound of the batch size to be sent batch.size: "${TB_KAFKA_BATCH_SIZE:16384}" # This variable creates a small amount of artificial delay—that is, rather than immediately sending out a record diff --git a/transport/coap/src/main/resources/tb-coap-transport.yml b/transport/coap/src/main/resources/tb-coap-transport.yml index 3c1ef94f09..1554b3fc24 100644 --- a/transport/coap/src/main/resources/tb-coap-transport.yml +++ b/transport/coap/src/main/resources/tb-coap-transport.yml @@ -285,8 +285,8 @@ queue: acks: "${TB_KAFKA_ACKS:all}" # Number of retries. Resend any record whose send fails with a potentially transient error retries: "${TB_KAFKA_RETRIES:1}" - # The compression type for all data generated by the producer. The default is none (i.e. no compression). Valid values none or gzip - compression.type: "${TB_KAFKA_COMPRESSION_TYPE:none}" # none or gzip + # The compression type for all data generated by the producer. The default is none (i.e. no compression). Valid values: none, gzip or lz4 + compression.type: "${TB_KAFKA_COMPRESSION_TYPE:none}" # none, gzip or lz4 # Default batch size. This setting gives the upper bound of the batch size to be sent batch.size: "${TB_KAFKA_BATCH_SIZE:16384}" # This variable creates a small amount of artificial delay—that is, rather than immediately sending out a record diff --git a/transport/http/src/main/resources/tb-http-transport.yml b/transport/http/src/main/resources/tb-http-transport.yml index 1b221d1fd9..c878887a01 100644 --- a/transport/http/src/main/resources/tb-http-transport.yml +++ b/transport/http/src/main/resources/tb-http-transport.yml @@ -235,7 +235,7 @@ queue: acks: "${TB_KAFKA_ACKS:all}" # Number of retries. Resend any record whose send fails with a potentially transient error retries: "${TB_KAFKA_RETRIES:1}" - compression.type: "${TB_KAFKA_COMPRESSION_TYPE:none}" # none or gzip + compression.type: "${TB_KAFKA_COMPRESSION_TYPE:none}" # none, gzip or lz4 # Default batch size. This setting gives the upper bound of the batch size to be sent batch.size: "${TB_KAFKA_BATCH_SIZE:16384}" # This variable creates a small amount of artificial delay—that is, rather than immediately sending out a record diff --git a/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml b/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml index 6140122062..51b1ad0a2b 100644 --- a/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml +++ b/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml @@ -335,8 +335,8 @@ queue: acks: "${TB_KAFKA_ACKS:all}" # Number of retries. Resend any record whose send fails with a potentially transient error retries: "${TB_KAFKA_RETRIES:1}" - # The compression type for all data generated by the producer. The default is none (i.e. no compression). Valid values none or gzip - compression.type: "${TB_KAFKA_COMPRESSION_TYPE:none}" # none or gzip + # The compression type for all data generated by the producer. The default is none (i.e. no compression). Valid values: none, gzip or lz4 + compression.type: "${TB_KAFKA_COMPRESSION_TYPE:none}" # none, gzip or lz4 # Default batch size. This setting gives the upper bound of the batch size to be sent batch.size: "${TB_KAFKA_BATCH_SIZE:16384}" # This variable creates a small amount of artificial delay—that is, rather than immediately sending out a record diff --git a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml index ac02fa396b..dfe35db29c 100644 --- a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml +++ b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml @@ -268,8 +268,8 @@ queue: acks: "${TB_KAFKA_ACKS:all}" # Number of retries. Resend any record whose send fails with a potentially transient error retries: "${TB_KAFKA_RETRIES:1}" - # The compression type for all data generated by the producer. The default is none (i.e. no compression). Valid values none or gzip - compression.type: "${TB_KAFKA_COMPRESSION_TYPE:none}" # none or gzip + # The compression type for all data generated by the producer. The default is none (i.e. no compression). Valid values: none, gzip or lz4 + compression.type: "${TB_KAFKA_COMPRESSION_TYPE:none}" # none, gzip or lz4 # Default batch size. This setting gives the upper bound of the batch size to be sent batch.size: "${TB_KAFKA_BATCH_SIZE:16384}" # This variable creates a small amount of artificial delay—that is, rather than immediately sending out a record diff --git a/transport/snmp/src/main/resources/tb-snmp-transport.yml b/transport/snmp/src/main/resources/tb-snmp-transport.yml index 656c01524d..d021030a91 100644 --- a/transport/snmp/src/main/resources/tb-snmp-transport.yml +++ b/transport/snmp/src/main/resources/tb-snmp-transport.yml @@ -207,8 +207,8 @@ queue: acks: "${TB_KAFKA_ACKS:all}" # Number of retries. Resend any record whose send fails with a potentially transient error retries: "${TB_KAFKA_RETRIES:1}" - # The compression type for all data generated by the producer. The default is none (i.e. no compression). Valid values none or gzip - compression.type: "${TB_KAFKA_COMPRESSION_TYPE:none}" # none or gzip + # The compression type for all data generated by the producer. The default is none (i.e. no compression). Valid values: none, gzip or lz4 + compression.type: "${TB_KAFKA_COMPRESSION_TYPE:none}" # none, gzip or lz4 # Default batch size. This setting gives the upper bound of the batch size to be sent batch.size: "${TB_KAFKA_BATCH_SIZE:16384}" # This variable creates a small amount of artificial delay—that is, rather than immediately sending out a record From 947e75a56adf1edbed79d86ca0eae5fececa1751 Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Thu, 7 May 2026 13:33:46 +0300 Subject: [PATCH 3/4] Code review changes --- msa/js-executor/package.json | 5 +++-- msa/js-executor/queue/kafkaTemplate.ts | 19 +++++++++++++------ 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/msa/js-executor/package.json b/msa/js-executor/package.json index dfa50f6765..7cbd99ea48 100644 --- a/msa/js-executor/package.json +++ b/msa/js-executor/package.json @@ -13,11 +13,11 @@ "build": "tsc" }, "dependencies": { + "@2l/kafkajs-lz4": "^1.3.2", "config": "^4.1.1", "express": "^5.1.0", "js-yaml": "^4.1.1", "kafkajs": "^2.2.4", - "@2l/kafkajs-lz4": "^1.3.2", "long": "^5.3.2", "uuid-parse": "^1.1.0", "winston": "^3.17.0", @@ -47,7 +47,8 @@ }, "pkg": { "assets": [ - "node_modules/config/**/*.*" + "node_modules/config/**/*.*", + "node_modules/@antoniomuso/lz4-napi-*/**/*.node" ] } } diff --git a/msa/js-executor/queue/kafkaTemplate.ts b/msa/js-executor/queue/kafkaTemplate.ts index 5a340b2cfd..1114190e64 100644 --- a/msa/js-executor/queue/kafkaTemplate.ts +++ b/msa/js-executor/queue/kafkaTemplate.ts @@ -31,14 +31,11 @@ import { Producer, TopicMessages } from 'kafkajs'; -import LZ4Codec from '@2l/kafkajs-lz4'; import { isNotEmptyStr } from '../api/utils'; import { KeyObject } from 'tls'; import process, { exit, kill } from 'process'; -CompressionCodecs[CompressionTypes.LZ4] = new LZ4Codec().codec; - export class KafkaTemplate implements IQueue { private logger = _logger(`kafkaTemplate`); @@ -50,15 +47,25 @@ export class KafkaTemplate implements IQueue { private linger = Number(config.get('kafka.linger_ms')); private requestTimeout = Number(config.get('kafka.requestTimeout')); private connectionTimeout = Number(config.get('kafka.connectionTimeout')); - private compressionType = KafkaTemplate.resolveCompressionType(config.get('kafka.compression')); + private compressionType = this.resolveCompressionType(config.get('kafka.compression')); - private static resolveCompressionType(compression: string): CompressionTypes { + private resolveCompressionType(compression: string): CompressionTypes { switch (compression) { case 'gzip': return CompressionTypes.GZIP; - case 'lz4': + case 'lz4': { + // Load the LZ4 codec lazily so users who don't enable LZ4 don't take a hard + // dependency on the lz4-napi native binary (e.g. inside pkg-built executables). + const LZ4Codec = require('@2l/kafkajs-lz4').default; + CompressionCodecs[CompressionTypes.LZ4] = new LZ4Codec().codec; return CompressionTypes.LZ4; + } + case 'none': + return CompressionTypes.None; default: + if (isNotEmptyStr(compression)) { + this.logger.warn('Unknown kafka.compression value "%s"; falling back to no compression. Supported values: gzip, lz4, none.', compression); + } return CompressionTypes.None; } } From 9dcbb3d9f93032e6fff21ac69916605d7c72c9b2 Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Thu, 7 May 2026 15:24:14 +0300 Subject: [PATCH 4/4] Fixed startup of js-executor for lz4 compression --- msa/js-executor/queue/kafkaTemplate.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/msa/js-executor/queue/kafkaTemplate.ts b/msa/js-executor/queue/kafkaTemplate.ts index 1114190e64..175abd80f4 100644 --- a/msa/js-executor/queue/kafkaTemplate.ts +++ b/msa/js-executor/queue/kafkaTemplate.ts @@ -56,7 +56,10 @@ export class KafkaTemplate implements IQueue { case 'lz4': { // Load the LZ4 codec lazily so users who don't enable LZ4 don't take a hard // dependency on the lz4-napi native binary (e.g. inside pkg-built executables). - const LZ4Codec = require('@2l/kafkajs-lz4').default; + // The package re-assigns module.exports = LZ4Codec, which wipes the __esModule + // flag and the .default property — so accept either shape. + const lz4Module = require('@2l/kafkajs-lz4'); + const LZ4Codec = lz4Module.default || lz4Module; CompressionCodecs[CompressionTypes.LZ4] = new LZ4Codec().codec; return CompressionTypes.LZ4; }