Browse Source

Merge pull request #7348 from thingsboard/feature/math-rule-node

[3.4.2] Math rule node
pull/7355/head
Andrew Shvayka 4 years ago
committed by GitHub
parent
commit
cd6e50312c
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 54
      application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
  2. 2
      common/dao-api/src/main/java/org/thingsboard/server/dao/attributes/AttributesService.java
  3. 3
      common/dao-api/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java
  4. 10
      common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java
  5. 7
      dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesService.java
  6. 22
      dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java
  7. 53
      dao/src/main/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDao.java
  8. 18
      dao/src/main/java/org/thingsboard/server/dao/timeseries/AbstractCassandraBaseTimeseriesDao.java
  9. 7
      dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java
  10. 11
      dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesLatestDao.java
  11. 11
      dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesLatestDao.java
  12. 6
      pom.xml
  13. 11
      rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java
  14. 6
      rule-engine/rule-engine-components/pom.xml
  15. 41
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/math/TbMathArgument.java
  16. 22
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/math/TbMathArgumentType.java
  17. 108
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/math/TbMathArgumentValue.java
  18. 397
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/math/TbMathNode.java
  19. 40
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/math/TbMathNodeConfiguration.java
  20. 35
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/math/TbMathResult.java
  21. 51
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/math/TbRuleNodeMathFunctionType.java
  22. 2
      rule-engine/rule-engine-components/src/main/resources/public/static/rulenode/rulenode-core-config.js
  23. 339
      rule-engine/rule-engine-components/src/test/java/math/TbMathNodeTest.java
  24. 1
      ui-ngx/src/app/shared/models/constants.ts
  25. 1
      ui-ngx/src/app/shared/models/rule-node.models.ts

54
application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java

@ -19,6 +19,7 @@ import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
@ -115,6 +116,13 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
super.shutdownExecutor();
}
@Override
public ListenableFuture<Void> saveAndNotify(TenantId tenantId, EntityId entityId, TsKvEntry ts) {
SettableFuture<Void> future = SettableFuture.create();
saveAndNotify(tenantId, entityId, Collections.singletonList(ts), new VoidFutureCallback(future));
return future;
}
@Override
public void saveAndNotify(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts, FutureCallback<Void> callback) {
saveAndNotify(tenantId, null, entityId, ts, 0L, callback);
@ -332,6 +340,34 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
, System.currentTimeMillis())), callback);
}
@Override
public ListenableFuture<Void> saveAttrAndNotify(TenantId tenantId, EntityId entityId, String scope, String key, long value) {
SettableFuture<Void> future = SettableFuture.create();
saveAttrAndNotify(tenantId, entityId, scope, key, value, new VoidFutureCallback(future));
return future;
}
@Override
public ListenableFuture<Void> saveAttrAndNotify(TenantId tenantId, EntityId entityId, String scope, String key, String value) {
SettableFuture<Void> future = SettableFuture.create();
saveAttrAndNotify(tenantId, entityId, scope, key, value, new VoidFutureCallback(future));
return future;
}
@Override
public ListenableFuture<Void> saveAttrAndNotify(TenantId tenantId, EntityId entityId, String scope, String key, double value) {
SettableFuture<Void> future = SettableFuture.create();
saveAttrAndNotify(tenantId, entityId, scope, key, value, new VoidFutureCallback(future));
return future;
}
@Override
public ListenableFuture<Void> saveAttrAndNotify(TenantId tenantId, EntityId entityId, String scope, String key, boolean value) {
SettableFuture<Void> future = SettableFuture.create();
saveAttrAndNotify(tenantId, entityId, scope, key, value, new VoidFutureCallback(future));
return future;
}
private void onAttributesUpdate(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, boolean notifyDevice) {
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId);
if (currentPartitions.contains(tpi)) {
@ -436,4 +472,22 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
}
}
private static class VoidFutureCallback implements FutureCallback<Void> {
private final SettableFuture<Void> future;
public VoidFutureCallback(SettableFuture<Void> future) {
this.future = future;
}
@Override
public void onSuccess(Void result) {
future.set(null);
}
@Override
public void onFailure(Throwable t) {
future.setException(t);
}
}
}

2
common/dao-api/src/main/java/org/thingsboard/server/dao/attributes/AttributesService.java

