38 changed files with 1582 additions and 208 deletions
@ -0,0 +1,27 @@ |
|||||
|
/** |
||||
|
* Copyright © 2016-2018 The Thingsboard Authors |
||||
|
* |
||||
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
|
* you may not use this file except in compliance with the License. |
||||
|
* You may obtain a copy of the License at |
||||
|
* |
||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
* |
||||
|
* Unless required by applicable law or agreed to in writing, software |
||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
|
* See the License for the specific language governing permissions and |
||||
|
* limitations under the License. |
||||
|
*/ |
||||
|
package org.thingsboard.rule.engine.api; |
||||
|
|
||||
|
import com.google.common.util.concurrent.ListenableFuture; |
||||
|
|
||||
|
import java.util.concurrent.Callable; |
||||
|
|
||||
|
public interface ListeningExecutor { |
||||
|
|
||||
|
<T> ListenableFuture<T> executeAsync(Callable<T> task); |
||||
|
|
||||
|
void onDestroy(); |
||||
|
} |
||||
@ -0,0 +1,58 @@ |
|||||
|
/** |
||||
|
* Copyright © 2016-2018 The Thingsboard Authors |
||||
|
* |
||||
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
|
* you may not use this file except in compliance with the License. |
||||
|
* You may obtain a copy of the License at |
||||
|
* |
||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
* |
||||
|
* Unless required by applicable law or agreed to in writing, software |
||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
|
* See the License for the specific language governing permissions and |
||||
|
* limitations under the License. |
||||
|
*/ |
||||
|
package org.thingsboard.rule.engine.filter; |
||||
|
|
||||
|
import lombok.extern.slf4j.Slf4j; |
||||
|
import org.thingsboard.rule.engine.TbNodeUtils; |
||||
|
import org.thingsboard.rule.engine.api.*; |
||||
|
import org.thingsboard.rule.engine.js.NashornJsEngine; |
||||
|
import org.thingsboard.server.common.msg.TbMsg; |
||||
|
|
||||
|
import javax.script.Bindings; |
||||
|
|
||||
|
import static org.thingsboard.rule.engine.DonAsynchron.withCallback; |
||||
|
|
||||
|
@Slf4j |
||||
|
public class TbJsFilterNode implements TbNode { |
||||
|
|
||||
|
private TbJsFilterNodeConfiguration config; |
||||
|
private NashornJsEngine jsEngine; |
||||
|
|
||||
|
@Override |
||||
|
public void init(TbNodeConfiguration configuration, TbNodeState state) throws TbNodeException { |
||||
|
this.config = TbNodeUtils.convert(configuration, TbJsFilterNodeConfiguration.class); |
||||
|
this.jsEngine = new NashornJsEngine(config.getJsScript()); |
||||
|
} |
||||
|
|
||||
|
@Override |
||||
|
public void onMsg(TbContext ctx, TbMsg msg) { |
||||
|
ListeningExecutor jsExecutor = ctx.getJsExecutor(); |
||||
|
withCallback(jsExecutor.executeAsync(() -> jsEngine.executeFilter(toBindings(msg))), |
||||
|
filterResult -> ctx.tellNext(msg, Boolean.toString(filterResult)), |
||||
|
t -> ctx.tellError(msg, t)); |
||||
|
} |
||||
|
|
||||
|
private Bindings toBindings(TbMsg msg) { |
||||
|
return NashornJsEngine.bindMsg(msg); |
||||
|
} |
||||
|
|
||||
|
@Override |
||||
|
public void destroy() { |
||||
|
if (jsEngine != null) { |
||||
|
jsEngine.destroy(); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,24 @@ |
|||||
|
/** |
||||
|
* Copyright © 2016-2018 The Thingsboard Authors |
||||
|
* |
||||
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
|
* you may not use this file except in compliance with the License. |
||||
|
* You may obtain a copy of the License at |
||||
|
* |
||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
* |
||||
|
* Unless required by applicable law or agreed to in writing, software |
||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
|
* See the License for the specific language governing permissions and |
||||
|
* limitations under the License. |
||||
|
*/ |
||||
|
package org.thingsboard.rule.engine.filter; |
||||
|
|
||||
|
import lombok.Data; |
||||
|
|
||||
|
@Data |
||||
|
public class TbJsFilterNodeConfiguration { |
||||
|
|
||||
|
private String jsScript; |
||||
|
} |
||||
@ -0,0 +1,82 @@ |
|||||
|
/** |
||||
|
* Copyright © 2016-2018 The Thingsboard Authors |
||||
|
* |
||||
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
|
* you may not use this file except in compliance with the License. |
||||
|
* You may obtain a copy of the License at |
||||
|
* |
||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
* |
||||
|
* Unless required by applicable law or agreed to in writing, software |
||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
|
* See the License for the specific language governing permissions and |
||||
|
* limitations under the License. |
||||
|
*/ |
||||
|
package org.thingsboard.rule.engine.filter; |
||||
|
|
||||
|
import lombok.extern.slf4j.Slf4j; |
||||
|
import org.thingsboard.rule.engine.TbNodeUtils; |
||||
|
import org.thingsboard.rule.engine.api.*; |
||||
|
import org.thingsboard.rule.engine.js.NashornJsEngine; |
||||
|
import org.thingsboard.server.common.msg.TbMsg; |
||||
|
|
||||
|
import javax.script.Bindings; |
||||
|
import java.util.Set; |
||||
|
|
||||
|
import static org.thingsboard.rule.engine.DonAsynchron.withCallback; |
||||
|
|
||||
|
@Slf4j |
||||
|
public class TbJsSwitchNode implements TbNode { |
||||
|
|
||||
|
private TbJsSwitchNodeConfiguration config; |
||||
|
private NashornJsEngine jsEngine; |
||||
|
|
||||
|
@Override |
||||
|
public void init(TbNodeConfiguration configuration, TbNodeState state) throws TbNodeException { |
||||
|
this.config = TbNodeUtils.convert(configuration, TbJsSwitchNodeConfiguration.class); |
||||
|
if (config.getAllowedRelations().size() < 1) { |
||||
|
String message = "Switch node should have at least 1 relation"; |
||||
|
log.error(message); |
||||
|
throw new IllegalStateException(message); |
||||
|
} |
||||
|
if (!config.isRouteToAllWithNoCheck()) { |
||||
|
this.jsEngine = new NashornJsEngine(config.getJsScript()); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
@Override |
||||
|
public void onMsg(TbContext ctx, TbMsg msg) { |
||||
|
if (config.isRouteToAllWithNoCheck()) { |
||||
|
ctx.tellNext(msg, config.getAllowedRelations()); |
||||
|
return; |
||||
|
} |
||||
|
ListeningExecutor jsExecutor = ctx.getJsExecutor(); |
||||
|
withCallback(jsExecutor.executeAsync(() -> jsEngine.executeSwitch(toBindings(msg))), |
||||
|
result -> processSwitch(ctx, msg, result), |
||||
|
t -> ctx.tellError(msg, t)); |
||||
|
} |
||||
|
|
||||
|
private void processSwitch(TbContext ctx, TbMsg msg, Set<String> nextRelations) { |
||||
|
if (validateRelations(nextRelations)) { |
||||
|
ctx.tellNext(msg, nextRelations); |
||||
|
} else { |
||||
|
ctx.tellError(msg, new IllegalStateException("Unsupported relation for switch " + nextRelations)); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
private boolean validateRelations(Set<String> nextRelations) { |
||||
|
return config.getAllowedRelations().containsAll(nextRelations); |
||||
|
} |
||||
|
|
||||
|
private Bindings toBindings(TbMsg msg) { |
||||
|
return NashornJsEngine.bindMsg(msg); |
||||
|
} |
||||
|
|
||||
|
@Override |
||||
|
public void destroy() { |
||||
|
if (jsEngine != null) { |
||||
|
jsEngine.destroy(); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,28 @@ |
|||||
|
/** |
||||
|
* Copyright © 2016-2018 The Thingsboard Authors |
||||
|
* |
||||
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
|
* you may not use this file except in compliance with the License. |
||||
|
* You may obtain a copy of the License at |
||||
|
* |
||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
* |
||||
|
* Unless required by applicable law or agreed to in writing, software |
||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
|
* See the License for the specific language governing permissions and |
||||
|
* limitations under the License. |
||||
|
*/ |
||||
|
package org.thingsboard.rule.engine.filter; |
||||
|
|
||||
|
import lombok.Data; |
||||
|
|
||||
|
import java.util.Set; |
||||
|
|
||||
|
@Data |
||||
|
public class TbJsSwitchNodeConfiguration { |
||||
|
|
||||
|
private String jsScript; |
||||
|
private Set<String> allowedRelations; |
||||
|
private boolean routeToAllWithNoCheck; |
||||
|
} |
||||
@ -0,0 +1,45 @@ |
|||||
|
/** |
||||
|
* Copyright © 2016-2018 The Thingsboard Authors |
||||
|
* |
||||
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
|
* you may not use this file except in compliance with the License. |
||||
|
* You may obtain a copy of the License at |
||||
|
* |
||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
* |
||||
|
* Unless required by applicable law or agreed to in writing, software |
||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
|
* See the License for the specific language governing permissions and |
||||
|
* limitations under the License. |
||||
|
*/ |
||||
|
package org.thingsboard.rule.engine.js; |
||||
|
|
||||
|
import com.google.common.util.concurrent.ListenableFuture; |
||||
|
import com.google.common.util.concurrent.ListeningExecutorService; |
||||
|
import com.google.common.util.concurrent.MoreExecutors; |
||||
|
import org.thingsboard.rule.engine.api.ListeningExecutor; |
||||
|
|
||||
|
import javax.annotation.PreDestroy; |
||||
|
import java.util.concurrent.Callable; |
||||
|
import java.util.concurrent.Executors; |
||||
|
|
||||
|
public class JsExecutorService implements ListeningExecutor{ |
||||
|
|
||||
|
private final ListeningExecutorService service; |
||||
|
|
||||
|
public JsExecutorService(int threadCount) { |
||||
|
this.service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(threadCount)); |
||||
|
} |
||||
|
|
||||
|
@Override |
||||
|
public <T> ListenableFuture<T> executeAsync(Callable<T> task) { |
||||
|
return service.submit(task); |
||||
|
} |
||||
|
|
||||
|
@PreDestroy |
||||
|
@Override |
||||
|
public void onDestroy() { |
||||
|
service.shutdown(); |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,139 @@ |
|||||
|
/** |
||||
|
* Copyright © 2016-2018 The Thingsboard Authors |
||||
|
* |
||||
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
|
* you may not use this file except in compliance with the License. |
||||
|
* You may obtain a copy of the License at |
||||
|
* |
||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
* |
||||
|
* Unless required by applicable law or agreed to in writing, software |
||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
|
* See the License for the specific language governing permissions and |
||||
|
* limitations under the License. |
||||
|
*/ |
||||
|
package org.thingsboard.rule.engine.js; |
||||
|
|
||||
|
import com.fasterxml.jackson.core.JsonProcessingException; |
||||
|
import com.fasterxml.jackson.databind.JsonNode; |
||||
|
import com.fasterxml.jackson.databind.ObjectMapper; |
||||
|
import com.google.common.collect.Sets; |
||||
|
import jdk.nashorn.api.scripting.NashornScriptEngineFactory; |
||||
|
import jdk.nashorn.api.scripting.ScriptObjectMirror; |
||||
|
import lombok.extern.slf4j.Slf4j; |
||||
|
import org.apache.commons.lang3.ArrayUtils; |
||||
|
import org.thingsboard.server.common.msg.TbMsg; |
||||
|
|
||||
|
import javax.script.*; |
||||
|
import java.util.Collections; |
||||
|
import java.util.Map; |
||||
|
import java.util.Set; |
||||
|
|
||||
|
|
||||
|
@Slf4j |
||||
|
public class NashornJsEngine { |
||||
|
|
||||
|
public static final String METADATA = "meta"; |
||||
|
public static final String DATA = "msg"; |
||||
|
private static NashornScriptEngineFactory factory = new NashornScriptEngineFactory(); |
||||
|
|
||||
|
private CompiledScript engine; |
||||
|
|
||||
|
public NashornJsEngine(String script) { |
||||
|
engine = compileScript(script); |
||||
|
} |
||||
|
|
||||
|
private static CompiledScript compileScript(String script) { |
||||
|
ScriptEngine engine = factory.getScriptEngine(new String[]{"--no-java"}); |
||||
|
Compilable compEngine = (Compilable) engine; |
||||
|
try { |
||||
|
return compEngine.compile(script); |
||||
|
} catch (ScriptException e) { |
||||
|
log.warn("Failed to compile JS script: {}", e.getMessage(), e); |
||||
|
throw new IllegalArgumentException("Can't compile script: " + e.getMessage()); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
public static Bindings bindMsg(TbMsg msg) { |
||||
|
try { |
||||
|
Bindings bindings = new SimpleBindings(); |
||||
|
bindings.put(METADATA, msg.getMetaData().getData()); |
||||
|
|
||||
|
if (ArrayUtils.isNotEmpty(msg.getData())) { |
||||
|
ObjectMapper mapper = new ObjectMapper(); |
||||
|
JsonNode jsonNode = mapper.readTree(msg.getData()); |
||||
|
Map map = mapper.treeToValue(jsonNode, Map.class); |
||||
|
bindings.put(DATA, map); |
||||
|
} |
||||
|
|
||||
|
return bindings; |
||||
|
} catch (Throwable th) { |
||||
|
throw new IllegalArgumentException("Cannot bind js args", th); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
private static TbMsg unbindMsg(Bindings bindings, TbMsg msg) throws JsonProcessingException { |
||||
|
for (Map.Entry<String, String> entry : msg.getMetaData().getData().entrySet()) { |
||||
|
Object obj = entry.getValue(); |
||||
|
entry.setValue(obj.toString()); |
||||
|
} |
||||
|
|
||||
|
Object payload = bindings.get(DATA); |
||||
|
if (payload != null) { |
||||
|
ObjectMapper mapper = new ObjectMapper(); |
||||
|
byte[] bytes = mapper.writeValueAsBytes(payload); |
||||
|
return new TbMsg(msg.getId(), msg.getType(), msg.getOriginator(), msg.getMetaData(), bytes); |
||||
|
} |
||||
|
|
||||
|
return msg; |
||||
|
} |
||||
|
|
||||
|
public TbMsg executeUpdate(Bindings bindings, TbMsg msg) throws ScriptException { |
||||
|
try { |
||||
|
engine.eval(bindings); |
||||
|
return unbindMsg(bindings, msg); |
||||
|
} catch (Throwable th) { |
||||
|
th.printStackTrace(); |
||||
|
throw new IllegalArgumentException("Cannot unbind js args", th); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
public boolean executeFilter(Bindings bindings) throws ScriptException { |
||||
|
Object eval = engine.eval(bindings); |
||||
|
if (eval instanceof Boolean) { |
||||
|
return (boolean) eval; |
||||
|
} else { |
||||
|
log.warn("Wrong result type: {}", eval); |
||||
|
throw new ScriptException("Wrong result type: " + eval); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
public Set<String> executeSwitch(Bindings bindings) throws ScriptException, NoSuchMethodException { |
||||
|
Object eval = this.engine.eval(bindings); |
||||
|
if (eval instanceof String) { |
||||
|
return Collections.singleton((String) eval); |
||||
|
} else if (eval instanceof ScriptObjectMirror) { |
||||
|
ScriptObjectMirror mir = (ScriptObjectMirror) eval; |
||||
|
if (mir.isArray()) { |
||||
|
Set<String> nextStates = Sets.newHashSet(); |
||||
|
for (Map.Entry<String, Object> entry : mir.entrySet()) { |
||||
|
if (entry.getValue() instanceof String) { |
||||
|
nextStates.add((String) entry.getValue()); |
||||
|
} else { |
||||
|
log.warn("Wrong result type: {}", eval); |
||||
|
throw new ScriptException("Wrong result type: " + eval); |
||||
|
} |
||||
|
} |
||||
|
return nextStates; |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
log.warn("Wrong result type: {}", eval); |
||||
|
throw new ScriptException("Wrong result type: " + eval); |
||||
|
} |
||||
|
|
||||
|
public void destroy() { |
||||
|
engine = null; |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,59 @@ |
|||||
|
/** |
||||
|
* Copyright © 2016-2018 The Thingsboard Authors |
||||
|
* |
||||
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
|
* you may not use this file except in compliance with the License. |
||||
|
* You may obtain a copy of the License at |
||||
|
* |
||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
* |
||||
|
* Unless required by applicable law or agreed to in writing, software |
||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
|
* See the License for the specific language governing permissions and |
||||
|
* limitations under the License. |
||||
|
*/ |
||||
|
package org.thingsboard.rule.engine.transform; |
||||
|
|
||||
|
import com.google.common.util.concurrent.ListenableFuture; |
||||
|
import lombok.extern.slf4j.Slf4j; |
||||
|
import org.thingsboard.rule.engine.TbNodeUtils; |
||||
|
import org.thingsboard.rule.engine.api.*; |
||||
|
import org.thingsboard.server.common.msg.TbMsg; |
||||
|
|
||||
|
import static org.thingsboard.rule.engine.DonAsynchron.withCallback; |
||||
|
|
||||
|
/** |
||||
|
* Created by ashvayka on 19.01.18. |
||||
|
*/ |
||||
|
@Slf4j |
||||
|
public abstract class TbAbstractTransformNode implements TbNode { |
||||
|
|
||||
|
private TbTransformNodeConfiguration config; |
||||
|
|
||||
|
@Override |
||||
|
public void init(TbNodeConfiguration configuration, TbNodeState state) throws TbNodeException { |
||||
|
this.config = TbNodeUtils.convert(configuration, TbTransformNodeConfiguration.class); |
||||
|
} |
||||
|
|
||||
|
@Override |
||||
|
public void onMsg(TbContext ctx, TbMsg msg) { |
||||
|
withCallback(transform(ctx, msg), |
||||
|
m -> routeMsg(ctx, m), |
||||
|
t -> ctx.tellError(msg, t)); |
||||
|
} |
||||
|
|
||||
|
protected abstract ListenableFuture<TbMsg> transform(TbContext ctx, TbMsg msg); |
||||
|
|
||||
|
private void routeMsg(TbContext ctx, TbMsg msg) { |
||||
|
if (config.isStartNewChain()) { |
||||
|
ctx.spawn(msg); |
||||
|
} else { |
||||
|
ctx.tellNext(msg); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
public void setConfig(TbTransformNodeConfiguration config) { |
||||
|
this.config = config; |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,93 @@ |
|||||
|
/** |
||||
|
* Copyright © 2016-2018 The Thingsboard Authors |
||||
|
* |
||||
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
|
* you may not use this file except in compliance with the License. |
||||
|
* You may obtain a copy of the License at |
||||
|
* |
||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
* |
||||
|
* Unless required by applicable law or agreed to in writing, software |
||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
|
* See the License for the specific language governing permissions and |
||||
|
* limitations under the License. |
||||
|
*/ |
||||
|
package org.thingsboard.rule.engine.transform; |
||||
|
|
||||
|
import com.google.common.base.Function; |
||||
|
import com.google.common.collect.Sets; |
||||
|
import com.google.common.util.concurrent.Futures; |
||||
|
import com.google.common.util.concurrent.ListenableFuture; |
||||
|
import lombok.extern.slf4j.Slf4j; |
||||
|
import org.apache.commons.lang3.StringUtils; |
||||
|
import org.thingsboard.rule.engine.TbNodeUtils; |
||||
|
import org.thingsboard.rule.engine.api.TbContext; |
||||
|
import org.thingsboard.rule.engine.api.TbNodeConfiguration; |
||||
|
import org.thingsboard.rule.engine.api.TbNodeException; |
||||
|
import org.thingsboard.rule.engine.api.TbNodeState; |
||||
|
import org.thingsboard.rule.engine.util.EntitiesCustomerIdAsyncLoader; |
||||
|
import org.thingsboard.rule.engine.util.EntitiesRelatedEntityIdAsyncLoader; |
||||
|
import org.thingsboard.rule.engine.util.EntitiesTenantIdAsyncLoader; |
||||
|
import org.thingsboard.server.common.data.id.EntityId; |
||||
|
import org.thingsboard.server.common.msg.TbMsg; |
||||
|
|
||||
|
import java.util.HashSet; |
||||
|
|
||||
|
@Slf4j |
||||
|
public class TbChangeOriginatorNode extends TbAbstractTransformNode { |
||||
|
|
||||
|
protected static final String CUSTOMER_SOURCE = "CUSTOMER"; |
||||
|
protected static final String TENANT_SOURCE = "TENANT"; |
||||
|
protected static final String RELATED_SOURCE = "RELATED"; |
||||
|
|
||||
|
private TbChangeOriginatorNodeConfiguration config; |
||||
|
|
||||
|
@Override |
||||
|
public void init(TbNodeConfiguration configuration, TbNodeState state) throws TbNodeException { |
||||
|
this.config = TbNodeUtils.convert(configuration, TbChangeOriginatorNodeConfiguration.class); |
||||
|
validateConfig(config); |
||||
|
setConfig(config); |
||||
|
} |
||||
|
|
||||
|
@Override |
||||
|
protected ListenableFuture<TbMsg> transform(TbContext ctx, TbMsg msg) { |
||||
|
ListenableFuture<? extends EntityId> newOriginator = getNewOriginator(ctx, msg.getOriginator()); |
||||
|
return Futures.transform(newOriginator, (Function<EntityId, TbMsg>) n -> new TbMsg(msg.getId(), msg.getType(), n, msg.getMetaData(), msg.getData())); |
||||
|
} |
||||
|
|
||||
|
private ListenableFuture<? extends EntityId> getNewOriginator(TbContext ctx, EntityId original) { |
||||
|
switch (config.getOriginatorSource()) { |
||||
|
case CUSTOMER_SOURCE: |
||||
|
return EntitiesCustomerIdAsyncLoader.findEntityIdAsync(ctx, original); |
||||
|
case TENANT_SOURCE: |
||||
|
return EntitiesTenantIdAsyncLoader.findEntityIdAsync(ctx, original); |
||||
|
case RELATED_SOURCE: |
||||
|
return EntitiesRelatedEntityIdAsyncLoader.findEntityAsync(ctx, original, config.getDirection(), config.getRelationType()); |
||||
|
default: |
||||
|
return Futures.immediateFailedFuture(new IllegalStateException("Unexpected originator source " + config.getOriginatorSource())); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
private void validateConfig(TbChangeOriginatorNodeConfiguration conf) { |
||||
|
HashSet<String> knownSources = Sets.newHashSet(CUSTOMER_SOURCE, TENANT_SOURCE, RELATED_SOURCE); |
||||
|
if (!knownSources.contains(conf.getOriginatorSource())) { |
||||
|
log.error("Unsupported source [{}] for TbChangeOriginatorNode", conf.getOriginatorSource()); |
||||
|
throw new IllegalArgumentException("Unsupported source TbChangeOriginatorNode" + conf.getOriginatorSource()); |
||||
|
} |
||||
|
|
||||
|
if (conf.getOriginatorSource().equals(RELATED_SOURCE)) { |
||||
|
if (conf.getDirection() == null || StringUtils.isBlank(conf.getRelationType())) { |
||||
|
log.error("Related source for TbChangeOriginatorNode should have direction and relationType. Actual [{}] [{}]", |
||||
|
conf.getDirection(), conf.getRelationType()); |
||||
|
throw new IllegalArgumentException("Wrong config for RElated Source in TbChangeOriginatorNode" + conf.getOriginatorSource()); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
} |
||||
|
|
||||
|
@Override |
||||
|
public void destroy() { |
||||
|
|
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,27 @@ |
|||||
|
/** |
||||
|
* Copyright © 2016-2018 The Thingsboard Authors |
||||
|
* |
||||
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
|
* you may not use this file except in compliance with the License. |
||||
|
* You may obtain a copy of the License at |
||||
|
* |
||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
* |
||||
|
* Unless required by applicable law or agreed to in writing, software |
||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
|
* See the License for the specific language governing permissions and |
||||
|
* limitations under the License. |
||||
|
*/ |
||||
|
package org.thingsboard.rule.engine.transform; |
||||
|
|
||||
|
import lombok.Data; |
||||
|
import org.thingsboard.server.common.data.relation.EntitySearchDirection; |
||||
|
|
||||
|
@Data |
||||
|
public class TbChangeOriginatorNodeConfiguration extends TbTransformNodeConfiguration{ |
||||
|
|
||||
|
private String originatorSource; |
||||
|
private EntitySearchDirection direction; |
||||
|
private String relationType; |
||||
|
} |
||||
@ -0,0 +1,56 @@ |
|||||
|
/** |
||||
|
* Copyright © 2016-2018 The Thingsboard Authors |
||||
|
* |
||||
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
|
* you may not use this file except in compliance with the License. |
||||
|
* You may obtain a copy of the License at |
||||
|
* |
||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
* |
||||
|
* Unless required by applicable law or agreed to in writing, software |
||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
|
* See the License for the specific language governing permissions and |
||||
|
* limitations under the License. |
||||
|
*/ |
||||
|
package org.thingsboard.rule.engine.transform; |
||||
|
|
||||
|
import com.google.common.util.concurrent.ListenableFuture; |
||||
|
import org.thingsboard.rule.engine.TbNodeUtils; |
||||
|
import org.thingsboard.rule.engine.api.TbContext; |
||||
|
import org.thingsboard.rule.engine.api.TbNodeConfiguration; |
||||
|
import org.thingsboard.rule.engine.api.TbNodeException; |
||||
|
import org.thingsboard.rule.engine.api.TbNodeState; |
||||
|
import org.thingsboard.rule.engine.js.NashornJsEngine; |
||||
|
import org.thingsboard.server.common.msg.TbMsg; |
||||
|
|
||||
|
import javax.script.Bindings; |
||||
|
|
||||
|
public class TbTransformMsgNode extends TbAbstractTransformNode { |
||||
|
|
||||
|
private TbTransformMsgNodeConfiguration config; |
||||
|
private NashornJsEngine jsEngine; |
||||
|
|
||||
|
@Override |
||||
|
public void init(TbNodeConfiguration configuration, TbNodeState state) throws TbNodeException { |
||||
|
this.config = TbNodeUtils.convert(configuration, TbTransformMsgNodeConfiguration.class); |
||||
|
this.jsEngine = new NashornJsEngine(config.getJsScript()); |
||||
|
setConfig(config); |
||||
|
} |
||||
|
|
||||
|
@Override |
||||
|
protected ListenableFuture<TbMsg> transform(TbContext ctx, TbMsg msg) { |
||||
|
return ctx.getJsExecutor().executeAsync(() -> jsEngine.executeUpdate(toBindings(msg), msg)); |
||||
|
} |
||||
|
|
||||
|
private Bindings toBindings(TbMsg msg) { |
||||
|
return NashornJsEngine.bindMsg(msg); |
||||
|
} |
||||
|
|
||||
|
@Override |
||||
|
public void destroy() { |
||||
|
if (jsEngine != null) { |
||||
|
jsEngine.destroy(); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,24 @@ |
|||||
|
/** |
||||
|
* Copyright © 2016-2018 The Thingsboard Authors |
||||
|
* |
||||
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
|
* you may not use this file except in compliance with the License. |
||||
|
* You may obtain a copy of the License at |
||||
|
* |
||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
* |
||||
|
* Unless required by applicable law or agreed to in writing, software |
||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
|
* See the License for the specific language governing permissions and |
||||
|
* limitations under the License. |
||||
|
*/ |
||||
|
package org.thingsboard.rule.engine.transform; |
||||
|
|
||||
|
import lombok.Data; |
||||
|
|
||||
|
@Data |
||||
|
public class TbTransformMsgNodeConfiguration extends TbTransformNodeConfiguration { |
||||
|
|
||||
|
private String jsScript; |
||||
|
} |
||||
@ -1,69 +0,0 @@ |
|||||
/** |
|
||||
* Copyright © 2016-2018 The Thingsboard Authors |
|
||||
* |
|
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|
||||
* you may not use this file except in compliance with the License. |
|
||||
* You may obtain a copy of the License at |
|
||||
* |
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||
* |
|
||||
* Unless required by applicable law or agreed to in writing, software |
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
||||
* See the License for the specific language governing permissions and |
|
||||
* limitations under the License. |
|
||||
*/ |
|
||||
package org.thingsboard.rule.engine.transform; |
|
||||
|
|
||||
import lombok.extern.slf4j.Slf4j; |
|
||||
import org.thingsboard.rule.engine.TbNodeUtils; |
|
||||
import org.thingsboard.rule.engine.api.*; |
|
||||
import org.thingsboard.rule.engine.metadata.TbGetAttributesNodeConfiguration; |
|
||||
import org.thingsboard.server.common.data.DataConstants; |
|
||||
import org.thingsboard.server.common.data.kv.AttributeKvEntry; |
|
||||
import org.thingsboard.server.common.msg.TbMsg; |
|
||||
import org.thingsboard.server.dao.attributes.AttributesService; |
|
||||
|
|
||||
import java.util.List; |
|
||||
|
|
||||
/** |
|
||||
* Created by ashvayka on 19.01.18. |
|
||||
*/ |
|
||||
@Slf4j |
|
||||
public class TbTransformNode implements TbNode { |
|
||||
|
|
||||
TbGetAttributesNodeConfiguration config; |
|
||||
|
|
||||
@Override |
|
||||
public void init(TbNodeConfiguration configuration, TbNodeState state) throws TbNodeException { |
|
||||
this.config = TbNodeUtils.convert(configuration, TbGetAttributesNodeConfiguration.class); |
|
||||
} |
|
||||
|
|
||||
@Override |
|
||||
public void onMsg(TbContext ctx, TbMsg msg) throws TbNodeException { |
|
||||
try { |
|
||||
//TODO: refactor this to work async and fetch attributes from cache.
|
|
||||
AttributesService service = ctx.getAttributesService(); |
|
||||
|
|
||||
fetchAttributes(msg, service, config.getClientAttributeNames(), DataConstants.CLIENT_SCOPE, "cs."); |
|
||||
fetchAttributes(msg, service, config.getServerAttributeNames(), DataConstants.SERVER_SCOPE, "ss."); |
|
||||
fetchAttributes(msg, service, config.getSharedAttributeNames(), DataConstants.SHARED_SCOPE, "shared."); |
|
||||
ctx.tellNext(msg); |
|
||||
} catch (Exception e) { |
|
||||
log.warn("[{}][{}] Failed to fetch attributes", msg.getOriginator(), msg.getId(), e); |
|
||||
throw new TbNodeException(e); |
|
||||
} |
|
||||
} |
|
||||
|
|
||||
private void fetchAttributes(TbMsg msg, AttributesService service, List<String> attributeNames, String scope, String prefix) throws InterruptedException, java.util.concurrent.ExecutionException { |
|
||||
if (attributeNames != null && attributeNames.isEmpty()) { |
|
||||
List<AttributeKvEntry> attributes = service.find(msg.getOriginator(), scope, attributeNames).get(); |
|
||||
attributes.forEach(attr -> msg.getMetaData().putValue(prefix + attr.getKey(), attr.getValueAsString())); |
|
||||
} |
|
||||
} |
|
||||
|
|
||||
@Override |
|
||||
public void destroy() { |
|
||||
|
|
||||
} |
|
||||
} |
|
||||
@ -0,0 +1,24 @@ |
|||||
|
/** |
||||
|
* Copyright © 2016-2018 The Thingsboard Authors |
||||
|
* |
||||
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
|
* you may not use this file except in compliance with the License. |
||||
|
* You may obtain a copy of the License at |
||||
|
* |
||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
* |
||||
|
* Unless required by applicable law or agreed to in writing, software |
||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
|
* See the License for the specific language governing permissions and |
||||
|
* limitations under the License. |
||||
|
*/ |
||||
|
package org.thingsboard.rule.engine.transform; |
||||
|
|
||||
|
import lombok.Data; |
||||
|
|
||||
|
@Data |
||||
|
public class TbTransformNodeConfiguration { |
||||
|
|
||||
|
private boolean startNewChain = false; |
||||
|
} |
||||
@ -0,0 +1,50 @@ |
|||||
|
/** |
||||
|
* Copyright © 2016-2018 The Thingsboard Authors |
||||
|
* |
||||
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
|
* you may not use this file except in compliance with the License. |
||||
|
* You may obtain a copy of the License at |
||||
|
* |
||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
* |
||||
|
* Unless required by applicable law or agreed to in writing, software |
||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
|
* See the License for the specific language governing permissions and |
||||
|
* limitations under the License. |
||||
|
*/ |
||||
|
package org.thingsboard.rule.engine.util; |
||||
|
|
||||
|
import com.google.common.util.concurrent.AsyncFunction; |
||||
|
import com.google.common.util.concurrent.Futures; |
||||
|
import com.google.common.util.concurrent.ListenableFuture; |
||||
|
import org.thingsboard.rule.engine.api.TbContext; |
||||
|
import org.thingsboard.rule.engine.api.TbNodeException; |
||||
|
import org.thingsboard.server.common.data.HasCustomerId; |
||||
|
import org.thingsboard.server.common.data.id.*; |
||||
|
|
||||
|
public class EntitiesCustomerIdAsyncLoader { |
||||
|
|
||||
|
|
||||
|
public static ListenableFuture<CustomerId> findEntityIdAsync(TbContext ctx, EntityId original) { |
||||
|
|
||||
|
switch (original.getEntityType()) { |
||||
|
case CUSTOMER: |
||||
|
return Futures.immediateFuture((CustomerId) original); |
||||
|
case USER: |
||||
|
return getCustomerAsync(ctx.getUserService().findUserByIdAsync((UserId) original)); |
||||
|
case ASSET: |
||||
|
return getCustomerAsync(ctx.getAssetService().findAssetByIdAsync((AssetId) original)); |
||||
|
case DEVICE: |
||||
|
return getCustomerAsync(ctx.getDeviceService().findDeviceByIdAsync((DeviceId) original)); |
||||
|
default: |
||||
|
return Futures.immediateFailedFuture(new TbNodeException("Unexpected original EntityType " + original)); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
private static <T extends HasCustomerId> ListenableFuture<CustomerId> getCustomerAsync(ListenableFuture<T> future) { |
||||
|
return Futures.transform(future, (AsyncFunction<HasCustomerId, CustomerId>) in -> { |
||||
|
return in != null ? Futures.immediateFuture(in.getCustomerId()) |
||||
|
: Futures.immediateFailedFuture(new IllegalStateException("Customer not found"));}); |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,51 @@ |
|||||
|
/** |
||||
|
* Copyright © 2016-2018 The Thingsboard Authors |
||||
|
* |
||||
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
|
* you may not use this file except in compliance with the License. |
||||
|
* You may obtain a copy of the License at |
||||
|
* |
||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
* |
||||
|
* Unless required by applicable law or agreed to in writing, software |
||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
|
* See the License for the specific language governing permissions and |
||||
|
* limitations under the License. |
||||
|
*/ |
||||
|
package org.thingsboard.rule.engine.util; |
||||
|
|
||||
|
import com.google.common.util.concurrent.AsyncFunction; |
||||
|
import com.google.common.util.concurrent.Futures; |
||||
|
import com.google.common.util.concurrent.ListenableFuture; |
||||
|
import org.apache.commons.collections.CollectionUtils; |
||||
|
import org.thingsboard.rule.engine.api.TbContext; |
||||
|
import org.thingsboard.server.common.data.id.EntityId; |
||||
|
import org.thingsboard.server.common.data.relation.EntityRelation; |
||||
|
import org.thingsboard.server.common.data.relation.EntitySearchDirection; |
||||
|
import org.thingsboard.server.dao.relation.RelationService; |
||||
|
|
||||
|
import java.util.List; |
||||
|
|
||||
|
import static org.thingsboard.server.common.data.relation.RelationTypeGroup.COMMON; |
||||
|
|
||||
|
public class EntitiesRelatedEntityIdAsyncLoader { |
||||
|
|
||||
|
public static ListenableFuture<EntityId> findEntityAsync(TbContext ctx, EntityId originator, |
||||
|
EntitySearchDirection direction, String relationType) { |
||||
|
RelationService relationService = ctx.getRelationService(); |
||||
|
if (direction == EntitySearchDirection.FROM) { |
||||
|
ListenableFuture<List<EntityRelation>> asyncRelation = relationService.findByFromAndTypeAsync(originator, relationType, COMMON); |
||||
|
return Futures.transform(asyncRelation, (AsyncFunction<? super List<EntityRelation>, EntityId>) |
||||
|
r -> CollectionUtils.isNotEmpty(r) ? Futures.immediateFuture(r.get(0).getTo()) |
||||
|
: Futures.immediateFailedFuture(new IllegalStateException("Relation not found"))); |
||||
|
} else if (direction == EntitySearchDirection.TO) { |
||||
|
ListenableFuture<List<EntityRelation>> asyncRelation = relationService.findByToAndTypeAsync(originator, relationType, COMMON); |
||||
|
return Futures.transform(asyncRelation, (AsyncFunction<? super List<EntityRelation>, EntityId>) |
||||
|
r -> CollectionUtils.isNotEmpty(r) ? Futures.immediateFuture(r.get(0).getFrom()) |
||||
|
: Futures.immediateFailedFuture(new IllegalStateException("Relation not found"))); |
||||
|
} |
||||
|
|
||||
|
return Futures.immediateFailedFuture(new IllegalStateException("Unknown direction")); |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,58 @@ |
|||||
|
/** |
||||
|
* Copyright © 2016-2018 The Thingsboard Authors |
||||
|
* |
||||
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
|
* you may not use this file except in compliance with the License. |
||||
|
* You may obtain a copy of the License at |
||||
|
* |
||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
* |
||||
|
* Unless required by applicable law or agreed to in writing, software |
||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
|
* See the License for the specific language governing permissions and |
||||
|
* limitations under the License. |
||||
|
*/ |
||||
|
package org.thingsboard.rule.engine.util; |
||||
|
|
||||
|
import com.google.common.util.concurrent.AsyncFunction; |
||||
|
import com.google.common.util.concurrent.Futures; |
||||
|
import com.google.common.util.concurrent.ListenableFuture; |
||||
|
import org.thingsboard.rule.engine.api.TbContext; |
||||
|
import org.thingsboard.rule.engine.api.TbNodeException; |
||||
|
import org.thingsboard.server.common.data.HasTenantId; |
||||
|
import org.thingsboard.server.common.data.alarm.AlarmId; |
||||
|
import org.thingsboard.server.common.data.id.*; |
||||
|
|
||||
|
public class EntitiesTenantIdAsyncLoader { |
||||
|
|
||||
|
public static ListenableFuture<TenantId> findEntityIdAsync(TbContext ctx, EntityId original) { |
||||
|
|
||||
|
switch (original.getEntityType()) { |
||||
|
case TENANT: |
||||
|
return Futures.immediateFuture((TenantId) original); |
||||
|
case CUSTOMER: |
||||
|
return getTenantAsync(ctx.getCustomerService().findCustomerByIdAsync((CustomerId) original)); |
||||
|
case USER: |
||||
|
return getTenantAsync(ctx.getUserService().findUserByIdAsync((UserId) original)); |
||||
|
case PLUGIN: |
||||
|
return getTenantAsync(ctx.getPluginService().findPluginByIdAsync((PluginId) original)); |
||||
|
case ASSET: |
||||
|
return getTenantAsync(ctx.getAssetService().findAssetByIdAsync((AssetId) original)); |
||||
|
case DEVICE: |
||||
|
return getTenantAsync(ctx.getDeviceService().findDeviceByIdAsync((DeviceId) original)); |
||||
|
case ALARM: |
||||
|
return getTenantAsync(ctx.getAlarmService().findAlarmByIdAsync((AlarmId) original)); |
||||
|
case RULE_CHAIN: |
||||
|
return getTenantAsync(ctx.getRuleChainService().findRuleChainByIdAsync((RuleChainId) original)); |
||||
|
default: |
||||
|
return Futures.immediateFailedFuture(new TbNodeException("Unexpected original EntityType " + original)); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
private static <T extends HasTenantId> ListenableFuture<TenantId> getTenantAsync(ListenableFuture<T> future) { |
||||
|
return Futures.transform(future, (AsyncFunction<HasTenantId, TenantId>) in -> { |
||||
|
return in != null ? Futures.immediateFuture(in.getTenantId()) |
||||
|
: Futures.immediateFailedFuture(new IllegalStateException("Tenant not found"));}); |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,170 @@ |
|||||
|
/** |
||||
|
* Copyright © 2016-2018 The Thingsboard Authors |
||||
|
* |
||||
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
|
* you may not use this file except in compliance with the License. |
||||
|
* You may obtain a copy of the License at |
||||
|
* |
||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
* |
||||
|
* Unless required by applicable law or agreed to in writing, software |
||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
|
* See the License for the specific language governing permissions and |
||||
|
* limitations under the License. |
||||
|
*/ |
||||
|
package org.thingsboard.rule.engine.filter; |
||||
|
|
||||
|
import com.datastax.driver.core.utils.UUIDs; |
||||
|
import com.fasterxml.jackson.databind.ObjectMapper; |
||||
|
import com.google.common.util.concurrent.Futures; |
||||
|
import com.google.common.util.concurrent.ListenableFuture; |
||||
|
import org.junit.Test; |
||||
|
import org.junit.runner.RunWith; |
||||
|
import org.mockito.ArgumentCaptor; |
||||
|
import org.mockito.Matchers; |
||||
|
import org.mockito.Mock; |
||||
|
import org.mockito.runners.MockitoJUnitRunner; |
||||
|
import org.mockito.stubbing.Answer; |
||||
|
import org.thingsboard.rule.engine.api.ListeningExecutor; |
||||
|
import org.thingsboard.rule.engine.api.TbContext; |
||||
|
import org.thingsboard.rule.engine.api.TbNodeConfiguration; |
||||
|
import org.thingsboard.rule.engine.api.TbNodeException; |
||||
|
import org.thingsboard.server.common.msg.TbMsg; |
||||
|
import org.thingsboard.server.common.msg.TbMsgMetaData; |
||||
|
|
||||
|
import javax.script.ScriptException; |
||||
|
import java.util.concurrent.Callable; |
||||
|
|
||||
|
import static org.junit.Assert.assertEquals; |
||||
|
import static org.mockito.Mockito.*; |
||||
|
|
||||
|
@RunWith(MockitoJUnitRunner.class) |
||||
|
public class TbJsFilterNodeTest { |
||||
|
|
||||
|
private TbJsFilterNode node; |
||||
|
|
||||
|
@Mock |
||||
|
private TbContext ctx; |
||||
|
@Mock |
||||
|
private ListeningExecutor executor; |
||||
|
|
||||
|
@Test |
||||
|
public void falseEvaluationDoNotSendMsg() throws TbNodeException { |
||||
|
initWithScript("10 > 15;"); |
||||
|
TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, new TbMsgMetaData(), "{}".getBytes()); |
||||
|
|
||||
|
mockJsExecutor(); |
||||
|
|
||||
|
node.onMsg(ctx, msg); |
||||
|
verify(ctx).getJsExecutor(); |
||||
|
verify(ctx).tellNext(msg, "false"); |
||||
|
verifyNoMoreInteractions(ctx); |
||||
|
} |
||||
|
|
||||
|
@Test |
||||
|
public void notValidMsgDataThrowsException() throws TbNodeException { |
||||
|
initWithScript("10 > 15;"); |
||||
|
TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, new TbMsgMetaData(), new byte[4]); |
||||
|
|
||||
|
when(ctx.getJsExecutor()).thenReturn(executor); |
||||
|
|
||||
|
mockJsExecutor(); |
||||
|
|
||||
|
node.onMsg(ctx, msg); |
||||
|
verifyError(msg, "Cannot bind js args", IllegalArgumentException.class); |
||||
|
} |
||||
|
|
||||
|
@Test |
||||
|
public void exceptionInJsThrowsException() throws TbNodeException { |
||||
|
initWithScript("meta.temp.curr < 15;"); |
||||
|
TbMsgMetaData metaData = new TbMsgMetaData(); |
||||
|
TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, "{}".getBytes()); |
||||
|
mockJsExecutor(); |
||||
|
|
||||
|
node.onMsg(ctx, msg); |
||||
|
String expectedMessage = "TypeError: Cannot get property \"curr\" of null in <eval> at line number 1"; |
||||
|
verifyError(msg, expectedMessage, ScriptException.class); |
||||
|
} |
||||
|
|
||||
|
@Test(expected = IllegalArgumentException.class) |
||||
|
public void notValidScriptThrowsException() throws TbNodeException { |
||||
|
initWithScript("10 > 15 asdq out"); |
||||
|
} |
||||
|
|
||||
|
@Test |
||||
|
public void metadataConditionCanBeFalse() throws TbNodeException { |
||||
|
initWithScript("meta.humidity < 15;"); |
||||
|
TbMsgMetaData metaData = new TbMsgMetaData(); |
||||
|
metaData.putValue("temp", "10"); |
||||
|
metaData.putValue("humidity", "99"); |
||||
|
TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, "{}".getBytes()); |
||||
|
mockJsExecutor(); |
||||
|
|
||||
|
node.onMsg(ctx, msg); |
||||
|
verify(ctx).getJsExecutor(); |
||||
|
verify(ctx).tellNext(msg, "false"); |
||||
|
verifyNoMoreInteractions(ctx); |
||||
|
} |
||||
|
|
||||
|
@Test |
||||
|
public void metadataConditionCanBeTrue() throws TbNodeException { |
||||
|
initWithScript("meta.temp < 15;"); |
||||
|
TbMsgMetaData metaData = new TbMsgMetaData(); |
||||
|
metaData.putValue("temp", "10"); |
||||
|
metaData.putValue("humidity", "99"); |
||||
|
TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, "{}".getBytes()); |
||||
|
mockJsExecutor(); |
||||
|
|
||||
|
node.onMsg(ctx, msg); |
||||
|
verify(ctx).getJsExecutor(); |
||||
|
verify(ctx).tellNext(msg, "true"); |
||||
|
} |
||||
|
|
||||
|
@Test |
||||
|
public void msgJsonParsedAndBinded() throws TbNodeException { |
||||
|
initWithScript("msg.passed < 15 && msg.name === 'Vit' && meta.temp == 10 && msg.bigObj.prop == 42;"); |
||||
|
TbMsgMetaData metaData = new TbMsgMetaData(); |
||||
|
metaData.putValue("temp", "10"); |
||||
|
metaData.putValue("humidity", "99"); |
||||
|
String rawJson = "{\"name\": \"Vit\", \"passed\": 5, \"bigObj\": {\"prop\":42}}"; |
||||
|
|
||||
|
TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson.getBytes()); |
||||
|
mockJsExecutor(); |
||||
|
|
||||
|
node.onMsg(ctx, msg); |
||||
|
verify(ctx).getJsExecutor(); |
||||
|
verify(ctx).tellNext(msg, "true"); |
||||
|
} |
||||
|
|
||||
|
private void initWithScript(String script) throws TbNodeException { |
||||
|
TbJsFilterNodeConfiguration config = new TbJsFilterNodeConfiguration(); |
||||
|
config.setJsScript(script); |
||||
|
ObjectMapper mapper = new ObjectMapper(); |
||||
|
TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration(mapper.valueToTree(config)); |
||||
|
|
||||
|
node = new TbJsFilterNode(); |
||||
|
node.init(nodeConfiguration, null); |
||||
|
} |
||||
|
|
||||
|
private void mockJsExecutor() { |
||||
|
when(ctx.getJsExecutor()).thenReturn(executor); |
||||
|
doAnswer((Answer<ListenableFuture<Boolean>>) invocationOnMock -> { |
||||
|
try { |
||||
|
Callable task = (Callable) (invocationOnMock.getArguments())[0]; |
||||
|
return Futures.immediateFuture((Boolean) task.call()); |
||||
|
} catch (Throwable th) { |
||||
|
return Futures.immediateFailedFuture(th); |
||||
|
} |
||||
|
}).when(executor).executeAsync(Matchers.any(Callable.class)); |
||||
|
} |
||||
|
|
||||
|
private void verifyError(TbMsg msg, String message, Class expectedClass) { |
||||
|
ArgumentCaptor<Throwable> captor = ArgumentCaptor.forClass(Throwable.class); |
||||
|
verify(ctx).tellError(same(msg), captor.capture()); |
||||
|
|
||||
|
Throwable value = captor.getValue(); |
||||
|
assertEquals(expectedClass, value.getClass()); |
||||
|
assertEquals(message, value.getMessage()); |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,167 @@ |
|||||
|
/** |
||||
|
* Copyright © 2016-2018 The Thingsboard Authors |
||||
|
* |
||||
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
|
* you may not use this file except in compliance with the License. |
||||
|
* You may obtain a copy of the License at |
||||
|
* |
||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
* |
||||
|
* Unless required by applicable law or agreed to in writing, software |
||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
|
* See the License for the specific language governing permissions and |
||||
|
* limitations under the License. |
||||
|
*/ |
||||
|
package org.thingsboard.rule.engine.filter; |
||||
|
|
||||
|
import com.datastax.driver.core.utils.UUIDs; |
||||
|
import com.fasterxml.jackson.databind.ObjectMapper; |
||||
|
import com.google.common.collect.Sets; |
||||
|
import com.google.common.util.concurrent.Futures; |
||||
|
import com.google.common.util.concurrent.ListenableFuture; |
||||
|
import org.junit.Test; |
||||
|
import org.junit.runner.RunWith; |
||||
|
import org.mockito.ArgumentCaptor; |
||||
|
import org.mockito.Matchers; |
||||
|
import org.mockito.Mock; |
||||
|
import org.mockito.runners.MockitoJUnitRunner; |
||||
|
import org.mockito.stubbing.Answer; |
||||
|
import org.thingsboard.rule.engine.api.ListeningExecutor; |
||||
|
import org.thingsboard.rule.engine.api.TbContext; |
||||
|
import org.thingsboard.rule.engine.api.TbNodeConfiguration; |
||||
|
import org.thingsboard.rule.engine.api.TbNodeException; |
||||
|
import org.thingsboard.server.common.msg.TbMsg; |
||||
|
import org.thingsboard.server.common.msg.TbMsgMetaData; |
||||
|
|
||||
|
import java.util.HashSet; |
||||
|
import java.util.Set; |
||||
|
import java.util.concurrent.Callable; |
||||
|
|
||||
|
import static org.junit.Assert.assertEquals; |
||||
|
import static org.mockito.Matchers.same; |
||||
|
import static org.mockito.Mockito.*; |
||||
|
|
||||
|
@RunWith(MockitoJUnitRunner.class) |
||||
|
public class TbJsSwitchNodeTest { |
||||
|
|
||||
|
private TbJsSwitchNode node; |
||||
|
|
||||
|
@Mock |
||||
|
private TbContext ctx; |
||||
|
@Mock |
||||
|
private ListeningExecutor executor; |
||||
|
|
||||
|
@Test |
||||
|
public void routeToAllDoNotEvaluatesJs() throws TbNodeException { |
||||
|
HashSet<String> relations = Sets.newHashSet("one", "two"); |
||||
|
initWithScript("test qwerty", relations, true); |
||||
|
TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, new TbMsgMetaData(), "{}".getBytes()); |
||||
|
|
||||
|
node.onMsg(ctx, msg); |
||||
|
verify(ctx).tellNext(msg, relations); |
||||
|
verifyNoMoreInteractions(ctx, executor); |
||||
|
} |
||||
|
|
||||
|
@Test |
||||
|
public void multipleRoutesAreAllowed() throws TbNodeException { |
||||
|
String jsCode = "function nextRelation(meta, msg) {\n" + |
||||
|
" if(msg.passed == 5 && meta.temp == 10)\n" + |
||||
|
" return ['three', 'one']\n" + |
||||
|
" else\n" + |
||||
|
" return 'two';\n" + |
||||
|
"};\n" + |
||||
|
"\n" + |
||||
|
"nextRelation(meta, msg);"; |
||||
|
initWithScript(jsCode, Sets.newHashSet("one", "two", "three"), false); |
||||
|
TbMsgMetaData metaData = new TbMsgMetaData(); |
||||
|
metaData.putValue("temp", "10"); |
||||
|
metaData.putValue("humidity", "99"); |
||||
|
String rawJson = "{\"name\": \"Vit\", \"passed\": 5}"; |
||||
|
|
||||
|
TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson.getBytes()); |
||||
|
mockJsExecutor(); |
||||
|
|
||||
|
node.onMsg(ctx, msg); |
||||
|
verify(ctx).getJsExecutor(); |
||||
|
verify(ctx).tellNext(msg, Sets.newHashSet("one", "three")); |
||||
|
} |
||||
|
|
||||
|
@Test |
||||
|
public void allowedRelationPassed() throws TbNodeException { |
||||
|
String jsCode = "function nextRelation(meta, msg) {\n" + |
||||
|
" if(msg.passed == 5 && meta.temp == 10)\n" + |
||||
|
" return 'one'\n" + |
||||
|
" else\n" + |
||||
|
" return 'two';\n" + |
||||
|
"};\n" + |
||||
|
"\n" + |
||||
|
"nextRelation(meta, msg);"; |
||||
|
initWithScript(jsCode, Sets.newHashSet("one", "two"), false); |
||||
|
TbMsgMetaData metaData = new TbMsgMetaData(); |
||||
|
metaData.putValue("temp", "10"); |
||||
|
metaData.putValue("humidity", "99"); |
||||
|
String rawJson = "{\"name\": \"Vit\", \"passed\": 5}"; |
||||
|
|
||||
|
TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson.getBytes()); |
||||
|
mockJsExecutor(); |
||||
|
|
||||
|
node.onMsg(ctx, msg); |
||||
|
verify(ctx).getJsExecutor(); |
||||
|
verify(ctx).tellNext(msg, Sets.newHashSet("one")); |
||||
|
} |
||||
|
|
||||
|
@Test |
||||
|
public void unknownRelationThrowsException() throws TbNodeException { |
||||
|
String jsCode = "function nextRelation(meta, msg) {\n" + |
||||
|
" return ['one','nine'];" + |
||||
|
"};\n" + |
||||
|
"\n" + |
||||
|
"nextRelation(meta, msg);"; |
||||
|
initWithScript(jsCode, Sets.newHashSet("one", "two"), false); |
||||
|
TbMsgMetaData metaData = new TbMsgMetaData(); |
||||
|
metaData.putValue("temp", "10"); |
||||
|
metaData.putValue("humidity", "99"); |
||||
|
String rawJson = "{\"name\": \"Vit\", \"passed\": 5}"; |
||||
|
|
||||
|
TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson.getBytes()); |
||||
|
mockJsExecutor(); |
||||
|
|
||||
|
node.onMsg(ctx, msg); |
||||
|
verify(ctx).getJsExecutor(); |
||||
|
verifyError(msg, "Unsupported relation for switch [nine, one]", IllegalStateException.class); |
||||
|
} |
||||
|
|
||||
|
private void initWithScript(String script, Set<String> relations, boolean routeToAll) throws TbNodeException { |
||||
|
TbJsSwitchNodeConfiguration config = new TbJsSwitchNodeConfiguration(); |
||||
|
config.setJsScript(script); |
||||
|
config.setAllowedRelations(relations); |
||||
|
config.setRouteToAllWithNoCheck(routeToAll); |
||||
|
ObjectMapper mapper = new ObjectMapper(); |
||||
|
TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration(mapper.valueToTree(config)); |
||||
|
|
||||
|
node = new TbJsSwitchNode(); |
||||
|
node.init(nodeConfiguration, null); |
||||
|
} |
||||
|
|
||||
|
private void mockJsExecutor() { |
||||
|
when(ctx.getJsExecutor()).thenReturn(executor); |
||||
|
doAnswer((Answer<ListenableFuture<Set<String>>>) invocationOnMock -> { |
||||
|
try { |
||||
|
Callable task = (Callable) (invocationOnMock.getArguments())[0]; |
||||
|
return Futures.immediateFuture((Set<String>) task.call()); |
||||
|
} catch (Throwable th) { |
||||
|
return Futures.immediateFailedFuture(th); |
||||
|
} |
||||
|
}).when(executor).executeAsync(Matchers.any(Callable.class)); |
||||
|
} |
||||
|
|
||||
|
private void verifyError(TbMsg msg, String message, Class expectedClass) { |
||||
|
ArgumentCaptor<Throwable> captor = ArgumentCaptor.forClass(Throwable.class); |
||||
|
verify(ctx).tellError(same(msg), captor.capture()); |
||||
|
|
||||
|
Throwable value = captor.getValue(); |
||||
|
assertEquals(expectedClass, value.getClass()); |
||||
|
assertEquals(message, value.getMessage()); |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,124 @@ |
|||||
|
/** |
||||
|
* Copyright © 2016-2018 The Thingsboard Authors |
||||
|
* |
||||
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
|
* you may not use this file except in compliance with the License. |
||||
|
* You may obtain a copy of the License at |
||||
|
* |
||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
* |
||||
|
* Unless required by applicable law or agreed to in writing, software |
||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
|
* See the License for the specific language governing permissions and |
||||
|
* limitations under the License. |
||||
|
*/ |
||||
|
package org.thingsboard.rule.engine.transform; |
||||
|
|
||||
|
import com.datastax.driver.core.utils.UUIDs; |
||||
|
import com.fasterxml.jackson.databind.ObjectMapper; |
||||
|
import com.google.common.util.concurrent.Futures; |
||||
|
import org.junit.Test; |
||||
|
import org.junit.runner.RunWith; |
||||
|
import org.mockito.ArgumentCaptor; |
||||
|
import org.mockito.Mock; |
||||
|
import org.mockito.runners.MockitoJUnitRunner; |
||||
|
import org.thingsboard.rule.engine.api.TbContext; |
||||
|
import org.thingsboard.rule.engine.api.TbNodeConfiguration; |
||||
|
import org.thingsboard.rule.engine.api.TbNodeException; |
||||
|
import org.thingsboard.server.common.data.asset.Asset; |
||||
|
import org.thingsboard.server.common.data.id.AssetId; |
||||
|
import org.thingsboard.server.common.data.id.CustomerId; |
||||
|
import org.thingsboard.server.common.msg.TbMsg; |
||||
|
import org.thingsboard.server.common.msg.TbMsgMetaData; |
||||
|
import org.thingsboard.server.dao.asset.AssetService; |
||||
|
|
||||
|
import static org.junit.Assert.assertEquals; |
||||
|
import static org.mockito.Matchers.same; |
||||
|
import static org.mockito.Mockito.verify; |
||||
|
import static org.mockito.Mockito.when; |
||||
|
|
||||
|
@RunWith(MockitoJUnitRunner.class) |
||||
|
public class TbChangeOriginatorNodeTest { |
||||
|
|
||||
|
private TbChangeOriginatorNode node; |
||||
|
|
||||
|
@Mock |
||||
|
private TbContext ctx; |
||||
|
@Mock |
||||
|
private AssetService assetService; |
||||
|
|
||||
|
|
||||
|
@Test |
||||
|
public void originatorCanBeChangedToCustomerId() throws TbNodeException { |
||||
|
init(false); |
||||
|
AssetId assetId = new AssetId(UUIDs.timeBased()); |
||||
|
CustomerId customerId = new CustomerId(UUIDs.timeBased()); |
||||
|
Asset asset = new Asset(); |
||||
|
asset.setCustomerId(customerId); |
||||
|
|
||||
|
TbMsg msg = new TbMsg(UUIDs.timeBased(), "ASSET", assetId, new TbMsgMetaData(), new byte[4]); |
||||
|
|
||||
|
when(ctx.getAssetService()).thenReturn(assetService); |
||||
|
when(assetService.findAssetByIdAsync(assetId)).thenReturn(Futures.immediateFuture(asset)); |
||||
|
|
||||
|
node.onMsg(ctx, msg); |
||||
|
ArgumentCaptor<TbMsg> captor = ArgumentCaptor.forClass(TbMsg.class); |
||||
|
verify(ctx).tellNext(captor.capture()); |
||||
|
TbMsg actualMsg = captor.getValue(); |
||||
|
assertEquals(customerId, actualMsg.getOriginator()); |
||||
|
assertEquals(msg.getId(), actualMsg.getId()); |
||||
|
} |
||||
|
|
||||
|
@Test |
||||
|
public void newChainCanBeStarted() throws TbNodeException { |
||||
|
init(true); |
||||
|
AssetId assetId = new AssetId(UUIDs.timeBased()); |
||||
|
CustomerId customerId = new CustomerId(UUIDs.timeBased()); |
||||
|
Asset asset = new Asset(); |
||||
|
asset.setCustomerId(customerId); |
||||
|
|
||||
|
TbMsg msg = new TbMsg(UUIDs.timeBased(), "ASSET", assetId, new TbMsgMetaData(), new byte[4]); |
||||
|
|
||||
|
when(ctx.getAssetService()).thenReturn(assetService); |
||||
|
when(assetService.findAssetByIdAsync(assetId)).thenReturn(Futures.immediateFuture(asset)); |
||||
|
|
||||
|
node.onMsg(ctx, msg); |
||||
|
ArgumentCaptor<TbMsg> captor = ArgumentCaptor.forClass(TbMsg.class); |
||||
|
verify(ctx).spawn(captor.capture()); |
||||
|
TbMsg actualMsg = captor.getValue(); |
||||
|
assertEquals(customerId, actualMsg.getOriginator()); |
||||
|
assertEquals(msg.getId(), actualMsg.getId()); |
||||
|
} |
||||
|
|
||||
|
@Test |
||||
|
public void exceptionThrownIfCannotFindNewOriginator() throws TbNodeException { |
||||
|
init(true); |
||||
|
AssetId assetId = new AssetId(UUIDs.timeBased()); |
||||
|
CustomerId customerId = new CustomerId(UUIDs.timeBased()); |
||||
|
Asset asset = new Asset(); |
||||
|
asset.setCustomerId(customerId); |
||||
|
|
||||
|
TbMsg msg = new TbMsg(UUIDs.timeBased(), "ASSET", assetId, new TbMsgMetaData(), new byte[4]); |
||||
|
|
||||
|
when(ctx.getAssetService()).thenReturn(assetService); |
||||
|
when(assetService.findAssetByIdAsync(assetId)).thenReturn(Futures.immediateFailedFuture(new IllegalStateException("wrong"))); |
||||
|
|
||||
|
node.onMsg(ctx, msg); |
||||
|
ArgumentCaptor<Throwable> captor = ArgumentCaptor.forClass(Throwable.class); |
||||
|
verify(ctx).tellError(same(msg), captor.capture()); |
||||
|
Throwable value = captor.getValue(); |
||||
|
assertEquals("wrong", value.getMessage()); |
||||
|
} |
||||
|
|
||||
|
public void init(boolean startNewChain) throws TbNodeException { |
||||
|
TbChangeOriginatorNodeConfiguration config = new TbChangeOriginatorNodeConfiguration(); |
||||
|
config.setOriginatorSource(TbChangeOriginatorNode.CUSTOMER_SOURCE); |
||||
|
config.setStartNewChain(startNewChain); |
||||
|
ObjectMapper mapper = new ObjectMapper(); |
||||
|
TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration(mapper.valueToTree(config)); |
||||
|
|
||||
|
node = new TbChangeOriginatorNode(); |
||||
|
node.init(nodeConfiguration, null); |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,140 @@ |
|||||
|
/** |
||||
|
* Copyright © 2016-2018 The Thingsboard Authors |
||||
|
* |
||||
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
|
* you may not use this file except in compliance with the License. |
||||
|
* You may obtain a copy of the License at |
||||
|
* |
||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
* |
||||
|
* Unless required by applicable law or agreed to in writing, software |
||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
|
* See the License for the specific language governing permissions and |
||||
|
* limitations under the License. |
||||
|
*/ |
||||
|
package org.thingsboard.rule.engine.transform; |
||||
|
|
||||
|
import com.datastax.driver.core.utils.UUIDs; |
||||
|
import com.fasterxml.jackson.databind.ObjectMapper; |
||||
|
import com.google.common.util.concurrent.Futures; |
||||
|
import com.google.common.util.concurrent.ListenableFuture; |
||||
|
import org.junit.Test; |
||||
|
import org.junit.runner.RunWith; |
||||
|
import org.mockito.ArgumentCaptor; |
||||
|
import org.mockito.Matchers; |
||||
|
import org.mockito.Mock; |
||||
|
import org.mockito.runners.MockitoJUnitRunner; |
||||
|
import org.mockito.stubbing.Answer; |
||||
|
import org.thingsboard.rule.engine.api.ListeningExecutor; |
||||
|
import org.thingsboard.rule.engine.api.TbContext; |
||||
|
import org.thingsboard.rule.engine.api.TbNodeConfiguration; |
||||
|
import org.thingsboard.rule.engine.api.TbNodeException; |
||||
|
import org.thingsboard.server.common.msg.TbMsg; |
||||
|
import org.thingsboard.server.common.msg.TbMsgMetaData; |
||||
|
|
||||
|
import java.util.concurrent.Callable; |
||||
|
|
||||
|
import static org.junit.Assert.assertEquals; |
||||
|
import static org.mockito.Matchers.same; |
||||
|
import static org.mockito.Mockito.*; |
||||
|
|
||||
|
@RunWith(MockitoJUnitRunner.class) |
||||
|
public class TbTransformMsgNodeTest { |
||||
|
|
||||
|
private TbTransformMsgNode node; |
||||
|
|
||||
|
@Mock |
||||
|
private TbContext ctx; |
||||
|
@Mock |
||||
|
private ListeningExecutor executor; |
||||
|
|
||||
|
@Test |
||||
|
public void metadataCanBeUpdated() throws TbNodeException { |
||||
|
initWithScript("meta.temp = meta.temp * 10;"); |
||||
|
TbMsgMetaData metaData = new TbMsgMetaData(); |
||||
|
metaData.putValue("temp", "7"); |
||||
|
metaData.putValue("humidity", "99"); |
||||
|
String rawJson = "{\"name\": \"Vit\", \"passed\": 5, \"bigObj\": {\"prop\":42}}"; |
||||
|
|
||||
|
TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson.getBytes()); |
||||
|
mockJsExecutor(); |
||||
|
|
||||
|
node.onMsg(ctx, msg); |
||||
|
verify(ctx).getJsExecutor(); |
||||
|
ArgumentCaptor<TbMsg> captor = ArgumentCaptor.forClass(TbMsg.class); |
||||
|
verify(ctx).tellNext(captor.capture()); |
||||
|
TbMsg actualMsg = captor.getValue(); |
||||
|
assertEquals("70.0", actualMsg.getMetaData().getValue("temp")); |
||||
|
} |
||||
|
|
||||
|
@Test |
||||
|
public void metadataCanBeAdded() throws TbNodeException { |
||||
|
initWithScript("meta.newAttr = meta.humidity - msg.passed;"); |
||||
|
TbMsgMetaData metaData = new TbMsgMetaData(); |
||||
|
metaData.putValue("temp", "7"); |
||||
|
metaData.putValue("humidity", "99"); |
||||
|
String rawJson = "{\"name\": \"Vit\", \"passed\": 5, \"bigObj\": {\"prop\":42}}"; |
||||
|
|
||||
|
TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson.getBytes()); |
||||
|
mockJsExecutor(); |
||||
|
|
||||
|
node.onMsg(ctx, msg); |
||||
|
verify(ctx).getJsExecutor(); |
||||
|
ArgumentCaptor<TbMsg> captor = ArgumentCaptor.forClass(TbMsg.class); |
||||
|
verify(ctx).tellNext(captor.capture()); |
||||
|
TbMsg actualMsg = captor.getValue(); |
||||
|
assertEquals("94.0", actualMsg.getMetaData().getValue("newAttr")); |
||||
|
} |
||||
|
|
||||
|
@Test |
||||
|
public void payloadCanBeUpdated() throws TbNodeException { |
||||
|
initWithScript("msg.passed = msg.passed * meta.temp; msg.bigObj.newProp = 'Ukraine' "); |
||||
|
TbMsgMetaData metaData = new TbMsgMetaData(); |
||||
|
metaData.putValue("temp", "7"); |
||||
|
metaData.putValue("humidity", "99"); |
||||
|
String rawJson = "{\"name\":\"Vit\",\"passed\": 5,\"bigObj\":{\"prop\":42}}"; |
||||
|
|
||||
|
TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson.getBytes()); |
||||
|
mockJsExecutor(); |
||||
|
|
||||
|
node.onMsg(ctx, msg); |
||||
|
verify(ctx).getJsExecutor(); |
||||
|
ArgumentCaptor<TbMsg> captor = ArgumentCaptor.forClass(TbMsg.class); |
||||
|
verify(ctx).tellNext(captor.capture()); |
||||
|
TbMsg actualMsg = captor.getValue(); |
||||
|
String expectedJson = "{\"name\":\"Vit\",\"passed\":35.0,\"bigObj\":{\"prop\":42,\"newProp\":\"Ukraine\"}}"; |
||||
|
assertEquals(expectedJson, new String(actualMsg.getData())); |
||||
|
} |
||||
|
|
||||
|
private void initWithScript(String script) throws TbNodeException { |
||||
|
TbTransformMsgNodeConfiguration config = new TbTransformMsgNodeConfiguration(); |
||||
|
config.setJsScript(script); |
||||
|
ObjectMapper mapper = new ObjectMapper(); |
||||
|
TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration(mapper.valueToTree(config)); |
||||
|
|
||||
|
node = new TbTransformMsgNode(); |
||||
|
node.init(nodeConfiguration, null); |
||||
|
} |
||||
|
|
||||
|
private void mockJsExecutor() { |
||||
|
when(ctx.getJsExecutor()).thenReturn(executor); |
||||
|
doAnswer((Answer<ListenableFuture<TbMsg>>) invocationOnMock -> { |
||||
|
try { |
||||
|
Callable task = (Callable) (invocationOnMock.getArguments())[0]; |
||||
|
return Futures.immediateFuture((TbMsg) task.call()); |
||||
|
} catch (Throwable th) { |
||||
|
return Futures.immediateFailedFuture(th); |
||||
|
} |
||||
|
}).when(executor).executeAsync(Matchers.any(Callable.class)); |
||||
|
} |
||||
|
|
||||
|
private void verifyError(TbMsg msg, String message, Class expectedClass) { |
||||
|
ArgumentCaptor<Throwable> captor = ArgumentCaptor.forClass(Throwable.class); |
||||
|
verify(ctx).tellError(same(msg), captor.capture()); |
||||
|
|
||||
|
Throwable value = captor.getValue(); |
||||
|
assertEquals(expectedClass, value.getClass()); |
||||
|
assertEquals(message, value.getMessage()); |
||||
|
} |
||||
|
} |
||||
Loading…
Reference in new issue