From c21927665d2329e2d37e22b947f7260ee9754bfe Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Tue, 3 May 2022 18:43:36 +0300 Subject: [PATCH 1/7] Fixed device profile with non default transport delivery to edge --- .../java/org/thingsboard/server/edge/BaseEdgeTest.java | 8 +++++++- .../device/profile/TransportPayloadTypeConfiguration.java | 4 +++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java index e815378d27..61bb53b060 100644 --- a/application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java @@ -58,6 +58,8 @@ import org.thingsboard.server.common.data.device.profile.AlarmRule; import org.thingsboard.server.common.data.device.profile.AllowCreateNewDevicesDeviceProfileProvisionConfiguration; import org.thingsboard.server.common.data.device.profile.DeviceProfileAlarm; import org.thingsboard.server.common.data.device.profile.DeviceProfileData; +import org.thingsboard.server.common.data.device.profile.JsonTransportPayloadConfiguration; +import org.thingsboard.server.common.data.device.profile.MqttDeviceProfileTransportConfiguration; import org.thingsboard.server.common.data.device.profile.SimpleAlarmConditionSpec; import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.edge.EdgeEvent; @@ -199,7 +201,11 @@ abstract public class BaseEdgeTest extends AbstractControllerTest { private void installation() throws Exception { edge = doPost("/api/edge", constructEdge("Test Edge", "test"), Edge.class); - DeviceProfile deviceProfile = this.createDeviceProfile(CUSTOM_DEVICE_PROFILE_NAME); + MqttDeviceProfileTransportConfiguration transportConfiguration = new MqttDeviceProfileTransportConfiguration(); + transportConfiguration.setTransportPayloadTypeConfiguration(new JsonTransportPayloadConfiguration()); + + DeviceProfile deviceProfile = this.createDeviceProfile(CUSTOM_DEVICE_PROFILE_NAME, transportConfiguration); + extendDeviceProfileData(deviceProfile); doPost("/api/deviceProfile", deviceProfile, DeviceProfile.class); diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/device/profile/TransportPayloadTypeConfiguration.java b/common/data/src/main/java/org/thingsboard/server/common/data/device/profile/TransportPayloadTypeConfiguration.java index 7129ba8846..4d229a3706 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/device/profile/TransportPayloadTypeConfiguration.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/device/profile/TransportPayloadTypeConfiguration.java @@ -21,6 +21,8 @@ import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; import org.thingsboard.server.common.data.TransportPayloadType; +import java.io.Serializable; + @JsonIgnoreProperties(ignoreUnknown = true) @JsonTypeInfo( use = JsonTypeInfo.Id.NAME, @@ -29,7 +31,7 @@ import org.thingsboard.server.common.data.TransportPayloadType; @JsonSubTypes({ @JsonSubTypes.Type(value = JsonTransportPayloadConfiguration.class, name = "JSON"), @JsonSubTypes.Type(value = ProtoTransportPayloadConfiguration.class, name = "PROTOBUF")}) -public interface TransportPayloadTypeConfiguration { +public interface TransportPayloadTypeConfiguration extends Serializable { @JsonIgnore TransportPayloadType getTransportPayloadType(); From 5f548f2179cfe016563bb8ce40cb68907ea0cd68 Mon Sep 17 00:00:00 2001 From: Jan Christoph Bernack Date: Wed, 4 May 2022 17:45:46 +0200 Subject: [PATCH 2/7] use coap piggybacked responses if possible --- .../transport/coap/CoapTransportResource.java | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java index 3e291a8177..86ac404aa2 100644 --- a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java +++ b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java @@ -169,7 +169,7 @@ public class CoapTransportResource extends AbstractCoapTransportResource { } private void processProvision(CoapExchange exchange) { - exchange.accept(); + deferAccept(exchange); try { UUID sessionId = UUID.randomUUID(); log.trace("[{}] Processing provision publish msg [{}]!", sessionId, exchange.advanced().getRequest()); @@ -195,7 +195,7 @@ public class CoapTransportResource extends AbstractCoapTransportResource { private void processRequest(CoapExchange exchange, SessionMsgType type) { log.trace("Processing {}", exchange.advanced().getRequest()); - exchange.accept(); + deferAccept(exchange); Exchange advanced = exchange.advanced(); Request request = advanced.getRequest(); @@ -346,6 +346,16 @@ public class CoapTransportResource extends AbstractCoapTransportResource { new CoapNoOpCallback(exchange)); } + /** + * Send an empty ACK if we are unable to send the full response within the timeout. + * If the full response is transmitted before the timeout this will not do anything. + * If this is triggered the full response will be sent in a separate CON/NON message. + * Essentially this allows the use of piggybacked responses. + */ + private void deferAccept(CoapExchange exchange) { + transportContext.getScheduler().schedule(exchange::accept, 500, TimeUnit.MILLISECONDS); + } + private UUID toSessionId(TransportProtos.SessionInfoProto sessionInfoProto) { return new UUID(sessionInfoProto.getSessionIdMSB(), sessionInfoProto.getSessionIdLSB()); } From 80b7b9cee9858f9635131d7d29555f8763051a51 Mon Sep 17 00:00:00 2001 From: Jan Christoph Bernack Date: Wed, 4 May 2022 17:54:49 +0200 Subject: [PATCH 3/7] add config parameter: transport.coap.piggyback_timeout --- application/src/main/resources/thingsboard.yml | 1 + .../org/thingsboard/server/coapserver/CoapServerContext.java | 4 ++++ .../org/thingsboard/server/coapserver/CoapServerService.java | 2 ++ .../server/coapserver/DefaultCoapServerService.java | 5 +++++ .../server/transport/coap/CoapTransportResource.java | 4 +++- transport/coap/src/main/resources/tb-coap-transport.yml | 1 + 6 files changed, 16 insertions(+), 1 deletion(-) diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 0aa7b3bb66..176b89b41d 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -703,6 +703,7 @@ transport: bind_address: "${COAP_BIND_ADDRESS:0.0.0.0}" bind_port: "${COAP_BIND_PORT:5683}" timeout: "${COAP_TIMEOUT:10000}" + piggyback_timeout: "${COAP_PIGGYBACK_TIMEOUT:500}" psm_activity_timer: "${COAP_PSM_ACTIVITY_TIMER:10000}" paging_transmission_window: "${COAP_PAGING_TRANSMISSION_WINDOW:10000}" dtls: diff --git a/common/coap-server/src/main/java/org/thingsboard/server/coapserver/CoapServerContext.java b/common/coap-server/src/main/java/org/thingsboard/server/coapserver/CoapServerContext.java index dda014da4c..8b448a1bcd 100644 --- a/common/coap-server/src/main/java/org/thingsboard/server/coapserver/CoapServerContext.java +++ b/common/coap-server/src/main/java/org/thingsboard/server/coapserver/CoapServerContext.java @@ -38,6 +38,10 @@ public class CoapServerContext { @Value("${transport.coap.timeout}") private Long timeout; + @Getter + @Value("${transport.coap.piggyback_timeout}") + private Long piggybackTimeout; + @Getter @Value("${transport.coap.psm_activity_timer:10000}") private long psmActivityTimer; diff --git a/common/coap-server/src/main/java/org/thingsboard/server/coapserver/CoapServerService.java b/common/coap-server/src/main/java/org/thingsboard/server/coapserver/CoapServerService.java index 8dbbc5a620..8d38b67d9c 100644 --- a/common/coap-server/src/main/java/org/thingsboard/server/coapserver/CoapServerService.java +++ b/common/coap-server/src/main/java/org/thingsboard/server/coapserver/CoapServerService.java @@ -29,4 +29,6 @@ public interface CoapServerService { long getTimeout(); + long getPiggybackTimeout(); + } diff --git a/common/coap-server/src/main/java/org/thingsboard/server/coapserver/DefaultCoapServerService.java b/common/coap-server/src/main/java/org/thingsboard/server/coapserver/DefaultCoapServerService.java index 3aa0d0c122..b7b0bcb436 100644 --- a/common/coap-server/src/main/java/org/thingsboard/server/coapserver/DefaultCoapServerService.java +++ b/common/coap-server/src/main/java/org/thingsboard/server/coapserver/DefaultCoapServerService.java @@ -88,6 +88,11 @@ public class DefaultCoapServerService implements CoapServerService { return coapServerContext.getTimeout(); } + @Override + public long getPiggybackTimeout() { + return coapServerContext.getPiggybackTimeout(); + } + private CoapServer createCoapServer() throws UnknownHostException { Configuration networkConfig = new Configuration(); networkConfig.set(CoapConfig.BLOCKWISE_STRICT_BLOCK2_OPTION, true); diff --git a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java index 86ac404aa2..983aec497c 100644 --- a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java +++ b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java @@ -69,6 +69,7 @@ public class CoapTransportResource extends AbstractCoapTransportResource { private final ConcurrentMap dtlsSessionsMap; private final long timeout; + private final long piggybackTimeout; private final CoapClientContext clients; public CoapTransportResource(CoapTransportContext ctx, CoapServerService coapServerService, String name) { @@ -77,6 +78,7 @@ public class CoapTransportResource extends AbstractCoapTransportResource { this.addObserver(new CoapResourceObserver()); this.dtlsSessionsMap = coapServerService.getDtlsSessionsMap(); this.timeout = coapServerService.getTimeout(); + this.piggybackTimeout = coapServerService.getPiggybackTimeout(); this.clients = ctx.getClientContext(); long sessionReportTimeout = ctx.getSessionReportTimeout(); ctx.getScheduler().scheduleAtFixedRate(clients::reportActivity, new Random().nextInt((int) sessionReportTimeout), sessionReportTimeout, TimeUnit.MILLISECONDS); @@ -353,7 +355,7 @@ public class CoapTransportResource extends AbstractCoapTransportResource { * Essentially this allows the use of piggybacked responses. */ private void deferAccept(CoapExchange exchange) { - transportContext.getScheduler().schedule(exchange::accept, 500, TimeUnit.MILLISECONDS); + transportContext.getScheduler().schedule(exchange::accept, piggybackTimeout, TimeUnit.MILLISECONDS); } private UUID toSessionId(TransportProtos.SessionInfoProto sessionInfoProto) { diff --git a/transport/coap/src/main/resources/tb-coap-transport.yml b/transport/coap/src/main/resources/tb-coap-transport.yml index f4717c418d..e53752abfe 100644 --- a/transport/coap/src/main/resources/tb-coap-transport.yml +++ b/transport/coap/src/main/resources/tb-coap-transport.yml @@ -89,6 +89,7 @@ transport: bind_address: "${COAP_BIND_ADDRESS:0.0.0.0}" bind_port: "${COAP_BIND_PORT:5683}" timeout: "${COAP_TIMEOUT:10000}" + piggyback_timeout: "${COAP_PIGGYBACK_TIMEOUT:500}" psm_activity_timer: "${COAP_PSM_ACTIVITY_TIMER:10000}" paging_transmission_window: "${COAP_PAGING_TRANSMISSION_WINDOW:10000}" dtls: From 6534db8ed2932931578be9cc52277657c2242f7d Mon Sep 17 00:00:00 2001 From: Jan Christoph Bernack Date: Fri, 13 May 2022 14:54:50 +0200 Subject: [PATCH 4/7] add fallback for piggybackTimeout of zero --- .../server/transport/coap/CoapTransportResource.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java index 983aec497c..0016cc05d8 100644 --- a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java +++ b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java @@ -355,7 +355,11 @@ public class CoapTransportResource extends AbstractCoapTransportResource { * Essentially this allows the use of piggybacked responses. */ private void deferAccept(CoapExchange exchange) { - transportContext.getScheduler().schedule(exchange::accept, piggybackTimeout, TimeUnit.MILLISECONDS); + if (piggybackTimeout > 0) { + transportContext.getScheduler().schedule(exchange::accept, piggybackTimeout, TimeUnit.MILLISECONDS); + } else { + exchange.accept(); + } } private UUID toSessionId(TransportProtos.SessionInfoProto sessionInfoProto) { From ac751b09f90c2f3e27679fa5735ba05ec6a3b957 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Mon, 6 Jun 2022 09:17:19 +0200 Subject: [PATCH 5/7] fixed hash partition service initialization --- .../service/cluster/routing/HashPartitionServiceTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/application/src/test/java/org/thingsboard/server/service/cluster/routing/HashPartitionServiceTest.java b/application/src/test/java/org/thingsboard/server/service/cluster/routing/HashPartitionServiceTest.java index 74738c361f..cea08003bf 100644 --- a/application/src/test/java/org/thingsboard/server/service/cluster/routing/HashPartitionServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/cluster/routing/HashPartitionServiceTest.java @@ -85,7 +85,9 @@ public class HashPartitionServiceTest { .addAllServiceTypes(Collections.singletonList(ServiceType.TB_CORE.name())) .build()); } + clusterRoutingService.init(); + clusterRoutingService.partitionsInit(); clusterRoutingService.recalculatePartitions(currentServer, otherServers); } From fa87a2fc8f3f0da9573f24c5e2e0e2e2271e3a5f Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Mon, 6 Jun 2022 09:35:04 +0200 Subject: [PATCH 6/7] fixed Rate limits test --- .../org/thingsboard/server/common/msg/tools/RateLimitsTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/message/src/test/java/org/thingsboard/server/common/msg/tools/RateLimitsTest.java b/common/message/src/test/java/org/thingsboard/server/common/msg/tools/RateLimitsTest.java index b0bbfa3dc6..c8583527f1 100644 --- a/common/message/src/test/java/org/thingsboard/server/common/msg/tools/RateLimitsTest.java +++ b/common/message/src/test/java/org/thingsboard/server/common/msg/tools/RateLimitsTest.java @@ -67,7 +67,7 @@ public class RateLimitsTest { assertThat(rateLimits.tryConsume()).as("new token is available").isFalse(); int expectedRefillTime = period * 1000; - int gap = 300; + int gap = 500; await("tokens refill for rate limit " + rateLimitConfig) .atLeast(expectedRefillTime - gap, TimeUnit.MILLISECONDS) From e49da1035b72d6e8aa73bab9b359271b9451da6f Mon Sep 17 00:00:00 2001 From: Igor Kulikov Date: Mon, 6 Jun 2022 15:12:57 +0300 Subject: [PATCH 7/7] Fix backward compatibility for defaultQueueName field of device profile --- .../install/SqlDatabaseUpgradeService.java | 41 +++++++++++++++++++ .../update/DefaultDataUpdateService.java | 38 ----------------- .../server/common/data/DeviceProfile.java | 12 ++++++ .../dao/device/DeviceProfileServiceImpl.java | 11 +++++ 4 files changed, 64 insertions(+), 38 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java b/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java index d0d5f37e00..306102c66c 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java @@ -571,6 +571,22 @@ public class SqlDatabaseUpgradeService implements DatabaseEntitiesUpgradeService schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "3.3.4", SCHEMA_UPDATE_SQL); loadSql(schemaUpdateFile, conn); + log.info("Loading queues..."); + try { + if (!CollectionUtils.isEmpty(queueConfig.getQueues())) { + queueConfig.getQueues().forEach(queueSettings -> { + Queue queue = queueConfigToQueue(queueSettings); + Queue existing = queueService.findQueueByTenantIdAndName(queue.getTenantId(), queue.getName()); + if (existing == null) { + queueService.saveQueue(queue); + } + }); + } else { + systemDataLoaderService.createQueues(); + } + } catch (Exception e) { + } + log.info("Updating device profiles..."); schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "3.3.4", "schema_update_device_profile.sql"); loadSql(schemaUpdateFile, conn); @@ -628,4 +644,29 @@ public class SqlDatabaseUpgradeService implements DatabaseEntitiesUpgradeService return isOldSchema; } + private Queue queueConfigToQueue(TbRuleEngineQueueConfiguration queueSettings) { + Queue queue = new Queue(); + queue.setTenantId(TenantId.SYS_TENANT_ID); + queue.setName(queueSettings.getName()); + queue.setTopic(queueSettings.getTopic()); + queue.setPollInterval(queueSettings.getPollInterval()); + queue.setPartitions(queueSettings.getPartitions()); + queue.setPackProcessingTimeout(queueSettings.getPackProcessingTimeout()); + SubmitStrategy submitStrategy = new SubmitStrategy(); + submitStrategy.setBatchSize(queueSettings.getSubmitStrategy().getBatchSize()); + submitStrategy.setType(SubmitStrategyType.valueOf(queueSettings.getSubmitStrategy().getType())); + queue.setSubmitStrategy(submitStrategy); + ProcessingStrategy processingStrategy = new ProcessingStrategy(); + processingStrategy.setType(ProcessingStrategyType.valueOf(queueSettings.getProcessingStrategy().getType())); + processingStrategy.setRetries(queueSettings.getProcessingStrategy().getRetries()); + processingStrategy.setFailurePercentage(queueSettings.getProcessingStrategy().getFailurePercentage()); + processingStrategy.setPauseBetweenRetries(queueSettings.getProcessingStrategy().getPauseBetweenRetries()); + processingStrategy.setMaxPauseBetweenRetries(queueSettings.getProcessingStrategy().getMaxPauseBetweenRetries()); + queue.setProcessingStrategy(processingStrategy); + queue.setConsumerPerPartition(queueSettings.isConsumerPerPartition()); + return queue; + } + + + } diff --git a/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java b/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java index 7b87668319..e27f2e88c2 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java @@ -162,21 +162,6 @@ public class DefaultDataUpdateService implements DataUpdateService { break; case "3.3.4": log.info("Updating data from version 3.3.4 to 3.4.0 ..."); - log.info("Loading queues..."); - try { - if (!CollectionUtils.isEmpty(queueConfig.getQueues())) { - queueConfig.getQueues().forEach(queueSettings -> { - Queue queue = queueConfigToQueue(queueSettings); - Queue existing = queueService.findQueueByTenantIdAndName(queue.getTenantId(), queue.getName()); - if (existing == null) { - queueService.saveQueue(queue); - } - }); - } else { - systemDataLoaderService.createQueues(); - } - } catch (Exception e) { - } tenantsProfileQueueConfigurationUpdater.updateEntities(null); checkPointRuleNodesUpdater.updateEntities(null); break; @@ -649,29 +634,6 @@ public class DefaultDataUpdateService implements DataUpdateService { return mainQueueConfiguration; } - private Queue queueConfigToQueue(TbRuleEngineQueueConfiguration queueSettings) { - Queue queue = new Queue(); - queue.setTenantId(TenantId.SYS_TENANT_ID); - queue.setName(queueSettings.getName()); - queue.setTopic(queueSettings.getTopic()); - queue.setPollInterval(queueSettings.getPollInterval()); - queue.setPartitions(queueSettings.getPartitions()); - queue.setPackProcessingTimeout(queueSettings.getPackProcessingTimeout()); - SubmitStrategy submitStrategy = new SubmitStrategy(); - submitStrategy.setBatchSize(queueSettings.getSubmitStrategy().getBatchSize()); - submitStrategy.setType(SubmitStrategyType.valueOf(queueSettings.getSubmitStrategy().getType())); - queue.setSubmitStrategy(submitStrategy); - ProcessingStrategy processingStrategy = new ProcessingStrategy(); - processingStrategy.setType(ProcessingStrategyType.valueOf(queueSettings.getProcessingStrategy().getType())); - processingStrategy.setRetries(queueSettings.getProcessingStrategy().getRetries()); - processingStrategy.setFailurePercentage(queueSettings.getProcessingStrategy().getFailurePercentage()); - processingStrategy.setPauseBetweenRetries(queueSettings.getProcessingStrategy().getPauseBetweenRetries()); - processingStrategy.setMaxPauseBetweenRetries(queueSettings.getProcessingStrategy().getMaxPauseBetweenRetries()); - queue.setProcessingStrategy(processingStrategy); - queue.setConsumerPerPartition(queueSettings.isConsumerPerPartition()); - return queue; - } - private final PaginatedUpdater checkPointRuleNodesUpdater = new PaginatedUpdater<>() { diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/DeviceProfile.java b/common/data/src/main/java/org/thingsboard/server/common/data/DeviceProfile.java index 36cbda47c3..f432862499 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/DeviceProfile.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/DeviceProfile.java @@ -16,6 +16,7 @@ package org.thingsboard.server.common.data; import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.JsonProcessingException; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; @@ -78,6 +79,8 @@ public class DeviceProfile extends SearchTextBased implements H "If present, the specified queue will be used to store all unprocessed messages related to device, including telemetry, attribute updates, etc. " + "Otherwise, the 'Main' queue will be used to store those messages.") private QueueId defaultQueueId; + + private String defaultQueueName; @Valid private transient DeviceProfileData profileData; @JsonIgnore @@ -168,4 +171,13 @@ public class DeviceProfile extends SearchTextBased implements H } } + @JsonIgnore + public String getDefaultQueueName() { + return defaultQueueName; + } + + @JsonProperty + public void setDefaultQueueName(String defaultQueueName) { + this.defaultQueueName = defaultQueueName; + } } diff --git a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceProfileServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceProfileServiceImpl.java index 04084de987..df5a49c1e1 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceProfileServiceImpl.java +++ b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceProfileServiceImpl.java @@ -37,8 +37,10 @@ import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; +import org.thingsboard.server.common.data.queue.Queue; import org.thingsboard.server.dao.entity.AbstractCachedEntityService; import org.thingsboard.server.dao.exception.DataValidationException; +import org.thingsboard.server.dao.queue.QueueService; import org.thingsboard.server.dao.service.DataValidator; import org.thingsboard.server.dao.service.PaginatedRemover; import org.thingsboard.server.dao.service.Validator; @@ -71,6 +73,9 @@ public class DeviceProfileServiceImpl extends AbstractCachedEntityService deviceProfileValidator; + @Autowired + private QueueService queueService; + private final Lock findOrCreateLock = new ReentrantLock(); @TransactionalEventListener(classes = DeviceProfileEvictEvent.class) @@ -119,6 +124,12 @@ public class DeviceProfileServiceImpl extends AbstractCachedEntityService