@ -39,6 +39,8 @@ public interface AttributesService {
ListenableFuture<List<String>> save(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes);
ListenableFuture<String> save(TenantId tenantId, EntityId entityId, String scope, AttributeKvEntry attribute);
ListenableFuture<List<String>> removeAll(TenantId tenantId, EntityId entityId, String scope, List<String> attributeKeys);
List<String> findAllKeysByDeviceProfileId(TenantId tenantId, DeviceProfileId deviceProfileId);

3
common/dao-api/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java

@ -27,6 +27,7 @@ import org.thingsboard.server.common.data.kv.TsKvEntry;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
/**
* @author Andrew Shvayka
@ -37,6 +38,8 @@ public interface TimeseriesService {
ListenableFuture<List<TsKvEntry>> findAll(TenantId tenantId, EntityId entityId, List<ReadTsKvQuery> queries);
ListenableFuture<Optional<TsKvEntry>> findLatest(TenantId tenantId, EntityId entityId, String keys);
ListenableFuture<List<TsKvEntry>> findLatest(TenantId tenantId, EntityId entityId, Collection<String> keys);
ListenableFuture<List<TsKvEntry>> findAllLatest(TenantId tenantId, EntityId entityId);

10
common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java

@ -122,6 +122,16 @@ public final class TbMsg implements Serializable {
data, tbMsg.ruleChainId, tbMsg.ruleNodeId, tbMsg.ctx.copy(), tbMsg.callback);
}
public static TbMsg transformMsgData(TbMsg tbMsg, String data) {
return new TbMsg(tbMsg.queueName, tbMsg.id, tbMsg.ts, tbMsg.type, tbMsg.originator, tbMsg.customerId, tbMsg.metaData, tbMsg.dataType,
data, tbMsg.ruleChainId, tbMsg.ruleNodeId, tbMsg.ctx.copy(), tbMsg.getCallback());
}
public static TbMsg transformMsg(TbMsg tbMsg, TbMsgMetaData metadata) {
return new TbMsg(tbMsg.queueName, tbMsg.id, tbMsg.ts, tbMsg.type, tbMsg.originator, tbMsg.customerId, metadata.copy(), tbMsg.dataType,
tbMsg.data, tbMsg.ruleChainId, tbMsg.ruleNodeId, tbMsg.ctx.copy(), tbMsg.getCallback());
}
public static TbMsg transformMsg(TbMsg tbMsg, CustomerId customerId) {
return new TbMsg(tbMsg.queueName, tbMsg.id, tbMsg.ts, tbMsg.type, tbMsg.originator, customerId, tbMsg.metaData, tbMsg.dataType,
tbMsg.data, tbMsg.ruleChainId, tbMsg.ruleNodeId, tbMsg.ctx.copy(), tbMsg.getCallback());

7
dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesService.java

@ -79,6 +79,13 @@ public class BaseAttributesService implements AttributesService {
return attributesDao.findAllKeysByEntityIds(tenantId, entityType, entityIds);
}
@Override
public ListenableFuture<String> save(TenantId tenantId, EntityId entityId, String scope, AttributeKvEntry attribute) {
validate(entityId, scope);
AttributeUtils.validate(attribute);
return attributesDao.save(tenantId, entityId, scope, attribute);
}
@Override
public ListenableFuture<List<String>> save(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes) {
validate(entityId, scope);

22
dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java

@ -205,6 +205,14 @@ public class CachedAttributesService implements AttributesService {
return attributesDao.findAllKeysByEntityIds(tenantId, entityType, entityIds);
}
@Override
public ListenableFuture<String> save(TenantId tenantId, EntityId entityId, String scope, AttributeKvEntry attribute) {
validate(entityId, scope);
AttributeUtils.validate(attribute);
ListenableFuture<String> future = attributesDao.save(tenantId, entityId, scope, attribute);
return Futures.transform(future, key -> evict(entityId, scope, attribute, key), cacheExecutor);
}
@Override
public ListenableFuture<List<String>> save(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes) {
validate(entityId, scope);
@ -213,17 +221,19 @@ public class CachedAttributesService implements AttributesService {
List<ListenableFuture<String>> futures = new ArrayList<>(attributes.size());
for (var attribute : attributes) {
ListenableFuture<String> future = attributesDao.save(tenantId, entityId, scope, attribute);
futures.add(Futures.transform(future, key -> {
log.trace("[{}][{}][{}] Before cache evict: {}", entityId, scope, key, attribute);
cache.evictOrPut(new AttributeCacheKey(scope, entityId, key), attribute);
log.trace("[{}][{}][{}] after cache evict.", entityId, scope, key);
return key;
}, cacheExecutor));
futures.add(Futures.transform(future, key -> evict(entityId, scope, attribute, key), cacheExecutor));
}
return Futures.allAsList(futures);
}
private String evict(EntityId entityId, String scope, AttributeKvEntry attribute, String key) {
log.trace("[{}][{}][{}] Before cache evict: {}", entityId, scope, key, attribute);
cache.evictOrPut(new AttributeCacheKey(scope, entityId, key), attribute);
log.trace("[{}][{}][{}] after cache evict.", entityId, scope, key);
return key;
}
@Override
public ListenableFuture<List<String>> removeAll(TenantId tenantId, EntityId entityId, String scope, List<String> attributeKeys) {
validate(entityId, scope);

53
dao/src/main/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDao.java

@ -149,9 +149,18 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme
return getRemoveLatestFuture(tenantId, entityId, query);
}
@Override
public ListenableFuture<Optional<TsKvEntry>> findLatestOpt(TenantId tenantId, EntityId entityId, String key) {
return Futures.immediateFuture(Optional.ofNullable(doFindLatest(entityId, key)));
}
@Override
public ListenableFuture<TsKvEntry> findLatest(TenantId tenantId, EntityId entityId, String key) {
return getFindLatestFuture(entityId, key);
TsKvEntry latest = doFindLatest(entityId, key);
if (latest == null) {
latest = new BasicTsKvEntry(System.currentTimeMillis(), new StringDataEntry(key, null));
}
return Futures.immediateFuture(latest);
}
@Override
@ -195,43 +204,41 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme
ReadTsKvQueryResult::getData, MoreExecutors.directExecutor());
}
protected ListenableFuture<TsKvEntry> getFindLatestFuture(EntityId entityId, String key) {
protected TsKvEntry doFindLatest(EntityId entityId, String key) {
TsKvLatestCompositeKey compositeKey =
new TsKvLatestCompositeKey(
entityId.getId(),
getOrSaveKeyId(key));
Optional<TsKvLatestEntity> entry = tsKvLatestRepository.findById(compositeKey);
TsKvEntry result;
if (entry.isPresent()) {
TsKvLatestEntity tsKvLatestEntity = entry.get();
tsKvLatestEntity.setStrKey(key);
result = DaoUtil.getData(tsKvLatestEntity);
return DaoUtil.getData(tsKvLatestEntity);
} else {
result = new BasicTsKvEntry(System.currentTimeMillis(), new StringDataEntry(key, null));
return null;
}
return Futures.immediateFuture(result);
}
protected ListenableFuture<TsKvLatestRemovingResult> getRemoveLatestFuture(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {
ListenableFuture<TsKvEntry> latestFuture = getFindLatestFuture(entityId, query.getKey());
TsKvEntry latest = doFindLatest(entityId, query.getKey());
ListenableFuture<Boolean> booleanFuture = Futures.transform(latestFuture, tsKvEntry -> {
long ts = tsKvEntry.getTs();
return ts > query.getStartTs() && ts <= query.getEndTs();
}, service);
if (latest == null) {
return Futures.immediateFuture(new TsKvLatestRemovingResult(query.getKey(), false));
}
ListenableFuture<Boolean> removedLatestFuture = Futures.transformAsync(booleanFuture, isRemove -> {
if (isRemove) {
TsKvLatestEntity latestEntity = new TsKvLatestEntity();
latestEntity.setEntityId(entityId.getId());
latestEntity.setKey(getOrSaveKeyId(query.getKey()));
return service.submit(() -> {
tsKvLatestRepository.delete(latestEntity);
return true;
});
}
return Futures.immediateFuture(false);
}, service);
long ts = latest.getTs();
ListenableFuture<Boolean> removedLatestFuture;
if (ts > query.getStartTs() && ts <= query.getEndTs()) {
TsKvLatestEntity latestEntity = new TsKvLatestEntity();
latestEntity.setEntityId(entityId.getId());
latestEntity.setKey(getOrSaveKeyId(query.getKey()));
removedLatestFuture = service.submit(() -> {
tsKvLatestRepository.delete(latestEntity);
return true;
});
} else {
removedLatestFuture = Futures.immediateFuture(false);
}
return Futures.transformAsync(removedLatestFuture, isRemoved -> {
if (isRemoved && query.getRewriteLatestIfDeleted()) {

18
dao/src/main/java/org/thingsboard/server/dao/timeseries/AbstractCassandraBaseTimeseriesDao.java

@ -88,14 +88,26 @@ public abstract class AbstractCassandraBaseTimeseriesDao extends CassandraAbstra
protected TsKvEntry convertResultToTsKvEntry(String key, Row row) {
if (row != null) {
Optional<String> foundKeyOpt = getKey(row);
long ts = row.getLong(ModelConstants.TS_COLUMN);
return new BasicTsKvEntry(ts, toKvEntry(row, foundKeyOpt.orElse(key)));
return getBasicTsKvEntry(key, row);
} else {
return new BasicTsKvEntry(System.currentTimeMillis(), new StringDataEntry(key, null));
}
}
protected Optional<TsKvEntry> convertResultToTsKvEntryOpt(String key, Row row) {
if (row != null) {
return Optional.of(getBasicTsKvEntry(key, row));
} else {
return Optional.empty();
}
}
private BasicTsKvEntry getBasicTsKvEntry(String key, Row row) {
Optional<String> foundKeyOpt = getKey(row);
long ts = row.getLong(ModelConstants.TS_COLUMN);
return new BasicTsKvEntry(ts, toKvEntry(row, foundKeyOpt.orElse(key)));
}
private Optional<String> getKey(Row row){
try{
return Optional.ofNullable(row.getString(ModelConstants.KEY_COLUMN));

7
dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java

@ -47,6 +47,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import static org.thingsboard.server.common.data.StringUtils.isBlank;
@ -117,6 +118,12 @@ public class BaseTimeseriesService implements TimeseriesService {
}, MoreExecutors.directExecutor());
}
@Override
public ListenableFuture<Optional<TsKvEntry>> findLatest(TenantId tenantId, EntityId entityId, String key) {
validate(entityId);
return timeseriesLatestDao.findLatestOpt(tenantId, entityId, key);
}
@Override
public ListenableFuture<List<TsKvEntry>> findLatest(TenantId tenantId, EntityId entityId, Collection<String> keys) {
validate(entityId);

11
dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesLatestDao.java

@ -59,15 +59,24 @@ public class CassandraBaseTimeseriesLatestDao extends AbstractCassandraBaseTimes
private PreparedStatement findLatestStmt;
private PreparedStatement findAllLatestStmt;
@Override
public ListenableFuture<Optional<TsKvEntry>> findLatestOpt(TenantId tenantId, EntityId entityId, String key) {
return findLatest(tenantId, entityId, key, rs -> convertResultToTsKvEntryOpt(key, rs.one()));
}
@Override
public ListenableFuture<TsKvEntry> findLatest(TenantId tenantId, EntityId entityId, String key) {
return findLatest(tenantId, entityId, key, rs -> convertResultToTsKvEntry(key, rs.one()));
}
private <T> ListenableFuture<T> findLatest(TenantId tenantId, EntityId entityId, String key, java.util.function.Function<TbResultSet, T> function) {
BoundStatementBuilder stmtBuilder = new BoundStatementBuilder(getFindLatestStmt().bind());
stmtBuilder.setString(0, entityId.getEntityType().name());
stmtBuilder.setUuid(1, entityId.getId());
stmtBuilder.setString(2, key);
BoundStatement stmt = stmtBuilder.build();
log.debug(GENERATED_QUERY_FOR_ENTITY_TYPE_AND_ENTITY_ID, stmt, entityId.getEntityType(), entityId.getId());
return getFuture(executeAsyncRead(tenantId, stmt), rs -> convertResultToTsKvEntry(key, rs.one()));
return getFuture(executeAsyncRead(tenantId, stmt), function);
}
@Override

11
dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesLatestDao.java

@ -24,9 +24,20 @@ import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.kv.TsKvLatestRemovingResult;
import java.util.List;
import java.util.Optional;
public interface TimeseriesLatestDao {
/**
* Optional TsKvEntry if the value is present in the DB
*
*/
ListenableFuture<Optional<TsKvEntry>> findLatestOpt(TenantId tenantId, EntityId entityId, String key);
/**
* Returns new BasicTsKvEntry(System.currentTimeMillis(), new StringDataEntry(key, null)) if the value is NOT present in the DB
*
*/
ListenableFuture<TsKvEntry> findLatest(TenantId tenantId, EntityId entityId, String key);
ListenableFuture<List<TsKvEntry>> findAllLatest(TenantId tenantId, EntityId entityId);

6
pom.xml

@ -137,6 +137,7 @@
<zeroturnaround.version>1.12</zeroturnaround.version>
<opensmpp.version>3.0.0</opensmpp.version>
<jgit.version>6.1.0.202203080745-r</jgit.version>
<exp4j.version>0.4.8</exp4j.version>
<aerogear-otp.version>1.0.0</aerogear-otp.version>
</properties>
@ -1910,6 +1911,11 @@
<artifactId>org.eclipse.jgit.ssh.apache</artifactId>
<version>${jgit.version}</version>
</dependency>
<dependency>
<groupId>net.objecthunter</groupId>
<artifactId>exp4j</artifactId>
<version>${exp4j.version}</version>
</dependency>
</dependencies>
</dependencyManagement>

11
rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java

@ -16,6 +16,7 @@
package org.thingsboard.rule.engine.api;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.ListenableFuture;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
@ -31,6 +32,8 @@ import java.util.List;
*/
public interface RuleEngineTelemetryService {
ListenableFuture<Void> saveAndNotify(TenantId tenantId, EntityId entityId, TsKvEntry ts);
void saveAndNotify(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts, FutureCallback<Void> callback);
void saveAndNotify(TenantId tenantId, CustomerId id, EntityId entityId, List<TsKvEntry> ts, long ttl, FutureCallback<Void> callback);
@ -43,6 +46,14 @@ public interface RuleEngineTelemetryService {
void saveLatestAndNotify(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts, FutureCallback<Void> callback);
ListenableFuture<Void> saveAttrAndNotify(TenantId tenantId, EntityId entityId, String scope, String key, long value);
ListenableFuture<Void> saveAttrAndNotify(TenantId tenantId, EntityId entityId, String scope, String key, String value);
ListenableFuture<Void> saveAttrAndNotify(TenantId tenantId, EntityId entityId, String scope, String key, double value);
ListenableFuture<Void> saveAttrAndNotify(TenantId tenantId, EntityId entityId, String scope, String key, boolean value);
void saveAttrAndNotify(TenantId tenantId, EntityId entityId, String scope, String key, long value, FutureCallback<Void> callback);
void saveAttrAndNotify(TenantId tenantId, EntityId entityId, String scope, String key, String value, FutureCallback<Void> callback);

6
rule-engine/rule-engine-components/pom.xml

@ -121,6 +121,10 @@
<artifactId>javax.mail</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>net.objecthunter</groupId>
<artifactId>exp4j</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
@ -139,10 +143,12 @@
<dependency>
<groupId>org.mock-server</groupId>
<artifactId>mockserver-netty</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mock-server</groupId>
<artifactId>mockserver-client-java</artifactId>
<scope>test</scope>
</dependency>
<dependency>

41
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/math/TbMathArgument.java

@ -0,0 +1,41 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.math;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class TbMathArgument {
private String name;
private TbMathArgumentType type;
private String key;
private String attributeScope;
private Double defaultValue;
public TbMathArgument(TbMathArgumentType type, String key) {
this(key, type, key, null, null);
}
public TbMathArgument(String name, TbMathArgumentType type, String key) {
this(name, type, key, null, null);
}
}

22
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/math/TbMathArgumentType.java

@ -0,0 +1,22 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.math;
public enum TbMathArgumentType {
ATTRIBUTE, TIME_SERIES, MESSAGE_BODY, MESSAGE_METADATA, CONSTANT;
}

108
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/math/TbMathArgumentValue.java

@ -0,0 +1,108 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.math;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import lombok.Getter;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import java.util.Optional;
public class TbMathArgumentValue {
@Getter
private final double value;
private TbMathArgumentValue(double value) {
this.value = value;
}
public static TbMathArgumentValue constant(TbMathArgument arg) {
return fromString(arg.getKey());
}
private static TbMathArgumentValue defaultOrThrow(Double defaultValue, String error) {
if (defaultValue != null) {
return new TbMathArgumentValue(defaultValue);
}
throw new RuntimeException(error);
}
public static TbMathArgumentValue fromMessageBody(TbMathArgument arg, Optional<ObjectNode> jsonNodeOpt) {
String key = arg.getKey();
Double defaultValue = arg.getDefaultValue();
if (jsonNodeOpt.isEmpty()) {
return defaultOrThrow(defaultValue, "Message body is empty!");
}
var json = jsonNodeOpt.get();
if (!json.has(key)) {
return defaultOrThrow(defaultValue, "Message body has no '" + key + "'!");
}
JsonNode valueNode = json.get(key);
if (valueNode.isNull()) {
return defaultOrThrow(defaultValue, "Message body has null '" + key + "'!");
}
double value;
if (valueNode.isNumber()) {
value = valueNode.doubleValue();
} else if (valueNode.isTextual()) {
var valueNodeText = valueNode.asText();
if (StringUtils.isNotBlank(valueNodeText)) {
try {
value = Double.parseDouble(valueNode.asText());
} catch (NumberFormatException ne) {
throw new RuntimeException("Can't convert value '" + valueNode.asText() + "' to double!");
}
} else {
return defaultOrThrow(defaultValue, "Message value is empty for '" + key + "'!");
}
} else {
throw new RuntimeException("Can't convert value '" + valueNode.toString() + "' to double!");
}
return new TbMathArgumentValue(value);
}
public static TbMathArgumentValue fromMessageMetadata(TbMathArgument arg, TbMsgMetaData metaData) {
String key = arg.getKey();
Double defaultValue = arg.getDefaultValue();
if (metaData == null) {
return defaultOrThrow(defaultValue, "Message metadata is empty!");
}
var value = metaData.getValue(key);
if (StringUtils.isEmpty(value)) {
return defaultOrThrow(defaultValue, "Message metadata has no '" + key + "'!");
}
return fromString(value);
}
public static TbMathArgumentValue fromLong(long value) {
return new TbMathArgumentValue(value);
}
public static TbMathArgumentValue fromDouble(double value) {
return new TbMathArgumentValue(value);
}
public static TbMathArgumentValue fromString(String value) {
try {
return new TbMathArgumentValue(Double.parseDouble(value));
} catch (NumberFormatException ne) {
throw new RuntimeException("Can't convert value '" + value + "' to double!");
}
}
}

397
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/math/TbMathNode.java

@ -0,0 +1,397 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.math;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import net.objecthunter.exp4j.Expression;
import net.objecthunter.exp4j.ExpressionBuilder;
import org.springframework.util.ConcurrentReferenceHashMap;
import org.thingsboard.common.util.DonAsynchron;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.DoubleDataEntry;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.msg.TbMsg;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
@SuppressWarnings("UnstableApiUsage")
@Slf4j
@RuleNode(
type = ComponentType.ACTION,
name = "math function",
configClazz = TbMathNodeConfiguration.class,
nodeDescription = "Apply math function and save the result into the message and/or database",
nodeDetails = "Supports math operations like: ADD, SUB, MULT, DIV, etc and functions: SIN, COS, TAN, SEC, etc. " +
"Use 'CUSTOM' operation to specify complex math expressions." +
"<br/><br/>" +
"You may use constant, message field, metadata field, attribute, and latest time-series as an arguments values. " +
"The result of the function may be also stored to message field, metadata field, attribute or time-series value." +
"<br/><br/>" +
"Primary use case for this rule node is to take one or more values from the database and modify them based on data from the message. " +
"For example, you may increase `totalWaterConsumption` based on the `deltaWaterConsumption` reported by device." +
"<br/><br/>" +
"Alternative use case is the replacement of simple JS `script` nodes with more light-weight and performant implementation. " +
"For example, you may transform Fahrenheit to Celsius (C = (F - 32) / 1.8) using CUSTOM operation and expression: (x - 32) / 1.8)." +
"<br/><br/>" +
"The execution is synchronized in scope of message originator (e.g. device) and server node. " +
"If you have rule nodes in different rule chains, they will process messages from the same originator synchronously in the scope of the server node.",
uiResources = {"static/rulenode/rulenode-core-config.js"},
configDirective = "tbActionNodeMathFunctionConfig",
icon = "calculate"
)
public class TbMathNode implements TbNode {
private static final ConcurrentMap<EntityId, Semaphore> semaphores = new ConcurrentReferenceHashMap<>();
private final ThreadLocal<Expression> customExpression = new ThreadLocal<>();
private TbMathNodeConfiguration config;
private boolean msgBodyToJsonConversionRequired;
@Override
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
this.config = TbNodeUtils.convert(configuration, TbMathNodeConfiguration.class);
var operation = config.getOperation();
var argsCount = config.getArguments().size();
if (argsCount < operation.getMinArgs() || argsCount > operation.getMaxArgs()) {
throw new RuntimeException("Args count: " + argsCount + " does not match operation: " + operation.name());
}
if (TbRuleNodeMathFunctionType.CUSTOM.equals(operation)) {
if (StringUtils.isBlank(config.getCustomFunction())) {
throw new RuntimeException("Custom function is blank!");
} else if (config.getCustomFunction().length() > 256) {
throw new RuntimeException("Custom function is too complex (length > 256)!");
}
}
msgBodyToJsonConversionRequired = config.getArguments().stream().anyMatch(arg -> TbMathArgumentType.MESSAGE_BODY.equals(arg.getType()));
msgBodyToJsonConversionRequired = msgBodyToJsonConversionRequired || TbMathArgumentType.MESSAGE_BODY.equals(config.getResult().getType());
}
@Override
public void onMsg(TbContext ctx, TbMsg msg) {
var originator = msg.getOriginator();
var originatorSemaphore = semaphores.computeIfAbsent(originator, tmp -> new Semaphore(1, true));
boolean acquired = tryAcquire(originator, originatorSemaphore);
if (!acquired) {
ctx.tellFailure(msg, new RuntimeException("Failed to process message for originator synchronously"));
return;
}
try {
var arguments = config.getArguments();
Optional<ObjectNode> msgBodyOpt = convertMsgBodyIfRequired(msg);
var argumentValues = Futures.allAsList(arguments.stream()
.map(arg -> resolveArguments(ctx, msg, msgBodyOpt, arg)).collect(Collectors.toList()));
ListenableFuture<TbMsg> resultMsgFuture = Futures.transformAsync(argumentValues, args ->
updateMsgAndDb(ctx, msg, msgBodyOpt, calculateResult(ctx, msg, args)), ctx.getDbCallbackExecutor());
DonAsynchron.withCallback(resultMsgFuture, resultMsg -> {
try {
ctx.tellSuccess(resultMsg);
} finally {
originatorSemaphore.release();
}
}, t -> {
try {
ctx.tellFailure(msg, t);
} finally {
originatorSemaphore.release();
}
}, ctx.getDbCallbackExecutor());
} catch (Throwable e) {
originatorSemaphore.release();
log.warn("[{}] Failed to process message: {}", originator, msg, e);
throw e;
}
}
private boolean tryAcquire(EntityId originator, Semaphore originatorSemaphore) {
boolean acquired;
try {
acquired = originatorSemaphore.tryAcquire(20, TimeUnit.SECONDS);
} catch (InterruptedException e) {
acquired = false;
log.debug("[{}] Failed to acquire semaphore", originator, e);
}
return acquired;
}
private ListenableFuture<TbMsg> updateMsgAndDb(TbContext ctx, TbMsg msg, Optional<ObjectNode> msgBodyOpt, double result) {
TbMathResult mathResultDef = config.getResult();
switch (mathResultDef.getType()) {
case MESSAGE_BODY:
return Futures.immediateFuture(addToBody(msg, mathResultDef, msgBodyOpt, result));
case MESSAGE_METADATA:
return Futures.immediateFuture(addToMeta(msg, mathResultDef, result));
case ATTRIBUTE:
ListenableFuture<Void> attrSave = saveAttribute(ctx, msg, result, mathResultDef);
return Futures.transform(attrSave, attr -> addToBodyAndMeta(msg, msgBodyOpt, result, mathResultDef), ctx.getDbCallbackExecutor());
case TIME_SERIES:
ListenableFuture<Void> tsSave = saveTimeSeries(ctx, msg, result, mathResultDef);
return Futures.transform(tsSave, ts -> addToBodyAndMeta(msg, msgBodyOpt, result, mathResultDef), ctx.getDbCallbackExecutor());
default:
throw new RuntimeException("Result type is not supported: " + mathResultDef.getType() + "!");
}
}
private ListenableFuture<Void> saveTimeSeries(TbContext ctx, TbMsg msg, double result, TbMathResult mathResultDef) {
return ctx.getTelemetryService().saveAndNotify(ctx.getTenantId(), msg.getOriginator(),
new BasicTsKvEntry(System.currentTimeMillis(), new DoubleDataEntry(mathResultDef.getKey(), result)));
}
private ListenableFuture<Void> saveAttribute(TbContext ctx, TbMsg msg, double result, TbMathResult mathResultDef) {
String attributeScope = getAttributeScope(mathResultDef.getAttributeScope());
if (isIntegerResult(mathResultDef, config.getOperation())) {
var value = toIntValue(mathResultDef, result);
return ctx.getTelemetryService().saveAttrAndNotify(
ctx.getTenantId(), msg.getOriginator(), attributeScope, mathResultDef.getKey(), value);
} else {
var value = toDoubleValue(mathResultDef, result);
return ctx.getTelemetryService().saveAttrAndNotify(
ctx.getTenantId(), msg.getOriginator(), attributeScope, mathResultDef.getKey(), value);
}
}
private boolean isIntegerResult(TbMathResult mathResultDef, TbRuleNodeMathFunctionType function) {
return function.isIntegerResult() || mathResultDef.getResultValuePrecision() == 0;
}
private long toIntValue(TbMathResult mathResultDef, double value) {
return (long) value;
}
private double toDoubleValue(TbMathResult mathResultDef, double value) {
return BigDecimal.valueOf(value).setScale(mathResultDef.getResultValuePrecision(), RoundingMode.HALF_UP).doubleValue();
}
private Optional<ObjectNode> convertMsgBodyIfRequired(TbMsg msg) {
Optional<ObjectNode> msgBodyOpt;
if (msgBodyToJsonConversionRequired) {
var jsonNode = JacksonUtil.toJsonNode(msg.getData());
if (jsonNode.isObject()) {
msgBodyOpt = Optional.of((ObjectNode) jsonNode);
} else {
throw new RuntimeException("Message body is not a JSON object!");
}
} else {
msgBodyOpt = Optional.empty();
}
return msgBodyOpt;
}
private TbMsg addToBodyAndMeta(TbMsg msg, Optional<ObjectNode> msgBodyOpt, double result, TbMathResult mathResultDef) {
TbMsg tmpMsg = msg;
if (mathResultDef.isAddToBody()) {
tmpMsg = addToBody(msg, mathResultDef, msgBodyOpt, result);
}
if (mathResultDef.isAddToMetadata()) {
tmpMsg = addToMeta(msg, mathResultDef, result);
}
return tmpMsg;
}
private TbMsg addToBody(TbMsg msg, TbMathResult mathResultDef, Optional<ObjectNode> msgBodyOpt, double result) {
ObjectNode body = msgBodyOpt.get();
if (isIntegerResult(mathResultDef, config.getOperation())) {
body.put(mathResultDef.getKey(), toIntValue(mathResultDef, result));
} else {
body.put(mathResultDef.getKey(), toDoubleValue(mathResultDef, result));
}
return TbMsg.transformMsgData(msg, JacksonUtil.toString(body));
}
private TbMsg addToMeta(TbMsg msg, TbMathResult mathResultDef, double result) {
var md = msg.getMetaData();
if (isIntegerResult(mathResultDef, config.getOperation())) {
md.putValue(mathResultDef.getKey(), Long.toString(toIntValue(mathResultDef, result)));
} else {
md.putValue(mathResultDef.getKey(), Double.toString(toDoubleValue(mathResultDef, result)));
}
return TbMsg.transformMsg(msg, md);
}
private double calculateResult(TbContext ctx, TbMsg msg, List<TbMathArgumentValue> args) {
switch (config.getOperation()) {
case ADD:
return apply(args.get(0), args.get(1), Double::sum);
case SUB:
return apply(args.get(0), args.get(1), (a, b) -> a - b);
case MULT:
return apply(args.get(0), args.get(1), (a, b) -> a * b);
case DIV:
return apply(args.get(0), args.get(1), (a, b) -> a / b);
case SIN:
return apply(args.get(0), Math::sin);
case SINH:
return apply(args.get(0), Math::sinh);
case COS:
return apply(args.get(0), Math::cos);
case COSH:
return apply(args.get(0), Math::cosh);
case TAN:
return apply(args.get(0), Math::tan);
case TANH:
return apply(args.get(0), Math::tanh);
case ACOS:
return apply(args.get(0), Math::acos);
case ASIN:
return apply(args.get(0), Math::asin);
case ATAN:
return apply(args.get(0), Math::atan);
case ATAN2:
return apply(args.get(0), args.get(1), Math::atan2);
case EXP:
return apply(args.get(0), Math::exp);
case EXPM1:
return apply(args.get(0), Math::expm1);
case SQRT:
return apply(args.get(0), Math::sqrt);
case CBRT:
return apply(args.get(0), Math::cbrt);
case GET_EXP:
return apply(args.get(0), (x) -> (double) Math.getExponent(x));
case HYPOT:
return apply(args.get(0), args.get(1), Math::hypot);
case LOG:
return apply(args.get(0), Math::log);
case LOG10:
return apply(args.get(0), Math::log10);
case LOG1P:
return apply(args.get(0), Math::log1p);
case CEIL:
return apply(args.get(0), Math::ceil);
case FLOOR:
return apply(args.get(0), Math::floor);
case FLOOR_DIV:
return apply(args.get(0), args.get(1), (a, b) -> (double) Math.floorDiv(a.longValue(), b.longValue()));
case FLOOR_MOD:
return apply(args.get(0), args.get(1), (a, b) -> (double) Math.floorMod(a.longValue(), b.longValue()));
case ABS:
return apply(args.get(0), Math::abs);
case MIN:
return apply(args.get(0), args.get(1), Math::min);
case MAX:
return apply(args.get(0), args.get(1), Math::max);
case POW:
return apply(args.get(0), args.get(1), Math::pow);
case SIGNUM:
return apply(args.get(0), Math::signum);
case RAD:
return apply(args.get(0), Math::toRadians);
case DEG:
return apply(args.get(0), Math::toDegrees);
case CUSTOM:
var expr = customExpression.get();
if (expr == null) {
expr = new ExpressionBuilder(config.getCustomFunction())
.implicitMultiplication(true)
.variables(config.getArguments().stream().map(TbMathArgument::getName).collect(Collectors.toSet()))
.build();
customExpression.set(expr);
}
for (int i = 0; i < config.getArguments().size(); i++) {
expr.setVariable(config.getArguments().get(i).getName(), args.get(i).getValue());
}
return expr.evaluate();
default:
throw new RuntimeException("Not supported operation: " + config.getOperation());
}
}
private double apply(TbMathArgumentValue arg, Function<Double, Double> function) {
return function.apply(arg.getValue());
}
private double apply(TbMathArgumentValue arg1, TbMathArgumentValue arg2, BiFunction<Double, Double, Double> function) {
return function.apply(arg1.getValue(), arg2.getValue());
}
private ListenableFuture<TbMathArgumentValue> resolveArguments(TbContext ctx, TbMsg msg, Optional<ObjectNode> msgBodyOpt, TbMathArgument arg) {
switch (arg.getType()) {
case CONSTANT:
return Futures.immediateFuture(TbMathArgumentValue.constant(arg));
case MESSAGE_BODY:
return Futures.immediateFuture(TbMathArgumentValue.fromMessageBody(arg, msgBodyOpt));
case MESSAGE_METADATA:
return Futures.immediateFuture(TbMathArgumentValue.fromMessageMetadata(arg, msg.getMetaData()));
case ATTRIBUTE:
String scope = getAttributeScope(arg.getAttributeScope());
return Futures.transform(ctx.getAttributesService().find(ctx.getTenantId(), msg.getOriginator(), scope, arg.getKey()),
opt -> getTbMathArgumentValue(arg, opt, "Attribute: " + arg.getKey() + " with scope: " + scope + " not found for entity: " + msg.getOriginator())
, MoreExecutors.directExecutor());
case TIME_SERIES:
return Futures.transform(ctx.getTimeseriesService().findLatest(ctx.getTenantId(), msg.getOriginator(), arg.getKey()),
opt -> getTbMathArgumentValue(arg, opt, "Time-series: " + arg.getKey() + " not found for entity: " + msg.getOriginator())
, MoreExecutors.directExecutor());
default:
throw new RuntimeException("Unsupported argument type: " + arg.getType() + "!");
}
}
private String getAttributeScope(String attrScope) {
return StringUtils.isEmpty(attrScope) ? DataConstants.SERVER_SCOPE : attrScope;
}
private TbMathArgumentValue getTbMathArgumentValue(TbMathArgument arg, Optional<? extends KvEntry> kvOpt, String error) {
if (kvOpt != null && kvOpt.isPresent()) {
var kv = kvOpt.get();
switch (kv.getDataType()) {
case LONG:
return TbMathArgumentValue.fromLong(kv.getLongValue().get());
case DOUBLE:
return TbMathArgumentValue.fromDouble(kv.getDoubleValue().get());
default:
return TbMathArgumentValue.fromString(kv.getValueAsString());
}
} else {
if (arg.getDefaultValue() != null) {
return TbMathArgumentValue.fromDouble(arg.getDefaultValue());
} else {
throw new RuntimeException(error);
}
}
}
@Override
public void destroy() {
}
}

40
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/math/TbMathNodeConfiguration.java

@ -0,0 +1,40 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.math;
import lombok.Data;
import org.thingsboard.rule.engine.api.NodeConfiguration;
import java.util.Arrays;
import java.util.List;
@Data
public class TbMathNodeConfiguration implements NodeConfiguration<TbMathNodeConfiguration> {
private TbRuleNodeMathFunctionType operation;
private List<TbMathArgument> arguments;
private String customFunction;
private TbMathResult result;
@Override
public TbMathNodeConfiguration defaultConfiguration() {
TbMathNodeConfiguration configuration = new TbMathNodeConfiguration();
configuration.setOperation(TbRuleNodeMathFunctionType.ADD);
configuration.setArguments(Arrays.asList(new TbMathArgument("x", TbMathArgumentType.CONSTANT, "2"), new TbMathArgument("y", TbMathArgumentType.CONSTANT, "2")));
configuration.setResult(new TbMathResult(TbMathArgumentType.MESSAGE_BODY, "result", 2, false, false, null));
return configuration;
}
}

35
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/math/TbMathResult.java

@ -0,0 +1,35 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.math;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class TbMathResult {
private TbMathArgumentType type;
private String key;
// 0 means integer, x > 0 means x decimal points after ".";
private int resultValuePrecision;
private boolean addToBody;
private boolean addToMetadata;
private String attributeScope;
}

51
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/math/TbRuleNodeMathFunctionType.java

@ -0,0 +1,51 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.math;
import lombok.Getter;
public enum TbRuleNodeMathFunctionType {
ADD(2), SUB(2), MULT(2), DIV(2),
SIN, SINH, COS, COSH, TAN, TANH, ACOS, ASIN, ATAN, ATAN2(2),
EXP, EXPM1, SQRT, CBRT, GET_EXP(1, 1, true), HYPOT(2), LOG, LOG10, LOG1P,
CEIL(1, 1, true), FLOOR(1, 1, true), FLOOR_DIV(2), FLOOR_MOD(2),
ABS, MIN(2), MAX(2), POW, SIGNUM, RAD, DEG,
CUSTOM(0, 16, false); //Custom function based on exp4j
@Getter
private final int minArgs;
@Getter
private final int maxArgs;
@Getter
private final boolean integerResult;
TbRuleNodeMathFunctionType() {
this(1, 1, false);
}
TbRuleNodeMathFunctionType(int args) {
this(args, args, false);
}
TbRuleNodeMathFunctionType(int minArgs, int maxArgs, boolean integerResult) {
this.minArgs = minArgs;
this.maxArgs = maxArgs;
this.integerResult = integerResult;
}
}

2
rule-engine/rule-engine-components/src/main/resources/public/static/rulenode/rulenode-core-config.js

File diff suppressed because one or more lines are too long

339
rule-engine/rule-engine-components/src/test/java/math/TbMathNodeTest.java

@ -0,0 +1,339 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package math;
import com.datastax.oss.driver.api.core.uuid.Uuids;
import com.google.common.util.concurrent.Futures;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.thingsboard.common.util.AbstractListeningExecutor;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.math.TbMathArgument;
import org.thingsboard.rule.engine.math.TbMathArgumentType;
import org.thingsboard.rule.engine.math.TbMathNode;
import org.thingsboard.rule.engine.math.TbMathNodeConfiguration;
import org.thingsboard.rule.engine.math.TbMathResult;
import org.thingsboard.rule.engine.math.TbRuleNodeMathFunctionType;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.DoubleDataEntry;
import org.thingsboard.server.common.data.kv.LongDataEntry;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import java.util.Arrays;
import java.util.Optional;
import static org.mockito.Mockito.lenient;
@RunWith(MockitoJUnitRunner.class)
public class TbMathNodeTest {
private EntityId originator = new DeviceId(Uuids.timeBased());
private TenantId tenantId = TenantId.fromUUID(Uuids.timeBased());
@Mock
private TbContext ctx;
@Mock
private AttributesService attributesService;
@Mock
private TimeseriesService tsService;
private AbstractListeningExecutor dbExecutor;
@Before
public void before() {
dbExecutor = new AbstractListeningExecutor() {
@Override
protected int getThreadPollSize() {
return 3;
}
};
dbExecutor.init();
initMocks();
}
@After
public void after() {
dbExecutor.destroy();
}
private void initMocks() {
Mockito.reset(ctx);
Mockito.reset(attributesService);
Mockito.reset(tsService);
lenient().when(ctx.getAttributesService()).thenReturn(attributesService);
lenient().when(ctx.getTimeseriesService()).thenReturn(tsService);
lenient().when(ctx.getTenantId()).thenReturn(tenantId);
lenient().when(ctx.getDbCallbackExecutor()).thenReturn(dbExecutor);
}
private TbMathNode initNode(TbRuleNodeMathFunctionType operation, TbMathResult result, TbMathArgument... arguments) {
return initNode(operation, null, result, arguments);
}
private TbMathNode initNodeWithCustomFunction(String expression, TbMathResult result, TbMathArgument... arguments) {
return initNode(TbRuleNodeMathFunctionType.CUSTOM, expression, result, arguments);
}
private TbMathNode initNode(TbRuleNodeMathFunctionType operation, String expression, TbMathResult result, TbMathArgument... arguments) {
try {
TbMathNodeConfiguration configuration = new TbMathNodeConfiguration();
configuration.setOperation(operation);
if (TbRuleNodeMathFunctionType.CUSTOM.equals(operation)) {
configuration.setCustomFunction(expression);
}
configuration.setResult(result);
configuration.setArguments(Arrays.asList(arguments));
TbMathNode node = new TbMathNode();
node.init(ctx, new TbNodeConfiguration(JacksonUtil.valueToTree(configuration)));
return node;
} catch (TbNodeException ex) {
throw new IllegalStateException(ex);
}
}
@Test
public void testExp4j() {
var node = initNodeWithCustomFunction("2a+3b",
new TbMathResult(TbMathArgumentType.MESSAGE_BODY, "result", 2, false, false, null),
new TbMathArgument(TbMathArgumentType.MESSAGE_BODY, "a"),
new TbMathArgument(TbMathArgumentType.MESSAGE_BODY, "b")
);
TbMsg msg = TbMsg.newMsg("TEST", originator, new TbMsgMetaData(), JacksonUtil.newObjectNode().put("a", 2).put("b", 2).toString());
node.onMsg(ctx, msg);
ArgumentCaptor<TbMsg> msgCaptor = ArgumentCaptor.forClass(TbMsg.class);
Mockito.verify(ctx, Mockito.timeout(5000)).tellSuccess(msgCaptor.capture());
TbMsg resultMsg = msgCaptor.getValue();
Assert.assertNotNull(resultMsg);
Assert.assertNotNull(resultMsg.getData());
var resultJson = JacksonUtil.toJsonNode(resultMsg.getData());
Assert.assertTrue(resultJson.has("result"));
Assert.assertEquals(10, resultJson.get("result").asInt());
}
@Test
public void testSimpleFunctions() {
testSimpleTwoArgumentFunction(TbRuleNodeMathFunctionType.ADD, 2.1, 2.2, 4.3);
testSimpleTwoArgumentFunction(TbRuleNodeMathFunctionType.SUB, 2.1, 2.2, -0.1);
testSimpleTwoArgumentFunction(TbRuleNodeMathFunctionType.MULT, 2.1, 2.0, 4.2);
testSimpleTwoArgumentFunction(TbRuleNodeMathFunctionType.DIV, 4.2, 2.0, 2.1);
testSimpleOneArgumentFunction(TbRuleNodeMathFunctionType.SIN, Math.toRadians(30), 0.5);
testSimpleOneArgumentFunction(TbRuleNodeMathFunctionType.SIN, Math.toRadians(90), 1.0);
testSimpleOneArgumentFunction(TbRuleNodeMathFunctionType.SINH, Math.toRadians(0), 0.0);
testSimpleOneArgumentFunction(TbRuleNodeMathFunctionType.COSH, Math.toRadians(0), 1.0);
testSimpleOneArgumentFunction(TbRuleNodeMathFunctionType.COS, Math.toRadians(60), 0.5);
testSimpleOneArgumentFunction(TbRuleNodeMathFunctionType.COS, Math.toRadians(0), 1.0);
testSimpleOneArgumentFunction(TbRuleNodeMathFunctionType.TAN, Math.toRadians(45), 1);
testSimpleOneArgumentFunction(TbRuleNodeMathFunctionType.TAN, Math.toRadians(0), 0);
testSimpleOneArgumentFunction(TbRuleNodeMathFunctionType.ABS, -1, 1);
testSimpleOneArgumentFunction(TbRuleNodeMathFunctionType.SQRT, 4, 2);
testSimpleOneArgumentFunction(TbRuleNodeMathFunctionType.CBRT, 8, 2);
}
private void testSimpleTwoArgumentFunction(TbRuleNodeMathFunctionType function, double arg1, double arg2, double result) {
initMocks();
var node = initNode(function,
new TbMathResult(TbMathArgumentType.MESSAGE_BODY, "result", 2, false, false, null),
new TbMathArgument(TbMathArgumentType.MESSAGE_BODY, "a"),
new TbMathArgument(TbMathArgumentType.MESSAGE_BODY, "b")
);
TbMsg msg = TbMsg.newMsg("TEST", originator, new TbMsgMetaData(), JacksonUtil.newObjectNode().put("a", arg1).put("b", arg2).toString());
node.onMsg(ctx, msg);
ArgumentCaptor<TbMsg> msgCaptor = ArgumentCaptor.forClass(TbMsg.class);
Mockito.verify(ctx, Mockito.timeout(5000).times(1)).tellSuccess(msgCaptor.capture());
TbMsg resultMsg = msgCaptor.getValue();
Assert.assertNotNull(resultMsg);
Assert.assertNotNull(resultMsg.getData());
var resultJson = JacksonUtil.toJsonNode(resultMsg.getData());
Assert.assertTrue(resultJson.has("result"));
Assert.assertEquals(result, resultJson.get("result").asDouble(), 0d);
}
private void testSimpleOneArgumentFunction(TbRuleNodeMathFunctionType function, double arg1, double result) {
initMocks();
var node = initNode(function,
new TbMathResult(TbMathArgumentType.MESSAGE_BODY, "result", 2, false, false, null),
new TbMathArgument(TbMathArgumentType.MESSAGE_BODY, "a")
);
TbMsg msg = TbMsg.newMsg("TEST", originator, new TbMsgMetaData(), JacksonUtil.newObjectNode().put("a", arg1).toString());
node.onMsg(ctx, msg);
ArgumentCaptor<TbMsg> msgCaptor = ArgumentCaptor.forClass(TbMsg.class);
Mockito.verify(ctx, Mockito.timeout(5000)).tellSuccess(msgCaptor.capture());
TbMsg resultMsg = msgCaptor.getValue();
Assert.assertNotNull(resultMsg);
Assert.assertNotNull(resultMsg.getData());
var resultJson = JacksonUtil.toJsonNode(resultMsg.getData());
Assert.assertTrue(resultJson.has("result"));
Assert.assertEquals(result, resultJson.get("result").asDouble(), 0d);
}
@Test
public void test_2_plus_2_body() {
var node = initNode(TbRuleNodeMathFunctionType.ADD,
new TbMathResult(TbMathArgumentType.MESSAGE_BODY, "result", 2, false, false, null),
new TbMathArgument(TbMathArgumentType.MESSAGE_BODY, "a"),
new TbMathArgument(TbMathArgumentType.MESSAGE_BODY, "b")
);
TbMsg msg = TbMsg.newMsg("TEST", originator, new TbMsgMetaData(), JacksonUtil.newObjectNode().put("a", 2).put("b", 2).toString());
node.onMsg(ctx, msg);
ArgumentCaptor<TbMsg> msgCaptor = ArgumentCaptor.forClass(TbMsg.class);
Mockito.verify(ctx, Mockito.timeout(5000)).tellSuccess(msgCaptor.capture());
TbMsg resultMsg = msgCaptor.getValue();
Assert.assertNotNull(resultMsg);
Assert.assertNotNull(resultMsg.getData());
var resultJson = JacksonUtil.toJsonNode(resultMsg.getData());
Assert.assertTrue(resultJson.has("result"));
Assert.assertEquals(4, resultJson.get("result").asInt());
}
@Test
public void test_2_plus_2_meta() {
var node = initNode(TbRuleNodeMathFunctionType.ADD,
new TbMathResult(TbMathArgumentType.MESSAGE_METADATA, "result", 0, false, false, null),
new TbMathArgument(TbMathArgumentType.MESSAGE_BODY, "a"),
new TbMathArgument(TbMathArgumentType.MESSAGE_BODY, "b")
);
TbMsg msg = TbMsg.newMsg("TEST", originator, new TbMsgMetaData(), JacksonUtil.newObjectNode().put("a", 2).put("b", 2).toString());
node.onMsg(ctx, msg);
ArgumentCaptor<TbMsg> msgCaptor = ArgumentCaptor.forClass(TbMsg.class);
Mockito.verify(ctx, Mockito.timeout(5000)).tellSuccess(msgCaptor.capture());
TbMsg resultMsg = msgCaptor.getValue();
Assert.assertNotNull(resultMsg);
Assert.assertNotNull(resultMsg.getData());
Assert.assertNotNull(resultMsg.getMetaData());
var result = resultMsg.getMetaData().getValue("result");
Assert.assertNotNull(result);
Assert.assertEquals("4", result);
}
@Test
public void test_2_plus_2_attr_and_ts() {
var node = initNode(TbRuleNodeMathFunctionType.ADD,
new TbMathResult(TbMathArgumentType.MESSAGE_BODY, "result", 2, false, false, null),
new TbMathArgument(TbMathArgumentType.ATTRIBUTE, "a"),
new TbMathArgument(TbMathArgumentType.TIME_SERIES, "b")
);
TbMsg msg = TbMsg.newMsg("TEST", originator, new TbMsgMetaData(), JacksonUtil.newObjectNode().toString());
Mockito.when(attributesService.find(tenantId, originator, DataConstants.SERVER_SCOPE, "a"))
.thenReturn(Futures.immediateFuture(Optional.of(new BaseAttributeKvEntry(System.currentTimeMillis(), new DoubleDataEntry("a", 2.0)))));
Mockito.when(tsService.findLatest(tenantId, originator, "b"))
.thenReturn(Futures.immediateFuture(Optional.of(new BasicTsKvEntry(System.currentTimeMillis(), new LongDataEntry("b", 2L)))));
node.onMsg(ctx, msg);
ArgumentCaptor<TbMsg> msgCaptor = ArgumentCaptor.forClass(TbMsg.class);
Mockito.verify(ctx, Mockito.timeout(5000)).tellSuccess(msgCaptor.capture());
TbMsg resultMsg = msgCaptor.getValue();
Assert.assertNotNull(resultMsg);
Assert.assertNotNull(resultMsg.getData());
var resultJson = JacksonUtil.toJsonNode(resultMsg.getData());
Assert.assertTrue(resultJson.has("result"));
Assert.assertEquals(4, resultJson.get("result").asInt());
}
@Test
public void test_sqrt_5_body() {
var node = initNode(TbRuleNodeMathFunctionType.SQRT,
new TbMathResult(TbMathArgumentType.MESSAGE_BODY, "result", 3, false, false, null),
new TbMathArgument(TbMathArgumentType.MESSAGE_BODY, "a")
);
TbMsg msg = TbMsg.newMsg("TEST", originator, new TbMsgMetaData(), JacksonUtil.newObjectNode().put("a", 5).toString());
node.onMsg(ctx, msg);
ArgumentCaptor<TbMsg> msgCaptor = ArgumentCaptor.forClass(TbMsg.class);
Mockito.verify(ctx, Mockito.timeout(5000)).tellSuccess(msgCaptor.capture());
TbMsg resultMsg = msgCaptor.getValue();
Assert.assertNotNull(resultMsg);
Assert.assertNotNull(resultMsg.getData());
var resultJson = JacksonUtil.toJsonNode(resultMsg.getData());
Assert.assertTrue(resultJson.has("result"));
Assert.assertEquals(2.236, resultJson.get("result").asDouble(), 0.0);
}
@Test
public void test_sqrt_5_meta() {
var node = initNode(TbRuleNodeMathFunctionType.SQRT,
new TbMathResult(TbMathArgumentType.MESSAGE_METADATA, "result", 3, false, false, null),
new TbMathArgument(TbMathArgumentType.MESSAGE_BODY, "a")
);
TbMsg msg = TbMsg.newMsg("TEST", originator, new TbMsgMetaData(), JacksonUtil.newObjectNode().put("a", 5).toString());
node.onMsg(ctx, msg);
ArgumentCaptor<TbMsg> msgCaptor = ArgumentCaptor.forClass(TbMsg.class);
Mockito.verify(ctx, Mockito.timeout(5000)).tellSuccess(msgCaptor.capture());
TbMsg resultMsg = msgCaptor.getValue();
Assert.assertNotNull(resultMsg);
Assert.assertNotNull(resultMsg.getData());
var result = resultMsg.getMetaData().getValue("result");
Assert.assertNotNull(result);
Assert.assertEquals("2.236", result);
}
}

1
ui-ngx/src/app/shared/models/constants.ts

@ -127,6 +127,7 @@ export const HelpLinks = {
ruleNodeRestApiCall: helpBaseUrl + '/docs/user-guide/rule-engine-2-0/external-nodes/#rest-api-call-node',
ruleNodeSendEmail: helpBaseUrl + '/docs/user-guide/rule-engine-2-0/external-nodes/#send-email-node',
ruleNodeSendSms: helpBaseUrl + '/docs/user-guide/rule-engine-2-0/external-nodes/#send-sms-node',
ruleNodeMath: helpBaseUrl + '/docs/user-guide/rule-engine-2-0/action-nodes/#math-function-node',
tenants: helpBaseUrl + '/docs/user-guide/ui/tenants',
tenantProfiles: helpBaseUrl + '/docs/user-guide/ui/tenant-profiles',
customers: helpBaseUrl + '/docs/user-guide/ui/customers',

1
ui-ngx/src/app/shared/models/rule-node.models.ts

@ -461,6 +461,7 @@ const ruleNodeClazzHelpLinkMap = {
'org.thingsboard.rule.engine.edge.TbMsgPushToEdgeNode': 'ruleNodePushToEdge',
'org.thingsboard.rule.engine.flow.TbRuleChainInputNode': 'ruleNodeRuleChain',
'org.thingsboard.rule.engine.flow.TbRuleChainOutputNode': 'ruleNodeOutputNode',
'org.thingsboard.rule.engine.math.TbMathNode': 'ruleNodeMath',
};
export function getRuleNodeHelpLink(component: RuleNodeComponentDescriptor): string {

Loading…
Cancel
Save