Browse Source

Merge branch 'develop/3.4' into feature/entities-version-control

pull/6654/head
Igor Kulikov 4 years ago
parent
commit
97565ff9df
  1. 41
      application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java
  2. 38
      application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java
  3. 1
      application/src/main/resources/thingsboard.yml
  4. 8
      application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java
  5. 2
      application/src/test/java/org/thingsboard/server/service/cluster/routing/HashPartitionServiceTest.java
  6. 4
      common/coap-server/src/main/java/org/thingsboard/server/coapserver/CoapServerContext.java
  7. 2
      common/coap-server/src/main/java/org/thingsboard/server/coapserver/CoapServerService.java
  8. 5
      common/coap-server/src/main/java/org/thingsboard/server/coapserver/DefaultCoapServerService.java
  9. 12
      common/data/src/main/java/org/thingsboard/server/common/data/DeviceProfile.java
  10. 4
      common/data/src/main/java/org/thingsboard/server/common/data/device/profile/TransportPayloadTypeConfiguration.java
  11. 2
      common/message/src/test/java/org/thingsboard/server/common/msg/tools/RateLimitsTest.java
  12. 20
      common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java
  13. 11
      dao/src/main/java/org/thingsboard/server/dao/device/DeviceProfileServiceImpl.java
  14. 1
      transport/coap/src/main/resources/tb-coap-transport.yml

41
application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java

@ -571,6 +571,22 @@ public class SqlDatabaseUpgradeService implements DatabaseEntitiesUpgradeService
schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "3.3.4", SCHEMA_UPDATE_SQL);
loadSql(schemaUpdateFile, conn);
log.info("Loading queues...");
try {
if (!CollectionUtils.isEmpty(queueConfig.getQueues())) {
queueConfig.getQueues().forEach(queueSettings -> {
Queue queue = queueConfigToQueue(queueSettings);
Queue existing = queueService.findQueueByTenantIdAndName(queue.getTenantId(), queue.getName());
if (existing == null) {
queueService.saveQueue(queue);
}
});
} else {
systemDataLoaderService.createQueues();
}
} catch (Exception e) {
}
log.info("Updating device profiles...");
schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "3.3.4", "schema_update_device_profile.sql");
loadSql(schemaUpdateFile, conn);
@ -628,4 +644,29 @@ public class SqlDatabaseUpgradeService implements DatabaseEntitiesUpgradeService
return isOldSchema;
}
private Queue queueConfigToQueue(TbRuleEngineQueueConfiguration queueSettings) {
Queue queue = new Queue();
queue.setTenantId(TenantId.SYS_TENANT_ID);
queue.setName(queueSettings.getName());
queue.setTopic(queueSettings.getTopic());
queue.setPollInterval(queueSettings.getPollInterval());
queue.setPartitions(queueSettings.getPartitions());
queue.setPackProcessingTimeout(queueSettings.getPackProcessingTimeout());
SubmitStrategy submitStrategy = new SubmitStrategy();
submitStrategy.setBatchSize(queueSettings.getSubmitStrategy().getBatchSize());
submitStrategy.setType(SubmitStrategyType.valueOf(queueSettings.getSubmitStrategy().getType()));
queue.setSubmitStrategy(submitStrategy);
ProcessingStrategy processingStrategy = new ProcessingStrategy();
processingStrategy.setType(ProcessingStrategyType.valueOf(queueSettings.getProcessingStrategy().getType()));
processingStrategy.setRetries(queueSettings.getProcessingStrategy().getRetries());
processingStrategy.setFailurePercentage(queueSettings.getProcessingStrategy().getFailurePercentage());
processingStrategy.setPauseBetweenRetries(queueSettings.getProcessingStrategy().getPauseBetweenRetries());
processingStrategy.setMaxPauseBetweenRetries(queueSettings.getProcessingStrategy().getMaxPauseBetweenRetries());
queue.setProcessingStrategy(processingStrategy);
queue.setConsumerPerPartition(queueSettings.isConsumerPerPartition());
return queue;
}
}

38
application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java

