diff --git a/application/src/main/java/org/thingsboard/server/ThingsboardServerApplication.java b/application/src/main/java/org/thingsboard/server/ThingsboardServerApplication.java index c3444d43f7..751bde6303 100644 --- a/application/src/main/java/org/thingsboard/server/ThingsboardServerApplication.java +++ b/application/src/main/java/org/thingsboard/server/ThingsboardServerApplication.java @@ -19,6 +19,7 @@ import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringBootConfiguration; import org.springframework.context.annotation.ComponentScan; import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.scheduling.annotation.EnableScheduling; import springfox.documentation.swagger2.annotations.EnableSwagger2; import java.util.Arrays; @@ -26,6 +27,7 @@ import java.util.Arrays; @SpringBootConfiguration @EnableAsync @EnableSwagger2 +@EnableScheduling @ComponentScan({"org.thingsboard.server"}) public class ThingsboardServerApplication { diff --git a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginActorMessageProcessor.java index 09aaf80dee..f6bf54d349 100644 --- a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginActorMessageProcessor.java @@ -28,9 +28,14 @@ import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.data.plugin.PluginMetaData; import org.thingsboard.server.common.msg.cluster.ClusterEventMsg; import org.thingsboard.server.common.msg.cluster.ServerAddress; +import org.thingsboard.server.common.msg.core.BasicStatusCodeResponse; +import org.thingsboard.server.common.msg.session.FromDeviceRequestMsg; +import org.thingsboard.server.common.msg.session.MsgType; import org.thingsboard.server.extensions.api.plugins.Plugin; import org.thingsboard.server.extensions.api.plugins.PluginInitializationException; import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse; +import org.thingsboard.server.extensions.api.plugins.msg.ResponsePluginToRuleMsg; +import org.thingsboard.server.extensions.api.plugins.msg.RuleToPluginMsg; import org.thingsboard.server.extensions.api.plugins.msg.TimeoutMsg; import org.thingsboard.server.extensions.api.plugins.rest.PluginRestMsg; import org.thingsboard.server.extensions.api.plugins.rpc.PluginRpcMsg; @@ -98,7 +103,20 @@ public class PluginActorMessageProcessor extends ComponentMsgProcessor public void onRuleToPluginMsg(RuleToPluginMsgWrapper msg) throws RuleException { if (state == ComponentLifecycleState.ACTIVE) { - pluginImpl.process(trustedCtx, msg.getRuleTenantId(), msg.getRuleId(), msg.getMsg()); + try { + pluginImpl.process(trustedCtx, msg.getRuleTenantId(), msg.getRuleId(), msg.getMsg()); + } catch (Exception ex) { + logger.debug("[{}] Failed to process RuleToPlugin msg: [{}] [{}]", tenantId, msg.getMsg(), ex); + RuleToPluginMsg ruleMsg = msg.getMsg(); + MsgType responceMsgType = MsgType.RULE_ENGINE_ERROR; + Integer requestId = 0; + if (ruleMsg.getPayload() instanceof FromDeviceRequestMsg) { + requestId = ((FromDeviceRequestMsg) ruleMsg.getPayload()).getRequestId(); + } + trustedCtx.reply( + new ResponsePluginToRuleMsg(ruleMsg.getUid(), tenantId, msg.getRuleId(), + BasicStatusCodeResponse.onError(responceMsgType, requestId, ex))); + } } else { //TODO: reply with plugin suspended message } diff --git a/application/src/main/java/org/thingsboard/server/service/component/AnnotationComponentDiscoveryService.java b/application/src/main/java/org/thingsboard/server/service/component/AnnotationComponentDiscoveryService.java index 52d7d7d80c..479f424e93 100644 --- a/application/src/main/java/org/thingsboard/server/service/component/AnnotationComponentDiscoveryService.java +++ b/application/src/main/java/org/thingsboard/server/service/component/AnnotationComponentDiscoveryService.java @@ -180,7 +180,7 @@ public class AnnotationComponentDiscoveryService implements ComponentDiscoverySe return scannedComponent; } - private NodeDefinition prepareNodeDefinition(RuleNode nodeAnnotation) throws IOException { + private NodeDefinition prepareNodeDefinition(RuleNode nodeAnnotation) throws Exception { NodeDefinition nodeDefinition = new NodeDefinition(); nodeDefinition.setDetails(nodeAnnotation.nodeDetails()); nodeDefinition.setDescription(nodeAnnotation.nodeDescription()); @@ -188,9 +188,10 @@ public class AnnotationComponentDiscoveryService implements ComponentDiscoverySe nodeDefinition.setOutEnabled(nodeAnnotation.outEnabled()); nodeDefinition.setRelationTypes(nodeAnnotation.relationTypes()); nodeDefinition.setCustomRelations(nodeAnnotation.customRelations()); - String defaultConfigResourceName = nodeAnnotation.defaultConfigResource(); - nodeDefinition.setDefaultConfiguration(mapper.readTree( - Resources.toString(Resources.getResource(defaultConfigResourceName), Charsets.UTF_8))); + Class configClazz = nodeAnnotation.configClazz(); + NodeConfiguration config = configClazz.newInstance(); + NodeConfiguration defaultConfiguration = config.defaultConfiguration(); + nodeDefinition.setDefaultConfiguration(mapper.valueToTree(defaultConfiguration)); return nodeDefinition; } diff --git a/application/src/main/java/org/thingsboard/server/service/install/DefaultSystemDataLoaderService.java b/application/src/main/java/org/thingsboard/server/service/install/DefaultSystemDataLoaderService.java index 1ef805f3c9..908faf2315 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/DefaultSystemDataLoaderService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/DefaultSystemDataLoaderService.java @@ -187,7 +187,7 @@ public class DefaultSystemDataLoaderService implements SystemDataLoaderService { @Override public void loadSystemRules() throws Exception { - loadRules(Paths.get(dataDir, JSON_DIR, SYSTEM_DIR, RULES_DIR), null); +// loadRules(Paths.get(dataDir, JSON_DIR, SYSTEM_DIR, RULES_DIR), null); } @Override @@ -228,7 +228,7 @@ public class DefaultSystemDataLoaderService implements SystemDataLoaderService { "Raspberry Pi GPIO control sample application"); loadPlugins(Paths.get(dataDir, JSON_DIR, DEMO_DIR, PLUGINS_DIR), demoTenant.getId()); - loadRules(Paths.get(dataDir, JSON_DIR, DEMO_DIR, RULES_DIR), demoTenant.getId()); +// loadRules(Paths.get(dataDir, JSON_DIR, DEMO_DIR, RULES_DIR), demoTenant.getId()); loadDashboards(Paths.get(dataDir, JSON_DIR, DEMO_DIR, DASHBOARDS_DIR), demoTenant.getId(), null); } diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 27585212db..fc04a193e1 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -181,6 +181,10 @@ cassandra: default_fetch_size: "${CASSANDRA_DEFAULT_FETCH_SIZE:2000}" # Specify partitioning size for timestamp key-value storage. Example MINUTES, HOURS, DAYS, MONTHS ts_key_value_partitioning: "${TS_KV_PARTITIONING:MONTHS}" + buffer_size: "${CASSANDRA_QUERY_BUFFER_SIZE:200000}" + concurrent_limit: "${CASSANDRA_QUERY_CONCURRENT_LIMIT:1000}" + permit_max_wait_time: "${PERMIT_MAX_WAIT_TIME:120000}" + rate_limit_print_interval_ms: "${CASSANDRA_QUERY_RATE_LIMIT_PRINT_MS:30000}" queue: msg.ttl: 604800 # 7 days diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/core/ToServerRpcRequestMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/core/ToServerRpcRequestMsg.java index ace51c03a7..87708a759f 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/core/ToServerRpcRequestMsg.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/core/ToServerRpcRequestMsg.java @@ -16,16 +16,16 @@ package org.thingsboard.server.common.msg.core; import lombok.Data; -import org.thingsboard.server.common.msg.session.FromDeviceMsg; +import org.thingsboard.server.common.msg.session.FromDeviceRequestMsg; import org.thingsboard.server.common.msg.session.MsgType; /** * @author Andrew Shvayka */ @Data -public class ToServerRpcRequestMsg implements FromDeviceMsg { +public class ToServerRpcRequestMsg implements FromDeviceRequestMsg { - private final int requestId; + private final Integer requestId; private final String method; private final String params; diff --git a/dao/src/main/java/org/thingsboard/server/dao/asset/CassandraAssetDao.java b/dao/src/main/java/org/thingsboard/server/dao/asset/CassandraAssetDao.java index 4f923fe808..64ec718fdd 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/asset/CassandraAssetDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/asset/CassandraAssetDao.java @@ -148,7 +148,7 @@ public class CassandraAssetDao extends CassandraAbstractSearchTextDao>() { @Nullable @Override diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/CassandraBaseAttributesDao.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/CassandraBaseAttributesDao.java index 932d6b9147..8ae9dc8ad4 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/attributes/CassandraBaseAttributesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/CassandraBaseAttributesDao.java @@ -147,12 +147,12 @@ public class CassandraBaseAttributesDao extends CassandraAbstractAsyncDao implem .and(eq(ATTRIBUTE_TYPE_COLUMN, attributeType)) .and(eq(ATTRIBUTE_KEY_COLUMN, key)); log.debug("Remove request: {}", delete.toString()); - return getFuture(getSession().executeAsync(delete), rs -> null); + return getFuture(executeAsyncWrite(delete), rs -> null); } private PreparedStatement getSaveStmt() { if (saveStmt == null) { - saveStmt = getSession().prepare("INSERT INTO " + ModelConstants.ATTRIBUTES_KV_CF + + saveStmt = prepare("INSERT INTO " + ModelConstants.ATTRIBUTES_KV_CF + "(" + ENTITY_TYPE_COLUMN + "," + ENTITY_ID_COLUMN + "," + ATTRIBUTE_TYPE_COLUMN + diff --git a/dao/src/main/java/org/thingsboard/server/dao/audit/CassandraAuditLogDao.java b/dao/src/main/java/org/thingsboard/server/dao/audit/CassandraAuditLogDao.java index 27f7adc669..fd02b5f880 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/audit/CassandraAuditLogDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/audit/CassandraAuditLogDao.java @@ -244,12 +244,12 @@ public class CassandraAuditLogDao extends CassandraAbstractSearchTimeDao>() { @Nullable @Override diff --git a/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraAbstractDao.java b/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraAbstractDao.java index c2f709f05b..ba186ccebe 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraAbstractDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraAbstractDao.java @@ -21,6 +21,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.thingsboard.server.dao.cassandra.CassandraCluster; import org.thingsboard.server.dao.model.type.*; +import org.thingsboard.server.dao.util.BufferedRateLimiter; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -33,16 +34,15 @@ public abstract class CassandraAbstractDao { private ConcurrentMap preparedStatementMap = new ConcurrentHashMap<>(); - protected PreparedStatement prepare(String query) { - return preparedStatementMap.computeIfAbsent(query, i -> getSession().prepare(i)); - } + @Autowired + private BufferedRateLimiter rateLimiter; private Session session; private ConsistencyLevel defaultReadLevel; private ConsistencyLevel defaultWriteLevel; - protected Session getSession() { + private Session getSession() { if (session == null) { session = cluster.getSession(); defaultReadLevel = cluster.getDefaultReadConsistencyLevel(); @@ -59,6 +59,10 @@ public abstract class CassandraAbstractDao { return session; } + protected PreparedStatement prepare(String query) { + return preparedStatementMap.computeIfAbsent(query, i -> getSession().prepare(i)); + } + private void registerCodecIfNotFound(CodecRegistry registry, TypeCodec codec) { try { registry.codecFor(codec.getCqlType(), codec.getJavaType()); @@ -85,10 +89,7 @@ public abstract class CassandraAbstractDao { private ResultSet execute(Statement statement, ConsistencyLevel level) { log.debug("Execute cassandra statement {}", statement); - if (statement.getConsistencyLevel() == null) { - statement.setConsistencyLevel(level); - } - return getSession().execute(statement); + return executeAsync(statement, level).getUninterruptibly(); } private ResultSetFuture executeAsync(Statement statement, ConsistencyLevel level) { @@ -96,6 +97,6 @@ public abstract class CassandraAbstractDao { if (statement.getConsistencyLevel() == null) { statement.setConsistencyLevel(level); } - return getSession().executeAsync(statement); + return new RateLimitedResultSetFuture(getSession(), rateLimiter, statement); } } \ No newline at end of file diff --git a/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraAbstractModelDao.java b/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraAbstractModelDao.java index 7e87fa8fec..47d43ba57f 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraAbstractModelDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraAbstractModelDao.java @@ -63,7 +63,7 @@ public abstract class CassandraAbstractModelDao, D> exte List list = Collections.emptyList(); if (statement != null) { statement.setConsistencyLevel(cluster.getDefaultReadConsistencyLevel()); - ResultSet resultSet = getSession().execute(statement); + ResultSet resultSet = executeRead(statement); Result result = getMapper().map(resultSet); if (result != null) { list = result.all(); @@ -75,7 +75,7 @@ public abstract class CassandraAbstractModelDao, D> exte protected ListenableFuture> findListByStatementAsync(Statement statement) { if (statement != null) { statement.setConsistencyLevel(cluster.getDefaultReadConsistencyLevel()); - ResultSetFuture resultSetFuture = getSession().executeAsync(statement); + ResultSetFuture resultSetFuture = executeAsyncRead(statement); return Futures.transform(resultSetFuture, new Function>() { @Nullable @Override @@ -97,7 +97,7 @@ public abstract class CassandraAbstractModelDao, D> exte E object = null; if (statement != null) { statement.setConsistencyLevel(cluster.getDefaultReadConsistencyLevel()); - ResultSet resultSet = getSession().execute(statement); + ResultSet resultSet = executeRead(statement); Result result = getMapper().map(resultSet); if (result != null) { object = result.one(); @@ -109,7 +109,7 @@ public abstract class CassandraAbstractModelDao, D> exte protected ListenableFuture findOneByStatementAsync(Statement statement) { if (statement != null) { statement.setConsistencyLevel(cluster.getDefaultReadConsistencyLevel()); - ResultSetFuture resultSetFuture = getSession().executeAsync(statement); + ResultSetFuture resultSetFuture = executeAsyncRead(statement); return Futures.transform(resultSetFuture, new Function() { @Nullable @Override @@ -184,7 +184,7 @@ public abstract class CassandraAbstractModelDao, D> exte public boolean removeById(UUID key) { Statement delete = QueryBuilder.delete().all().from(getColumnFamilyName()).where(eq(ModelConstants.ID_PROPERTY, key)); log.debug("Remove request: {}", delete.toString()); - return getSession().execute(delete).wasApplied(); + return executeWrite(delete).wasApplied(); } @Override diff --git a/dao/src/main/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFuture.java b/dao/src/main/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFuture.java new file mode 100644 index 0000000000..2674c6ddea --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFuture.java @@ -0,0 +1,148 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.nosql; + +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.Statement; +import com.google.common.base.Function; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.Uninterruptibles; +import org.thingsboard.server.dao.util.AsyncRateLimiter; + +import javax.annotation.Nullable; +import java.util.concurrent.*; + +public class RateLimitedResultSetFuture implements ResultSetFuture { + + private final ListenableFuture originalFuture; + private final ListenableFuture rateLimitFuture; + + public RateLimitedResultSetFuture(Session session, AsyncRateLimiter rateLimiter, Statement statement) { + this.rateLimitFuture = rateLimiter.acquireAsync(); + this.originalFuture = Futures.transform(rateLimitFuture, + (Function) i -> executeAsyncWithRelease(rateLimiter, session, statement)); + } + + @Override + public ResultSet getUninterruptibly() { + return safeGet().getUninterruptibly(); + } + + @Override + public ResultSet getUninterruptibly(long timeout, TimeUnit unit) throws TimeoutException { + long rateLimitStart = System.nanoTime(); + ResultSetFuture resultSetFuture = null; + try { + resultSetFuture = originalFuture.get(timeout, unit); + } catch (InterruptedException | ExecutionException e) { + throw new IllegalStateException(e); + } + long rateLimitDurationNano = System.nanoTime() - rateLimitStart; + long innerTimeoutNano = unit.toNanos(timeout) - rateLimitDurationNano; + if (innerTimeoutNano > 0) { + return resultSetFuture.getUninterruptibly(innerTimeoutNano, TimeUnit.NANOSECONDS); + } + throw new TimeoutException("Timeout waiting for task."); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + if (originalFuture.isDone()) { + return safeGet().cancel(mayInterruptIfRunning); + } else { + return originalFuture.cancel(mayInterruptIfRunning); + } + } + + @Override + public boolean isCancelled() { + if (originalFuture.isDone()) { + return safeGet().isCancelled(); + } + + return originalFuture.isCancelled(); + } + + @Override + public boolean isDone() { + return originalFuture.isDone() && safeGet().isDone(); + } + + @Override + public ResultSet get() throws InterruptedException, ExecutionException { + return safeGet().get(); + } + + @Override + public ResultSet get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + long rateLimitStart = System.nanoTime(); + ResultSetFuture resultSetFuture = originalFuture.get(timeout, unit); + long rateLimitDurationNano = System.nanoTime() - rateLimitStart; + long innerTimeoutNano = unit.toNanos(timeout) - rateLimitDurationNano; + if (innerTimeoutNano > 0) { + return resultSetFuture.get(innerTimeoutNano, TimeUnit.NANOSECONDS); + } + throw new TimeoutException("Timeout waiting for task."); + } + + @Override + public void addListener(Runnable listener, Executor executor) { + originalFuture.addListener(() -> { + try { + ResultSetFuture resultSetFuture = Uninterruptibles.getUninterruptibly(originalFuture); + resultSetFuture.addListener(listener, executor); + } catch (CancellationException e) { + cancel(false); + return; + } catch (ExecutionException e) { + Futures.immediateFailedFuture(e).addListener(listener, executor); + } + }, executor); + } + + private ResultSetFuture safeGet() { + try { + return originalFuture.get(); + } catch (InterruptedException | ExecutionException e) { + throw new IllegalStateException(e); + } + } + + private ResultSetFuture executeAsyncWithRelease(AsyncRateLimiter rateLimiter, Session session, Statement statement) { + try { + ResultSetFuture resultSetFuture = session.executeAsync(statement); + Futures.addCallback(resultSetFuture, new FutureCallback() { + @Override + public void onSuccess(@Nullable ResultSet result) { + rateLimiter.release(); + } + + @Override + public void onFailure(Throwable t) { + rateLimiter.release(); + } + }); + return resultSetFuture; + } catch (RuntimeException re) { + rateLimiter.release(); + throw re; + } + } +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationDao.java b/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationDao.java index 9e252412b2..55838d6462 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationDao.java @@ -242,7 +242,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati private PreparedStatement getSaveStmt() { if (saveStmt == null) { - saveStmt = getSession().prepare("INSERT INTO " + ModelConstants.RELATION_COLUMN_FAMILY_NAME + " " + + saveStmt = prepare("INSERT INTO " + ModelConstants.RELATION_COLUMN_FAMILY_NAME + " " + "(" + ModelConstants.RELATION_FROM_ID_PROPERTY + "," + ModelConstants.RELATION_FROM_TYPE_PROPERTY + "," + ModelConstants.RELATION_TO_ID_PROPERTY + @@ -257,7 +257,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati private PreparedStatement getDeleteStmt() { if (deleteStmt == null) { - deleteStmt = getSession().prepare("DELETE FROM " + ModelConstants.RELATION_COLUMN_FAMILY_NAME + + deleteStmt = prepare("DELETE FROM " + ModelConstants.RELATION_COLUMN_FAMILY_NAME + WHERE + ModelConstants.RELATION_FROM_ID_PROPERTY + " = ?" + AND + ModelConstants.RELATION_FROM_TYPE_PROPERTY + " = ?" + AND + ModelConstants.RELATION_TO_ID_PROPERTY + " = ?" + @@ -270,7 +270,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati private PreparedStatement getDeleteAllByEntityStmt() { if (deleteAllByEntityStmt == null) { - deleteAllByEntityStmt = getSession().prepare("DELETE FROM " + ModelConstants.RELATION_COLUMN_FAMILY_NAME + + deleteAllByEntityStmt = prepare("DELETE FROM " + ModelConstants.RELATION_COLUMN_FAMILY_NAME + WHERE + ModelConstants.RELATION_FROM_ID_PROPERTY + " = ?" + AND + ModelConstants.RELATION_FROM_TYPE_PROPERTY + " = ?"); } @@ -279,7 +279,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati private PreparedStatement getFindAllByFromStmt() { if (findAllByFromStmt == null) { - findAllByFromStmt = getSession().prepare(SELECT_COLUMNS + " " + + findAllByFromStmt = prepare(SELECT_COLUMNS + " " + FROM + ModelConstants.RELATION_COLUMN_FAMILY_NAME + " " + WHERE + ModelConstants.RELATION_FROM_ID_PROPERTY + EQUAL_TO_PARAM + AND + ModelConstants.RELATION_FROM_TYPE_PROPERTY + EQUAL_TO_PARAM + @@ -290,7 +290,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati private PreparedStatement getFindAllByFromAndTypeStmt() { if (findAllByFromAndTypeStmt == null) { - findAllByFromAndTypeStmt = getSession().prepare(SELECT_COLUMNS + " " + + findAllByFromAndTypeStmt = prepare(SELECT_COLUMNS + " " + FROM + ModelConstants.RELATION_COLUMN_FAMILY_NAME + " " + WHERE + ModelConstants.RELATION_FROM_ID_PROPERTY + EQUAL_TO_PARAM + AND + ModelConstants.RELATION_FROM_TYPE_PROPERTY + EQUAL_TO_PARAM + @@ -303,7 +303,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati private PreparedStatement getFindAllByToStmt() { if (findAllByToStmt == null) { - findAllByToStmt = getSession().prepare(SELECT_COLUMNS + " " + + findAllByToStmt = prepare(SELECT_COLUMNS + " " + FROM + ModelConstants.RELATION_REVERSE_VIEW_NAME + " " + WHERE + ModelConstants.RELATION_TO_ID_PROPERTY + EQUAL_TO_PARAM + AND + ModelConstants.RELATION_TO_TYPE_PROPERTY + EQUAL_TO_PARAM + @@ -314,7 +314,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati private PreparedStatement getFindAllByToAndTypeStmt() { if (findAllByToAndTypeStmt == null) { - findAllByToAndTypeStmt = getSession().prepare(SELECT_COLUMNS + " " + + findAllByToAndTypeStmt = prepare(SELECT_COLUMNS + " " + FROM + ModelConstants.RELATION_REVERSE_VIEW_NAME + " " + WHERE + ModelConstants.RELATION_TO_ID_PROPERTY + EQUAL_TO_PARAM + AND + ModelConstants.RELATION_TO_TYPE_PROPERTY + EQUAL_TO_PARAM + @@ -327,7 +327,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati private PreparedStatement getCheckRelationStmt() { if (checkRelationStmt == null) { - checkRelationStmt = getSession().prepare(SELECT_COLUMNS + " " + + checkRelationStmt = prepare(SELECT_COLUMNS + " " + FROM + ModelConstants.RELATION_COLUMN_FAMILY_NAME + " " + WHERE + ModelConstants.RELATION_FROM_ID_PROPERTY + EQUAL_TO_PARAM + AND + ModelConstants.RELATION_FROM_TYPE_PROPERTY + EQUAL_TO_PARAM + diff --git a/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java b/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java index 01f60f8dff..370d770188 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java @@ -82,8 +82,9 @@ public class BaseRelationService implements RelationService { } @Caching(evict = { - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.typeGroup}"), + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.to, #relation.type, #relation.typeGroup}"), @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.type, #relation.typeGroup}"), + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.typeGroup}"), @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.typeGroup}"), @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.type, #relation.typeGroup}") }) @@ -95,8 +96,9 @@ public class BaseRelationService implements RelationService { } @Caching(evict = { - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.typeGroup}"), + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.to, #relation.type, #relation.typeGroup}"), @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.type, #relation.typeGroup}"), + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.typeGroup}"), @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.typeGroup}"), @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.type, #relation.typeGroup}") }) @@ -108,11 +110,11 @@ public class BaseRelationService implements RelationService { } @Caching(evict = { - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.typeGroup}"), + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.to, #relation.type, #relation.typeGroup}"), @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.type, #relation.typeGroup}"), + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.typeGroup}"), @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.typeGroup}"), - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.type, #relation.typeGroup}"), - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.to, #relation.type, #relation.typeGroup}") + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.type, #relation.typeGroup}") }) @Override public boolean deleteRelation(EntityRelation relation) { @@ -122,11 +124,11 @@ public class BaseRelationService implements RelationService { } @Caching(evict = { - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "#relation.from"), - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.type}"), - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "#relation.to"), - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.type}"), - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.to, #relation.type, #relation.typeGroup}") + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.to, #relation.type, #relation.typeGroup}"), + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.type, #relation.typeGroup}"), + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.typeGroup}"), + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.typeGroup}"), + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.type, #relation.typeGroup}") }) @Override public ListenableFuture deleteRelationAsync(EntityRelation relation) { @@ -136,11 +138,11 @@ public class BaseRelationService implements RelationService { } @Caching(evict = { - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "#from"), - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #relationType}"), - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "#to"), - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#to, #relationType}"), - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #to, #relationType}") + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #to, #relationType, #typeGroup}"), + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #relationType, #typeGroup}"), + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #typeGroup}"), + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#to, #typeGroup}"), + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#to, #relationType, #typeGroup}") }) @Override public boolean deleteRelation(EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup) { @@ -150,11 +152,11 @@ public class BaseRelationService implements RelationService { } @Caching(evict = { - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "#from"), - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #relationType}"), - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "#to"), - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#to, #relationType}"), - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #to, #relationType}") + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #to, #relationType, #typeGroup}"), + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #relationType, #typeGroup}"), + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #typeGroup}"), + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#to, #typeGroup}"), + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#to, #relationType, #typeGroup}") }) @Override public ListenableFuture deleteRelationAsync(EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup) { diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java index d620e11f1d..cda4b1669b 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java @@ -73,7 +73,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem private PreparedStatement partitionInsertStmt; private PreparedStatement partitionInsertTtlStmt; - private PreparedStatement[] latestInsertStmts; + private PreparedStatement latestInsertStmt; private PreparedStatement[] saveStmts; private PreparedStatement[] saveTtlStmts; private PreparedStatement[] fetchStmts; @@ -306,13 +306,15 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem @Override public ListenableFuture saveLatest(EntityId entityId, TsKvEntry tsKvEntry) { - DataType type = tsKvEntry.getDataType(); - BoundStatement stmt = getLatestStmt(type).bind() + BoundStatement stmt = getLatestStmt().bind() .setString(0, entityId.getEntityType().name()) .setUUID(1, entityId.getId()) .setString(2, tsKvEntry.getKey()) - .setLong(3, tsKvEntry.getTs()); - addValue(tsKvEntry, stmt, 4); + .setLong(3, tsKvEntry.getTs()) + .set(4, tsKvEntry.getBooleanValue().orElse(null), Boolean.class) + .set(5, tsKvEntry.getStrValue().orElse(null), String.class) + .set(6, tsKvEntry.getLongValue().orElse(null), Long.class) + .set(7, tsKvEntry.getDoubleValue().orElse(null), Double.class); return getFuture(executeAsyncWrite(stmt), rs -> null); } @@ -381,7 +383,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem if (saveStmts == null) { saveStmts = new PreparedStatement[DataType.values().length]; for (DataType type : DataType.values()) { - saveStmts[type.ordinal()] = getSession().prepare(INSERT_INTO + ModelConstants.TS_KV_CF + + saveStmts[type.ordinal()] = prepare(INSERT_INTO + ModelConstants.TS_KV_CF + "(" + ModelConstants.ENTITY_TYPE_COLUMN + "," + ModelConstants.ENTITY_ID_COLUMN + "," + ModelConstants.KEY_COLUMN + @@ -398,7 +400,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem if (saveTtlStmts == null) { saveTtlStmts = new PreparedStatement[DataType.values().length]; for (DataType type : DataType.values()) { - saveTtlStmts[type.ordinal()] = getSession().prepare(INSERT_INTO + ModelConstants.TS_KV_CF + + saveTtlStmts[type.ordinal()] = prepare(INSERT_INTO + ModelConstants.TS_KV_CF + "(" + ModelConstants.ENTITY_TYPE_COLUMN + "," + ModelConstants.ENTITY_ID_COLUMN + "," + ModelConstants.KEY_COLUMN + @@ -420,7 +422,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem } else if (type == Aggregation.AVG && fetchStmts[Aggregation.SUM.ordinal()] != null) { fetchStmts[type.ordinal()] = fetchStmts[Aggregation.SUM.ordinal()]; } else { - fetchStmts[type.ordinal()] = getSession().prepare(SELECT_PREFIX + + fetchStmts[type.ordinal()] = prepare(SELECT_PREFIX + String.join(", ", ModelConstants.getFetchColumnNames(type)) + " FROM " + ModelConstants.TS_KV_CF + " WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + EQUALS_PARAM + "AND " + ModelConstants.ENTITY_ID_COLUMN + EQUALS_PARAM @@ -435,26 +437,26 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem return fetchStmts[aggType.ordinal()]; } - private PreparedStatement getLatestStmt(DataType dataType) { - if (latestInsertStmts == null) { - latestInsertStmts = new PreparedStatement[DataType.values().length]; - for (DataType type : DataType.values()) { - latestInsertStmts[type.ordinal()] = getSession().prepare(INSERT_INTO + ModelConstants.TS_KV_LATEST_CF + - "(" + ModelConstants.ENTITY_TYPE_COLUMN + - "," + ModelConstants.ENTITY_ID_COLUMN + - "," + ModelConstants.KEY_COLUMN + - "," + ModelConstants.TS_COLUMN + - "," + getColumnName(type) + ")" + - " VALUES(?, ?, ?, ?, ?)"); - } + private PreparedStatement getLatestStmt() { + if (latestInsertStmt == null) { + latestInsertStmt = prepare(INSERT_INTO + ModelConstants.TS_KV_LATEST_CF + + "(" + ModelConstants.ENTITY_TYPE_COLUMN + + "," + ModelConstants.ENTITY_ID_COLUMN + + "," + ModelConstants.KEY_COLUMN + + "," + ModelConstants.TS_COLUMN + + "," + ModelConstants.BOOLEAN_VALUE_COLUMN + + "," + ModelConstants.STRING_VALUE_COLUMN + + "," + ModelConstants.LONG_VALUE_COLUMN + + "," + ModelConstants.DOUBLE_VALUE_COLUMN + ")" + + " VALUES(?, ?, ?, ?, ?, ?, ?, ?)"); } - return latestInsertStmts[dataType.ordinal()]; + return latestInsertStmt; } private PreparedStatement getPartitionInsertStmt() { if (partitionInsertStmt == null) { - partitionInsertStmt = getSession().prepare(INSERT_INTO + ModelConstants.TS_KV_PARTITIONS_CF + + partitionInsertStmt = prepare(INSERT_INTO + ModelConstants.TS_KV_PARTITIONS_CF + "(" + ModelConstants.ENTITY_TYPE_COLUMN + "," + ModelConstants.ENTITY_ID_COLUMN + "," + ModelConstants.PARTITION_COLUMN + @@ -466,7 +468,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem private PreparedStatement getPartitionInsertTtlStmt() { if (partitionInsertTtlStmt == null) { - partitionInsertTtlStmt = getSession().prepare(INSERT_INTO + ModelConstants.TS_KV_PARTITIONS_CF + + partitionInsertTtlStmt = prepare(INSERT_INTO + ModelConstants.TS_KV_PARTITIONS_CF + "(" + ModelConstants.ENTITY_TYPE_COLUMN + "," + ModelConstants.ENTITY_ID_COLUMN + "," + ModelConstants.PARTITION_COLUMN + @@ -479,7 +481,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem private PreparedStatement getFindLatestStmt() { if (findLatestStmt == null) { - findLatestStmt = getSession().prepare(SELECT_PREFIX + + findLatestStmt = prepare(SELECT_PREFIX + ModelConstants.KEY_COLUMN + "," + ModelConstants.TS_COLUMN + "," + ModelConstants.STRING_VALUE_COLUMN + "," + @@ -496,7 +498,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem private PreparedStatement getFindAllLatestStmt() { if (findAllLatestStmt == null) { - findAllLatestStmt = getSession().prepare(SELECT_PREFIX + + findAllLatestStmt = prepare(SELECT_PREFIX + ModelConstants.KEY_COLUMN + "," + ModelConstants.TS_COLUMN + "," + ModelConstants.STRING_VALUE_COLUMN + "," + diff --git a/dao/src/main/java/org/thingsboard/server/dao/util/AsyncRateLimiter.java b/dao/src/main/java/org/thingsboard/server/dao/util/AsyncRateLimiter.java new file mode 100644 index 0000000000..6fb21d6adb --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/util/AsyncRateLimiter.java @@ -0,0 +1,25 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.util; + +import com.google.common.util.concurrent.ListenableFuture; + +public interface AsyncRateLimiter { + + ListenableFuture acquireAsync(); + + void release(); +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/util/BufferedRateLimiter.java b/dao/src/main/java/org/thingsboard/server/dao/util/BufferedRateLimiter.java new file mode 100644 index 0000000000..2acd623a37 --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/util/BufferedRateLimiter.java @@ -0,0 +1,164 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.util; + +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; + +@Component +@Slf4j +@NoSqlDao +public class BufferedRateLimiter implements AsyncRateLimiter { + + private final ListeningExecutorService pool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10)); + + private final int permitsLimit; + private final int maxPermitWaitTime; + private final AtomicInteger permits; + private final BlockingQueue queue; + + private final AtomicInteger maxQueueSize = new AtomicInteger(); + private final AtomicInteger maxGrantedPermissions = new AtomicInteger(); + + public BufferedRateLimiter(@Value("${cassandra.query.buffer_size}") int queueLimit, + @Value("${cassandra.query.concurrent_limit}") int permitsLimit, + @Value("${cassandra.query.permit_max_wait_time}") int maxPermitWaitTime) { + this.permitsLimit = permitsLimit; + this.maxPermitWaitTime = maxPermitWaitTime; + this.permits = new AtomicInteger(); + this.queue = new LinkedBlockingQueue<>(queueLimit); + } + + @Override + public ListenableFuture acquireAsync() { + if (queue.isEmpty()) { + if (permits.incrementAndGet() <= permitsLimit) { + if (permits.get() > maxGrantedPermissions.get()) { + maxGrantedPermissions.set(permits.get()); + } + return Futures.immediateFuture(null); + } + permits.decrementAndGet(); + } + + return putInQueue(); + } + + @Override + public void release() { + permits.decrementAndGet(); + reprocessQueue(); + } + + private void reprocessQueue() { + while (permits.get() < permitsLimit) { + if (permits.incrementAndGet() <= permitsLimit) { + if (permits.get() > maxGrantedPermissions.get()) { + maxGrantedPermissions.set(permits.get()); + } + LockedFuture lockedFuture = queue.poll(); + if (lockedFuture != null) { + lockedFuture.latch.countDown(); + } else { + permits.decrementAndGet(); + break; + } + } else { + permits.decrementAndGet(); + } + } + } + + private LockedFuture createLockedFuture() { + CountDownLatch latch = new CountDownLatch(1); + ListenableFuture future = pool.submit(() -> { + latch.await(); + return null; + }); + return new LockedFuture(latch, future, System.currentTimeMillis()); + } + + private ListenableFuture putInQueue() { + + int size = queue.size(); + if (size > maxQueueSize.get()) { + maxQueueSize.set(size); + } + + if (queue.remainingCapacity() > 0) { + try { + LockedFuture lockedFuture = createLockedFuture(); + if (!queue.offer(lockedFuture, 1, TimeUnit.SECONDS)) { + lockedFuture.cancelFuture(); + return Futures.immediateFailedFuture(new IllegalStateException("Rate Limit Buffer is full. Reject")); + } + if(permits.get() < permitsLimit) { + reprocessQueue(); + } + return lockedFuture.future; + } catch (InterruptedException e) { + return Futures.immediateFailedFuture(new IllegalStateException("Rate Limit Task interrupted. Reject")); + } + } + return Futures.immediateFailedFuture(new IllegalStateException("Rate Limit Buffer is full. Reject")); + } + + @Scheduled(fixedDelayString = "${cassandra.query.rate_limit_print_interval_ms}") + public void printStats() { + int expiredCount = 0; + for (LockedFuture lockedFuture : queue) { + if (lockedFuture.isExpired()) { + lockedFuture.cancelFuture(); + expiredCount++; + } + } + log.info("Permits maxBuffer is [{}] max concurrent [{}] expired [{}] current granted [{}]", maxQueueSize.getAndSet(0), + maxGrantedPermissions.getAndSet(0), expiredCount, permits.get()); + } + + private class LockedFuture { + final CountDownLatch latch; + final ListenableFuture future; + final long createTime; + + public LockedFuture(CountDownLatch latch, ListenableFuture future, long createTime) { + this.latch = latch; + this.future = future; + this.createTime = createTime; + } + + void cancelFuture() { + future.cancel(false); + latch.countDown(); + } + + boolean isExpired() { + return (System.currentTimeMillis() - createTime) > maxPermitWaitTime; + } + + } + + +} diff --git a/dao/src/test/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFutureTest.java b/dao/src/test/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFutureTest.java new file mode 100644 index 0000000000..fa62c2b9b0 --- /dev/null +++ b/dao/src/test/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFutureTest.java @@ -0,0 +1,156 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.nosql; + +import com.datastax.driver.core.*; +import com.datastax.driver.core.exceptions.UnsupportedFeatureException; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.runners.MockitoJUnitRunner; +import org.mockito.stubbing.Answer; +import org.thingsboard.server.dao.util.AsyncRateLimiter; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +@RunWith(MockitoJUnitRunner.class) +public class RateLimitedResultSetFutureTest { + + private RateLimitedResultSetFuture resultSetFuture; + + @Mock + private AsyncRateLimiter rateLimiter; + @Mock + private Session session; + @Mock + private Statement statement; + @Mock + private ResultSetFuture realFuture; + @Mock + private ResultSet rows; + @Mock + private Row row; + + @Test + public void doNotReleasePermissionIfRateLimitFutureFailed() throws InterruptedException { + when(rateLimiter.acquireAsync()).thenReturn(Futures.immediateFailedFuture(new IllegalArgumentException())); + resultSetFuture = new RateLimitedResultSetFuture(session, rateLimiter, statement); + Thread.sleep(1000L); + verify(rateLimiter).acquireAsync(); + try { + assertTrue(resultSetFuture.isDone()); + fail(); + } catch (Exception e) { + assertTrue(e instanceof IllegalStateException); + Throwable actualCause = e.getCause(); + assertTrue(actualCause instanceof ExecutionException); + } + verifyNoMoreInteractions(session, rateLimiter, statement); + + } + + @Test + public void getUninterruptiblyDelegateToCassandra() throws InterruptedException, ExecutionException { + when(rateLimiter.acquireAsync()).thenReturn(Futures.immediateFuture(null)); + when(session.executeAsync(statement)).thenReturn(realFuture); + Mockito.doAnswer((Answer) invocation -> { + Object[] args = invocation.getArguments(); + Runnable task = (Runnable) args[0]; + task.run(); + return null; + }).when(realFuture).addListener(Mockito.any(), Mockito.any()); + + when(realFuture.getUninterruptibly()).thenReturn(rows); + + resultSetFuture = new RateLimitedResultSetFuture(session, rateLimiter, statement); + ResultSet actual = resultSetFuture.getUninterruptibly(); + assertSame(rows, actual); + verify(rateLimiter, times(1)).acquireAsync(); + verify(rateLimiter, times(1)).release(); + } + + @Test + public void addListenerAllowsFutureTransformation() throws InterruptedException, ExecutionException { + when(rateLimiter.acquireAsync()).thenReturn(Futures.immediateFuture(null)); + when(session.executeAsync(statement)).thenReturn(realFuture); + Mockito.doAnswer((Answer) invocation -> { + Object[] args = invocation.getArguments(); + Runnable task = (Runnable) args[0]; + task.run(); + return null; + }).when(realFuture).addListener(Mockito.any(), Mockito.any()); + + when(realFuture.get()).thenReturn(rows); + when(rows.one()).thenReturn(row); + + resultSetFuture = new RateLimitedResultSetFuture(session, rateLimiter, statement); + + ListenableFuture transform = Futures.transform(resultSetFuture, ResultSet::one); + Row actualRow = transform.get(); + + assertSame(row, actualRow); + verify(rateLimiter, times(1)).acquireAsync(); + verify(rateLimiter, times(1)).release(); + } + + @Test + public void immidiateCassandraExceptionReturnsPermit() throws InterruptedException, ExecutionException { + when(rateLimiter.acquireAsync()).thenReturn(Futures.immediateFuture(null)); + when(session.executeAsync(statement)).thenThrow(new UnsupportedFeatureException(ProtocolVersion.V3, "hjg")); + resultSetFuture = new RateLimitedResultSetFuture(session, rateLimiter, statement); + ListenableFuture transform = Futures.transform(resultSetFuture, ResultSet::one); + try { + transform.get(); + fail(); + } catch (Exception e) { + assertTrue(e instanceof ExecutionException); + } + verify(rateLimiter, times(1)).acquireAsync(); + verify(rateLimiter, times(1)).release(); + } + + @Test + public void queryTimeoutReturnsPermit() throws InterruptedException, ExecutionException { + when(rateLimiter.acquireAsync()).thenReturn(Futures.immediateFuture(null)); + when(session.executeAsync(statement)).thenReturn(realFuture); + Mockito.doAnswer((Answer) invocation -> { + Object[] args = invocation.getArguments(); + Runnable task = (Runnable) args[0]; + task.run(); + return null; + }).when(realFuture).addListener(Mockito.any(), Mockito.any()); + + when(realFuture.get()).thenThrow(new ExecutionException("Fail", new TimeoutException("timeout"))); + resultSetFuture = new RateLimitedResultSetFuture(session, rateLimiter, statement); + ListenableFuture transform = Futures.transform(resultSetFuture, ResultSet::one); + try { + transform.get(); + fail(); + } catch (Exception e) { + assertTrue(e instanceof ExecutionException); + } + verify(rateLimiter, times(1)).acquireAsync(); + verify(rateLimiter, times(1)).release(); + } + +} \ No newline at end of file diff --git a/dao/src/test/java/org/thingsboard/server/dao/util/BufferedRateLimiterTest.java b/dao/src/test/java/org/thingsboard/server/dao/util/BufferedRateLimiterTest.java new file mode 100644 index 0000000000..5bfc3b6e95 --- /dev/null +++ b/dao/src/test/java/org/thingsboard/server/dao/util/BufferedRateLimiterTest.java @@ -0,0 +1,134 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.util; + +import com.google.common.util.concurrent.*; +import org.junit.Test; + +import javax.annotation.Nullable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.*; + + +public class BufferedRateLimiterTest { + + @Test + public void finishedFutureReturnedIfPermitsAreGranted() { + BufferedRateLimiter limiter = new BufferedRateLimiter(10, 10, 100); + ListenableFuture actual = limiter.acquireAsync(); + assertTrue(actual.isDone()); + } + + @Test + public void notFinishedFutureReturnedIfPermitsAreNotGranted() { + BufferedRateLimiter limiter = new BufferedRateLimiter(10, 1, 100); + ListenableFuture actual1 = limiter.acquireAsync(); + ListenableFuture actual2 = limiter.acquireAsync(); + assertTrue(actual1.isDone()); + assertFalse(actual2.isDone()); + } + + @Test + public void failedFutureReturnedIfQueueIsfull() { + BufferedRateLimiter limiter = new BufferedRateLimiter(1, 1, 100); + ListenableFuture actual1 = limiter.acquireAsync(); + ListenableFuture actual2 = limiter.acquireAsync(); + ListenableFuture actual3 = limiter.acquireAsync(); + + assertTrue(actual1.isDone()); + assertFalse(actual2.isDone()); + assertTrue(actual3.isDone()); + try { + actual3.get(); + fail(); + } catch (Exception e) { + assertTrue(e instanceof ExecutionException); + Throwable actualCause = e.getCause(); + assertTrue(actualCause instanceof IllegalStateException); + assertEquals("Rate Limit Buffer is full. Reject", actualCause.getMessage()); + } + } + + @Test + public void releasedPermitTriggerTasksFromQueue() throws InterruptedException { + BufferedRateLimiter limiter = new BufferedRateLimiter(10, 2, 100); + ListenableFuture actual1 = limiter.acquireAsync(); + ListenableFuture actual2 = limiter.acquireAsync(); + ListenableFuture actual3 = limiter.acquireAsync(); + ListenableFuture actual4 = limiter.acquireAsync(); + assertTrue(actual1.isDone()); + assertTrue(actual2.isDone()); + assertFalse(actual3.isDone()); + assertFalse(actual4.isDone()); + limiter.release(); + TimeUnit.MILLISECONDS.sleep(100L); + assertTrue(actual3.isDone()); + assertFalse(actual4.isDone()); + limiter.release(); + TimeUnit.MILLISECONDS.sleep(100L); + assertTrue(actual4.isDone()); + } + + @Test + public void permitsReleasedInConcurrentMode() throws InterruptedException { + BufferedRateLimiter limiter = new BufferedRateLimiter(10, 2, 100); + AtomicInteger actualReleased = new AtomicInteger(); + AtomicInteger actualRejected = new AtomicInteger(); + ListeningExecutorService pool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(5)); + for (int i = 0; i < 100; i++) { + ListenableFuture> submit = pool.submit(limiter::acquireAsync); + Futures.addCallback(submit, new FutureCallback>() { + @Override + public void onSuccess(@Nullable ListenableFuture result) { + Futures.addCallback(result, new FutureCallback() { + @Override + public void onSuccess(@Nullable Void result) { + try { + TimeUnit.MILLISECONDS.sleep(100); + } catch (InterruptedException e) { + e.printStackTrace(); + } + limiter.release(); + actualReleased.incrementAndGet(); + } + + @Override + public void onFailure(Throwable t) { + actualRejected.incrementAndGet(); + } + }); + } + + @Override + public void onFailure(Throwable t) { + } + }); + } + + TimeUnit.SECONDS.sleep(2); + assertTrue("Unexpected released count " + actualReleased.get(), + actualReleased.get() > 10 && actualReleased.get() < 20); + assertTrue("Unexpected rejected count " + actualRejected.get(), + actualRejected.get() > 80 && actualRejected.get() < 90); + + } + + +} \ No newline at end of file diff --git a/dao/src/test/resources/cassandra-test.properties b/dao/src/test/resources/cassandra-test.properties index 82fcbe1949..737687f053 100644 --- a/dao/src/test/resources/cassandra-test.properties +++ b/dao/src/test/resources/cassandra-test.properties @@ -47,3 +47,8 @@ cassandra.query.default_fetch_size=2000 cassandra.query.ts_key_value_partitioning=HOURS cassandra.query.max_limit_per_request=1000 +cassandra.query.buffer_size=100000 +cassandra.query.concurrent_limit=1000 +cassandra.query.permit_max_wait_time=20000 +cassandra.query.rate_limit_print_interval_ms=30000 + diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/NodeConfiguration.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/NodeConfiguration.java new file mode 100644 index 0000000000..5e4c4b5399 --- /dev/null +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/NodeConfiguration.java @@ -0,0 +1,22 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.api; + +public interface NodeConfiguration { + + NodeConfiguration defaultConfiguration(); + +} diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleNode.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleNode.java index f8e0fa2e29..16170347a7 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleNode.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleNode.java @@ -35,15 +35,16 @@ public @interface RuleNode { String nodeDetails(); + Class configClazz(); + boolean inEnabled() default true; boolean outEnabled() default true; ComponentScope scope() default ComponentScope.TENANT; - String defaultConfigResource() default "EmptyNodeConfig.json"; - String[] relationTypes() default {"Success", "Failure"}; boolean customRelations() default false; + } diff --git a/rule-engine/rule-engine-api/src/main/resources/EmptyNodeConfig.json b/rule-engine/rule-engine-api/src/main/resources/EmptyNodeConfig.json deleted file mode 100644 index 7a73a41bfd..0000000000 --- a/rule-engine/rule-engine-api/src/main/resources/EmptyNodeConfig.json +++ /dev/null @@ -1,2 +0,0 @@ -{ -} \ No newline at end of file diff --git a/rule-engine/rule-engine-api/src/main/resources/EmptyNodeDescriptor.json b/rule-engine/rule-engine-api/src/main/resources/EmptyNodeDescriptor.json deleted file mode 100644 index 7a73a41bfd..0000000000 --- a/rule-engine/rule-engine-api/src/main/resources/EmptyNodeDescriptor.json +++ /dev/null @@ -1,2 +0,0 @@ -{ -} \ No newline at end of file diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsFilterNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsFilterNode.java index c85d4807e0..07b166db68 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsFilterNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsFilterNode.java @@ -30,6 +30,7 @@ import static org.thingsboard.rule.engine.DonAsynchron.withCallback; @RuleNode( type = ComponentType.FILTER, name = "script", relationTypes = {"True", "False", "Failure"}, + configClazz = TbJsFilterNodeConfiguration.class, nodeDescription = "Filter incoming messages using JS script", nodeDetails = "Evaluate incoming Message with configured JS condition. " + "If True - send Message via True chain, otherwise False chain is used." + diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsFilterNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsFilterNodeConfiguration.java index bf543e3276..3b19c7c641 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsFilterNodeConfiguration.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsFilterNodeConfiguration.java @@ -16,9 +16,17 @@ package org.thingsboard.rule.engine.filter; import lombok.Data; +import org.thingsboard.rule.engine.api.NodeConfiguration; @Data -public class TbJsFilterNodeConfiguration { +public class TbJsFilterNodeConfiguration implements NodeConfiguration { private String jsScript; + + @Override + public TbJsFilterNodeConfiguration defaultConfiguration() { + TbJsFilterNodeConfiguration configuration = new TbJsFilterNodeConfiguration(); + configuration.setJsScript("msg.passed < 15 && msg.name === 'Vit' && meta.temp == 10 && msg.bigObj.prop == 42;"); + return configuration; + } } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNode.java index faf97b4e50..c1236a4653 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNode.java @@ -31,6 +31,7 @@ import static org.thingsboard.rule.engine.DonAsynchron.withCallback; @RuleNode( type = ComponentType.FILTER, name = "switch", customRelations = true, + configClazz = TbJsSwitchNodeConfiguration.class, nodeDescription = "Route incoming Message to one or multiple output chains", nodeDetails = "Node executes configured JS script. Script should return array of next Chain names where Message should be routed. " + "If Array is empty - message not routed to next Node. " + diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeConfiguration.java index 331302d542..b354c7199f 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeConfiguration.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeConfiguration.java @@ -15,14 +15,29 @@ */ package org.thingsboard.rule.engine.filter; +import com.google.common.collect.Sets; import lombok.Data; +import org.thingsboard.rule.engine.api.NodeConfiguration; import java.util.Set; @Data -public class TbJsSwitchNodeConfiguration { +public class TbJsSwitchNodeConfiguration implements NodeConfiguration { private String jsScript; private Set allowedRelations; private boolean routeToAllWithNoCheck; + + @Override + public TbJsSwitchNodeConfiguration defaultConfiguration() { + TbJsSwitchNodeConfiguration configuration = new TbJsSwitchNodeConfiguration(); + configuration.setJsScript("function nextRelation(meta, msg) {\n" + + " return ['one','nine'];" + + "};\n" + + "\n" + + "nextRelation(meta, msg);"); + configuration.setAllowedRelations(Sets.newHashSet("one", "two")); + configuration.setRouteToAllWithNoCheck(false); + return configuration; + } } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeFilterNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeFilterNode.java index 7a6f9fdbee..3a86c25afb 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeFilterNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeFilterNode.java @@ -28,6 +28,7 @@ import org.thingsboard.server.common.msg.TbMsg; @RuleNode( type = ComponentType.FILTER, name = "message type", + configClazz = TbMsgTypeFilterNodeConfiguration.class, nodeDescription = "Filter incoming messages by Message Type", nodeDetails = "Evaluate incoming Message with configured JS condition. " + "If incoming MessageType is expected - send Message via Success chain, otherwise Failure chain is used.") diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeFilterNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeFilterNodeConfiguration.java index 3b7ba9055e..a2e1b179f0 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeFilterNodeConfiguration.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeFilterNodeConfiguration.java @@ -16,15 +16,24 @@ package org.thingsboard.rule.engine.filter; import lombok.Data; +import org.thingsboard.rule.engine.api.NodeConfiguration; +import java.util.Arrays; +import java.util.Collections; import java.util.List; /** * Created by ashvayka on 19.01.18. */ @Data -public class TbMsgTypeFilterNodeConfiguration { +public class TbMsgTypeFilterNodeConfiguration implements NodeConfiguration { private List messageTypes; + @Override + public TbMsgTypeFilterNodeConfiguration defaultConfiguration() { + TbMsgTypeFilterNodeConfiguration configuration = new TbMsgTypeFilterNodeConfiguration(); + configuration.setMessageTypes(Arrays.asList("GET_ATTRIBUTES","POST_ATTRIBUTES","POST_TELEMETRY","RPC_REQUEST")); + return configuration; + } } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNode.java index 6228206da8..69ee9d7c79 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNode.java @@ -38,6 +38,7 @@ import static org.thingsboard.server.common.data.DataConstants.*; @Slf4j @RuleNode(type = ComponentType.ENRICHMENT, name = "originator attributes", + configClazz = TbGetAttributesNodeConfiguration.class, nodeDescription = "Add Message Originator Attributes or Latest Telemetry into Message Metadata", nodeDetails = "If Attributes enrichment configured, CLIENT/SHARED/SERVER attributes are added into Message metadata " + "with specific prefix: cs/shared/ss. To access those attributes in other nodes this template can be used " + diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNodeConfiguration.java index ad92314324..103b4de956 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNodeConfiguration.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNodeConfiguration.java @@ -16,14 +16,16 @@ package org.thingsboard.rule.engine.metadata; import lombok.Data; +import org.thingsboard.rule.engine.api.NodeConfiguration; +import java.util.Collections; import java.util.List; /** * Created by ashvayka on 19.01.18. */ @Data -public class TbGetAttributesNodeConfiguration { +public class TbGetAttributesNodeConfiguration implements NodeConfiguration { private List clientAttributeNames; private List sharedAttributeNames; @@ -31,4 +33,13 @@ public class TbGetAttributesNodeConfiguration { private List latestTsKeyNames; + @Override + public TbGetAttributesNodeConfiguration defaultConfiguration() { + TbGetAttributesNodeConfiguration configuration = new TbGetAttributesNodeConfiguration(); + configuration.setClientAttributeNames(Collections.emptyList()); + configuration.setSharedAttributeNames(Collections.emptyList()); + configuration.setServerAttributeNames(Collections.emptyList()); + configuration.setLatestTsKeyNames(Collections.emptyList()); + return configuration; + } } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNode.java index d85fb56de0..cc6d6a1197 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNode.java @@ -26,6 +26,7 @@ import org.thingsboard.server.common.data.plugin.ComponentType; @RuleNode( type = ComponentType.ENRICHMENT, name="customer attributes", + configClazz = TbGetEntityAttrNodeConfiguration.class, nodeDescription = "Add Originators Customer Attributes or Latest Telemetry into Message Metadata", nodeDetails = "If Attributes enrichment configured, server scope attributes are added into Message metadata. " + "To access those attributes in other nodes this template can be used " + diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetEntityAttrNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetEntityAttrNodeConfiguration.java index a5e85c57d7..51951150af 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetEntityAttrNodeConfiguration.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetEntityAttrNodeConfiguration.java @@ -16,13 +16,25 @@ package org.thingsboard.rule.engine.metadata; import lombok.Data; +import org.thingsboard.rule.engine.api.NodeConfiguration; +import java.util.HashMap; import java.util.Map; import java.util.Optional; @Data -public class TbGetEntityAttrNodeConfiguration { +public class TbGetEntityAttrNodeConfiguration implements NodeConfiguration { private Map attrMapping; private boolean isTelemetry = false; + + @Override + public TbGetEntityAttrNodeConfiguration defaultConfiguration() { + TbGetEntityAttrNodeConfiguration configuration = new TbGetEntityAttrNodeConfiguration(); + Map attrMapping = new HashMap<>(); + attrMapping.putIfAbsent("temperature", "tempo"); + configuration.setAttrMapping(attrMapping); + configuration.setTelemetry(true); + return configuration; + } } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttrNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttrNodeConfiguration.java index ae0b662d48..82119926af 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttrNodeConfiguration.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttrNodeConfiguration.java @@ -16,11 +16,28 @@ package org.thingsboard.rule.engine.metadata; import lombok.Data; +import org.thingsboard.rule.engine.api.NodeConfiguration; +import org.thingsboard.server.common.data.relation.EntityRelation; import org.thingsboard.server.common.data.relation.EntitySearchDirection; +import java.util.HashMap; +import java.util.Map; + @Data -public class TbGetRelatedAttrNodeConfiguration extends TbGetEntityAttrNodeConfiguration { +public class TbGetRelatedAttrNodeConfiguration extends TbGetEntityAttrNodeConfiguration { private String relationType; private EntitySearchDirection direction; + + @Override + public TbGetRelatedAttrNodeConfiguration defaultConfiguration() { + TbGetRelatedAttrNodeConfiguration configuration = new TbGetRelatedAttrNodeConfiguration(); + Map attrMapping = new HashMap<>(); + attrMapping.putIfAbsent("temperature", "tempo"); + configuration.setAttrMapping(attrMapping); + configuration.setTelemetry(true); + configuration.setRelationType(EntityRelation.CONTAINS_TYPE); + configuration.setDirection(EntitySearchDirection.FROM); + return configuration; + } } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttributeNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttributeNode.java index 26b756176c..22c0b9f0a8 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttributeNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttributeNode.java @@ -26,6 +26,7 @@ import org.thingsboard.server.common.data.plugin.ComponentType; @RuleNode( type = ComponentType.ENRICHMENT, name="related attributes", + configClazz = TbGetRelatedAttrNodeConfiguration.class, nodeDescription = "Add Originators Related Entity Attributes or Latest Telemetry into Message Metadata", nodeDetails = "Related Entity found using configured relation direction and Relation Type. " + "If multiple Related Entities are found, only first Entity is used for attributes enrichment, other entities are discarded. " + diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTenantAttributeNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTenantAttributeNode.java index 7d9c50bfc6..b5f5e02dbe 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTenantAttributeNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTenantAttributeNode.java @@ -28,6 +28,7 @@ import org.thingsboard.server.common.data.plugin.ComponentType; @RuleNode( type = ComponentType.ENRICHMENT, name="tenant attributes", + configClazz = TbGetEntityAttrNodeConfiguration.class, nodeDescription = "Add Originators Tenant Attributes or Latest Telemetry into Message Metadata", nodeDetails = "If Attributes enrichment configured, server scope attributes are added into Message metadata. " + "To access those attributes in other nodes this template can be used " + diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNode.java index d237df83ce..40a647a822 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNode.java @@ -36,6 +36,7 @@ import java.util.HashSet; @RuleNode( type = ComponentType.TRANSFORMATION, name="change originator", + configClazz = TbChangeOriginatorNodeConfiguration.class, nodeDescription = "Change Message Originator To Tenant/Customer/Related Entity", nodeDetails = "Related Entity found using configured relation direction and Relation Type. " + "If multiple Related Entities are found, only first Entity is used as new Originator, other entities are discarded. ") diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNodeConfiguration.java index cf036810c4..3370408231 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNodeConfiguration.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNodeConfiguration.java @@ -16,12 +16,24 @@ package org.thingsboard.rule.engine.transform; import lombok.Data; +import org.thingsboard.rule.engine.api.NodeConfiguration; +import org.thingsboard.server.common.data.relation.EntityRelation; import org.thingsboard.server.common.data.relation.EntitySearchDirection; @Data -public class TbChangeOriginatorNodeConfiguration extends TbTransformNodeConfiguration{ +public class TbChangeOriginatorNodeConfiguration extends TbTransformNodeConfiguration implements NodeConfiguration { private String originatorSource; private EntitySearchDirection direction; private String relationType; + + @Override + public TbChangeOriginatorNodeConfiguration defaultConfiguration() { + TbChangeOriginatorNodeConfiguration configuration = new TbChangeOriginatorNodeConfiguration(); + configuration.setOriginatorSource(TbChangeOriginatorNode.CUSTOMER_SOURCE); + configuration.setDirection(EntitySearchDirection.FROM); + configuration.setRelationType(EntityRelation.CONTAINS_TYPE); + configuration.setStartNewChain(false); + return configuration; + } } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformMsgNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformMsgNode.java index babdbc3085..626790fcf8 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformMsgNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformMsgNode.java @@ -27,6 +27,7 @@ import javax.script.Bindings; @RuleNode( type = ComponentType.TRANSFORMATION, name = "script", + configClazz = TbTransformMsgNodeConfiguration.class, nodeDescription = "Change Message payload and Metadata using JavaScript", nodeDetails = "JavaScript function recieve 2 input parameters that can be changed inside.
" + "meta - is a Message metadata.
" + diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformMsgNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformMsgNodeConfiguration.java index 9cc926b54d..4f9e9eb681 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformMsgNodeConfiguration.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformMsgNodeConfiguration.java @@ -16,9 +16,18 @@ package org.thingsboard.rule.engine.transform; import lombok.Data; +import org.thingsboard.rule.engine.api.NodeConfiguration; @Data -public class TbTransformMsgNodeConfiguration extends TbTransformNodeConfiguration { +public class TbTransformMsgNodeConfiguration extends TbTransformNodeConfiguration implements NodeConfiguration { private String jsScript; + + @Override + public TbTransformMsgNodeConfiguration defaultConfiguration() { + TbTransformMsgNodeConfiguration configuration = new TbTransformMsgNodeConfiguration(); + configuration.setStartNewChain(false); + configuration.setJsScript("msg.passed = msg.passed * meta.temp; msg.bigObj.newProp = 'Ukraine' "); + return configuration; + } } diff --git a/ui/src/app/api/rule-chain.service.js b/ui/src/app/api/rule-chain.service.js index f17553545e..ebc48fad61 100644 --- a/ui/src/app/api/rule-chain.service.js +++ b/ui/src/app/api/rule-chain.service.js @@ -153,16 +153,21 @@ function RuleChainService($http, $q, $filter, types, componentDescriptorService) return deferred.promise; } - function getRuleNodeSupportedLinks(nodeType) { //eslint-disable-line - //TODO: - var deferred = $q.defer(); - var linkLabels = [ - { name: 'Success', custom: false }, - { name: 'Fail', custom: false }, - { name: 'Custom', custom: true }, - ]; - deferred.resolve(linkLabels); - return deferred.promise; + function getRuleNodeSupportedLinks(component) { + var relationTypes = component.configurationDescriptor.nodeDefinition.relationTypes; + var customRelations = component.configurationDescriptor.nodeDefinition.customRelations; + var linkLabels = []; + for (var i=0;i { + if (tooltip.$element.parentNode != null) { + tooltip.$element.parentNode.removeChild(tooltip.$element); + } + }); + } + }); +} + +function onElementRemoved(element, callback) { + if (!document.body.contains(element)) { //eslint-disable-line + callback(); + } else { + var observer; + observer = new MutationObserver(function(mutations) { //eslint-disable-line + if (!document.body.contains(element)) { //eslint-disable-line + callback(); + observer.disconnect(); + } + }); + observer.observe(document.body, {childList: true}); //eslint-disable-line + } +} diff --git a/ui/src/app/components/js-func.directive.js b/ui/src/app/components/js-func.directive.js index f95d003e8a..33cebdebf0 100644 --- a/ui/src/app/components/js-func.directive.js +++ b/ui/src/app/components/js-func.directive.js @@ -22,6 +22,8 @@ import thingsboardToast from '../services/toast'; import thingsboardUtils from '../common/utils.service'; import thingsboardExpandFullscreen from './expand-fullscreen.directive'; +import fixAceEditor from './ace-editor-fix'; + /* eslint-disable import/no-unresolved, import/default */ import jsFuncTemplate from './js-func.tpl.html'; @@ -83,6 +85,7 @@ function JsFunc($compile, $templateCache, toast, utils, $translate) { scope.js_editor.session.on("change", function () { scope.cleanupJsErrors(); }); + fixAceEditor(_ace); } }; diff --git a/ui/src/app/components/json-object-edit.directive.js b/ui/src/app/components/json-object-edit.directive.js new file mode 100644 index 0000000000..db0aa60bb5 --- /dev/null +++ b/ui/src/app/components/json-object-edit.directive.js @@ -0,0 +1,168 @@ +/* + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import './json-object-edit.scss'; + +import 'brace/ext/language_tools'; +import 'brace/mode/json'; +import 'ace-builds/src-min-noconflict/snippets/json'; + +import fixAceEditor from './ace-editor-fix'; + +/* eslint-disable import/no-unresolved, import/default */ + +import jsonObjectEditTemplate from './json-object-edit.tpl.html'; + +/* eslint-enable import/no-unresolved, import/default */ + +export default angular.module('thingsboard.directives.jsonObjectEdit', []) + .directive('tbJsonObjectEdit', JsonObjectEdit) + .name; + +/*@ngInject*/ +function JsonObjectEdit($compile, $templateCache, $document, toast, utils) { + + var linker = function (scope, element, attrs, ngModelCtrl) { + var template = $templateCache.get(jsonObjectEditTemplate); + element.html(template); + + scope.label = attrs.label; + + scope.objectValid = true; + scope.validationError = ''; + + scope.json_editor; + + scope.onFullscreenChanged = function () { + updateEditorSize(); + }; + + function updateEditorSize() { + if (scope.json_editor) { + scope.json_editor.resize(); + scope.json_editor.renderer.updateFull(); + } + } + + scope.jsonEditorOptions = { + useWrapMode: true, + mode: 'json', + advanced: { + enableSnippets: true, + enableBasicAutocompletion: true, + enableLiveAutocompletion: true + }, + onLoad: function (_ace) { + scope.json_editor = _ace; + scope.json_editor.session.on("change", function () { + scope.cleanupJsonErrors(); + }); + fixAceEditor(_ace); + } + }; + + scope.cleanupJsonErrors = function () { + toast.hide(); + }; + + scope.updateValidity = function () { + ngModelCtrl.$setValidity('objectValid', scope.objectValid); + }; + + scope.$watch('contentBody', function (newVal, prevVal) { + if (!angular.equals(newVal, prevVal)) { + var object = scope.validate(); + ngModelCtrl.$setViewValue(object); + scope.updateValidity(); + } + }); + + ngModelCtrl.$render = function () { + var object = ngModelCtrl.$viewValue; + var content = ''; + try { + if (object) { + content = angular.toJson(object, true); + } + } catch (e) { + // + } + scope.contentBody = content; + }; + + scope.showError = function (error) { + var toastParent = angular.element('#tb-json-panel', element); + toast.showError(error, toastParent, 'bottom left'); + }; + + scope.validate = function () { + if (!scope.contentBody || !scope.contentBody.length) { + if (scope.required) { + scope.validationError = 'Json object is required.'; + scope.objectValid = false; + } else { + scope.validationError = ''; + scope.objectValid = true; + } + return null; + } else { + try { + var object = angular.fromJson(scope.contentBody); + scope.validationError = ''; + scope.objectValid = true; + return object; + } catch (e) { + var details = utils.parseException(e); + var errorInfo = 'Error:'; + if (details.name) { + errorInfo += ' ' + details.name + ':'; + } + if (details.message) { + errorInfo += ' ' + details.message; + } + scope.validationError = errorInfo; + scope.objectValid = false; + return null; + } + } + }; + + scope.$on('form-submit', function () { + if (!scope.readonly) { + scope.cleanupJsonErrors(); + if (!scope.objectValid) { + scope.showError(scope.validationError); + } + } + }); + + scope.$on('update-ace-editor-size', function () { + updateEditorSize(); + }); + + $compile(element.contents())(scope); + } + + return { + restrict: "E", + require: "^ngModel", + scope: { + required:'=ngRequired', + readonly:'=ngReadonly', + fillHeight:'=?' + }, + link: linker + }; +} diff --git a/ui/src/app/components/json-object-edit.scss b/ui/src/app/components/json-object-edit.scss new file mode 100644 index 0000000000..232d69a2fb --- /dev/null +++ b/ui/src/app/components/json-object-edit.scss @@ -0,0 +1,35 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +tb-json-object-edit { + position: relative; + .fill-height { + height: 100%; + } +} + +.tb-json-object-panel { + margin-left: 15px; + border: 1px solid #C0C0C0; + height: 100%; + #tb-json-input { + min-width: 200px; + width: 100%; + height: 100%; + &:not(.fill-height) { + min-height: 200px; + } + } +} diff --git a/ui/src/app/components/json-object-edit.tpl.html b/ui/src/app/components/json-object-edit.tpl.html new file mode 100644 index 0000000000..ebab3c7292 --- /dev/null +++ b/ui/src/app/components/json-object-edit.tpl.html @@ -0,0 +1,34 @@ + +
+
+ + + +
+
+
+
+
+
diff --git a/ui/src/app/components/react/json-form-ace-editor.jsx b/ui/src/app/components/react/json-form-ace-editor.jsx index 1c4c02e16f..5afd3d18f1 100644 --- a/ui/src/app/components/react/json-form-ace-editor.jsx +++ b/ui/src/app/components/react/json-form-ace-editor.jsx @@ -23,6 +23,8 @@ import FlatButton from 'material-ui/FlatButton'; import 'brace/ext/language_tools'; import 'brace/theme/github'; +import fixAceEditor from './../ace-editor-fix'; + class ThingsboardAceEditor extends React.Component { constructor(props) { @@ -31,6 +33,7 @@ class ThingsboardAceEditor extends React.Component { this.onBlur = this.onBlur.bind(this); this.onFocus = this.onFocus.bind(this); this.onTidy = this.onTidy.bind(this); + this.onLoad = this.onLoad.bind(this); var value = props.value ? props.value + '' : ''; this.state = { value: value, @@ -72,6 +75,10 @@ class ThingsboardAceEditor extends React.Component { } } + onLoad(editor) { + fixAceEditor(editor); + } + render() { const styles = reactCSS({ @@ -117,6 +124,7 @@ class ThingsboardAceEditor extends React.Component { onChange={this.onValueChanged} onFocus={this.onFocus} onBlur={this.onBlur} + onLoad={this.onLoad} name={this.props.form.title} value={this.state.value} readOnly={this.props.form.readonly} diff --git a/ui/src/app/components/widget/widget-config.directive.js b/ui/src/app/components/widget/widget-config.directive.js index d0ee6a3d04..4d4d958427 100644 --- a/ui/src/app/components/widget/widget-config.directive.js +++ b/ui/src/app/components/widget/widget-config.directive.js @@ -23,6 +23,8 @@ import thingsboardJsonForm from '../json-form.directive'; import thingsboardManageWidgetActions from './action/manage-widget-actions.directive'; import 'angular-ui-ace'; +import fixAceEditor from './../ace-editor-fix'; + import './widget-config.scss'; /* eslint-disable import/no-unresolved, import/default */ @@ -72,6 +74,9 @@ function WidgetConfig($compile, $templateCache, $rootScope, $translate, $timeout enableSnippets: true, enableBasicAutocompletion: true, enableLiveAutocompletion: true + }, + onLoad: function (_ace) { + fixAceEditor(_ace); } }; diff --git a/ui/src/app/extension/extensions-forms/extension-form-opc.directive.js b/ui/src/app/extension/extensions-forms/extension-form-opc.directive.js index bf4886c994..e1d2519ae3 100644 --- a/ui/src/app/extension/extensions-forms/extension-form-opc.directive.js +++ b/ui/src/app/extension/extensions-forms/extension-form-opc.directive.js @@ -128,8 +128,8 @@ export default function ExtensionFormOpcDirective($compile, $templateCache, $tra let addedFile = event.target.result; if (addedFile && addedFile.length > 0) { - model[options.fileName] = $file.name; - model[options.file] = addedFile.replace(/^data.*base64,/, ""); + model[options.location] = $file.name; + model[options.fileContent] = addedFile.replace(/^data.*base64,/, ""); } } @@ -142,8 +142,8 @@ export default function ExtensionFormOpcDirective($compile, $templateCache, $tra scope.clearFile = function(model, options) { scope.theForm.$setDirty(); - model[options.fileName] = null; - model[options.file] = null; + model[options.location] = null; + model[options.fileContent] = null; }; diff --git a/ui/src/app/extension/extensions-forms/extension-form-opc.tpl.html b/ui/src/app/extension/extensions-forms/extension-form-opc.tpl.html index 501eeebd07..5a7c00ba3b 100644 --- a/ui/src/app/extension/extensions-forms/extension-form-opc.tpl.html +++ b/ui/src/app/extension/extensions-forms/extension-form-opc.tpl.html @@ -212,8 +212,8 @@ diff --git a/ui/src/app/layout/index.js b/ui/src/app/layout/index.js index e90334bf52..d397d142c8 100644 --- a/ui/src/app/layout/index.js +++ b/ui/src/app/layout/index.js @@ -29,6 +29,7 @@ import thingsboardNoAnimate from '../components/no-animate.directive'; import thingsboardOnFinishRender from '../components/finish-render.directive'; import thingsboardSideMenu from '../components/side-menu.directive'; import thingsboardDashboardAutocomplete from '../components/dashboard-autocomplete.directive'; +import thingsboardJsonObjectEdit from '../components/json-object-edit.directive'; import thingsboardUserMenu from './user-menu.directive'; @@ -90,7 +91,8 @@ export default angular.module('thingsboard.home', [ thingsboardNoAnimate, thingsboardOnFinishRender, thingsboardSideMenu, - thingsboardDashboardAutocomplete + thingsboardDashboardAutocomplete, + thingsboardJsonObjectEdit ]) .config(HomeRoutes) .controller('HomeController', HomeController) diff --git a/ui/src/app/locale/locale.constant.js b/ui/src/app/locale/locale.constant.js index 360c8282ac..cca2a11648 100644 --- a/ui/src/app/locale/locale.constant.js +++ b/ui/src/app/locale/locale.constant.js @@ -1179,6 +1179,7 @@ export default angular.module('thingsboard.locale', []) "delete": "Delete rule node", "rulenode-details": "Rule node details", "debug-mode": "Debug mode", + "configuration": "Configuration", "link-details": "Rule node link details", "add-link": "Add link", "link-label": "Link label", diff --git a/ui/src/app/rulechain/rulechain.controller.js b/ui/src/app/rulechain/rulechain.controller.js index d9bbf2fd8d..b792f138f8 100644 --- a/ui/src/app/rulechain/rulechain.controller.js +++ b/ui/src/app/rulechain/rulechain.controller.js @@ -151,6 +151,9 @@ export function RuleChainController($stateParams, $scope, $compile, $q, $mdUtil, }, 'mouseLeave': function () { destroyTooltips(); + }, + 'mouseDown': function () { + destroyTooltips(); } } }; @@ -226,16 +229,12 @@ export function RuleChainController($stateParams, $scope, $compile, $q, $mdUtil, edgeDoubleClick: function (event, edge) { var sourceNode = vm.modelservice.nodes.getNodeByConnectorId(edge.source); if (sourceNode.component.type != types.ruleNodeType.INPUT.value) { - ruleChainService.getRuleNodeSupportedLinks(sourceNode.component.clazz).then( - (labels) => { - vm.isEditingRuleNode = false; - vm.editingRuleNode = null; - vm.editingRuleNodeLinkLabels = labels; - vm.isEditingRuleNodeLink = true; - vm.editingRuleNodeLinkIndex = vm.ruleChainModel.edges.indexOf(edge); - vm.editingRuleNodeLink = angular.copy(edge); - } - ); + vm.isEditingRuleNode = false; + vm.editingRuleNode = null; + vm.editingRuleNodeLinkLabels = ruleChainService.getRuleNodeSupportedLinks(sourceNode.component); + vm.isEditingRuleNodeLink = true; + vm.editingRuleNodeLinkIndex = vm.ruleChainModel.edges.indexOf(edge); + vm.editingRuleNodeLink = angular.copy(edge); } }, nodeCallbacks: { @@ -267,16 +266,10 @@ export function RuleChainController($stateParams, $scope, $compile, $q, $mdUtil, deferred.resolve(edge); } } else { - ruleChainService.getRuleNodeSupportedLinks(sourceNode.component.clazz).then( - (labels) => { - addRuleNodeLink(event, edge, labels).then( - (link) => { - deferred.resolve(link); - }, - () => { - deferred.reject(); - } - ); + var labels = ruleChainService.getRuleNodeSupportedLinks(sourceNode.component); + addRuleNodeLink(event, edge, labels).then( + (link) => { + deferred.resolve(link); }, () => { deferred.reject(); @@ -309,24 +302,19 @@ export function RuleChainController($stateParams, $scope, $compile, $q, $mdUtil, y: 10+50*model.nodes.length, connectors: [] }; - if (componentType == types.ruleNodeType.RULE_CHAIN.value) { - node.connectors.push( - { - type: flowchartConstants.leftConnectorType, - id: model.nodes.length - } - ); - } else { + if (ruleNodeComponent.configurationDescriptor.nodeDefinition.inEnabled) { node.connectors.push( { type: flowchartConstants.leftConnectorType, - id: model.nodes.length*2 + id: model.nodes.length * 2 } ); + } + if (ruleNodeComponent.configurationDescriptor.nodeDefinition.outEnabled) { node.connectors.push( { type: flowchartConstants.rightConnectorType, - id: model.nodes.length*2+1 + id: model.nodes.length * 2 + 1 } ); } @@ -398,17 +386,24 @@ export function RuleChainController($stateParams, $scope, $compile, $q, $mdUtil, name: ruleNode.name, nodeClass: vm.types.ruleNodeType[component.type].nodeClass, icon: vm.types.ruleNodeType[component.type].icon, - connectors: [ + connectors: [] + }; + if (component.configurationDescriptor.nodeDefinition.inEnabled) { + node.connectors.push( { type: flowchartConstants.leftConnectorType, id: vm.nextConnectorID++ - }, + } + ); + } + if (component.configurationDescriptor.nodeDefinition.outEnabled) { + node.connectors.push( { type: flowchartConstants.rightConnectorType, id: vm.nextConnectorID++ } - ] - }; + ); + } nodes.push(node); vm.ruleChainModel.nodes.push(node); } @@ -590,6 +585,9 @@ export function RuleChainController($stateParams, $scope, $compile, $q, $mdUtil, } function addRuleNode($event, ruleNode) { + + ruleNode.configuration = angular.copy(ruleNode.component.configurationDescriptor.nodeDefinition.defaultConfiguration); + $mdDialog.show({ controller: 'AddRuleNodeController', controllerAs: 'vm', @@ -601,13 +599,15 @@ export function RuleChainController($stateParams, $scope, $compile, $q, $mdUtil, }).then(function (ruleNode) { ruleNode.id = vm.nextNodeID++; ruleNode.connectors = []; - ruleNode.connectors.push( - { - id: vm.nextConnectorID++, - type: flowchartConstants.leftConnectorType - } - ); - if (ruleNode.component.type != types.ruleNodeType.RULE_CHAIN.value) { + if (ruleNode.component.configurationDescriptor.nodeDefinition.inEnabled) { + ruleNode.connectors.push( + { + id: vm.nextConnectorID++, + type: flowchartConstants.leftConnectorType + } + ); + } + if (ruleNode.component.configurationDescriptor.nodeDefinition.outEnabled) { ruleNode.connectors.push( { id: vm.nextConnectorID++, diff --git a/ui/src/app/rulechain/rulenode-fieldset.tpl.html b/ui/src/app/rulechain/rulenode-fieldset.tpl.html index 0d16e45405..30cf0752cd 100644 --- a/ui/src/app/rulechain/rulenode-fieldset.tpl.html +++ b/ui/src/app/rulechain/rulenode-fieldset.tpl.html @@ -38,6 +38,11 @@ ng-model="ruleNode.debugMode">{{ 'rulenode.debug-mode' | translate }} + + diff --git a/ui/src/app/rulechain/rulenode.directive.js b/ui/src/app/rulechain/rulenode.directive.js index 998e9981c6..be3e9c36b5 100644 --- a/ui/src/app/rulechain/rulenode.directive.js +++ b/ui/src/app/rulechain/rulenode.directive.js @@ -14,6 +14,8 @@ * limitations under the License. */ +import './rulenode.scss'; + /* eslint-disable import/no-unresolved, import/default */ import ruleNodeFieldsetTemplate from './rulenode-fieldset.tpl.html'; diff --git a/ui/src/app/rulechain/rulenode.scss b/ui/src/app/rulechain/rulenode.scss new file mode 100644 index 0000000000..febc637a37 --- /dev/null +++ b/ui/src/app/rulechain/rulenode.scss @@ -0,0 +1,22 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +.tb-rulenode { + tb-json-object-edit.tb-rule-node-configuration-json { + height: 300px; + display: block; + } +} \ No newline at end of file diff --git a/ui/src/app/rulechain/rulenode.tpl.html b/ui/src/app/rulechain/rulenode.tpl.html index 5a521a8333..ffc8a0f20d 100644 --- a/ui/src/app/rulechain/rulenode.tpl.html +++ b/ui/src/app/rulechain/rulenode.tpl.html @@ -19,7 +19,7 @@ id="{{node.id}}" ng-attr-style="position: absolute; top: {{ node.y }}px; left: {{ node.x }}px;" ng-dblclick="callbacks.doubleClick($event, node)" - ng-mouseover="callbacks.mouseOver($event, node)" + ng-mousedown="callbacks.mouseDown($event, node)" ng-mouseenter="callbacks.mouseEnter($event, node)" ng-mouseleave="callbacks.mouseLeave($event, node)">
diff --git a/ui/src/scss/main.scss b/ui/src/scss/main.scss index 93ff3205c0..6aa662c58c 100644 --- a/ui/src/scss/main.scss +++ b/ui/src/scss/main.scss @@ -203,6 +203,12 @@ md-sidenav { * THINGSBOARD SPECIFIC ***********************/ +$swift-ease-out-duration: 0.4s !default; +$swift-ease-out-timing-function: cubic-bezier(0.25, 0.8, 0.25, 1) !default; + +$input-label-float-offset: 6px !default; +$input-label-float-scale: 0.75 !default; + label { &.tb-title { pointer-events: none; @@ -213,6 +219,18 @@ label { &.no-padding { padding-bottom: 0px; } + &.tb-required:after { + content: ' *'; + font-size: 13px; + vertical-align: top; + color: rgba(0,0,0,0.54); + } + &.tb-error { + color: rgb(221,44,0); + &.tb-required:after { + color: rgb(221,44,0); + } + } } }