Browse Source

Merge branch 'develop/2.5.5' into feature/edge

pull/3481/head
Volodymyr Babak 6 years ago
parent
commit
7af1c86e17
  1. 3
      application/src/main/data/json/demo/rule_chains/root_rule_chain.json
  2. 3
      application/src/main/data/json/tenant/rule_chains/root_rule_chain.json
  3. 30
      application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
  4. 8
      application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingStrategyFactory.java
  5. 3
      application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java
  6. 4
      application/src/main/java/org/thingsboard/server/service/state/DeviceStateData.java
  7. 7
      application/src/main/java/org/thingsboard/server/service/subscription/DefaultSubscriptionManagerService.java
  8. 2
      application/src/main/java/org/thingsboard/server/service/subscription/SubscriptionManagerService.java
  9. 11
      application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
  10. 3
      application/src/main/java/org/thingsboard/server/service/ttl/AbstractCleanUpService.java
  11. 5
      application/src/main/resources/thingsboard.yml
  12. 1
      common/queue/src/main/java/org/thingsboard/server/queue/settings/TbRuleEngineQueueAckStrategyConfiguration.java
  13. 4
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java
  14. 9
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
  15. 1
      common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java
  16. 17
      dao/src/main/java/org/thingsboard/server/dao/event/BaseEventService.java
  17. 2
      dao/src/test/resources/sql-test.properties
  18. 2599
      msa/js-executor/package-lock.json
  19. 2
      msa/js-executor/package.json
  20. 19
      msa/js-executor/queue/kafkaTemplate.js
  21. 2
      rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java
  22. 16
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java
  23. 3
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNodeConfiguration.java
  24. 6
      rule-engine/rule-engine-components/src/main/resources/public/static/rulenode/rulenode-core-config.js
  25. 2
      transport/mqtt/src/main/resources/tb-mqtt-transport.yml
  26. 17
      ui/src/app/widget/lib/rpc/knob.directive.js

3
application/src/main/data/json/demo/rule_chains/root_rule_chain.json