@ -162,21 +162,6 @@ public class DefaultDataUpdateService implements DataUpdateService {
break;
case "3.3.4":
log.info("Updating data from version 3.3.4 to 3.4.0 ...");
log.info("Loading queues...");
try {
if (!CollectionUtils.isEmpty(queueConfig.getQueues())) {
queueConfig.getQueues().forEach(queueSettings -> {
Queue queue = queueConfigToQueue(queueSettings);
Queue existing = queueService.findQueueByTenantIdAndName(queue.getTenantId(), queue.getName());
if (existing == null) {
queueService.saveQueue(queue);
}
});
} else {
systemDataLoaderService.createQueues();
}
} catch (Exception e) {
}
tenantsProfileQueueConfigurationUpdater.updateEntities(null);
checkPointRuleNodesUpdater.updateEntities(null);
break;
@ -649,29 +634,6 @@ public class DefaultDataUpdateService implements DataUpdateService {
return mainQueueConfiguration;
}
private Queue queueConfigToQueue(TbRuleEngineQueueConfiguration queueSettings) {
Queue queue = new Queue();
queue.setTenantId(TenantId.SYS_TENANT_ID);
queue.setName(queueSettings.getName());
queue.setTopic(queueSettings.getTopic());
queue.setPollInterval(queueSettings.getPollInterval());
queue.setPartitions(queueSettings.getPartitions());
queue.setPackProcessingTimeout(queueSettings.getPackProcessingTimeout());
SubmitStrategy submitStrategy = new SubmitStrategy();
submitStrategy.setBatchSize(queueSettings.getSubmitStrategy().getBatchSize());
submitStrategy.setType(SubmitStrategyType.valueOf(queueSettings.getSubmitStrategy().getType()));
queue.setSubmitStrategy(submitStrategy);
ProcessingStrategy processingStrategy = new ProcessingStrategy();
processingStrategy.setType(ProcessingStrategyType.valueOf(queueSettings.getProcessingStrategy().getType()));
processingStrategy.setRetries(queueSettings.getProcessingStrategy().getRetries());
processingStrategy.setFailurePercentage(queueSettings.getProcessingStrategy().getFailurePercentage());
processingStrategy.setPauseBetweenRetries(queueSettings.getProcessingStrategy().getPauseBetweenRetries());
processingStrategy.setMaxPauseBetweenRetries(queueSettings.getProcessingStrategy().getMaxPauseBetweenRetries());
queue.setProcessingStrategy(processingStrategy);
queue.setConsumerPerPartition(queueSettings.isConsumerPerPartition());
return queue;
}
private final PaginatedUpdater<String, RuleNode> checkPointRuleNodesUpdater =
new PaginatedUpdater<>() {

1
application/src/main/resources/thingsboard.yml

@ -712,6 +712,7 @@ transport:
bind_address: "${COAP_BIND_ADDRESS:0.0.0.0}"
bind_port: "${COAP_BIND_PORT:5683}"
timeout: "${COAP_TIMEOUT:10000}"
piggyback_timeout: "${COAP_PIGGYBACK_TIMEOUT:500}"
psm_activity_timer: "${COAP_PSM_ACTIVITY_TIMER:10000}"
paging_transmission_window: "${COAP_PAGING_TRANSMISSION_WINDOW:10000}"
dtls:

8
application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java

@ -58,6 +58,8 @@ import org.thingsboard.server.common.data.device.profile.AlarmRule;
import org.thingsboard.server.common.data.device.profile.AllowCreateNewDevicesDeviceProfileProvisionConfiguration;
import org.thingsboard.server.common.data.device.profile.DeviceProfileAlarm;
import org.thingsboard.server.common.data.device.profile.DeviceProfileData;
import org.thingsboard.server.common.data.device.profile.JsonTransportPayloadConfiguration;
import org.thingsboard.server.common.data.device.profile.MqttDeviceProfileTransportConfiguration;
import org.thingsboard.server.common.data.device.profile.SimpleAlarmConditionSpec;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.edge.EdgeEvent;
@ -199,7 +201,11 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
private void installation() throws Exception {
edge = doPost("/api/edge", constructEdge("Test Edge", "test"), Edge.class);
DeviceProfile deviceProfile = this.createDeviceProfile(CUSTOM_DEVICE_PROFILE_NAME);
MqttDeviceProfileTransportConfiguration transportConfiguration = new MqttDeviceProfileTransportConfiguration();
transportConfiguration.setTransportPayloadTypeConfiguration(new JsonTransportPayloadConfiguration());
DeviceProfile deviceProfile = this.createDeviceProfile(CUSTOM_DEVICE_PROFILE_NAME, transportConfiguration);
extendDeviceProfileData(deviceProfile);
doPost("/api/deviceProfile", deviceProfile, DeviceProfile.class);

2
application/src/test/java/org/thingsboard/server/service/cluster/routing/HashPartitionServiceTest.java

@ -87,7 +87,9 @@ public class HashPartitionServiceTest {
.addAllServiceTypes(Collections.singletonList(ServiceType.TB_CORE.name()))
.build());
}
clusterRoutingService.init();
clusterRoutingService.partitionsInit();
clusterRoutingService.recalculatePartitions(currentServer, otherServers);
}

4
common/coap-server/src/main/java/org/thingsboard/server/coapserver/CoapServerContext.java

@ -38,6 +38,10 @@ public class CoapServerContext {
@Value("${transport.coap.timeout}")
private Long timeout;
@Getter
@Value("${transport.coap.piggyback_timeout}")
private Long piggybackTimeout;
@Getter
@Value("${transport.coap.psm_activity_timer:10000}")
private long psmActivityTimer;

2
common/coap-server/src/main/java/org/thingsboard/server/coapserver/CoapServerService.java

@ -29,4 +29,6 @@ public interface CoapServerService {
long getTimeout();
long getPiggybackTimeout();
}

5
common/coap-server/src/main/java/org/thingsboard/server/coapserver/DefaultCoapServerService.java

@ -88,6 +88,11 @@ public class DefaultCoapServerService implements CoapServerService {
return coapServerContext.getTimeout();
}
@Override
public long getPiggybackTimeout() {
return coapServerContext.getPiggybackTimeout();
}
private CoapServer createCoapServer() throws UnknownHostException {
Configuration networkConfig = new Configuration();
networkConfig.set(CoapConfig.BLOCKWISE_STRICT_BLOCK2_OPTION, true);

12
common/data/src/main/java/org/thingsboard/server/common/data/DeviceProfile.java

@ -16,6 +16,7 @@
package org.thingsboard.server.common.data;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
@ -78,6 +79,8 @@ public class DeviceProfile extends SearchTextBased<DeviceProfileId> implements H
"If present, the specified queue will be used to store all unprocessed messages related to device, including telemetry, attribute updates, etc. " +
"Otherwise, the 'Main' queue will be used to store those messages.")
private QueueId defaultQueueId;
private String defaultQueueName;
@Valid
private transient DeviceProfileData profileData;
@JsonIgnore
@ -171,4 +174,13 @@ public class DeviceProfile extends SearchTextBased<DeviceProfileId> implements H
}
}
@JsonIgnore
public String getDefaultQueueName() {
return defaultQueueName;
}
@JsonProperty
public void setDefaultQueueName(String defaultQueueName) {
this.defaultQueueName = defaultQueueName;
}
}

