Browse Source

Merge remote-tracking branch 'origin/develop/1.5' into feature/rpc-refactoring

pull/725/head
Andrew Shvayka 8 years ago
parent
commit
eb6f0f394e
  1. 5
      application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
  2. 5
      application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
  3. 7
      application/src/main/java/org/thingsboard/server/controller/RuleChainController.java
  4. 59
      application/src/main/java/org/thingsboard/server/service/executors/AbstractListeningExecutor.java
  5. 32
      application/src/main/java/org/thingsboard/server/service/executors/DbCallbackExecutorService.java
  6. 31
      application/src/main/java/org/thingsboard/server/service/mail/MailExecutorService.java
  7. 30
      application/src/main/java/org/thingsboard/server/service/script/JsExecutorService.java
  8. 4
      application/src/main/java/org/thingsboard/server/service/script/NashornJsEngine.java
  9. 3
      application/src/main/resources/thingsboard.yml
  10. 3
      rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/ListeningExecutor.java
  11. 2
      rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
  12. 27
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbAlarmNode.java
  13. 4
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbLogNode.java
  14. 4
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mail/TbMsgToEmailNode.java
  15. 3
      rule-engine/rule-engine-components/src/main/resources/public/static/rulenode/rulenode-core-config.js
  16. 55
      ui/src/app/rulechain/rulechain.controller.js

5
application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java

