diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 1a932160ad..b0b822b90e 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -583,6 +583,9 @@ queue: linger.ms: "${TB_KAFKA_LINGER_MS:1}" buffer.memory: "${TB_BUFFER_MEMORY:33554432}" replication_factor: "${TB_QUEUE_KAFKA_REPLICATION_FACTOR:1}" + max_poll_records: "${TB_QUEUE_KAFKA_MAX_POLL_RECORDS:8192}" + max_partition_fetch_bytes: "${TB_QUEUE_KAFKA_MAX_PARTITION_FETCH_BYTES:16777216}" + fetch_max_bytes: "${TB_QUEUE_KAFKA_FETCH_MAX_BYTES:134217728}" topic-properties: rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java index 1b5619fd26..de94db804d 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java @@ -54,6 +54,9 @@ public class TbKafkaConsumerTemplate extends AbstractTbQue if (groupId != null) { props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); } + props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, settings.getMaxPollRecords()); + props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, settings.getMaxPartitionFetchBytes()); + props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, settings.getFetchMaxBytes()); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java index 66121cb215..659dd19bda 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java @@ -55,6 +55,18 @@ public class TbKafkaSettings { @Getter private short replicationFactor; + @Value("${queue.kafka.max_poll_records:8192}") + @Getter + private int maxPollRecords; + + @Value("${queue.kafka.max_partition_fetch_bytes:16777216}") + @Getter + private int maxPartitionFetchBytes; + + @Value("${queue.kafka.fetch_max_bytes:134217728}") + @Getter + private int fetchMaxBytes; + @Value("${kafka.other:#{null}}") private List other; diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/credentials/CertPemClientCredentials.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/credentials/CertPemClientCredentials.java index 7c140176b7..60895fa002 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/credentials/CertPemClientCredentials.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/credentials/CertPemClientCredentials.java @@ -21,7 +21,6 @@ import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import lombok.Data; import lombok.extern.slf4j.Slf4j; -import org.thingsboard.mqtt.MqttClientConfig; import org.apache.commons.codec.binary.Base64; import org.bouncycastle.jce.provider.BouncyCastleProvider; import org.bouncycastle.openssl.PEMDecryptorProvider; @@ -30,15 +29,26 @@ import org.bouncycastle.openssl.PEMKeyPair; import org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter; import org.bouncycastle.openssl.jcajce.JcePEMDecryptorProviderBuilder; import org.springframework.util.StringUtils; +import org.thingsboard.mqtt.MqttClientConfig; +import javax.crypto.Cipher; +import javax.crypto.EncryptedPrivateKeyInfo; +import javax.crypto.SecretKeyFactory; +import javax.crypto.spec.PBEKeySpec; import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.TrustManagerFactory; import java.io.ByteArrayInputStream; -import java.security.*; +import java.security.AlgorithmParameters; +import java.security.Key; +import java.security.KeyFactory; +import java.security.KeyPair; +import java.security.KeyStore; +import java.security.PrivateKey; +import java.security.Security; import java.security.cert.Certificate; import java.security.cert.CertificateFactory; import java.security.cert.X509Certificate; -import java.security.interfaces.RSAPrivateKey; +import java.security.spec.KeySpec; import java.security.spec.PKCS8EncodedKeySpec; import java.util.Optional; @@ -138,16 +148,36 @@ public class CertPemClientCredentials implements MqttClientCredentials { } private PrivateKey readPrivateKeyFile(String fileContent) throws Exception { - RSAPrivateKey privateKey = null; + PrivateKey privateKey = null; if (fileContent != null && !fileContent.isEmpty()) { fileContent = fileContent.replaceAll(".*BEGIN.*PRIVATE KEY.*", "") .replaceAll(".*END.*PRIVATE KEY.*", "") .replaceAll("\\s", ""); byte[] decoded = Base64.decodeBase64(fileContent); KeyFactory keyFactory = KeyFactory.getInstance("RSA"); - privateKey = (RSAPrivateKey) keyFactory.generatePrivate(new PKCS8EncodedKeySpec(decoded)); + KeySpec keySpec = getKeySpec(decoded); + privateKey = keyFactory.generatePrivate(keySpec); } return privateKey; } + private KeySpec getKeySpec(byte[] encodedKey) throws Exception { + KeySpec keySpec; + if (password == null) { + keySpec = new PKCS8EncodedKeySpec(encodedKey); + } else { + PBEKeySpec pbeKeySpec = new PBEKeySpec(password.toCharArray()); + + EncryptedPrivateKeyInfo privateKeyInfo = new EncryptedPrivateKeyInfo(encodedKey); + String algorithmName = privateKeyInfo.getAlgName(); + Cipher cipher = Cipher.getInstance(algorithmName); + SecretKeyFactory secretKeyFactory = SecretKeyFactory.getInstance(algorithmName); + + Key pbeKey = secretKeyFactory.generateSecret(pbeKeySpec); + AlgorithmParameters algParams = privateKeyInfo.getAlgParameters(); + cipher.init(Cipher.DECRYPT_MODE, pbeKey, algParams); + keySpec = privateKeyInfo.getKeySpec(cipher); + } + return keySpec; + } }