@ -44,7 +44,8 @@
"name": "Save Client Attributes",
"debugMode": false,
"configuration": {
"scope": "CLIENT_SCOPE"
"scope": "CLIENT_SCOPE",
"notifyDevice": "false"
}
},
{

3
application/src/main/data/json/tenant/rule_chains/root_rule_chain.json

@ -32,7 +32,8 @@
"name": "Save Client Attributes",
"debugMode": false,
"configuration": {
"scope": "CLIENT_SCOPE"
"scope": "CLIENT_SCOPE",
"notifyDevice": "false"
}
},
{

30
application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java

@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.channel.EventLoopGroup;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.util.StringUtils;
import org.thingsboard.common.util.ListeningExecutor;
import org.thingsboard.rule.engine.api.MailService;
import org.thingsboard.rule.engine.api.RuleEngineRpcService;
@ -43,6 +44,7 @@ import org.thingsboard.server.common.data.rule.RuleNode;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.common.msg.queue.ServiceQueue;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.dao.alarm.AlarmService;
@ -68,7 +70,6 @@ import org.thingsboard.server.service.script.RuleNodeJsScriptEngine;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
/**
@ -123,7 +124,7 @@ class DefaultTbContext implements TbContext {
@Override
public void enqueue(TbMsg tbMsg, String queueName, Runnable onSuccess, Consumer<Throwable> onFailure) {
TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, queueName, getTenantId(), tbMsg.getOriginator());
TopicPartitionInfo tpi = resolvePartition(tbMsg, queueName);
enqueue(tpi, tbMsg, onFailure, onSuccess);
}
@ -140,46 +141,57 @@ class DefaultTbContext implements TbContext {
@Override
public void enqueueForTellFailure(TbMsg tbMsg, String failureMessage) {
TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getTenantId(), tbMsg.getOriginator());
TopicPartitionInfo tpi = resolvePartition(tbMsg);
enqueueForTellNext(tpi, tbMsg, Collections.singleton(TbRelationTypes.FAILURE), failureMessage, null, null);
}
@Override
public void enqueueForTellNext(TbMsg tbMsg, String relationType) {
TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getTenantId(), tbMsg.getOriginator());
TopicPartitionInfo tpi = resolvePartition(tbMsg);
enqueueForTellNext(tpi, tbMsg, Collections.singleton(relationType), null, null, null);
}
@Override
public void enqueueForTellNext(TbMsg tbMsg, Set<String> relationTypes) {
TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getTenantId(), tbMsg.getOriginator());
TopicPartitionInfo tpi = resolvePartition(tbMsg);
enqueueForTellNext(tpi, tbMsg, relationTypes, null, null, null);
}
@Override
public void enqueueForTellNext(TbMsg tbMsg, String relationType, Runnable onSuccess, Consumer<Throwable> onFailure) {
TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getTenantId(), tbMsg.getOriginator());
TopicPartitionInfo tpi = resolvePartition(tbMsg);
enqueueForTellNext(tpi, tbMsg, Collections.singleton(relationType), null, onSuccess, onFailure);
}
@Override
public void enqueueForTellNext(TbMsg tbMsg, Set<String> relationTypes, Runnable onSuccess, Consumer<Throwable> onFailure) {
TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getTenantId(), tbMsg.getOriginator());
TopicPartitionInfo tpi = resolvePartition(tbMsg);
enqueueForTellNext(tpi, tbMsg, relationTypes, null, onSuccess, onFailure);
}
@Override
public void enqueueForTellNext(TbMsg tbMsg, String queueName, String relationType, Runnable onSuccess, Consumer<Throwable> onFailure) {
TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, queueName, getTenantId(), tbMsg.getOriginator());
TopicPartitionInfo tpi = resolvePartition(tbMsg, queueName);
enqueueForTellNext(tpi, tbMsg, Collections.singleton(relationType), null, onSuccess, onFailure);
}
@Override
public void enqueueForTellNext(TbMsg tbMsg, String queueName, Set<String> relationTypes, Runnable onSuccess, Consumer<Throwable> onFailure) {
TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, queueName, getTenantId(), tbMsg.getOriginator());
TopicPartitionInfo tpi = resolvePartition(tbMsg, queueName);
enqueueForTellNext(tpi, tbMsg, relationTypes, null, onSuccess, onFailure);
}
private TopicPartitionInfo resolvePartition(TbMsg tbMsg, String queueName) {
if (StringUtils.isEmpty(queueName)) {
queueName = ServiceQueue.MAIN;
}
return mainCtx.resolve(ServiceType.TB_RULE_ENGINE, queueName, getTenantId(), tbMsg.getOriginator());
}
private TopicPartitionInfo resolvePartition(TbMsg tbMsg) {
return resolvePartition(tbMsg, tbMsg.getQueueName());
}
private void enqueueForTellNext(TopicPartitionInfo tpi, TbMsg source, Set<String> relationTypes, String failureMessage, Runnable onSuccess, Consumer<Throwable> onFailure) {
RuleChainId ruleChainId = nodeCtx.getSelf().getRuleChainId();
RuleNodeId ruleNodeId = nodeCtx.getSelf().getId();

8
application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingStrategyFactory.java

@ -56,7 +56,9 @@ public class TbRuleEngineProcessingStrategyFactory {
private final boolean retryTimeout;
private final int maxRetries;
private final double maxAllowedFailurePercentage;
private final long pauseBetweenRetries;
private final long maxPauseBetweenRetries;
private long pauseBetweenRetries;
private int initialTotalCount;
private int retryCount;
@ -69,6 +71,7 @@ public class TbRuleEngineProcessingStrategyFactory {
this.maxRetries = configuration.getRetries();
this.maxAllowedFailurePercentage = configuration.getFailurePercentage();
this.pauseBetweenRetries = configuration.getPauseBetweenRetries();
this.maxPauseBetweenRetries = configuration.getMaxPauseBetweenRetries();
}
@Override
@ -108,6 +111,9 @@ public class TbRuleEngineProcessingStrategyFactory {
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
if (maxPauseBetweenRetries > pauseBetweenRetries) {
pauseBetweenRetries = Math.min(maxPauseBetweenRetries, pauseBetweenRetries * 2);
}
}
return new TbRuleEngineProcessingDecision(false, toReprocess);
}

3
application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java

@ -392,7 +392,7 @@ public class DefaultDeviceStateService implements DeviceStateService {
if (stateData != null) {
DeviceState state = stateData.getState();
state.setActive(ts < state.getLastActivityTime() + state.getInactivityTimeout());
if (!state.isActive() && (state.getLastInactivityAlarmTime() == 0L || state.getLastInactivityAlarmTime() < state.getLastActivityTime())) {
if (!state.isActive() && (state.getLastInactivityAlarmTime() == 0L || state.getLastInactivityAlarmTime() < state.getLastActivityTime()) && stateData.getDeviceCreationTime() + state.getInactivityTimeout() < ts) {
state.setLastInactivityAlarmTime(ts);
pushRuleEngineMessage(stateData, INACTIVITY_EVENT);
save(deviceId, INACTIVITY_ALARM_TIME, ts);
@ -479,6 +479,7 @@ public class DefaultDeviceStateService implements DeviceStateService {
return DeviceStateData.builder()
.tenantId(device.getTenantId())
.deviceId(device.getId())
.deviceCreationTime(device.getCreatedTime())
.metaData(md)
.state(deviceState).build();
} catch (Exception e) {

4
application/src/main/java/org/thingsboard/server/service/state/DeviceStateData.java

@ -30,8 +30,8 @@ class DeviceStateData {
private final TenantId tenantId;
private final DeviceId deviceId;
private final long deviceCreationTime;
private TbMsgMetaData metaData;
private final DeviceState state;
}

7
application/src/main/java/org/thingsboard/server/service/subscription/DefaultSubscriptionManagerService.java

@ -216,6 +216,11 @@ public class DefaultSubscriptionManagerService implements SubscriptionManagerSer
@Override
public void onAttributesUpdate(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, TbCallback callback) {
onAttributesUpdate(tenantId, entityId, scope, attributes, callback, true);
}
@Override
public void onAttributesUpdate(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, TbCallback callback, boolean notifyDevice) {
onLocalSubUpdate(entityId,
s -> {
if (TbSubscriptionType.ATTRIBUTES.equals(s.getType())) {
@ -244,7 +249,7 @@ public class DefaultSubscriptionManagerService implements SubscriptionManagerSer
deviceStateService.onDeviceInactivityTimeoutUpdate(new DeviceId(entityId.getId()), attribute.getLongValue().orElse(0L));
}
}
} else if (TbAttributeSubscriptionScope.SHARED_SCOPE.name().equalsIgnoreCase(scope)) {
} else if (TbAttributeSubscriptionScope.SHARED_SCOPE.name().equalsIgnoreCase(scope) && notifyDevice) {
clusterService.pushMsgToCore(DeviceAttributesEventNotificationMsg.onUpdate(tenantId,
new DeviceId(entityId.getId()), DataConstants.SHARED_SCOPE, new ArrayList<>(attributes))
, null);

2
application/src/main/java/org/thingsboard/server/service/subscription/SubscriptionManagerService.java

@ -35,4 +35,6 @@ public interface SubscriptionManagerService extends ApplicationListener<Partitio
void onAttributesUpdate(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, TbCallback callback);
void onAttributesUpdate(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, TbCallback callback, boolean notifyDevice);
}

11
application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java

@ -128,9 +128,14 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
@Override
public void saveAndNotify(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, FutureCallback<Void> callback) {
saveAndNotify(tenantId, entityId, scope, attributes, callback, true);
}
@Override
public void saveAndNotify(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, FutureCallback<Void> callback, boolean notifyDevice) {
ListenableFuture<List<Void>> saveFuture = attrService.save(tenantId, entityId, scope, attributes);
addMainCallback(saveFuture, callback);
addWsCallback(saveFuture, success -> onAttributesUpdate(tenantId, entityId, scope, attributes));
addWsCallback(saveFuture, success -> onAttributesUpdate(tenantId, entityId, scope, attributes, notifyDevice));
}
@Override
@ -157,11 +162,11 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
, System.currentTimeMillis())), callback);
}
private void onAttributesUpdate(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes) {
private void onAttributesUpdate(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, boolean notifyDevice) {
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId);
if (currentPartitions.contains(tpi)) {
if (subscriptionManagerService.isPresent()) {
subscriptionManagerService.get().onAttributesUpdate(tenantId, entityId, scope, attributes, TbCallback.EMPTY);
subscriptionManagerService.get().onAttributesUpdate(tenantId, entityId, scope, attributes, TbCallback.EMPTY, notifyDevice);
} else {
log.warn("Possible misconfiguration because subscriptionManagerService is null!");
}

3
application/src/main/java/org/thingsboard/server/service/ttl/AbstractCleanUpService.java

@ -39,8 +39,7 @@ public abstract class AbstractCleanUpService {
protected String dbPassword;
protected long executeQuery(Connection conn, String query) throws SQLException {
try (Statement statement = conn.createStatement()) {
ResultSet resultSet = statement.executeQuery(query);
try (Statement statement = conn.createStatement(); ResultSet resultSet = statement.executeQuery(query)) {
if (log.isDebugEnabled()) {
getWarnings(statement);
}

5
application/src/main/resources/thingsboard.yml

@ -577,6 +577,8 @@ transport:
key_password: "${MQTT_SSL_KEY_PASSWORD:server_key_password}"
# Type of the key store
key_store_type: "${MQTT_SSL_KEY_STORE_TYPE:JKS}"
# Skip certificate validity check for client certificates.
skip_validity_check_for_client_cert: "${MQTT_SSL_SKIP_VALIDITY_CHECK_FOR_CLIENT_CERT:false}"
# Local CoAP transport parameters
coap:
# Enable/disable coap transport protocol.
@ -756,6 +758,7 @@ queue:
retries: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_RETRIES:3}" # Number of retries, 0 is unlimited
failure-percentage: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less then X percentage of messages;
pause-between-retries: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_RETRY_PAUSE:3}"# Time in seconds to wait in consumer thread before retries;
max-pause-between-retries: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_MAX_RETRY_PAUSE:3}"# Max allowed time in seconds for pause between retries.
- name: "${TB_QUEUE_RE_HP_QUEUE_NAME:HighPriority}"
topic: "${TB_QUEUE_RE_HP_TOPIC:tb_rule_engine.hp}"
poll-interval: "${TB_QUEUE_RE_HP_POLL_INTERVAL_MS:25}"
@ -771,6 +774,7 @@ queue:
retries: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_RETRIES:0}" # Number of retries, 0 is unlimited
failure-percentage: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less then X percentage of messages;
pause-between-retries: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_RETRY_PAUSE:5}"# Time in seconds to wait in consumer thread before retries;
max-pause-between-retries: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_MAX_RETRY_PAUSE:5}"# Max allowed time in seconds for pause between retries.
- name: "${TB_QUEUE_RE_SQ_QUEUE_NAME:SequentialByOriginator}"
topic: "${TB_QUEUE_RE_SQ_TOPIC:tb_rule_engine.sq}"
poll-interval: "${TB_QUEUE_RE_SQ_POLL_INTERVAL_MS:25}"
@ -786,6 +790,7 @@ queue:
retries: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_RETRIES:3}" # Number of retries, 0 is unlimited
failure-percentage: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less then X percentage of messages;
pause-between-retries: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_RETRY_PAUSE:5}"# Time in seconds to wait in consumer thread before retries;
max-pause-between-retries: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_MAX_RETRY_PAUSE:5}"# Max allowed time in seconds for pause between retries.
transport:
# For high priority notifications that require minimum latency and processing time
notifications_topic: "${TB_QUEUE_TRANSPORT_NOTIFICATIONS_TOPIC:tb_transport.notifications}"

1
common/queue/src/main/java/org/thingsboard/server/queue/settings/TbRuleEngineQueueAckStrategyConfiguration.java

@ -24,5 +24,6 @@ public class TbRuleEngineQueueAckStrategyConfiguration {
private int retries;
private double failurePercentage;
private long pauseBetweenRetries;
private long maxPauseBetweenRetries;
}

4
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java

@ -46,6 +46,10 @@ public class MqttTransportContext extends TransportContext {
@Value("${transport.mqtt.netty.max_payload_size}")
private Integer maxPayloadSize;
@Getter
@Value("${transport.mqtt.netty.skip_validity_check_for_client_cert:false}")
private boolean skipValidityCheckForClientCert;
@Getter
@Setter
private SslHandler sslHandler;

9
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java

@ -342,7 +342,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
private MqttMessage createUnSubAckMessage(int msgId) {
MqttFixedHeader mqttFixedHeader =
new MqttFixedHeader(UNSUBACK, false, AT_LEAST_ONCE, false, 0);
new MqttFixedHeader(UNSUBACK, false, AT_MOST_ONCE, false, 0);
MqttMessageIdVariableHeader mqttMessageIdVariableHeader = MqttMessageIdVariableHeader.from(msgId);
return new MqttMessage(mqttFixedHeader, mqttMessageIdVariableHeader);
}
@ -383,6 +383,9 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
private void processX509CertConnect(ChannelHandlerContext ctx, X509Certificate cert) {
try {
if(!context.isSkipValidityCheckForClientCert()){
cert.checkValidity();
}
String strCert = SslUtil.getX509CertificateString(cert);
String sha3Hash = EncryptionUtil.getSha3Hash(strCert);
transportService.process(ValidateDeviceX509CertRequestMsg.newBuilder().setHash(sha3Hash).build(),
@ -445,7 +448,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
private static MqttSubAckMessage createSubAckMessage(Integer msgId, List<Integer> grantedQoSList) {
MqttFixedHeader mqttFixedHeader =
new MqttFixedHeader(SUBACK, false, AT_LEAST_ONCE, false, 0);
new MqttFixedHeader(SUBACK, false, AT_MOST_ONCE, false, 0);
MqttMessageIdVariableHeader mqttMessageIdVariableHeader = MqttMessageIdVariableHeader.from(msgId);
MqttSubAckPayload mqttSubAckPayload = new MqttSubAckPayload(grantedQoSList);
return new MqttSubAckMessage(mqttFixedHeader, mqttMessageIdVariableHeader, mqttSubAckPayload);
@ -457,7 +460,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
public static MqttPubAckMessage createMqttPubAckMsg(int requestId) {
MqttFixedHeader mqttFixedHeader =
new MqttFixedHeader(PUBACK, false, AT_LEAST_ONCE, false, 0);
new MqttFixedHeader(PUBACK, false, AT_MOST_ONCE, false, 0);
MqttMessageIdVariableHeader mqttMsgIdVariableHeader =
MqttMessageIdVariableHeader.from(requestId);
return new MqttPubAckMessage(mqttFixedHeader, mqttMsgIdVariableHeader);

1
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java

@ -298,6 +298,7 @@ public class DefaultTransportService implements TransportService {
TbMsgMetaData metaData = new TbMsgMetaData();
metaData.putValue("deviceName", sessionInfo.getDeviceName());
metaData.putValue("deviceType", sessionInfo.getDeviceType());
metaData.putValue("notifyDevice", "false");
TbMsg tbMsg = TbMsg.newMsg(SessionMsgType.POST_ATTRIBUTES_REQUEST.name(), deviceId, metaData, gson.toJson(json));
sendToRuleEngine(tenantId, tbMsg, new TransportTbQueueCallback(callback));
}

17
dao/src/main/java/org/thingsboard/server/dao/event/BaseEventService.java

@ -15,6 +15,7 @@
*/
package org.thingsboard.server.dao.event;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
@ -28,6 +29,7 @@ import org.thingsboard.server.common.data.page.TimePageLink;
import org.thingsboard.server.dao.exception.DataValidationException;
import org.thingsboard.server.dao.service.DataValidator;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Optional;
@ -35,6 +37,8 @@ import java.util.Optional;
@Slf4j
public class BaseEventService implements EventService {
private static final int MAX_DEBUG_EVENT_SYMBOLS = 4 * 1024;
@Autowired
public EventDao eventDao;
@ -47,6 +51,7 @@ public class BaseEventService implements EventService {
@Override
public ListenableFuture<Event> saveAsync(Event event) {
eventValidator.validate(event, Event::getTenantId);
checkAndTruncateDebugEvent(event);
return eventDao.saveAsync(event);
}
@ -56,9 +61,21 @@ public class BaseEventService implements EventService {
if (StringUtils.isEmpty(event.getUid())) {
throw new DataValidationException("Event uid should be specified!");
}
checkAndTruncateDebugEvent(event);
return eventDao.saveIfNotExists(event);
}
private void checkAndTruncateDebugEvent(Event event) {
if (event.getType().startsWith("DEBUG") && event.getBody() != null && event.getBody().has("data")) {
String dataStr = event.getBody().get("data").asText();
int length = dataStr.length();
if (length > MAX_DEBUG_EVENT_SYMBOLS) {
((ObjectNode) event.getBody()).put("data", dataStr.substring(0, MAX_DEBUG_EVENT_SYMBOLS) + "...[truncated " + (length - MAX_DEBUG_EVENT_SYMBOLS) + " symbols]");
log.trace("[{}] Event was truncated: {}", event.getId(), dataStr);
}
}
}
@Override
public Optional<Event> findEvent(TenantId tenantId, EntityId entityId, String eventType, String eventUid) {
if (tenantId == null) {

2
dao/src/test/resources/sql-test.properties

@ -12,7 +12,7 @@ spring.jpa.database-platform=org.hibernate.dialect.HSQLDialect
spring.datasource.username=sa
spring.datasource.password=
spring.datasource.url=jdbc:hsqldb:file:/tmp/testDb;sql.enforce_size=false
spring.datasource.url=jdbc:hsqldb:file:target/tmp/testDb;sql.enforce_size=false
spring.datasource.driverClassName=org.hsqldb.jdbc.JDBCDriver
spring.datasource.hikari.maximumPoolSize = 50

2599
msa/js-executor/package-lock.json

File diff suppressed because it is too large

2
msa/js-executor/package.json

@ -14,7 +14,7 @@
"dependencies": {
"config": "^3.2.2",
"js-yaml": "^3.12.0",
"kafkajs": "^1.12.0",
"kafkajs": "^1.14.0",
"@google-cloud/pubsub": "^1.7.1",
"aws-sdk": "^2.663.0",
"amqplib": "^0.5.5",

19
msa/js-executor/queue/kafkaTemplate.js

@ -27,20 +27,10 @@ let kafkaAdmin;
let consumer;
let producer;
const topics = [];
const configEntries = [];
function KafkaProducer() {
this.send = async (responseTopic, scriptId, rawResponse, headers) => {
if (!topics.includes(responseTopic)) {
let createResponseTopicResult = await createTopic(responseTopic, 1);
topics.push(responseTopic);
if (createResponseTopicResult) {
logger.info('Created new topic: %s', requestTopic);
}
}
return producer.send(
{
topic: responseTopic,
@ -99,10 +89,13 @@ function KafkaProducer() {
}
}
let createRequestTopicResult = await createTopic(requestTopic, partitions);
let topics = await kafkaAdmin.listTopics();
if (createRequestTopicResult) {
logger.info('Created new topic: %s', requestTopic);
if (!topics.includes(requestTopic)) {
let createRequestTopicResult = await createTopic(requestTopic, partitions);
if (createRequestTopicResult) {
logger.info('Created new topic: %s', requestTopic);
}
}
consumer = kafkaClient.consumer({groupId: 'js-executor-group'});

2
rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java

@ -36,6 +36,8 @@ public interface RuleEngineTelemetryService {
void saveAndNotify(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, FutureCallback<Void> callback);
void saveAndNotify(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, FutureCallback<Void> callback, boolean notifyDevice);
void saveAttrAndNotify(TenantId tenantId, EntityId entityId, String scope, String key, long value, FutureCallback<Void> callback);
void saveAttrAndNotify(TenantId tenantId, EntityId entityId, String scope, String key, String value, FutureCallback<Void> callback);

16
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java

@ -17,7 +17,7 @@ package org.thingsboard.rule.engine.telemetry;
import com.google.gson.JsonParser;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StringUtils;
import org.apache.commons.lang3.StringUtils;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
@ -53,6 +53,9 @@ public class TbMsgAttributesNode implements TbNode {
@Override
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
this.config = TbNodeUtils.convert(configuration, TbMsgAttributesNodeConfiguration.class);
if (config.getNotifyDevice() == null) {
config.setNotifyDevice(true);
}
}
@Override
@ -64,8 +67,15 @@ public class TbMsgAttributesNode implements TbNode {
String src = msg.getData();
Set<AttributeKvEntry> attributes = JsonConverter.convertToAttributes(new JsonParser().parse(src));
msg.getMetaData().putValue(SCOPE, config.getScope());
ctx.getTelemetryService().saveAndNotify(ctx.getTenantId(), msg.getOriginator(), config.getScope(),
new ArrayList<>(attributes), new TelemetryNodeCallback(ctx, msg));
String notifyDeviceStr = msg.getMetaData().getValue("notifyDevice");
ctx.getTelemetryService().saveAndNotify(
ctx.getTenantId(),
msg.getOriginator(),
config.getScope(),
new ArrayList<>(attributes),
new TelemetryNodeCallback(ctx, msg),
config.getNotifyDevice() || StringUtils.isEmpty(notifyDeviceStr) || Boolean.parseBoolean(notifyDeviceStr)
);
}
@Override

3
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNodeConfiguration.java

@ -24,10 +24,13 @@ public class TbMsgAttributesNodeConfiguration implements NodeConfiguration<TbMsg
private String scope;
private Boolean notifyDevice;
@Override
public TbMsgAttributesNodeConfiguration defaultConfiguration() {
TbMsgAttributesNodeConfiguration configuration = new TbMsgAttributesNodeConfiguration();
configuration.setScope(DataConstants.SERVER_SCOPE);
configuration.setNotifyDevice(false);
return configuration;
}
}

6
rule-engine/rule-engine-components/src/main/resources/public/static/rulenode/rulenode-core-config.js

File diff suppressed because one or more lines are too long

2
transport/mqtt/src/main/resources/tb-mqtt-transport.yml

@ -67,6 +67,8 @@ transport:
key_password: "${MQTT_SSL_KEY_PASSWORD:server_key_password}"
# Type of the key store
key_store_type: "${MQTT_SSL_KEY_STORE_TYPE:JKS}"
# Skip certificate validity check for client certificates.
skip_validity_check_for_client_cert: "${MQTT_SSL_SKIP_VALIDITY_CHECK_FOR_CLIENT_CERT:false}"
sessions:
inactivity_timeout: "${TB_TRANSPORT_SESSIONS_INACTIVITY_TIMEOUT:300000}"
report_timeout: "${TB_TRANSPORT_SESSIONS_REPORT_TIMEOUT:30000}"

17
ui/src/app/widget/lib/rpc/knob.directive.js

@ -50,6 +50,7 @@ function KnobController($element, $scope, $document) {
vm.value = 0;
vm.error = '';
vm.newValue = 0;
var knob = angular.element('.knob', $element),
knobContainer = angular.element('#knob-container', $element),
@ -145,9 +146,11 @@ function KnobController($element, $scope, $document) {
turn(degreeToRatio(currentDeg));
rotation = currentDeg;
startDeg = -1;
rpcUpdateValue(vm.newValue);
});
knob.on('mousedown touchstart', (e) => {
moving = false;
e.preventDefault();
var offset = knob.offset();
var center = {
@ -158,7 +161,7 @@ function KnobController($element, $scope, $document) {
var a, b, deg, tmp,
rad2deg = 180/Math.PI;
knob.on('mousemove.rem touchmove.rem', (e) => {
$document.on('mousemove.rem touchmove.rem', (e) => {
moving = true;
e = (e.originalEvent.touches) ? e.originalEvent.touches[0] : e;
@ -209,6 +212,9 @@ function KnobController($element, $scope, $document) {
});
$document.on('mouseup.rem touchend.rem',() => {
if(moving) {
rpcUpdateValue(vm.newValue);
}
knob.off('.rem');
$document.off('.rem');
rotation = currentDeg;
@ -269,12 +275,12 @@ function KnobController($element, $scope, $document) {
}
function turn(ratio) {
var value = (vm.minValue + (vm.maxValue - vm.minValue)*ratio).toFixed(vm.ctx.decimals);
if (canvasBar.value != value) {
canvasBar.value = value;
vm.newValue = (vm.minValue + (vm.maxValue - vm.minValue)*ratio).toFixed(vm.ctx.decimals);
if (canvasBar.value != vm.newValue) {
canvasBar.value = vm.newValue;
}
updateColor(canvasBar.getValueColor());
onValue(value);
onValue(vm.newValue);
}
function setValue(value) {
@ -303,7 +309,6 @@ function KnobController($element, $scope, $document) {
$scope.$applyAsync(() => {
vm.value = formatValue(value);
checkValueSize();
rpcUpdateValue(value);
});
}

Loading…
Cancel
Save