@ -59,6 +59,7 @@ import org.thingsboard.server.service.cluster.discovery.DiscoveryService;
import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
import org.thingsboard.server.service.component.ComponentDiscoveryService;
import org.thingsboard.server.service.executors.DbCallbackExecutorService;
import org.thingsboard.server.service.mail.MailExecutorService;
import org.thingsboard.server.service.script.JsExecutorService;
import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
@ -173,6 +174,10 @@ public class ActorSystemContext {
@Getter
private MailExecutorService mailExecutor;
@Autowired
@Getter
private DbCallbackExecutorService dbCallbackExecutor;
@Autowired
@Getter
private MailService mailService;

5
application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java

@ -137,6 +137,11 @@ class DefaultTbContext implements TbContext {
return mainCtx.getMailExecutor();
}
@Override
public ListeningExecutor getDbCallbackExecutor() {
return mainCtx.getDbCallbackExecutor();
}
@Override
public ScriptEngine createJsScriptEngine(String script, String functionName, String... argNames) {
return new NashornJsEngine(script, functionName, argNames);

7
application/src/main/java/org/thingsboard/server/controller/RuleChainController.java

@ -253,6 +253,13 @@ public class RuleChainController extends BaseController {
Set<String> states = engine.executeSwitch(inMsg);
output = objectMapper.writeValueAsString(states);
break;
case "json":
JsonNode json = engine.executeJson(inMsg);
output = objectMapper.writeValueAsString(json);
break;
case "string":
output = engine.executeToString(inMsg);
break;
default:
throw new IllegalArgumentException("Unsupported script type: " + scriptType);
}

59
application/src/main/java/org/thingsboard/server/service/executors/AbstractListeningExecutor.java

@ -0,0 +1,59 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.executors;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.thingsboard.rule.engine.api.ListeningExecutor;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
/**
* Created by igor on 4/13/18.
*/
public abstract class AbstractListeningExecutor implements ListeningExecutor {
private ListeningExecutorService service;
@PostConstruct
public void init() {
this.service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(getThreadPollSize()));
}
@PreDestroy
public void destroy() {
if (this.service != null) {
this.service.shutdown();
}
}
@Override
public <T> ListenableFuture<T> executeAsync(Callable<T> task) {
return service.submit(task);
}
@Override
public void execute(Runnable command) {
service.execute(command);
}
protected abstract int getThreadPollSize();
}

32
application/src/main/java/org/thingsboard/server/service/executors/DbCallbackExecutorService.java

@ -0,0 +1,32 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.executors;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
public class DbCallbackExecutorService extends AbstractListeningExecutor {
@Value("${actors.rule.db_callback_thread_pool_size}")
private int dbCallbackExecutorThreadPoolSize;
@Override
protected int getThreadPollSize() {
return dbCallbackExecutorThreadPoolSize;
}
}

31
application/src/main/java/org/thingsboard/server/service/mail/MailExecutorService.java

@ -15,40 +15,19 @@
*/
package org.thingsboard.server.service.mail;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.thingsboard.rule.engine.api.ListeningExecutor;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import org.thingsboard.server.service.executors.AbstractListeningExecutor;
@Component
public class MailExecutorService implements ListeningExecutor {
public class MailExecutorService extends AbstractListeningExecutor {
@Value("${actors.rule.mail_thread_pool_size}")
private int mailExecutorThreadPoolSize;
private ListeningExecutorService service;
@PostConstruct
public void init() {
this.service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(mailExecutorThreadPoolSize));
}
@PreDestroy
public void destroy() {
if (this.service != null) {
this.service.shutdown();
}
}
@Override
public <T> ListenableFuture<T> executeAsync(Callable<T> task) {
return service.submit(task);
protected int getThreadPollSize() {
return mailExecutorThreadPoolSize;
}
}

30
application/src/main/java/org/thingsboard/server/service/script/JsExecutorService.java

@ -15,41 +15,19 @@
*/
package org.thingsboard.server.service.script;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.thingsboard.rule.engine.api.ListeningExecutor;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import org.thingsboard.server.service.executors.AbstractListeningExecutor;
@Component
public class JsExecutorService implements ListeningExecutor{
public class JsExecutorService extends AbstractListeningExecutor {
@Value("${actors.rule.js_thread_pool_size}")
private int jsExecutorThreadPoolSize;
private ListeningExecutorService service;
@PostConstruct
public void init() {
this.service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(jsExecutorThreadPoolSize));
}
@PreDestroy
public void destroy() {
if (this.service != null) {
this.service.shutdown();
}
}
@Override
public <T> ListenableFuture<T> executeAsync(Callable<T> task) {
return service.submit(task);
protected int getThreadPollSize() {
return jsExecutorThreadPoolSize;
}
}

4
application/src/main/java/org/thingsboard/server/service/script/NashornJsEngine.java

@ -153,6 +153,10 @@ public class NashornJsEngine implements org.thingsboard.rule.engine.api.ScriptEn
@Override
public String executeToString(TbMsg msg) throws ScriptException {
JsonNode result = executeScript(msg);
if (!result.isTextual()) {
log.warn("Wrong result type: {}", result.getNodeType());
throw new ScriptException("Wrong result type: " + result.getNodeType());
}
return result.asText();
}

3
application/src/main/resources/thingsboard.yml

@ -219,8 +219,11 @@ actors:
termination.delay: "${ACTORS_RULE_TERMINATION_DELAY:30000}"
# Errors for particular actor are persisted once per specified amount of milliseconds
error_persist_frequency: "${ACTORS_RULE_ERROR_FREQUENCY:3000}"
# Specify thread pool size for database request callbacks executor service
db_callback_thread_pool_size: "${ACTORS_RULE_DB_CALLBACK_THREAD_POOL_SIZE:1}"
# Specify thread pool size for javascript executor service
js_thread_pool_size: "${ACTORS_RULE_JS_THREAD_POOL_SIZE:10}"
# Specify thread pool size for mail sender executor service
mail_thread_pool_size: "${ACTORS_RULE_MAIL_THREAD_POOL_SIZE:10}"
chain:
# Errors for particular actor are persisted once per specified amount of milliseconds

3
rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/ListeningExecutor.java

@ -18,8 +18,9 @@ package org.thingsboard.rule.engine.api;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
public interface ListeningExecutor {
public interface ListeningExecutor extends Executor {
<T> ListenableFuture<T> executeAsync(Callable<T> task);

2
rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java

@ -88,6 +88,8 @@ public interface TbContext {
ListeningExecutor getMailExecutor();
ListeningExecutor getDbCallbackExecutor();
MailService getMailService();
ScriptEngine createJsScriptEngine(String script, String functionName, String... argNames);

27
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbAlarmNode.java

@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
*
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -32,6 +32,8 @@ import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import java.util.concurrent.ExecutorService;
import static org.thingsboard.rule.engine.DonAsynchron.withCallback;
@Slf4j
@ -47,7 +49,8 @@ import static org.thingsboard.rule.engine.DonAsynchron.withCallback;
"If alarm was not created, original message is returned. Otherwise new Message returned with type 'ALARM', Alarm object in 'msg' property and 'matadata' will contains one of those properties 'isNewAlarm/isExistingAlarm/isClearedAlarm' " +
"Message payload can be accessed via <code>msg</code> property. For example <code>'temperature = ' + msg.temperature ;</code>" +
"Message metadata can be accessed via <code>metadata</code> property. For example <code>'name = ' + metadata.customerName;</code>",
uiResources = {"static/rulenode/rulenode-core-config.js"})
uiResources = {"static/rulenode/rulenode-core-config.js"},
configDirective = "tbActionNodeAlarmConfig")
public class TbAlarmNode implements TbNode {
@ -81,7 +84,7 @@ public class TbAlarmNode implements TbNode {
} else {
return checkForClearIfExist(ctx, msg);
}
});
}, ctx.getDbCallbackExecutor());
withCallback(transform,
alarmResult -> {
@ -107,7 +110,7 @@ public class TbAlarmNode implements TbNode {
} else {
return updateAlarm(ctx, msg, a);
}
});
}, ctx.getDbCallbackExecutor());
}
private ListenableFuture<AlarmResult> checkForClearIfExist(TbContext ctx, TbMsg msg) {
@ -117,12 +120,14 @@ public class TbAlarmNode implements TbNode {
return clearAlarm(ctx, msg, a);
}
return Futures.immediateFuture(new AlarmResult(false, false, false, null));
});
}, ctx.getDbCallbackExecutor());
}
private ListenableFuture<AlarmResult> createNewAlarm(TbContext ctx, TbMsg msg) {
ListenableFuture<Alarm> asyncAlarm = Futures.transform(buildAlarmDetails(ctx, msg), (Function<JsonNode, Alarm>) details -> buildAlarm(msg, details, ctx.getTenantId()));
ListenableFuture<Alarm> asyncCreated = Futures.transform(asyncAlarm, (Function<Alarm, Alarm>) alarm -> ctx.getAlarmService().createOrUpdateAlarm(alarm));
ListenableFuture<Alarm> asyncAlarm = Futures.transform(buildAlarmDetails(ctx, msg),
(Function<JsonNode, Alarm>) details -> buildAlarm(msg, details, ctx.getTenantId()));
ListenableFuture<Alarm> asyncCreated = Futures.transform(asyncAlarm,
(Function<Alarm, Alarm>) alarm -> ctx.getAlarmService().createOrUpdateAlarm(alarm), ctx.getDbCallbackExecutor());
return Futures.transform(asyncCreated, (Function<Alarm, AlarmResult>) alarm -> new AlarmResult(true, false, false, alarm));
}
@ -133,7 +138,7 @@ public class TbAlarmNode implements TbNode {
alarm.setDetails(details);
alarm.setEndTs(System.currentTimeMillis());
return ctx.getAlarmService().createOrUpdateAlarm(alarm);
});
}, ctx.getDbCallbackExecutor());
return Futures.transform(asyncUpdated, (Function<Alarm, AlarmResult>) a -> new AlarmResult(false, true, false, a));
}

