diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index a51f500f82..a9ddaea2e4 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -125,7 +125,7 @@ class DefaultTbContext implements TbContext { @Override public void enqueue(TbMsg tbMsg, String queueName, Runnable onSuccess, Consumer onFailure) { - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, queueName, getTenantId(), tbMsg.getOriginator()); + TopicPartitionInfo tpi = resolvePartition(tbMsg, queueName); enqueue(tpi, tbMsg, onFailure, onSuccess); } @@ -142,46 +142,54 @@ class DefaultTbContext implements TbContext { @Override public void enqueueForTellFailure(TbMsg tbMsg, String failureMessage) { - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getTenantId(), tbMsg.getOriginator()); + TopicPartitionInfo tpi = resolvePartition(tbMsg); enqueueForTellNext(tpi, tbMsg, Collections.singleton(TbRelationTypes.FAILURE), failureMessage, null, null); } @Override public void enqueueForTellNext(TbMsg tbMsg, String relationType) { - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getTenantId(), tbMsg.getOriginator()); + TopicPartitionInfo tpi = resolvePartition(tbMsg); enqueueForTellNext(tpi, tbMsg, Collections.singleton(relationType), null, null, null); } @Override public void enqueueForTellNext(TbMsg tbMsg, Set relationTypes) { - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getTenantId(), tbMsg.getOriginator()); + TopicPartitionInfo tpi = resolvePartition(tbMsg); enqueueForTellNext(tpi, tbMsg, relationTypes, null, null, null); } @Override public void enqueueForTellNext(TbMsg tbMsg, String relationType, Runnable onSuccess, Consumer onFailure) { - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getTenantId(), tbMsg.getOriginator()); + TopicPartitionInfo tpi = resolvePartition(tbMsg); enqueueForTellNext(tpi, tbMsg, Collections.singleton(relationType), null, onSuccess, onFailure); } @Override public void enqueueForTellNext(TbMsg tbMsg, Set relationTypes, Runnable onSuccess, Consumer onFailure) { - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getTenantId(), tbMsg.getOriginator()); + TopicPartitionInfo tpi = resolvePartition(tbMsg); enqueueForTellNext(tpi, tbMsg, relationTypes, null, onSuccess, onFailure); } @Override public void enqueueForTellNext(TbMsg tbMsg, String queueName, String relationType, Runnable onSuccess, Consumer onFailure) { - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, queueName, getTenantId(), tbMsg.getOriginator()); + TopicPartitionInfo tpi = resolvePartition(tbMsg, queueName); enqueueForTellNext(tpi, tbMsg, Collections.singleton(relationType), null, onSuccess, onFailure); } @Override public void enqueueForTellNext(TbMsg tbMsg, String queueName, Set relationTypes, Runnable onSuccess, Consumer onFailure) { - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, queueName, getTenantId(), tbMsg.getOriginator()); + TopicPartitionInfo tpi = resolvePartition(tbMsg, queueName); enqueueForTellNext(tpi, tbMsg, relationTypes, null, onSuccess, onFailure); } + private TopicPartitionInfo resolvePartition(TbMsg tbMsg, String queueName) { + return mainCtx.resolve(ServiceType.TB_RULE_ENGINE, queueName, getTenantId(), tbMsg.getOriginator()); + } + + private TopicPartitionInfo resolvePartition(TbMsg tbMsg) { + return resolvePartition(tbMsg, tbMsg.getQueueName()); + } + private void enqueueForTellNext(TopicPartitionInfo tpi, TbMsg source, Set relationTypes, String failureMessage, Runnable onSuccess, Consumer onFailure) { RuleChainId ruleChainId = nodeCtx.getSelf().getRuleChainId(); RuleNodeId ruleNodeId = nodeCtx.getSelf().getId(); diff --git a/application/src/main/java/org/thingsboard/server/service/profile/DefaultTbDeviceProfileCache.java b/application/src/main/java/org/thingsboard/server/service/profile/DefaultTbDeviceProfileCache.java index 085bb26e8c..b0b47fe886 100644 --- a/application/src/main/java/org/thingsboard/server/service/profile/DefaultTbDeviceProfileCache.java +++ b/application/src/main/java/org/thingsboard/server/service/profile/DefaultTbDeviceProfileCache.java @@ -50,9 +50,9 @@ public class DefaultTbDeviceProfileCache implements TbDeviceProfileCache { public DeviceProfile get(TenantId tenantId, DeviceProfileId deviceProfileId) { DeviceProfile profile = deviceProfilesMap.get(deviceProfileId); if (profile == null) { - deviceProfileFetchLock.lock(); profile = deviceProfilesMap.get(deviceProfileId); if (profile == null) { + deviceProfileFetchLock.lock(); try { profile = deviceProfileService.findDeviceProfileById(tenantId, deviceProfileId); if (profile != null) { diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 2499ef1114..51363d3c83 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -481,7 +481,7 @@ spring: database-platform: "${SPRING_JPA_DATABASE_PLATFORM:org.hibernate.dialect.PostgreSQLDialect}" datasource: driverClassName: "${SPRING_DRIVER_CLASS_NAME:org.postgresql.Driver}" - url: "${SPRING_DATASOURCE_URL:jdbc:postgresql://localhost:5432/thingsboard_32}" + url: "${SPRING_DATASOURCE_URL:jdbc:postgresql://localhost:5432/thingsboard}" username: "${SPRING_DATASOURCE_USERNAME:postgres}" password: "${SPRING_DATASOURCE_PASSWORD:postgres}" hikari: @@ -608,6 +608,8 @@ transport: key_password: "${MQTT_SSL_KEY_PASSWORD:server_key_password}" # Type of the key store key_store_type: "${MQTT_SSL_KEY_STORE_TYPE:JKS}" + # Skip certificate validity check for client certificates. + skip_validity_check_for_client_cert: "${MQTT_SSL_SKIP_VALIDITY_CHECK_FOR_CLIENT_CERT:false}" # Local CoAP transport parameters coap: # Enable/disable coap transport protocol. diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java index 0d06f53ac6..6695fa24ac 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java @@ -51,6 +51,10 @@ public class MqttTransportContext extends TransportContext { @Value("${transport.mqtt.netty.max_payload_size}") private Integer maxPayloadSize; + @Getter + @Value("${transport.mqtt.netty.skip_validity_check_for_client_cert:false}") + private boolean skipValidityCheckForClientCert; + @Getter @Setter private SslHandler sslHandler; diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index 46a4934fa6..6a09bdda40 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -387,7 +387,9 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement private void processX509CertConnect(ChannelHandlerContext ctx, X509Certificate cert) { try { - cert.checkValidity(new Date()); + if(!context.isSkipValidityCheckForClientCert()){ + cert.checkValidity(); + } String strCert = SslUtil.getX509CertificateString(cert); String sha3Hash = EncryptionUtil.getSha3Hash(strCert); transportService.process(DeviceTransportType.MQTT, ValidateDeviceX509CertRequestMsg.newBuilder().setHash(sha3Hash).build(), diff --git a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml index 5f011f48e4..f01b15c77a 100644 --- a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml +++ b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml @@ -66,6 +66,8 @@ transport: key_password: "${MQTT_SSL_KEY_PASSWORD:server_key_password}" # Type of the key store key_store_type: "${MQTT_SSL_KEY_STORE_TYPE:JKS}" + # Skip certificate validity check for client certificates. + skip_validity_check_for_client_cert: "${MQTT_SSL_SKIP_VALIDITY_CHECK_FOR_CLIENT_CERT:false}" sessions: inactivity_timeout: "${TB_TRANSPORT_SESSIONS_INACTIVITY_TIMEOUT:300000}" report_timeout: "${TB_TRANSPORT_SESSIONS_REPORT_TIMEOUT:30000}"