4
common/data/src/main/java/org/thingsboard/server/common/data/device/profile/TransportPayloadTypeConfiguration.java

@ -21,6 +21,8 @@ import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.thingsboard.server.common.data.TransportPayloadType;
import java.io.Serializable;
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonTypeInfo(
use = JsonTypeInfo.Id.NAME,
@ -29,7 +31,7 @@ import org.thingsboard.server.common.data.TransportPayloadType;
@JsonSubTypes({
@JsonSubTypes.Type(value = JsonTransportPayloadConfiguration.class, name = "JSON"),
@JsonSubTypes.Type(value = ProtoTransportPayloadConfiguration.class, name = "PROTOBUF")})
public interface TransportPayloadTypeConfiguration {
public interface TransportPayloadTypeConfiguration extends Serializable {
@JsonIgnore
TransportPayloadType getTransportPayloadType();

2
common/message/src/test/java/org/thingsboard/server/common/msg/tools/RateLimitsTest.java

@ -67,7 +67,7 @@ public class RateLimitsTest {
assertThat(rateLimits.tryConsume()).as("new token is available").isFalse();
int expectedRefillTime = period * 1000;
int gap = 300;
int gap = 500;
await("tokens refill for rate limit " + rateLimitConfig)
.atLeast(expectedRefillTime - gap, TimeUnit.MILLISECONDS)

20
common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java

@ -69,6 +69,7 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
private final ConcurrentMap<InetSocketAddress, TbCoapDtlsSessionInfo> dtlsSessionsMap;
private final long timeout;
private final long piggybackTimeout;
private final CoapClientContext clients;
public CoapTransportResource(CoapTransportContext ctx, CoapServerService coapServerService, String name) {
@ -77,6 +78,7 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
this.addObserver(new CoapResourceObserver());
this.dtlsSessionsMap = coapServerService.getDtlsSessionsMap();
this.timeout = coapServerService.getTimeout();
this.piggybackTimeout = coapServerService.getPiggybackTimeout();
this.clients = ctx.getClientContext();
long sessionReportTimeout = ctx.getSessionReportTimeout();
ctx.getScheduler().scheduleAtFixedRate(clients::reportActivity, new Random().nextInt((int) sessionReportTimeout), sessionReportTimeout, TimeUnit.MILLISECONDS);
@ -169,7 +171,7 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
}
private void processProvision(CoapExchange exchange) {
exchange.accept();
deferAccept(exchange);
try {
UUID sessionId = UUID.randomUUID();
log.trace("[{}] Processing provision publish msg [{}]!", sessionId, exchange.advanced().getRequest());
@ -195,7 +197,7 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
private void processRequest(CoapExchange exchange, SessionMsgType type) {
log.trace("Processing {}", exchange.advanced().getRequest());
exchange.accept();
deferAccept(exchange);
Exchange advanced = exchange.advanced();
Request request = advanced.getRequest();
@ -346,6 +348,20 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
new CoapNoOpCallback(exchange));
}
/**
* Send an empty ACK if we are unable to send the full response within the timeout.
* If the full response is transmitted before the timeout this will not do anything.
* If this is triggered the full response will be sent in a separate CON/NON message.
* Essentially this allows the use of piggybacked responses.
*/
private void deferAccept(CoapExchange exchange) {
if (piggybackTimeout > 0) {
transportContext.getScheduler().schedule(exchange::accept, piggybackTimeout, TimeUnit.MILLISECONDS);
} else {
exchange.accept();
}
}
private UUID toSessionId(TransportProtos.SessionInfoProto sessionInfoProto) {
return new UUID(sessionInfoProto.getSessionIdMSB(), sessionInfoProto.getSessionIdLSB());
}

11
dao/src/main/java/org/thingsboard/server/dao/device/DeviceProfileServiceImpl.java

@ -37,8 +37,10 @@ import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.queue.Queue;
import org.thingsboard.server.dao.entity.AbstractCachedEntityService;
import org.thingsboard.server.dao.exception.DataValidationException;
import org.thingsboard.server.dao.queue.QueueService;
import org.thingsboard.server.dao.service.DataValidator;
import org.thingsboard.server.dao.service.PaginatedRemover;
import org.thingsboard.server.dao.service.Validator;
@ -71,6 +73,9 @@ public class DeviceProfileServiceImpl extends AbstractCachedEntityService<Device
@Autowired
private DataValidator<DeviceProfile> deviceProfileValidator;
@Autowired
private QueueService queueService;
private final Lock findOrCreateLock = new ReentrantLock();
@TransactionalEventListener(classes = DeviceProfileEvictEvent.class)
@ -119,6 +124,12 @@ public class DeviceProfileServiceImpl extends AbstractCachedEntityService<Device
DeviceProfile oldDeviceProfile = deviceProfileValidator.validate(deviceProfile, DeviceProfile::getTenantId);
DeviceProfile savedDeviceProfile;
try {
if (deviceProfile.getDefaultQueueId() == null && StringUtils.isNotEmpty(deviceProfile.getDefaultQueueName())) {
Queue existing = queueService.findQueueByTenantIdAndName(deviceProfile.getTenantId(), deviceProfile.getDefaultQueueName());
if (existing != null) {
deviceProfile.setDefaultQueueId(existing.getId());
}
}
savedDeviceProfile = deviceProfileDao.saveAndFlush(deviceProfile.getTenantId(), deviceProfile);
publishEvictEvent(new DeviceProfileEvictEvent(savedDeviceProfile.getTenantId(), savedDeviceProfile.getName(),
oldDeviceProfile != null ? oldDeviceProfile.getName() : null, savedDeviceProfile.getId(), savedDeviceProfile.isDefault()));

1
transport/coap/src/main/resources/tb-coap-transport.yml

@ -89,6 +89,7 @@ transport:
bind_address: "${COAP_BIND_ADDRESS:0.0.0.0}"
bind_port: "${COAP_BIND_PORT:5683}"
timeout: "${COAP_TIMEOUT:10000}"
piggyback_timeout: "${COAP_PIGGYBACK_TIMEOUT:500}"
psm_activity_timer: "${COAP_PSM_ACTIVITY_TIMER:10000}"
paging_transmission_window: "${COAP_PAGING_TRANSMISSION_WINDOW:10000}"
dtls:

Loading…
Cancel
Save