4
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbLogNode.java

@ -31,7 +31,9 @@ import static org.thingsboard.rule.engine.DonAsynchron.withCallback;
nodeDescription = "Log incoming messages using JS script for transformation Message into String",
nodeDetails = "Transform incoming Message with configured JS condition to String and log final value. " +
"Message payload can be accessed via <code>msg</code> property. For example <code>'temperature = ' + msg.temperature ;</code>" +
"Message metadata can be accessed via <code>metadata</code> property. For example <code>'name = ' + metadata.customerName;</code>")
"Message metadata can be accessed via <code>metadata</code> property. For example <code>'name = ' + metadata.customerName;</code>",
uiResources = {"static/rulenode/rulenode-core-config.js"},
configDirective = "tbActionNodeLogConfig")
public class TbLogNode implements TbNode {

4
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mail/TbMsgToEmailNode.java

@ -40,7 +40,9 @@ import static org.thingsboard.rule.engine.mail.TbSendEmailNode.SEND_EMAIL_TYPE;
configClazz = TbMsgToEmailNodeConfiguration.class,
nodeDescription = "Change Message Originator To Tenant/Customer/Related Entity",
nodeDetails = "Related Entity found using configured relation direction and Relation Type. " +
"If multiple Related Entities are found, only first Entity is used as new Originator, other entities are discarded. ")
"If multiple Related Entities are found, only first Entity is used as new Originator, other entities are discarded. ",
uiResources = {"static/rulenode/rulenode-core-config.js"},
configDirective = "tbTransformationNodeToEmailConfig")
public class TbMsgToEmailNode implements TbNode {
private static final ObjectMapper MAPPER = new ObjectMapper();

3
rule-engine/rule-engine-components/src/main/resources/public/static/rulenode/rulenode-core-config.js

File diff suppressed because one or more lines are too long

55
ui/src/app/rulechain/rulechain.controller.js

@ -62,6 +62,7 @@ export function RuleChainController($state, $scope, $compile, $q, $mdUtil, $time
vm.isEditingRuleNodeLink = false;
vm.isLibraryOpen = true;
vm.enableHotKeys = true;
Object.defineProperty(vm, 'isLibraryOpenReadonly', {
get: function() { return vm.isLibraryOpen },
@ -327,8 +328,10 @@ export function RuleChainController($state, $scope, $compile, $q, $mdUtil, $time
description: $translate.instant('rulenode.select-all-objects'),
allowIn: ['INPUT', 'SELECT', 'TEXTAREA'],
callback: function (event) {
event.preventDefault();
vm.modelservice.selectAll();
if (vm.enableHotKeys) {
event.preventDefault();
vm.modelservice.selectAll();
}
}
})
.add({
@ -336,8 +339,10 @@ export function RuleChainController($state, $scope, $compile, $q, $mdUtil, $time
description: $translate.instant('rulenode.copy-selected'),
allowIn: ['INPUT', 'SELECT', 'TEXTAREA'],
callback: function (event) {
event.preventDefault();
copyRuleNodes();
if (vm.enableHotKeys) {
event.preventDefault();
copyRuleNodes();
}
}
})
.add({
@ -345,9 +350,11 @@ export function RuleChainController($state, $scope, $compile, $q, $mdUtil, $time
description: $translate.instant('action.paste'),
allowIn: ['INPUT', 'SELECT', 'TEXTAREA'],
callback: function (event) {
event.preventDefault();
if (itembuffer.hasRuleNodes()) {
pasteRuleNodes();
if (vm.enableHotKeys) {
event.preventDefault();
if (itembuffer.hasRuleNodes()) {
pasteRuleNodes();
}
}
}
})
@ -356,9 +363,11 @@ export function RuleChainController($state, $scope, $compile, $q, $mdUtil, $time
description: $translate.instant('rulenode.deselect-all-objects'),
allowIn: ['INPUT', 'SELECT', 'TEXTAREA'],
callback: function (event) {
event.preventDefault();
event.stopPropagation();
vm.modelservice.deselectAll();
if (vm.enableHotKeys) {
event.preventDefault();
event.stopPropagation();
vm.modelservice.deselectAll();
}
}
})
.add({
@ -366,8 +375,10 @@ export function RuleChainController($state, $scope, $compile, $q, $mdUtil, $time
description: $translate.instant('action.apply'),
allowIn: ['INPUT', 'SELECT', 'TEXTAREA'],
callback: function (event) {
event.preventDefault();
vm.saveRuleChain();
if (vm.enableHotKeys) {
event.preventDefault();
vm.saveRuleChain();
}
}
})
.add({
@ -375,8 +386,10 @@ export function RuleChainController($state, $scope, $compile, $q, $mdUtil, $time
description: $translate.instant('action.decline-changes'),
allowIn: ['INPUT', 'SELECT', 'TEXTAREA'],
callback: function (event) {
event.preventDefault();
vm.revertRuleChain();
if (vm.enableHotKeys) {
event.preventDefault();
vm.revertRuleChain();
}
}
})
.add({
@ -384,8 +397,10 @@ export function RuleChainController($state, $scope, $compile, $q, $mdUtil, $time
description: $translate.instant('rulenode.delete-selected-objects'),
allowIn: ['INPUT', 'SELECT', 'TEXTAREA'],
callback: function (event) {
event.preventDefault();
vm.modelservice.deleteSelected();
if (vm.enableHotKeys) {
event.preventDefault();
vm.modelservice.deleteSelected();
}
}
})
}
@ -574,6 +589,7 @@ export function RuleChainController($state, $scope, $compile, $q, $mdUtil, $time
$scope.$watch(function() {
return vm.isEditingRuleNode || vm.isEditingRuleNodeLink;
}, (val) => {
vm.enableHotKeys = !val;
updateErrorTooltips(val);
});
@ -605,12 +621,15 @@ export function RuleChainController($state, $scope, $compile, $q, $mdUtil, $time
}
} else {
var labels = ruleChainService.getRuleNodeSupportedLinks(sourceNode.component);
vm.enableHotKeys = false;
addRuleNodeLink(event, edge, labels).then(
(link) => {
deferred.resolve(link);
vm.enableHotKeys = true;
},
() => {
deferred.reject();
vm.enableHotKeys = true;
}
);
}
@ -1159,6 +1178,8 @@ export function RuleChainController($state, $scope, $compile, $q, $mdUtil, $time
var ruleChainId = vm.ruleChain.id ? vm.ruleChain.id.id : null;
vm.enableHotKeys = false;
$mdDialog.show({
controller: 'AddRuleNodeController',
controllerAs: 'vm',
@ -1188,7 +1209,9 @@ export function RuleChainController($state, $scope, $compile, $q, $mdUtil, $time
}
vm.ruleChainModel.nodes.push(ruleNode);
updateRuleNodesHighlight();
vm.enableHotKeys = true;
}, function () {
vm.enableHotKeys = true;
});
}

Loading…
Cancel
Save