diff --git a/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java b/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java index d0d5f37e00..306102c66c 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java @@ -571,6 +571,22 @@ public class SqlDatabaseUpgradeService implements DatabaseEntitiesUpgradeService schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "3.3.4", SCHEMA_UPDATE_SQL); loadSql(schemaUpdateFile, conn); + log.info("Loading queues..."); + try { + if (!CollectionUtils.isEmpty(queueConfig.getQueues())) { + queueConfig.getQueues().forEach(queueSettings -> { + Queue queue = queueConfigToQueue(queueSettings); + Queue existing = queueService.findQueueByTenantIdAndName(queue.getTenantId(), queue.getName()); + if (existing == null) { + queueService.saveQueue(queue); + } + }); + } else { + systemDataLoaderService.createQueues(); + } + } catch (Exception e) { + } + log.info("Updating device profiles..."); schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "3.3.4", "schema_update_device_profile.sql"); loadSql(schemaUpdateFile, conn); @@ -628,4 +644,29 @@ public class SqlDatabaseUpgradeService implements DatabaseEntitiesUpgradeService return isOldSchema; } + private Queue queueConfigToQueue(TbRuleEngineQueueConfiguration queueSettings) { + Queue queue = new Queue(); + queue.setTenantId(TenantId.SYS_TENANT_ID); + queue.setName(queueSettings.getName()); + queue.setTopic(queueSettings.getTopic()); + queue.setPollInterval(queueSettings.getPollInterval()); + queue.setPartitions(queueSettings.getPartitions()); + queue.setPackProcessingTimeout(queueSettings.getPackProcessingTimeout()); + SubmitStrategy submitStrategy = new SubmitStrategy(); + submitStrategy.setBatchSize(queueSettings.getSubmitStrategy().getBatchSize()); + submitStrategy.setType(SubmitStrategyType.valueOf(queueSettings.getSubmitStrategy().getType())); + queue.setSubmitStrategy(submitStrategy); + ProcessingStrategy processingStrategy = new ProcessingStrategy(); + processingStrategy.setType(ProcessingStrategyType.valueOf(queueSettings.getProcessingStrategy().getType())); + processingStrategy.setRetries(queueSettings.getProcessingStrategy().getRetries()); + processingStrategy.setFailurePercentage(queueSettings.getProcessingStrategy().getFailurePercentage()); + processingStrategy.setPauseBetweenRetries(queueSettings.getProcessingStrategy().getPauseBetweenRetries()); + processingStrategy.setMaxPauseBetweenRetries(queueSettings.getProcessingStrategy().getMaxPauseBetweenRetries()); + queue.setProcessingStrategy(processingStrategy); + queue.setConsumerPerPartition(queueSettings.isConsumerPerPartition()); + return queue; + } + + + } diff --git a/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java b/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java index 7b87668319..e27f2e88c2 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java @@ -162,21 +162,6 @@ public class DefaultDataUpdateService implements DataUpdateService { break; case "3.3.4": log.info("Updating data from version 3.3.4 to 3.4.0 ..."); - log.info("Loading queues..."); - try { - if (!CollectionUtils.isEmpty(queueConfig.getQueues())) { - queueConfig.getQueues().forEach(queueSettings -> { - Queue queue = queueConfigToQueue(queueSettings); - Queue existing = queueService.findQueueByTenantIdAndName(queue.getTenantId(), queue.getName()); - if (existing == null) { - queueService.saveQueue(queue); - } - }); - } else { - systemDataLoaderService.createQueues(); - } - } catch (Exception e) { - } tenantsProfileQueueConfigurationUpdater.updateEntities(null); checkPointRuleNodesUpdater.updateEntities(null); break; @@ -649,29 +634,6 @@ public class DefaultDataUpdateService implements DataUpdateService { return mainQueueConfiguration; } - private Queue queueConfigToQueue(TbRuleEngineQueueConfiguration queueSettings) { - Queue queue = new Queue(); - queue.setTenantId(TenantId.SYS_TENANT_ID); - queue.setName(queueSettings.getName()); - queue.setTopic(queueSettings.getTopic()); - queue.setPollInterval(queueSettings.getPollInterval()); - queue.setPartitions(queueSettings.getPartitions()); - queue.setPackProcessingTimeout(queueSettings.getPackProcessingTimeout()); - SubmitStrategy submitStrategy = new SubmitStrategy(); - submitStrategy.setBatchSize(queueSettings.getSubmitStrategy().getBatchSize()); - submitStrategy.setType(SubmitStrategyType.valueOf(queueSettings.getSubmitStrategy().getType())); - queue.setSubmitStrategy(submitStrategy); - ProcessingStrategy processingStrategy = new ProcessingStrategy(); - processingStrategy.setType(ProcessingStrategyType.valueOf(queueSettings.getProcessingStrategy().getType())); - processingStrategy.setRetries(queueSettings.getProcessingStrategy().getRetries()); - processingStrategy.setFailurePercentage(queueSettings.getProcessingStrategy().getFailurePercentage()); - processingStrategy.setPauseBetweenRetries(queueSettings.getProcessingStrategy().getPauseBetweenRetries()); - processingStrategy.setMaxPauseBetweenRetries(queueSettings.getProcessingStrategy().getMaxPauseBetweenRetries()); - queue.setProcessingStrategy(processingStrategy); - queue.setConsumerPerPartition(queueSettings.isConsumerPerPartition()); - return queue; - } - private final PaginatedUpdater checkPointRuleNodesUpdater = new PaginatedUpdater<>() { diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 0058df9b2b..ac6fe34905 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -712,6 +712,7 @@ transport: bind_address: "${COAP_BIND_ADDRESS:0.0.0.0}" bind_port: "${COAP_BIND_PORT:5683}" timeout: "${COAP_TIMEOUT:10000}" + piggyback_timeout: "${COAP_PIGGYBACK_TIMEOUT:500}" psm_activity_timer: "${COAP_PSM_ACTIVITY_TIMER:10000}" paging_transmission_window: "${COAP_PAGING_TRANSMISSION_WINDOW:10000}" dtls: diff --git a/application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java index e815378d27..61bb53b060 100644 --- a/application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java @@ -58,6 +58,8 @@ import org.thingsboard.server.common.data.device.profile.AlarmRule; import org.thingsboard.server.common.data.device.profile.AllowCreateNewDevicesDeviceProfileProvisionConfiguration; import org.thingsboard.server.common.data.device.profile.DeviceProfileAlarm; import org.thingsboard.server.common.data.device.profile.DeviceProfileData; +import org.thingsboard.server.common.data.device.profile.JsonTransportPayloadConfiguration; +import org.thingsboard.server.common.data.device.profile.MqttDeviceProfileTransportConfiguration; import org.thingsboard.server.common.data.device.profile.SimpleAlarmConditionSpec; import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.edge.EdgeEvent; @@ -199,7 +201,11 @@ abstract public class BaseEdgeTest extends AbstractControllerTest { private void installation() throws Exception { edge = doPost("/api/edge", constructEdge("Test Edge", "test"), Edge.class); - DeviceProfile deviceProfile = this.createDeviceProfile(CUSTOM_DEVICE_PROFILE_NAME); + MqttDeviceProfileTransportConfiguration transportConfiguration = new MqttDeviceProfileTransportConfiguration(); + transportConfiguration.setTransportPayloadTypeConfiguration(new JsonTransportPayloadConfiguration()); + + DeviceProfile deviceProfile = this.createDeviceProfile(CUSTOM_DEVICE_PROFILE_NAME, transportConfiguration); + extendDeviceProfileData(deviceProfile); doPost("/api/deviceProfile", deviceProfile, DeviceProfile.class); diff --git a/application/src/test/java/org/thingsboard/server/service/cluster/routing/HashPartitionServiceTest.java b/application/src/test/java/org/thingsboard/server/service/cluster/routing/HashPartitionServiceTest.java index 6aafa833b1..9df6c16111 100644 --- a/application/src/test/java/org/thingsboard/server/service/cluster/routing/HashPartitionServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/cluster/routing/HashPartitionServiceTest.java @@ -87,7 +87,9 @@ public class HashPartitionServiceTest { .addAllServiceTypes(Collections.singletonList(ServiceType.TB_CORE.name())) .build()); } + clusterRoutingService.init(); + clusterRoutingService.partitionsInit(); clusterRoutingService.recalculatePartitions(currentServer, otherServers); } diff --git a/common/coap-server/src/main/java/org/thingsboard/server/coapserver/CoapServerContext.java b/common/coap-server/src/main/java/org/thingsboard/server/coapserver/CoapServerContext.java index dda014da4c..8b448a1bcd 100644 --- a/common/coap-server/src/main/java/org/thingsboard/server/coapserver/CoapServerContext.java +++ b/common/coap-server/src/main/java/org/thingsboard/server/coapserver/CoapServerContext.java @@ -38,6 +38,10 @@ public class CoapServerContext { @Value("${transport.coap.timeout}") private Long timeout; + @Getter + @Value("${transport.coap.piggyback_timeout}") + private Long piggybackTimeout; + @Getter @Value("${transport.coap.psm_activity_timer:10000}") private long psmActivityTimer; diff --git a/common/coap-server/src/main/java/org/thingsboard/server/coapserver/CoapServerService.java b/common/coap-server/src/main/java/org/thingsboard/server/coapserver/CoapServerService.java index 8dbbc5a620..8d38b67d9c 100644 --- a/common/coap-server/src/main/java/org/thingsboard/server/coapserver/CoapServerService.java +++ b/common/coap-server/src/main/java/org/thingsboard/server/coapserver/CoapServerService.java @@ -29,4 +29,6 @@ public interface CoapServerService { long getTimeout(); + long getPiggybackTimeout(); + } diff --git a/common/coap-server/src/main/java/org/thingsboard/server/coapserver/DefaultCoapServerService.java b/common/coap-server/src/main/java/org/thingsboard/server/coapserver/DefaultCoapServerService.java index 3aa0d0c122..b7b0bcb436 100644 --- a/common/coap-server/src/main/java/org/thingsboard/server/coapserver/DefaultCoapServerService.java +++ b/common/coap-server/src/main/java/org/thingsboard/server/coapserver/DefaultCoapServerService.java @@ -88,6 +88,11 @@ public class DefaultCoapServerService implements CoapServerService { return coapServerContext.getTimeout(); } + @Override + public long getPiggybackTimeout() { + return coapServerContext.getPiggybackTimeout(); + } + private CoapServer createCoapServer() throws UnknownHostException { Configuration networkConfig = new Configuration(); networkConfig.set(CoapConfig.BLOCKWISE_STRICT_BLOCK2_OPTION, true); diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/DeviceProfile.java b/common/data/src/main/java/org/thingsboard/server/common/data/DeviceProfile.java index b1cddabb43..e087ba0b7c 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/DeviceProfile.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/DeviceProfile.java @@ -16,6 +16,7 @@ package org.thingsboard.server.common.data; import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.JsonProcessingException; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; @@ -78,6 +79,8 @@ public class DeviceProfile extends SearchTextBased implements H "If present, the specified queue will be used to store all unprocessed messages related to device, including telemetry, attribute updates, etc. " + "Otherwise, the 'Main' queue will be used to store those messages.") private QueueId defaultQueueId; + + private String defaultQueueName; @Valid private transient DeviceProfileData profileData; @JsonIgnore @@ -171,4 +174,13 @@ public class DeviceProfile extends SearchTextBased implements H } } + @JsonIgnore + public String getDefaultQueueName() { + return defaultQueueName; + } + + @JsonProperty + public void setDefaultQueueName(String defaultQueueName) { + this.defaultQueueName = defaultQueueName; + } } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/device/profile/TransportPayloadTypeConfiguration.java b/common/data/src/main/java/org/thingsboard/server/common/data/device/profile/TransportPayloadTypeConfiguration.java index 7129ba8846..4d229a3706 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/device/profile/TransportPayloadTypeConfiguration.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/device/profile/TransportPayloadTypeConfiguration.java @@ -21,6 +21,8 @@ import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; import org.thingsboard.server.common.data.TransportPayloadType; +import java.io.Serializable; + @JsonIgnoreProperties(ignoreUnknown = true) @JsonTypeInfo( use = JsonTypeInfo.Id.NAME, @@ -29,7 +31,7 @@ import org.thingsboard.server.common.data.TransportPayloadType; @JsonSubTypes({ @JsonSubTypes.Type(value = JsonTransportPayloadConfiguration.class, name = "JSON"), @JsonSubTypes.Type(value = ProtoTransportPayloadConfiguration.class, name = "PROTOBUF")}) -public interface TransportPayloadTypeConfiguration { +public interface TransportPayloadTypeConfiguration extends Serializable { @JsonIgnore TransportPayloadType getTransportPayloadType(); diff --git a/common/message/src/test/java/org/thingsboard/server/common/msg/tools/RateLimitsTest.java b/common/message/src/test/java/org/thingsboard/server/common/msg/tools/RateLimitsTest.java index b0bbfa3dc6..c8583527f1 100644 --- a/common/message/src/test/java/org/thingsboard/server/common/msg/tools/RateLimitsTest.java +++ b/common/message/src/test/java/org/thingsboard/server/common/msg/tools/RateLimitsTest.java @@ -67,7 +67,7 @@ public class RateLimitsTest { assertThat(rateLimits.tryConsume()).as("new token is available").isFalse(); int expectedRefillTime = period * 1000; - int gap = 300; + int gap = 500; await("tokens refill for rate limit " + rateLimitConfig) .atLeast(expectedRefillTime - gap, TimeUnit.MILLISECONDS) diff --git a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java index 3e291a8177..0016cc05d8 100644 --- a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java +++ b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java @@ -69,6 +69,7 @@ public class CoapTransportResource extends AbstractCoapTransportResource { private final ConcurrentMap dtlsSessionsMap; private final long timeout; + private final long piggybackTimeout; private final CoapClientContext clients; public CoapTransportResource(CoapTransportContext ctx, CoapServerService coapServerService, String name) { @@ -77,6 +78,7 @@ public class CoapTransportResource extends AbstractCoapTransportResource { this.addObserver(new CoapResourceObserver()); this.dtlsSessionsMap = coapServerService.getDtlsSessionsMap(); this.timeout = coapServerService.getTimeout(); + this.piggybackTimeout = coapServerService.getPiggybackTimeout(); this.clients = ctx.getClientContext(); long sessionReportTimeout = ctx.getSessionReportTimeout(); ctx.getScheduler().scheduleAtFixedRate(clients::reportActivity, new Random().nextInt((int) sessionReportTimeout), sessionReportTimeout, TimeUnit.MILLISECONDS); @@ -169,7 +171,7 @@ public class CoapTransportResource extends AbstractCoapTransportResource { } private void processProvision(CoapExchange exchange) { - exchange.accept(); + deferAccept(exchange); try { UUID sessionId = UUID.randomUUID(); log.trace("[{}] Processing provision publish msg [{}]!", sessionId, exchange.advanced().getRequest()); @@ -195,7 +197,7 @@ public class CoapTransportResource extends AbstractCoapTransportResource { private void processRequest(CoapExchange exchange, SessionMsgType type) { log.trace("Processing {}", exchange.advanced().getRequest()); - exchange.accept(); + deferAccept(exchange); Exchange advanced = exchange.advanced(); Request request = advanced.getRequest(); @@ -346,6 +348,20 @@ public class CoapTransportResource extends AbstractCoapTransportResource { new CoapNoOpCallback(exchange)); } + /** + * Send an empty ACK if we are unable to send the full response within the timeout. + * If the full response is transmitted before the timeout this will not do anything. + * If this is triggered the full response will be sent in a separate CON/NON message. + * Essentially this allows the use of piggybacked responses. + */ + private void deferAccept(CoapExchange exchange) { + if (piggybackTimeout > 0) { + transportContext.getScheduler().schedule(exchange::accept, piggybackTimeout, TimeUnit.MILLISECONDS); + } else { + exchange.accept(); + } + } + private UUID toSessionId(TransportProtos.SessionInfoProto sessionInfoProto) { return new UUID(sessionInfoProto.getSessionIdMSB(), sessionInfoProto.getSessionIdLSB()); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceProfileServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceProfileServiceImpl.java index 04084de987..df5a49c1e1 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceProfileServiceImpl.java +++ b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceProfileServiceImpl.java @@ -37,8 +37,10 @@ import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; +import org.thingsboard.server.common.data.queue.Queue; import org.thingsboard.server.dao.entity.AbstractCachedEntityService; import org.thingsboard.server.dao.exception.DataValidationException; +import org.thingsboard.server.dao.queue.QueueService; import org.thingsboard.server.dao.service.DataValidator; import org.thingsboard.server.dao.service.PaginatedRemover; import org.thingsboard.server.dao.service.Validator; @@ -71,6 +73,9 @@ public class DeviceProfileServiceImpl extends AbstractCachedEntityService deviceProfileValidator; + @Autowired + private QueueService queueService; + private final Lock findOrCreateLock = new ReentrantLock(); @TransactionalEventListener(classes = DeviceProfileEvictEvent.class) @@ -119,6 +124,12 @@ public class DeviceProfileServiceImpl extends AbstractCachedEntityService