diff --git a/application/src/main/data/upgrade/1.4.0/schema_update.cql b/application/src/main/data/upgrade/1.4.0/schema_update.cql new file mode 100644 index 0000000000..c5b656feef --- /dev/null +++ b/application/src/main/data/upgrade/1.4.0/schema_update.cql @@ -0,0 +1,89 @@ +-- +-- Copyright © 2016-2017 The Thingsboard Authors +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +CREATE TABLE IF NOT EXISTS thingsboard.audit_log_by_entity_id ( + tenant_id timeuuid, + id timeuuid, + customer_id timeuuid, + entity_id timeuuid, + entity_type text, + entity_name text, + user_id timeuuid, + user_name text, + action_type text, + action_data text, + action_status text, + action_failure_details text, + PRIMARY KEY ((tenant_id, entity_id, entity_type), id) +); + +CREATE TABLE IF NOT EXISTS thingsboard.audit_log_by_customer_id ( + tenant_id timeuuid, + id timeuuid, + customer_id timeuuid, + entity_id timeuuid, + entity_type text, + entity_name text, + user_id timeuuid, + user_name text, + action_type text, + action_data text, + action_status text, + action_failure_details text, + PRIMARY KEY ((tenant_id, customer_id), id) +); + +CREATE TABLE IF NOT EXISTS thingsboard.audit_log_by_user_id ( + tenant_id timeuuid, + id timeuuid, + customer_id timeuuid, + entity_id timeuuid, + entity_type text, + entity_name text, + user_id timeuuid, + user_name text, + action_type text, + action_data text, + action_status text, + action_failure_details text, + PRIMARY KEY ((tenant_id, user_id), id) +); + + + +CREATE TABLE IF NOT EXISTS thingsboard.audit_log_by_tenant_id ( + tenant_id timeuuid, + id timeuuid, + partition bigint, + customer_id timeuuid, + entity_id timeuuid, + entity_type text, + entity_name text, + user_id timeuuid, + user_name text, + action_type text, + action_data text, + action_status text, + action_failure_details text, + PRIMARY KEY ((tenant_id, partition), id) +); + +CREATE TABLE IF NOT EXISTS thingsboard.audit_log_by_tenant_id_partitions ( + tenant_id timeuuid, + partition bigint, + PRIMARY KEY (( tenant_id ), partition) +) WITH CLUSTERING ORDER BY ( partition ASC ) +AND compaction = { 'class' : 'LeveledCompactionStrategy' }; diff --git a/application/src/main/data/upgrade/1.4.0/schema_update.sql b/application/src/main/data/upgrade/1.4.0/schema_update.sql new file mode 100644 index 0000000000..0d031bf0a4 --- /dev/null +++ b/application/src/main/data/upgrade/1.4.0/schema_update.sql @@ -0,0 +1,31 @@ +-- +-- Copyright © 2016-2017 The Thingsboard Authors +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +CREATE TABLE IF NOT EXISTS audit_log ( + id varchar(31) NOT NULL CONSTRAINT audit_log_pkey PRIMARY KEY, + tenant_id varchar(31), + customer_id varchar(31), + entity_id varchar(31), + entity_type varchar(255), + entity_name varchar(255), + user_id varchar(31), + user_name varchar(255), + action_type varchar(255), + action_data varchar(255), + action_status varchar(255), + action_failure_details varchar +); + diff --git a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java index d2d9b7ae10..3f0d39f99e 100644 --- a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java @@ -429,12 +429,12 @@ public final class PluginProcessingContext implements PluginContext { @Override public ListenableFuture> findByFromAndType(EntityId from, String relationType) { - return this.pluginCtx.relationService.findByFromAndType(from, relationType, RelationTypeGroup.COMMON); + return this.pluginCtx.relationService.findByFromAndTypeAsync(from, relationType, RelationTypeGroup.COMMON); } @Override public ListenableFuture> findByToAndType(EntityId from, String relationType) { - return this.pluginCtx.relationService.findByToAndType(from, relationType, RelationTypeGroup.COMMON); + return this.pluginCtx.relationService.findByToAndTypeAsync(from, relationType, RelationTypeGroup.COMMON); } @Override diff --git a/application/src/main/java/org/thingsboard/server/controller/BaseController.java b/application/src/main/java/org/thingsboard/server/controller/BaseController.java index 8ae04357a4..96343cbbd0 100644 --- a/application/src/main/java/org/thingsboard/server/controller/BaseController.java +++ b/application/src/main/java/org/thingsboard/server/controller/BaseController.java @@ -130,6 +130,10 @@ public abstract class BaseController { @Autowired protected AuditLogService auditLogService; + @ExceptionHandler(Exception.class) + public void handleException(Exception ex, HttpServletResponse response) { + errorResponseHandler.handle(ex, response); + } @ExceptionHandler(ThingsboardException.class) public void handleThingsboardException(ThingsboardException ex, HttpServletResponse response) { diff --git a/application/src/main/java/org/thingsboard/server/controller/EntityRelationController.java b/application/src/main/java/org/thingsboard/server/controller/EntityRelationController.java index ec7be1276a..5705ed9773 100644 --- a/application/src/main/java/org/thingsboard/server/controller/EntityRelationController.java +++ b/application/src/main/java/org/thingsboard/server/controller/EntityRelationController.java @@ -120,7 +120,7 @@ public class EntityRelationController extends BaseController { checkEntityId(fromId); checkEntityId(toId); RelationTypeGroup typeGroup = parseRelationTypeGroup(strRelationTypeGroup, RelationTypeGroup.COMMON); - return checkNotNull(relationService.getRelation(fromId, toId, strRelationType, typeGroup).get()); + return checkNotNull(relationService.getRelation(fromId, toId, strRelationType, typeGroup)); } catch (Exception e) { throw handleException(e); } @@ -138,7 +138,7 @@ public class EntityRelationController extends BaseController { checkEntityId(entityId); RelationTypeGroup typeGroup = parseRelationTypeGroup(strRelationTypeGroup, RelationTypeGroup.COMMON); try { - return checkNotNull(relationService.findByFrom(entityId, typeGroup).get()); + return checkNotNull(relationService.findByFrom(entityId, typeGroup)); } catch (Exception e) { throw handleException(e); } @@ -176,7 +176,7 @@ public class EntityRelationController extends BaseController { checkEntityId(entityId); RelationTypeGroup typeGroup = parseRelationTypeGroup(strRelationTypeGroup, RelationTypeGroup.COMMON); try { - return checkNotNull(relationService.findByFromAndType(entityId, strRelationType, typeGroup).get()); + return checkNotNull(relationService.findByFromAndType(entityId, strRelationType, typeGroup)); } catch (Exception e) { throw handleException(e); } @@ -194,7 +194,7 @@ public class EntityRelationController extends BaseController { checkEntityId(entityId); RelationTypeGroup typeGroup = parseRelationTypeGroup(strRelationTypeGroup, RelationTypeGroup.COMMON); try { - return checkNotNull(relationService.findByTo(entityId, typeGroup).get()); + return checkNotNull(relationService.findByTo(entityId, typeGroup)); } catch (Exception e) { throw handleException(e); } @@ -232,7 +232,7 @@ public class EntityRelationController extends BaseController { checkEntityId(entityId); RelationTypeGroup typeGroup = parseRelationTypeGroup(strRelationTypeGroup, RelationTypeGroup.COMMON); try { - return checkNotNull(relationService.findByToAndType(entityId, strRelationType, typeGroup).get()); + return checkNotNull(relationService.findByToAndType(entityId, strRelationType, typeGroup)); } catch (Exception e) { throw handleException(e); } diff --git a/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java b/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java index bec78095e2..d918b22bfc 100644 --- a/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java +++ b/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java @@ -81,6 +81,8 @@ public class ThingsboardInstallService { case "1.3.1": log.info("Upgrading ThingsBoard from version 1.3.1 to 1.4.0 ..."); + databaseUpgradeService.upgradeDatabase("1.3.1"); + log.info("Updating system data..."); systemDataLoaderService.deleteSystemWidgetBundle("charts"); diff --git a/application/src/main/java/org/thingsboard/server/service/install/CassandraDatabaseUpgradeService.java b/application/src/main/java/org/thingsboard/server/service/install/CassandraDatabaseUpgradeService.java index 5b32374f60..dacf453605 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/CassandraDatabaseUpgradeService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/CassandraDatabaseUpgradeService.java @@ -159,6 +159,12 @@ public class CassandraDatabaseUpgradeService implements DatabaseUpgradeService { break; case "1.3.0": break; + case "1.3.1": + log.info("Updating schema ..."); + schemaUpdateFile = Paths.get(this.dataDir, "upgrade", "1.4.0", SCHEMA_UPDATE_CQL); + loadCql(schemaUpdateFile); + log.info("Schema updated."); + break; default: throw new RuntimeException("Unable to upgrade Cassandra database, unsupported fromVersion: " + fromVersion); } diff --git a/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java b/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java index a8ffa99a46..098b4fe970 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java @@ -61,6 +61,15 @@ public class SqlDatabaseUpgradeService implements DatabaseUpgradeService { } log.info("Schema updated."); break; + case "1.3.1": + log.info("Updating schema ..."); + schemaUpdateFile = Paths.get(this.dataDir, "upgrade", "1.4.0", SCHEMA_UPDATE_SQL); + try (Connection conn = DriverManager.getConnection(dbUrl, dbUserName, dbPassword)) { + String sql = new String(Files.readAllBytes(schemaUpdateFile), Charset.forName("UTF-8")); + conn.createStatement().execute(sql); //NOSONAR, ignoring because method used to execute thingsboard database upgrade script + } + log.info("Schema updated."); + break; default: throw new RuntimeException("Unable to upgrade SQL database, unsupported fromVersion: " + fromVersion); } diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 83ff3a9848..ca7fbdf7ed 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -111,6 +111,27 @@ coap: adaptor: "${COAP_ADAPTOR_NAME:JsonCoapAdaptor}" timeout: "${COAP_TIMEOUT:10000}" +#Quota parameters +quota: + host: + # Max allowed number of API requests in interval for single host + limit: "${QUOTA_HOST_LIMIT:10000}" + # Interval duration + intervalMs: "${QUOTA_HOST_INTERVAL_MS:60000}" + # Maximum silence duration for host after which Host removed from QuotaService. Must be bigger than intervalMs + ttlMs: "${QUOTA_HOST_TTL_MS:60000}" + # Interval for scheduled task that cleans expired records. TTL is used for expiring + cleanPeriodMs: "${QUOTA_HOST_CLEAN_PERIOD_MS:300000}" + # Enable Host API Limits + enabled: "${QUOTA_HOST_ENABLED:false}" + # Array of whitelist hosts + whitelist: "${QUOTA_HOST_WHITELIST:localhost,127.0.0.1}" + # Array of blacklist hosts + blacklist: "${QUOTA_HOST_BLACKLIST:}" + log: + topSize: 10 + intervalMin: 2 + database: type: "${DATABASE_TYPE:sql}" # cassandra OR sql @@ -193,26 +214,11 @@ actors: enabled: "${ACTORS_STATISTICS_ENABLED:true}" persist_frequency: "${ACTORS_STATISTICS_PERSIST_FREQUENCY:3600000}" -# Cache parameters cache: - # Enable/disable cache functionality. - enabled: "${CACHE_ENABLED:false}" - device_credentials: - # Default time to store device credentials in cache, in seconds - time_to_live: "${CACHE_DEVICE_CREDENTIAL_TTL:3600}" - # Maximum size of the map. When maximum size is reached, the map is evicted based on the policy defined. - max_size: - # Max size policy options: - # PER_NODE: Maximum number of map entries in each JVM. - # PER_PARTITION: Maximum number of map entries within each partition. - # USED_HEAP_SIZE: Maximum used heap size in megabytes for each JVM. - # USED_HEAP_PERCENTAGE: Maximum used heap size percentage for each JVM. - # FREE_HEAP_SIZE: Minimum free heap size in megabytes for each JVM. - # FREE_HEAP_PERCENTAGE: Minimum free heap size percentage for each JVM. - policy: "${CACHE_DEVICE_CREDENTIAL_MAX_SIZE_POLICY:PER_NODE}" - size: "${CACHE_DEVICE_CREDENTIAL_MAX_SIZE_SIZE:1000000}" + # caffeine or redis + type: "${CACHE_TYPE:caffeine}" -caching: +caffeine: specs: relations: timeToLiveInMinutes: 1440 @@ -224,6 +230,15 @@ caching: timeToLiveInMinutes: 1440 maxSize: 100000 +redis: + # standalone or cluster + connection: + type: standalone + host: "${REDIS_HOST:localhost}" + port: "${REDIS_PORT:6379}" + db: "${REDIS_DB:0}" + password: "${REDIS_PASSWORD:}" + # Check new version updates parameters updates: # Enable/disable updates checking. diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/ContactBased.java b/common/data/src/main/java/org/thingsboard/server/common/data/ContactBased.java index d5252a0ce1..da0c56b3a7 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/ContactBased.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/ContactBased.java @@ -19,7 +19,7 @@ import lombok.EqualsAndHashCode; import org.thingsboard.server.common.data.id.UUIDBased; @EqualsAndHashCode(callSuper = true) -public abstract class ContactBased extends SearchTextBased { +public abstract class ContactBased extends SearchTextBasedWithAdditionalInfo { private static final long serialVersionUID = 5047448057830660988L; diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/Customer.java b/common/data/src/main/java/org/thingsboard/server/common/data/Customer.java index 7754e88ecd..1ee9cab68e 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/Customer.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/Customer.java @@ -29,8 +29,7 @@ public class Customer extends ContactBased implements HasName { private String title; private TenantId tenantId; - private transient JsonNode additionalInfo; - + public Customer() { super(); } @@ -43,7 +42,6 @@ public class Customer extends ContactBased implements HasName { super(customer); this.tenantId = customer.getTenantId(); this.title = customer.getTitle(); - this.additionalInfo = customer.getAdditionalInfo(); } public TenantId getTenantId() { @@ -65,7 +63,7 @@ public class Customer extends ContactBased implements HasName { @JsonIgnore public boolean isPublic() { if (getAdditionalInfo() != null && getAdditionalInfo().has("isPublic")) { - return additionalInfo.get("isPublic").asBoolean(); + return getAdditionalInfo().get("isPublic").asBoolean(); } return false; @@ -77,14 +75,6 @@ public class Customer extends ContactBased implements HasName { return title; } - public JsonNode getAdditionalInfo() { - return additionalInfo; - } - - public void setAdditionalInfo(JsonNode additionalInfo) { - this.additionalInfo = additionalInfo; - } - @Override public String getSearchText() { return getTitle(); @@ -94,7 +84,6 @@ public class Customer extends ContactBased implements HasName { public int hashCode() { final int prime = 31; int result = super.hashCode(); - result = prime * result + ((additionalInfo == null) ? 0 : additionalInfo.hashCode()); result = prime * result + ((tenantId == null) ? 0 : tenantId.hashCode()); result = prime * result + ((title == null) ? 0 : title.hashCode()); return result; @@ -109,11 +98,6 @@ public class Customer extends ContactBased implements HasName { if (getClass() != obj.getClass()) return false; Customer other = (Customer) obj; - if (additionalInfo == null) { - if (other.additionalInfo != null) - return false; - } else if (!additionalInfo.equals(other.additionalInfo)) - return false; if (tenantId == null) { if (other.tenantId != null) return false; @@ -135,7 +119,7 @@ public class Customer extends ContactBased implements HasName { builder.append(", tenantId="); builder.append(tenantId); builder.append(", additionalInfo="); - builder.append(additionalInfo); + builder.append(getAdditionalInfo()); builder.append(", country="); builder.append(country); builder.append(", state="); diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/Device.java b/common/data/src/main/java/org/thingsboard/server/common/data/Device.java index 131a3100c5..9019960127 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/Device.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/Device.java @@ -23,7 +23,7 @@ import org.thingsboard.server.common.data.id.TenantId; import com.fasterxml.jackson.databind.JsonNode; @EqualsAndHashCode(callSuper = true) -public class Device extends SearchTextBased implements HasName { +public class Device extends SearchTextBasedWithAdditionalInfo implements HasName { private static final long serialVersionUID = 2807343040519543363L; @@ -31,7 +31,6 @@ public class Device extends SearchTextBased implements HasName { private CustomerId customerId; private String name; private String type; - private transient JsonNode additionalInfo; public Device() { super(); @@ -47,7 +46,6 @@ public class Device extends SearchTextBased implements HasName { this.customerId = device.getCustomerId(); this.name = device.getName(); this.type = device.getType(); - this.additionalInfo = device.getAdditionalInfo(); } public TenantId getTenantId() { @@ -83,14 +81,6 @@ public class Device extends SearchTextBased implements HasName { this.type = type; } - public JsonNode getAdditionalInfo() { - return additionalInfo; - } - - public void setAdditionalInfo(JsonNode additionalInfo) { - this.additionalInfo = additionalInfo; - } - @Override public String getSearchText() { return getName(); @@ -108,7 +98,7 @@ public class Device extends SearchTextBased implements HasName { builder.append(", type="); builder.append(type); builder.append(", additionalInfo="); - builder.append(additionalInfo); + builder.append(getAdditionalInfo()); builder.append(", createdTime="); builder.append(createdTime); builder.append(", id="); diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/HasAdditionalInfo.java b/common/data/src/main/java/org/thingsboard/server/common/data/HasAdditionalInfo.java new file mode 100644 index 0000000000..3239df01b6 --- /dev/null +++ b/common/data/src/main/java/org/thingsboard/server/common/data/HasAdditionalInfo.java @@ -0,0 +1,24 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.data; + +import com.fasterxml.jackson.databind.JsonNode; + +public interface HasAdditionalInfo { + + JsonNode getAdditionalInfo(); + +} diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/SearchTextBasedWithAdditionalInfo.java b/common/data/src/main/java/org/thingsboard/server/common/data/SearchTextBasedWithAdditionalInfo.java new file mode 100644 index 0000000000..9dc077a07a --- /dev/null +++ b/common/data/src/main/java/org/thingsboard/server/common/data/SearchTextBasedWithAdditionalInfo.java @@ -0,0 +1,105 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.data; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.common.data.id.UUIDBased; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.Arrays; +import java.util.Objects; +import java.util.function.Supplier; +import java.util.function.Consumer; + +/** + * Created by ashvayka on 19.02.18. + */ +@Slf4j +public abstract class SearchTextBasedWithAdditionalInfo extends SearchTextBased implements HasAdditionalInfo { + + private transient JsonNode additionalInfo; + @JsonIgnore + private byte[] additionalInfoBytes; + + public SearchTextBasedWithAdditionalInfo() { + super(); + } + + public SearchTextBasedWithAdditionalInfo(I id) { + super(id); + } + + public SearchTextBasedWithAdditionalInfo(SearchTextBasedWithAdditionalInfo searchTextBased) { + super(searchTextBased); + setAdditionalInfo(searchTextBased.getAdditionalInfo()); + } + + @Override + public JsonNode getAdditionalInfo() { + return getJson(() -> additionalInfo, () -> additionalInfoBytes); + } + + public void setAdditionalInfo(JsonNode addInfo) { + setJson(addInfo, json -> this.additionalInfo = json, bytes -> this.additionalInfoBytes = bytes); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + if (!super.equals(o)) return false; + SearchTextBasedWithAdditionalInfo that = (SearchTextBasedWithAdditionalInfo) o; + return Arrays.equals(additionalInfoBytes, that.additionalInfoBytes); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), additionalInfoBytes); + } + + public static JsonNode getJson(Supplier jsonData, Supplier binaryData) { + JsonNode json = jsonData.get(); + if (json != null) { + return json; + } else { + byte[] data = binaryData.get(); + if (data != null) { + try { + return new ObjectMapper().readTree(new ByteArrayInputStream(data)); + } catch (IOException e) { + log.warn("Can't deserialize json data: ", e); + return null; + } + } else { + return null; + } + } + } + + public static void setJson(JsonNode json, Consumer jsonConsumer, Consumer bytesConsumer) { + jsonConsumer.accept(json); + try { + bytesConsumer.accept(new ObjectMapper().writeValueAsBytes(json)); + } catch (JsonProcessingException e) { + log.warn("Can't serialize json data: ", e); + } + } +} diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/Tenant.java b/common/data/src/main/java/org/thingsboard/server/common/data/Tenant.java index bb209f1011..334ec4d038 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/Tenant.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/Tenant.java @@ -28,8 +28,7 @@ public class Tenant extends ContactBased implements HasName { private String title; private String region; - private transient JsonNode additionalInfo; - + public Tenant() { super(); } @@ -42,7 +41,6 @@ public class Tenant extends ContactBased implements HasName { super(tenant); this.title = tenant.getTitle(); this.region = tenant.getRegion(); - this.additionalInfo = tenant.getAdditionalInfo(); } public String getTitle() { @@ -67,14 +65,6 @@ public class Tenant extends ContactBased implements HasName { this.region = region; } - public JsonNode getAdditionalInfo() { - return additionalInfo; - } - - public void setAdditionalInfo(JsonNode additionalInfo) { - this.additionalInfo = additionalInfo; - } - @Override public String getSearchText() { return getTitle(); @@ -88,7 +78,7 @@ public class Tenant extends ContactBased implements HasName { builder.append(", region="); builder.append(region); builder.append(", additionalInfo="); - builder.append(additionalInfo); + builder.append(getAdditionalInfo()); builder.append(", country="); builder.append(country); builder.append(", state="); diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/User.java b/common/data/src/main/java/org/thingsboard/server/common/data/User.java index a6488995a9..a6f13d0803 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/User.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/User.java @@ -25,7 +25,7 @@ import org.thingsboard.server.common.data.security.Authority; import com.fasterxml.jackson.databind.JsonNode; @EqualsAndHashCode(callSuper = true) -public class User extends SearchTextBased implements HasName { +public class User extends SearchTextBasedWithAdditionalInfo implements HasName { private static final long serialVersionUID = 8250339805336035966L; @@ -35,7 +35,6 @@ public class User extends SearchTextBased implements HasName { private Authority authority; private String firstName; private String lastName; - private transient JsonNode additionalInfo; public User() { super(); @@ -53,7 +52,6 @@ public class User extends SearchTextBased implements HasName { this.authority = user.getAuthority(); this.firstName = user.getFirstName(); this.lastName = user.getLastName(); - this.additionalInfo = user.getAdditionalInfo(); } public TenantId getTenantId() { @@ -110,14 +108,6 @@ public class User extends SearchTextBased implements HasName { this.lastName = lastName; } - public JsonNode getAdditionalInfo() { - return additionalInfo; - } - - public void setAdditionalInfo(JsonNode additionalInfo) { - this.additionalInfo = additionalInfo; - } - @Override public String getSearchText() { return getEmail(); @@ -139,7 +129,7 @@ public class User extends SearchTextBased implements HasName { builder.append(", lastName="); builder.append(lastName); builder.append(", additionalInfo="); - builder.append(additionalInfo); + builder.append(getAdditionalInfo()); builder.append(", createdTime="); builder.append(createdTime); builder.append(", id="); diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/asset/Asset.java b/common/data/src/main/java/org/thingsboard/server/common/data/asset/Asset.java index 382a2d72e2..2ed7f5d9fe 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/asset/Asset.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/asset/Asset.java @@ -17,14 +17,16 @@ package org.thingsboard.server.common.data.asset; import com.fasterxml.jackson.databind.JsonNode; import lombok.EqualsAndHashCode; +import org.thingsboard.server.common.data.HasAdditionalInfo; import org.thingsboard.server.common.data.HasName; import org.thingsboard.server.common.data.SearchTextBased; +import org.thingsboard.server.common.data.SearchTextBasedWithAdditionalInfo; import org.thingsboard.server.common.data.id.AssetId; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.TenantId; @EqualsAndHashCode(callSuper = true) -public class Asset extends SearchTextBased implements HasName { +public class Asset extends SearchTextBasedWithAdditionalInfo implements HasName { private static final long serialVersionUID = 2807343040519543363L; @@ -32,7 +34,6 @@ public class Asset extends SearchTextBased implements HasName { private CustomerId customerId; private String name; private String type; - private transient JsonNode additionalInfo; public Asset() { super(); @@ -48,7 +49,6 @@ public class Asset extends SearchTextBased implements HasName { this.customerId = asset.getCustomerId(); this.name = asset.getName(); this.type = asset.getType(); - this.additionalInfo = asset.getAdditionalInfo(); } public TenantId getTenantId() { @@ -84,14 +84,6 @@ public class Asset extends SearchTextBased implements HasName { this.type = type; } - public JsonNode getAdditionalInfo() { - return additionalInfo; - } - - public void setAdditionalInfo(JsonNode additionalInfo) { - this.additionalInfo = additionalInfo; - } - @Override public String getSearchText() { return getName(); @@ -109,7 +101,7 @@ public class Asset extends SearchTextBased implements HasName { builder.append(", type="); builder.append(type); builder.append(", additionalInfo="); - builder.append(additionalInfo); + builder.append(getAdditionalInfo()); builder.append(", createdTime="); builder.append(createdTime); builder.append(", id="); diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/plugin/PluginMetaData.java b/common/data/src/main/java/org/thingsboard/server/common/data/plugin/PluginMetaData.java index 755cf71bab..4b91752b4b 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/plugin/PluginMetaData.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/plugin/PluginMetaData.java @@ -15,16 +15,24 @@ */ package org.thingsboard.server.common.data.plugin; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import lombok.EqualsAndHashCode; +import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.common.data.HasName; -import org.thingsboard.server.common.data.SearchTextBased; +import org.thingsboard.server.common.data.SearchTextBasedWithAdditionalInfo; import org.thingsboard.server.common.data.id.PluginId; import org.thingsboard.server.common.data.id.TenantId; import com.fasterxml.jackson.databind.JsonNode; +import java.io.ByteArrayInputStream; +import java.io.IOException; + @EqualsAndHashCode(callSuper = true) -public class PluginMetaData extends SearchTextBased implements HasName { +@Slf4j +public class PluginMetaData extends SearchTextBasedWithAdditionalInfo implements HasName { private static final long serialVersionUID = 1L; @@ -35,7 +43,9 @@ public class PluginMetaData extends SearchTextBased implements HasName private boolean publicAccess; private ComponentLifecycleState state; private transient JsonNode configuration; - private transient JsonNode additionalInfo; + @JsonIgnore + private byte[] configurationBytes; + public PluginMetaData() { super(); @@ -54,7 +64,6 @@ public class PluginMetaData extends SearchTextBased implements HasName this.publicAccess = plugin.isPublicAccess(); this.state = plugin.getState(); this.configuration = plugin.getConfiguration(); - this.additionalInfo = plugin.getAdditionalInfo(); } @Override @@ -96,11 +105,11 @@ public class PluginMetaData extends SearchTextBased implements HasName } public JsonNode getConfiguration() { - return configuration; + return getJson(() -> configuration, () -> configurationBytes); } - public void setConfiguration(JsonNode configuration) { - this.configuration = configuration; + public void setConfiguration(JsonNode data) { + setJson(data, json -> this.configuration = json, bytes -> this.configurationBytes = bytes); } public boolean isPublicAccess() { @@ -119,14 +128,6 @@ public class PluginMetaData extends SearchTextBased implements HasName return state; } - public JsonNode getAdditionalInfo() { - return additionalInfo; - } - - public void setAdditionalInfo(JsonNode additionalInfo) { - this.additionalInfo = additionalInfo; - } - @Override public String toString() { return "PluginMetaData [apiToken=" + apiToken + ", tenantId=" + tenantId + ", name=" + name + ", clazz=" + clazz + ", publicAccess=" + publicAccess diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/relation/EntityRelation.java b/common/data/src/main/java/org/thingsboard/server/common/data/relation/EntityRelation.java index f9d70fa874..dcb7f2fd71 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/relation/EntityRelation.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/relation/EntityRelation.java @@ -15,10 +15,20 @@ */ package org.thingsboard.server.common.data.relation; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.common.data.SearchTextBasedWithAdditionalInfo; import org.thingsboard.server.common.data.id.EntityId; -public class EntityRelation { +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.Serializable; + +@Slf4j +public class EntityRelation implements Serializable { private static final long serialVersionUID = 2807343040519543363L; @@ -29,7 +39,9 @@ public class EntityRelation { private EntityId to; private String type; private RelationTypeGroup typeGroup; - private JsonNode additionalInfo; + private transient JsonNode additionalInfo; + @JsonIgnore + private byte[] additionalInfoBytes; public EntityRelation() { super(); @@ -92,11 +104,11 @@ public class EntityRelation { } public JsonNode getAdditionalInfo() { - return additionalInfo; + return SearchTextBasedWithAdditionalInfo.getJson(() -> additionalInfo, () -> additionalInfoBytes); } - public void setAdditionalInfo(JsonNode additionalInfo) { - this.additionalInfo = additionalInfo; + public void setAdditionalInfo(JsonNode addInfo) { + SearchTextBasedWithAdditionalInfo.setJson(addInfo, json -> this.additionalInfo = json, bytes -> this.additionalInfoBytes = bytes); } @Override diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleMetaData.java b/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleMetaData.java index 1fd72c2246..d2d6419138 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleMetaData.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleMetaData.java @@ -15,18 +15,23 @@ */ package org.thingsboard.server.common.data.rule; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import lombok.Data; import lombok.EqualsAndHashCode; +import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.common.data.HasName; -import org.thingsboard.server.common.data.SearchTextBased; +import org.thingsboard.server.common.data.SearchTextBasedWithAdditionalInfo; import org.thingsboard.server.common.data.id.RuleId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.plugin.ComponentLifecycleState; @Data @EqualsAndHashCode(callSuper = true) -public class RuleMetaData extends SearchTextBased implements HasName { +@Slf4j +public class RuleMetaData extends SearchTextBasedWithAdditionalInfo implements HasName { private static final long serialVersionUID = -5656679015122935465L; @@ -38,7 +43,13 @@ public class RuleMetaData extends SearchTextBased implements HasName { private transient JsonNode filters; private transient JsonNode processor; private transient JsonNode action; - private transient JsonNode additionalInfo; + @JsonIgnore + private byte[] filtersBytes; + @JsonIgnore + private byte[] processorBytes; + @JsonIgnore + private byte[] actionBytes; + public RuleMetaData() { super(); @@ -55,10 +66,9 @@ public class RuleMetaData extends SearchTextBased implements HasName { this.state = rule.getState(); this.weight = rule.getWeight(); this.pluginToken = rule.getPluginToken(); - this.filters = rule.getFilters(); - this.processor = rule.getProcessor(); - this.action = rule.getAction(); - this.additionalInfo = rule.getAdditionalInfo(); + this.setFilters(rule.getFilters()); + this.setProcessor(rule.getProcessor()); + this.setAction(rule.getAction()); } @Override @@ -71,4 +81,29 @@ public class RuleMetaData extends SearchTextBased implements HasName { return name; } + public JsonNode getFilters() { + return SearchTextBasedWithAdditionalInfo.getJson(() -> filters, () -> filtersBytes); + } + + public JsonNode getProcessor() { + return SearchTextBasedWithAdditionalInfo.getJson(() -> processor, () -> processorBytes); + } + + public JsonNode getAction() { + return SearchTextBasedWithAdditionalInfo.getJson(() -> action, () -> actionBytes); + } + + public void setFilters(JsonNode data) { + setJson(data, json -> this.filters = json, bytes -> this.filtersBytes = bytes); + } + + public void setProcessor(JsonNode data) { + setJson(data, json -> this.processor = json, bytes -> this.processorBytes = bytes); + } + + public void setAction(JsonNode data) { + setJson(data, json -> this.action = json, bytes -> this.actionBytes = bytes); + } + + } diff --git a/common/transport/pom.xml b/common/transport/pom.xml index 884f455e3b..f360c0330f 100644 --- a/common/transport/pom.xml +++ b/common/transport/pom.xml @@ -74,6 +74,18 @@ mockito-all test + + org.springframework + spring-context + + + com.google.guava + guava + + + org.apache.commons + commons-lang3 + diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/Clock.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/Clock.java new file mode 100644 index 0000000000..e832354b1e --- /dev/null +++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/Clock.java @@ -0,0 +1,45 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.transport.quota; + +/** + * @author Vitaliy Paromskiy + * @version 1.0 + */ +public final class Clock { + + private static long time = 0L; + + private Clock() { + } + + + public static long millis() { + return time == 0 ? System.currentTimeMillis() : time; + } + + public static void setMillis(long millis) { + time = millis; + } + + public static void shift(long delta) { + time += delta; + } + + public static void reset() { + time = 0; + } +} diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/HostRequestLimitPolicy.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/HostRequestLimitPolicy.java new file mode 100644 index 0000000000..83d664123e --- /dev/null +++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/HostRequestLimitPolicy.java @@ -0,0 +1,38 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.transport.quota; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +/** + * @author Vitaliy Paromskiy + * @version 1.0 + */ +@Component +public class HostRequestLimitPolicy { + + private final long limit; + + public HostRequestLimitPolicy(@Value("${quota.host.limit}") long limit) { + this.limit = limit; + } + + public boolean isValid(long currentValue) { + return currentValue <= limit; + } + +} diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/HostRequestsQuotaService.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/HostRequestsQuotaService.java new file mode 100644 index 0000000000..0914dd6d44 --- /dev/null +++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/HostRequestsQuotaService.java @@ -0,0 +1,76 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.transport.quota; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; +import org.thingsboard.server.common.transport.quota.inmemory.HostRequestIntervalRegistry; +import org.thingsboard.server.common.transport.quota.inmemory.IntervalRegistryCleaner; +import org.thingsboard.server.common.transport.quota.inmemory.IntervalRegistryLogger; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; + +/** + * @author Vitaliy Paromskiy + * @version 1.0 + */ +@Service +@Slf4j +public class HostRequestsQuotaService implements QuotaService { + + private final HostRequestIntervalRegistry requestRegistry; + private final HostRequestLimitPolicy requestsPolicy; + private final IntervalRegistryCleaner registryCleaner; + private final IntervalRegistryLogger registryLogger; + private final boolean enabled; + + public HostRequestsQuotaService(HostRequestIntervalRegistry requestRegistry, HostRequestLimitPolicy requestsPolicy, + IntervalRegistryCleaner registryCleaner, IntervalRegistryLogger registryLogger, + @Value("${quota.host.enabled}") boolean enabled) { + this.requestRegistry = requestRegistry; + this.requestsPolicy = requestsPolicy; + this.registryCleaner = registryCleaner; + this.registryLogger = registryLogger; + this.enabled = enabled; + } + + @PostConstruct + public void init() { + if (enabled) { + registryCleaner.schedule(); + registryLogger.schedule(); + } + } + + @PreDestroy + public void close() { + if (enabled) { + registryCleaner.stop(); + registryLogger.stop(); + } + } + + @Override + public boolean isQuotaExceeded(String key) { + if (enabled) { + long count = requestRegistry.tick(key); + return !requestsPolicy.isValid(count); + } + return false; + } +} diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/QuotaService.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/QuotaService.java new file mode 100644 index 0000000000..cea5db63d1 --- /dev/null +++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/QuotaService.java @@ -0,0 +1,25 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.transport.quota; + +/** + * @author Vitaliy Paromskiy + * @version 1.0 + */ +public interface QuotaService { + + boolean isQuotaExceeded(String key); +} diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/HostRequestIntervalRegistry.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/HostRequestIntervalRegistry.java new file mode 100644 index 0000000000..b35ce47515 --- /dev/null +++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/HostRequestIntervalRegistry.java @@ -0,0 +1,83 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.transport.quota.inmemory; + +import com.google.common.collect.Sets; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +/** + * @author Vitaliy Paromskiy + * @version 1.0 + */ +@Component +@Slf4j +public class HostRequestIntervalRegistry { + + private final Map hostCounts = new ConcurrentHashMap<>(); + private final long intervalDurationMs; + private final long ttlMs; + private final Set whiteList; + private final Set blackList; + + public HostRequestIntervalRegistry(@Value("${quota.host.intervalMs}") long intervalDurationMs, + @Value("${quota.host.ttlMs}") long ttlMs, + @Value("${quota.host.whitelist}") String whiteList, + @Value("${quota.host.blacklist}") String blackList) { + this.intervalDurationMs = intervalDurationMs; + this.ttlMs = ttlMs; + this.whiteList = Sets.newHashSet(StringUtils.split(whiteList, ',')); + this.blackList = Sets.newHashSet(StringUtils.split(blackList, ',')); + } + + @PostConstruct + public void init() { + if (ttlMs < intervalDurationMs) { + log.warn("TTL for IntervalRegistry [{}] smaller than interval duration [{}]", ttlMs, intervalDurationMs); + } + log.info("Start Host Quota Service with whitelist {}", whiteList); + log.info("Start Host Quota Service with blacklist {}", blackList); + } + + public long tick(String clientHostId) { + if (whiteList.contains(clientHostId)) { + return 0; + } else if (blackList.contains(clientHostId)) { + return Long.MAX_VALUE; + } + IntervalCount intervalCount = hostCounts.computeIfAbsent(clientHostId, s -> new IntervalCount(intervalDurationMs)); + return intervalCount.resetIfExpiredAndTick(); + } + + public void clean() { + hostCounts.entrySet().removeIf(entry -> entry.getValue().silenceDuration() > ttlMs); + } + + public Map getContent() { + return hostCounts.entrySet().stream() + .collect(Collectors.toMap( + Map.Entry::getKey, + interval -> interval.getValue().getCount())); + } +} diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/IntervalCount.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/IntervalCount.java new file mode 100644 index 0000000000..8301b8e6f1 --- /dev/null +++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/IntervalCount.java @@ -0,0 +1,68 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.transport.quota.inmemory; + + +import org.thingsboard.server.common.transport.quota.Clock; + +import java.util.concurrent.atomic.LongAdder; + +/** + * @author Vitaliy Paromskiy + * @version 1.0 + */ +public class IntervalCount { + + private final LongAdder adder = new LongAdder(); + private final long intervalDurationMs; + private volatile long startTime; + private volatile long lastTickTime; + + public IntervalCount(long intervalDurationMs) { + this.intervalDurationMs = intervalDurationMs; + startTime = Clock.millis(); + } + + public long resetIfExpiredAndTick() { + if (isExpired()) { + reset(); + } + tick(); + return adder.sum(); + } + + public long silenceDuration() { + return Clock.millis() - lastTickTime; + } + + public long getCount() { + return adder.sum(); + } + + private void tick() { + adder.add(1); + lastTickTime = Clock.millis(); + } + + private void reset() { + adder.reset(); + startTime = Clock.millis(); + } + + private boolean isExpired() { + return (Clock.millis() - startTime) > intervalDurationMs; + } +} diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/IntervalRegistryCleaner.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/IntervalRegistryCleaner.java new file mode 100644 index 0000000000..1e2076a0b0 --- /dev/null +++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/IntervalRegistryCleaner.java @@ -0,0 +1,66 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.transport.quota.inmemory; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import javax.annotation.PreDestroy; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * @author Vitaliy Paromskiy + * @version 1.0 + */ +@Component +@Slf4j +public class IntervalRegistryCleaner { + + private final HostRequestIntervalRegistry intervalRegistry; + private final long cleanPeriodMs; + private ScheduledExecutorService executor; + + public IntervalRegistryCleaner(HostRequestIntervalRegistry intervalRegistry, @Value("${quota.host.cleanPeriodMs}") long cleanPeriodMs) { + this.intervalRegistry = intervalRegistry; + this.cleanPeriodMs = cleanPeriodMs; + } + + public void schedule() { + if (executor != null) { + throw new IllegalStateException("Registry Cleaner already scheduled"); + } + executor = Executors.newSingleThreadScheduledExecutor(); + executor.scheduleAtFixedRate(this::clean, cleanPeriodMs, cleanPeriodMs, TimeUnit.MILLISECONDS); + } + + public void stop() { + if (executor != null) { + executor.shutdown(); + } + } + + public void clean() { + try { + intervalRegistry.clean(); + } catch (RuntimeException ex) { + log.error("Could not clear Interval Registry", ex); + } + } + +} diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/IntervalRegistryLogger.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/IntervalRegistryLogger.java new file mode 100644 index 0000000000..8afc7b7f10 --- /dev/null +++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/IntervalRegistryLogger.java @@ -0,0 +1,95 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.transport.quota.inmemory; + +import com.google.common.collect.MinMaxPriorityQueue; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import java.util.Comparator; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * @author Vitaliy Paromskiy + * @version 1.0 + */ +@Component +@Slf4j +public class IntervalRegistryLogger { + + private final int topSize; + private final HostRequestIntervalRegistry intervalRegistry; + private final long logIntervalMin; + private ScheduledExecutorService executor; + + public IntervalRegistryLogger(@Value("${quota.log.topSize}") int topSize, @Value("${quota.log.intervalMin}") long logIntervalMin, + HostRequestIntervalRegistry intervalRegistry) { + this.topSize = topSize; + this.logIntervalMin = logIntervalMin; + this.intervalRegistry = intervalRegistry; + } + + public void schedule() { + if (executor != null) { + throw new IllegalStateException("Registry Cleaner already scheduled"); + } + executor = Executors.newSingleThreadScheduledExecutor(); + executor.scheduleAtFixedRate(this::logStatistic, logIntervalMin, logIntervalMin, TimeUnit.MINUTES); + } + + public void stop() { + if (executor != null) { + executor.shutdown(); + } + } + + public void logStatistic() { + Map registryContent = intervalRegistry.getContent(); + int uniqHosts = registryContent.size(); + long requestsCount = registryContent.values().stream().mapToLong(i -> i).sum(); + Map top = getTopElements(registryContent); + log(top, uniqHosts, requestsCount); + } + + protected Map getTopElements(Map countMap) { + MinMaxPriorityQueue> topQueue = MinMaxPriorityQueue + .orderedBy(Comparator.comparing((Function, Long>) Map.Entry::getValue).reversed()) + .maximumSize(topSize) + .create(countMap.entrySet()); + + return topQueue.stream().collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + private void log(Map top, int uniqHosts, long requestsCount) { + long rps = requestsCount / TimeUnit.MINUTES.toSeconds(logIntervalMin); + StringBuilder builder = new StringBuilder("Quota Statistic : "); + builder.append("uniqHosts : ").append(uniqHosts).append("; "); + builder.append("requestsCount : ").append(requestsCount).append("; "); + builder.append("RPS : ").append(rps).append(" "); + builder.append("top -> "); + for (Map.Entry host : top.entrySet()) { + builder.append(host.getKey()).append(" : ").append(host.getValue()).append("; "); + } + + log.info(builder.toString()); + } +} diff --git a/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/ClockTest.java b/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/ClockTest.java new file mode 100644 index 0000000000..6ed5445aec --- /dev/null +++ b/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/ClockTest.java @@ -0,0 +1,66 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.transport.quota; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.*; + +/** + * @author Vitaliy Paromskiy + * @version 1.0 + */ +public class ClockTest { + + @Before + public void init() { + Clock.reset(); + } + + @After + public void clear() { + Clock.reset(); + } + + @Test + public void defaultClockUseSystemTime() { + assertFalse(Clock.millis() > System.currentTimeMillis()); + } + + @Test + public void timeCanBeSet() { + Clock.setMillis(100L); + assertEquals(100L, Clock.millis()); + } + + @Test + public void clockCanBeReseted() { + Clock.setMillis(100L); + assertEquals(100L, Clock.millis()); + Clock.reset(); + assertFalse(Clock.millis() > System.currentTimeMillis()); + } + + @Test + public void timeIsShifted() { + Clock.setMillis(100L); + Clock.shift(50L); + assertEquals(150L, Clock.millis()); + } + +} \ No newline at end of file diff --git a/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/HostRequestLimitPolicyTest.java b/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/HostRequestLimitPolicyTest.java new file mode 100644 index 0000000000..f28d17c5f9 --- /dev/null +++ b/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/HostRequestLimitPolicyTest.java @@ -0,0 +1,46 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.transport.quota; + +import org.junit.Test; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * @author Vitaliy Paromskiy + * @version 1.0 + */ +public class HostRequestLimitPolicyTest { + + private HostRequestLimitPolicy limitPolicy = new HostRequestLimitPolicy(10L); + + @Test + public void ifCurrentValueLessThenLimitItIsValid() { + assertTrue(limitPolicy.isValid(9)); + } + + @Test + public void ifCurrentValueEqualsToLimitItIsValid() { + assertTrue(limitPolicy.isValid(10)); + } + + @Test + public void ifCurrentValueGreaterThenLimitItIsValid() { + assertFalse(limitPolicy.isValid(11)); + } + +} \ No newline at end of file diff --git a/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/HostRequestsQuotaServiceTest.java b/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/HostRequestsQuotaServiceTest.java new file mode 100644 index 0000000000..b8c9284fda --- /dev/null +++ b/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/HostRequestsQuotaServiceTest.java @@ -0,0 +1,76 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.transport.quota; + +import org.junit.Before; +import org.junit.Test; +import org.thingsboard.server.common.transport.quota.inmemory.HostRequestIntervalRegistry; +import org.thingsboard.server.common.transport.quota.inmemory.IntervalRegistryCleaner; +import org.thingsboard.server.common.transport.quota.inmemory.IntervalRegistryLogger; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.*; + +/** + * @author Vitaliy Paromskiy + * @version 1.0 + */ +public class HostRequestsQuotaServiceTest { + + private HostRequestsQuotaService quotaService; + + private HostRequestIntervalRegistry requestRegistry = mock(HostRequestIntervalRegistry.class); + private HostRequestLimitPolicy requestsPolicy = mock(HostRequestLimitPolicy.class); + private IntervalRegistryCleaner registryCleaner = mock(IntervalRegistryCleaner.class); + private IntervalRegistryLogger registryLogger = mock(IntervalRegistryLogger.class); + + @Before + public void init() { + quotaService = new HostRequestsQuotaService(requestRegistry, requestsPolicy, registryCleaner, registryLogger, true); + } + + @Test + public void quotaExceededIfRequestCountBiggerThanAllowed() { + when(requestRegistry.tick("key")).thenReturn(10L); + when(requestsPolicy.isValid(10L)).thenReturn(false); + + assertTrue(quotaService.isQuotaExceeded("key")); + + verify(requestRegistry).tick("key"); + verify(requestsPolicy).isValid(10L); + verifyNoMoreInteractions(requestRegistry, requestsPolicy); + } + + @Test + public void quotaNotExceededIfRequestCountLessThanAllowed() { + when(requestRegistry.tick("key")).thenReturn(10L); + when(requestsPolicy.isValid(10L)).thenReturn(true); + + assertFalse(quotaService.isQuotaExceeded("key")); + + verify(requestRegistry).tick("key"); + verify(requestsPolicy).isValid(10L); + verifyNoMoreInteractions(requestRegistry, requestsPolicy); + } + + @Test + public void serviceCanBeDisabled() { + quotaService = new HostRequestsQuotaService(requestRegistry, requestsPolicy, registryCleaner, registryLogger, false); + assertFalse(quotaService.isQuotaExceeded("key")); + verifyNoMoreInteractions(requestRegistry, requestsPolicy); + } +} \ No newline at end of file diff --git a/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/inmemory/HostRequestIntervalRegistryTest.java b/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/inmemory/HostRequestIntervalRegistryTest.java new file mode 100644 index 0000000000..ff1e525f91 --- /dev/null +++ b/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/inmemory/HostRequestIntervalRegistryTest.java @@ -0,0 +1,85 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.transport.quota.inmemory; + +import com.google.common.collect.Sets; +import org.junit.Before; +import org.junit.Test; + +import java.util.Collections; + +import static org.junit.Assert.assertEquals; + +/** + * @author Vitaliy Paromskiy + * @version 1.0 + */ +public class HostRequestIntervalRegistryTest { + + private HostRequestIntervalRegistry registry; + + @Before + public void init() { + registry = new HostRequestIntervalRegistry(10000L, 100L,"g1,g2", "b1"); + } + + @Test + public void newHostCreateNewInterval() { + assertEquals(1L, registry.tick("host1")); + } + + @Test + public void existingHostUpdated() { + registry.tick("aaa"); + assertEquals(1L, registry.tick("bbb")); + assertEquals(2L, registry.tick("aaa")); + } + + @Test + public void expiredIntervalsCleaned() throws InterruptedException { + registry.tick("aaa"); + Thread.sleep(150L); + registry.tick("bbb"); + registry.clean(); + assertEquals(1L, registry.tick("aaa")); + assertEquals(2L, registry.tick("bbb")); + } + + @Test + public void domainFromWhitelistNotCounted(){ + assertEquals(0L, registry.tick("g1")); + assertEquals(0L, registry.tick("g1")); + assertEquals(0L, registry.tick("g2")); + } + + @Test + public void domainFromBlackListReturnMaxValue(){ + assertEquals(Long.MAX_VALUE, registry.tick("b1")); + assertEquals(Long.MAX_VALUE, registry.tick("b1")); + } + + @Test + public void emptyWhitelistParsedOk(){ + registry = new HostRequestIntervalRegistry(10000L, 100L,"", "b1"); + assertEquals(1L, registry.tick("aaa")); + } + + @Test + public void emptyBlacklistParsedOk(){ + registry = new HostRequestIntervalRegistry(10000L, 100L,"", ""); + assertEquals(1L, registry.tick("aaa")); + } +} \ No newline at end of file diff --git a/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/inmemory/IntervalCountTest.java b/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/inmemory/IntervalCountTest.java new file mode 100644 index 0000000000..7bdcafde3c --- /dev/null +++ b/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/inmemory/IntervalCountTest.java @@ -0,0 +1,65 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.transport.quota.inmemory; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.thingsboard.server.common.transport.quota.Clock; + +import static org.junit.Assert.assertEquals; + +/** + * @author Vitaliy Paromskiy + * @version 1.0 + */ +public class IntervalCountTest { + + @Before + public void init() { + Clock.setMillis(1000L); + } + + @After + public void clear() { + Clock.reset(); + } + + @Test + public void ticksInSameIntervalAreSummed() { + IntervalCount intervalCount = new IntervalCount(100L); + assertEquals(1L, intervalCount.resetIfExpiredAndTick()); + Clock.shift(100); + assertEquals(2L, intervalCount.resetIfExpiredAndTick()); + } + + @Test + public void oldDataCleanedWhenIntervalExpired() { + IntervalCount intervalCount = new IntervalCount(100L); + assertEquals(1L, intervalCount.resetIfExpiredAndTick()); + Clock.shift(101); + assertEquals(1L, intervalCount.resetIfExpiredAndTick()); + } + + @Test + public void silenceDurationCalculatedFromLastTick() { + IntervalCount intervalCount = new IntervalCount(100L); + assertEquals(1L, intervalCount.resetIfExpiredAndTick()); + Clock.shift(10L); + assertEquals(10L, intervalCount.silenceDuration()); + } + +} \ No newline at end of file diff --git a/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/inmemory/IntervalRegistryLoggerTest.java b/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/inmemory/IntervalRegistryLoggerTest.java new file mode 100644 index 0000000000..cc25b4c402 --- /dev/null +++ b/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/inmemory/IntervalRegistryLoggerTest.java @@ -0,0 +1,61 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.transport.quota.inmemory; + +import com.google.common.collect.ImmutableMap; +import org.junit.Before; +import org.junit.Test; + +import java.util.Collections; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; + +/** + * @author Vitaliy Paromskiy + * @version 1.0 + */ +public class IntervalRegistryLoggerTest { + + private IntervalRegistryLogger logger; + + private HostRequestIntervalRegistry requestRegistry = mock(HostRequestIntervalRegistry.class); + + @Before + public void init() { + logger = new IntervalRegistryLogger(3, 10, requestRegistry); + } + + @Test + public void onlyMaxHostsCollected() { + Map map = ImmutableMap.of("a", 8L, "b", 3L, "c", 1L, "d", 3L); + Map actual = logger.getTopElements(map); + Map expected = ImmutableMap.of("a", 8L, "b", 3L, "d", 3L); + + assertEquals(expected, actual); + } + + @Test + public void emptyMapProcessedCorrectly() { + Map map = Collections.emptyMap(); + Map actual = logger.getTopElements(map); + Map expected = Collections.emptyMap(); + + assertEquals(expected, actual); + } + +} \ No newline at end of file diff --git a/dao/pom.xml b/dao/pom.xml index 75d69343e5..0e72a6c7d6 100644 --- a/dao/pom.xml +++ b/dao/pom.xml @@ -182,6 +182,14 @@ org.springframework spring-context-support + + org.springframework.data + spring-data-redis + + + redis.clients + jedis + diff --git a/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmService.java b/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmService.java index be7e44f008..02bf108cff 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmService.java @@ -337,7 +337,7 @@ public class BaseAlarmService extends AbstractEntityService implements AlarmServ private void updateRelations(Alarm alarm, AlarmStatus oldStatus, AlarmStatus newStatus) { try { - List relations = relationService.findByTo(alarm.getId(), RelationTypeGroup.ALARM).get(); + List relations = relationService.findByToAsync(alarm.getId(), RelationTypeGroup.ALARM).get(); Set parents = relations.stream().map(EntityRelation::getFrom).collect(Collectors.toSet()); for (EntityId parentId : parents) { updateAlarmRelation(parentId, alarm.getId(), oldStatus, newStatus); diff --git a/dao/src/main/java/org/thingsboard/server/dao/cache/ServiceCacheConfiguration.java b/dao/src/main/java/org/thingsboard/server/dao/cache/CaffeineCacheConfiguration.java similarity index 90% rename from dao/src/main/java/org/thingsboard/server/dao/cache/ServiceCacheConfiguration.java rename to dao/src/main/java/org/thingsboard/server/dao/cache/CaffeineCacheConfiguration.java index ba21cb204c..b65a54fa2f 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/cache/ServiceCacheConfiguration.java +++ b/dao/src/main/java/org/thingsboard/server/dao/cache/CaffeineCacheConfiguration.java @@ -18,6 +18,7 @@ package org.thingsboard.server.dao.cache; import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.Ticker; import lombok.Data; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.cache.CacheManager; import org.springframework.cache.annotation.EnableCaching; @@ -33,10 +34,11 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @Configuration -@ConfigurationProperties(prefix = "caching") +@ConditionalOnProperty(prefix = "cache", value = "type", havingValue = "caffeine", matchIfMissing = true) +@ConfigurationProperties(prefix = "caffeine") @EnableCaching @Data -public class ServiceCacheConfiguration { +public class CaffeineCacheConfiguration { private Map specs; diff --git a/dao/src/main/java/org/thingsboard/server/dao/cache/TBRedisCacheConfiguration.java b/dao/src/main/java/org/thingsboard/server/dao/cache/TBRedisCacheConfiguration.java new file mode 100644 index 0000000000..340e25f7a9 --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/cache/TBRedisCacheConfiguration.java @@ -0,0 +1,78 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.cache; + +import lombok.Data; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.cache.CacheManager; +import org.springframework.cache.annotation.EnableCaching; +import org.springframework.cache.interceptor.KeyGenerator; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.data.redis.cache.RedisCacheManager; +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.connection.jedis.JedisConnectionFactory; +import org.springframework.data.redis.core.RedisTemplate; + +@Configuration +@ConditionalOnProperty(prefix = "cache", value = "type", havingValue = "redis", matchIfMissing = false) +@EnableCaching +@Data +public class TBRedisCacheConfiguration { + + @Value("${redis.connection.host}") + private String host; + + @Value("${redis.connection.port}") + private Integer port; + + @Value("${redis.connection.db}") + private Integer db; + + @Value("${redis.connection.password}") + private String password; + + @Bean + public RedisConnectionFactory redisConnectionFactory() { + JedisConnectionFactory factory = new JedisConnectionFactory(); + factory.setHostName(host); + factory.setPort(port); + factory.setDatabase(db); + factory.setPassword(password); + return factory; + } + + @Bean + public RedisTemplate redisTemplate(RedisConnectionFactory cf) { + RedisTemplate redisTemplate = new RedisTemplate<>(); + redisTemplate.setConnectionFactory(cf); + return redisTemplate; + } + + @Bean + public CacheManager cacheManager(RedisTemplate redisTemplate) { + return new RedisCacheManager(redisTemplate); + } + + @Bean + public KeyGenerator previousDeviceCredentialsId() { + return new PreviousDeviceCredentialsIdKeyGenerator(); + } + + +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java b/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java index 6e3e75b7e9..8e3252d5ac 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java @@ -64,19 +64,28 @@ public class BaseRelationService implements RelationService { return relationDao.checkRelation(from, to, relationType, typeGroup); } - @Cacheable(cacheNames = RELATIONS_CACHE, key = "{#from, #to, #relationType}") + @Cacheable(cacheNames = RELATIONS_CACHE, key = "{#from, #to, #relationType, #typeGroup}") @Override - public ListenableFuture getRelation(EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup) { + public EntityRelation getRelation(EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup) { + try { + return getRelationAsync(from, to, relationType, typeGroup).get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + + @Override + public ListenableFuture getRelationAsync(EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup) { log.trace("Executing EntityRelation [{}][{}][{}][{}]", from, to, relationType, typeGroup); validate(from, to, relationType, typeGroup); return relationDao.getRelation(from, to, relationType, typeGroup); } @Caching(evict = { - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "#relation.from"), - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.type}"), - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "#relation.to"), - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.type}") + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.typeGroup}"), + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.type, #relation.typeGroup}"), + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.typeGroup}"), + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.type, #relation.typeGroup}") }) @Override public boolean saveRelation(EntityRelation relation) { @@ -86,10 +95,10 @@ public class BaseRelationService implements RelationService { } @Caching(evict = { - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "#relation.from"), - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.type}"), - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "#relation.to"), - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.type}") + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.typeGroup}"), + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.type, #relation.typeGroup}"), + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.typeGroup}"), + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.type, #relation.typeGroup}") }) @Override public ListenableFuture saveRelationAsync(EntityRelation relation) { @@ -99,11 +108,11 @@ public class BaseRelationService implements RelationService { } @Caching(evict = { - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "#relation.from"), - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.type}"), - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "#relation.to"), - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.type}"), - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.to, #relation.type}") + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.typeGroup}"), + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.type, #relation.typeGroup}"), + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.typeGroup}"), + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.type, #relation.typeGroup}"), + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.to, #relation.type, #relation.typeGroup}") }) @Override public boolean deleteRelation(EntityRelation relation) { @@ -117,7 +126,7 @@ public class BaseRelationService implements RelationService { @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.type}"), @CacheEvict(cacheNames = RELATIONS_CACHE, key = "#relation.to"), @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.type}"), - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.to, #relation.type}") + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.to, #relation.type, #relation.typeGroup}") }) @Override public ListenableFuture deleteRelationAsync(EntityRelation relation) { @@ -218,9 +227,9 @@ public class BaseRelationService implements RelationService { ListenableFuture>> inboundRelationsTo = Futures.allAsList(inboundRelationsListTo); ListenableFuture> inboundDeletions = Futures.transform(inboundRelationsTo, (AsyncFunction>, List>) relations -> { - List> results = getListenableFutures(relations, cache, true); - return Futures.allAsList(results); - }); + List> results = getListenableFutures(relations, cache, true); + return Futures.allAsList(results); + }); ListenableFuture inboundFuture = Futures.transform(inboundDeletions, getListToBooleanFunction()); @@ -272,9 +281,18 @@ public class BaseRelationService implements RelationService { cache.evict(fromToAndType); } - @Cacheable(cacheNames = RELATIONS_CACHE, key = "#from") + @Cacheable(cacheNames = RELATIONS_CACHE, key = "{#from, #typeGroup}") + @Override + public List findByFrom(EntityId from, RelationTypeGroup typeGroup) { + try { + return findByFromAsync(from, typeGroup).get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + @Override - public ListenableFuture> findByFrom(EntityId from, RelationTypeGroup typeGroup) { + public ListenableFuture> findByFromAsync(EntityId from, RelationTypeGroup typeGroup) { log.trace("Executing findByFrom [{}][{}]", from, typeGroup); validate(from); validateTypeGroup(typeGroup); @@ -300,9 +318,18 @@ public class BaseRelationService implements RelationService { return relationsInfo; } - @Cacheable(cacheNames = RELATIONS_CACHE, key = "{#from, #relationType}") + @Cacheable(cacheNames = RELATIONS_CACHE, key = "{#from, #relationType, #typeGroup}") + @Override + public List findByFromAndType(EntityId from, String relationType, RelationTypeGroup typeGroup) { + try { + return findByFromAndTypeAsync(from, relationType, typeGroup).get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + @Override - public ListenableFuture> findByFromAndType(EntityId from, String relationType, RelationTypeGroup typeGroup) { + public ListenableFuture> findByFromAndTypeAsync(EntityId from, String relationType, RelationTypeGroup typeGroup) { log.trace("Executing findByFromAndType [{}][{}][{}]", from, relationType, typeGroup); validate(from); validateType(relationType); @@ -310,9 +337,18 @@ public class BaseRelationService implements RelationService { return relationDao.findAllByFromAndType(from, relationType, typeGroup); } - @Cacheable(cacheNames = RELATIONS_CACHE, key = "#to") + @Cacheable(cacheNames = RELATIONS_CACHE, key = "{#to, #typeGroup}") + @Override + public List findByTo(EntityId to, RelationTypeGroup typeGroup) { + try { + return findByToAsync(to, typeGroup).get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + @Override - public ListenableFuture> findByTo(EntityId to, RelationTypeGroup typeGroup) { + public ListenableFuture> findByToAsync(EntityId to, RelationTypeGroup typeGroup) { log.trace("Executing findByTo [{}][{}]", to, typeGroup); validate(to); validateTypeGroup(typeGroup); @@ -351,9 +387,18 @@ public class BaseRelationService implements RelationService { return entityRelationInfo; } - @Cacheable(cacheNames = RELATIONS_CACHE, key = "{#to, #relationType}") + @Cacheable(cacheNames = RELATIONS_CACHE, key = "{#to, #relationType, #typeGroup}") + @Override + public List findByToAndType(EntityId to, String relationType, RelationTypeGroup typeGroup) { + try { + return findByToAndTypeAsync(to, relationType, typeGroup).get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + @Override - public ListenableFuture> findByToAndType(EntityId to, String relationType, RelationTypeGroup typeGroup) { + public ListenableFuture> findByToAndTypeAsync(EntityId to, String relationType, RelationTypeGroup typeGroup) { log.trace("Executing findByToAndType [{}][{}][{}]", to, relationType, typeGroup); validate(to); validateType(relationType); @@ -527,9 +572,9 @@ public class BaseRelationService implements RelationService { private ListenableFuture> findRelations(final EntityId rootId, final EntitySearchDirection direction) { ListenableFuture> relations; if (direction == EntitySearchDirection.FROM) { - relations = findByFrom(rootId, RelationTypeGroup.COMMON); + relations = findByFromAsync(rootId, RelationTypeGroup.COMMON); } else { - relations = findByTo(rootId, RelationTypeGroup.COMMON); + relations = findByToAsync(rootId, RelationTypeGroup.COMMON); } return relations; } diff --git a/dao/src/main/java/org/thingsboard/server/dao/relation/RelationService.java b/dao/src/main/java/org/thingsboard/server/dao/relation/RelationService.java index cef93935e6..17a459a851 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/relation/RelationService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/relation/RelationService.java @@ -31,7 +31,9 @@ public interface RelationService { ListenableFuture checkRelation(EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup); - ListenableFuture getRelation(EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup); + EntityRelation getRelation(EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup); + + ListenableFuture getRelationAsync(EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup); boolean saveRelation(EntityRelation relation); @@ -49,17 +51,25 @@ public interface RelationService { ListenableFuture deleteEntityRelationsAsync(EntityId entity); - ListenableFuture> findByFrom(EntityId from, RelationTypeGroup typeGroup); + List findByFrom(EntityId from, RelationTypeGroup typeGroup); + + ListenableFuture> findByFromAsync(EntityId from, RelationTypeGroup typeGroup); ListenableFuture> findInfoByFrom(EntityId from, RelationTypeGroup typeGroup); - ListenableFuture> findByFromAndType(EntityId from, String relationType, RelationTypeGroup typeGroup); + List findByFromAndType(EntityId from, String relationType, RelationTypeGroup typeGroup); + + ListenableFuture> findByFromAndTypeAsync(EntityId from, String relationType, RelationTypeGroup typeGroup); - ListenableFuture> findByTo(EntityId to, RelationTypeGroup typeGroup); + List findByTo(EntityId to, RelationTypeGroup typeGroup); + + ListenableFuture> findByToAsync(EntityId to, RelationTypeGroup typeGroup); ListenableFuture> findInfoByTo(EntityId to, RelationTypeGroup typeGroup); - ListenableFuture> findByToAndType(EntityId to, String relationType, RelationTypeGroup typeGroup); + List findByToAndType(EntityId to, String relationType, RelationTypeGroup typeGroup); + + ListenableFuture> findByToAndTypeAsync(EntityId to, String relationType, RelationTypeGroup typeGroup); ListenableFuture> findByQuery(EntityRelationsQuery query); diff --git a/dao/src/main/resources/sql/schema.sql b/dao/src/main/resources/sql/schema.sql index 7c0f172432..6d6e82542e 100644 --- a/dao/src/main/resources/sql/schema.sql +++ b/dao/src/main/resources/sql/schema.sql @@ -59,7 +59,6 @@ CREATE TABLE IF NOT EXISTS audit_log ( action_type varchar(255), action_data varchar(255), action_status varchar(255), - search_text varchar(255), action_failure_details varchar ); diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/BaseRelationServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/BaseRelationServiceTest.java index 6b58af8e19..d1c2ae2af1 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/service/BaseRelationServiceTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/service/BaseRelationServiceTest.java @@ -122,7 +122,7 @@ public abstract class BaseRelationServiceTest extends AbstractServiceTest { saveRelation(relationB1); saveRelation(relationB2); - List relations = relationService.findByFrom(parentA, RelationTypeGroup.COMMON).get(); + List relations = relationService.findByFrom(parentA, RelationTypeGroup.COMMON); Assert.assertEquals(2, relations.size()); for (EntityRelation relation : relations) { Assert.assertEquals(EntityRelation.CONTAINS_TYPE, relation.getType()); @@ -130,13 +130,13 @@ public abstract class BaseRelationServiceTest extends AbstractServiceTest { Assert.assertTrue(childA.equals(relation.getTo()) || childB.equals(relation.getTo())); } - relations = relationService.findByFromAndType(parentA, EntityRelation.CONTAINS_TYPE, RelationTypeGroup.COMMON).get(); + relations = relationService.findByFromAndType(parentA, EntityRelation.CONTAINS_TYPE, RelationTypeGroup.COMMON); Assert.assertEquals(2, relations.size()); - relations = relationService.findByFromAndType(parentA, EntityRelation.MANAGES_TYPE, RelationTypeGroup.COMMON).get(); + relations = relationService.findByFromAndType(parentA, EntityRelation.MANAGES_TYPE, RelationTypeGroup.COMMON); Assert.assertEquals(0, relations.size()); - relations = relationService.findByFrom(parentB, RelationTypeGroup.COMMON).get(); + relations = relationService.findByFrom(parentB, RelationTypeGroup.COMMON); Assert.assertEquals(2, relations.size()); for (EntityRelation relation : relations) { Assert.assertEquals(EntityRelation.MANAGES_TYPE, relation.getType()); @@ -144,10 +144,10 @@ public abstract class BaseRelationServiceTest extends AbstractServiceTest { Assert.assertTrue(childA.equals(relation.getTo()) || childB.equals(relation.getTo())); } - relations = relationService.findByFromAndType(parentB, EntityRelation.CONTAINS_TYPE, RelationTypeGroup.COMMON).get(); + relations = relationService.findByFromAndType(parentB, EntityRelation.CONTAINS_TYPE, RelationTypeGroup.COMMON); Assert.assertEquals(0, relations.size()); - relations = relationService.findByFromAndType(parentB, EntityRelation.CONTAINS_TYPE, RelationTypeGroup.COMMON).get(); + relations = relationService.findByFromAndType(parentB, EntityRelation.CONTAINS_TYPE, RelationTypeGroup.COMMON); Assert.assertEquals(0, relations.size()); } @@ -177,26 +177,26 @@ public abstract class BaseRelationServiceTest extends AbstractServiceTest { // Data propagation to views is async Thread.sleep(3000); - List relations = relationService.findByTo(childA, RelationTypeGroup.COMMON).get(); + List relations = relationService.findByTo(childA, RelationTypeGroup.COMMON); Assert.assertEquals(2, relations.size()); for (EntityRelation relation : relations) { Assert.assertEquals(childA, relation.getTo()); Assert.assertTrue(parentA.equals(relation.getFrom()) || parentB.equals(relation.getFrom())); } - relations = relationService.findByToAndType(childA, EntityRelation.CONTAINS_TYPE, RelationTypeGroup.COMMON).get(); + relations = relationService.findByToAndType(childA, EntityRelation.CONTAINS_TYPE, RelationTypeGroup.COMMON); Assert.assertEquals(1, relations.size()); - relations = relationService.findByToAndType(childB, EntityRelation.MANAGES_TYPE, RelationTypeGroup.COMMON).get(); + relations = relationService.findByToAndType(childB, EntityRelation.MANAGES_TYPE, RelationTypeGroup.COMMON); Assert.assertEquals(1, relations.size()); - relations = relationService.findByToAndType(parentA, EntityRelation.MANAGES_TYPE, RelationTypeGroup.COMMON).get(); + relations = relationService.findByToAndType(parentA, EntityRelation.MANAGES_TYPE, RelationTypeGroup.COMMON); Assert.assertEquals(0, relations.size()); - relations = relationService.findByToAndType(parentB, EntityRelation.MANAGES_TYPE, RelationTypeGroup.COMMON).get(); + relations = relationService.findByToAndType(parentB, EntityRelation.MANAGES_TYPE, RelationTypeGroup.COMMON); Assert.assertEquals(0, relations.size()); - relations = relationService.findByTo(childB, RelationTypeGroup.COMMON).get(); + relations = relationService.findByTo(childB, RelationTypeGroup.COMMON); Assert.assertEquals(2, relations.size()); for (EntityRelation relation : relations) { Assert.assertEquals(childB, relation.getTo()); diff --git a/dao/src/test/resources/application-test.properties b/dao/src/test/resources/application-test.properties index 4a73d37f08..793f9b6038 100644 --- a/dao/src/test/resources/application-test.properties +++ b/dao/src/test/resources/application-test.properties @@ -1,8 +1,3 @@ -cache.enabled=false -cache.device_credentials.time_to_live=3600 -cache.device_credentials.max_size.size=1000000 -cache.device_credentials.max_size.policy=PER_NODE - zk.enabled=false zk.url=localhost:2181 zk.zk_dir=/thingsboard @@ -14,11 +9,22 @@ audit_log.exceptions.enabled=false audit_log.by_tenant_partitioning=MONTHS audit_log.default_query_period=30 -caching.specs.relations.timeToLiveInMinutes=1440 -caching.specs.relations.maxSize=100000 +cache.type=caffeine +#cache.type=redis + +caffeine.specs.relations.timeToLiveInMinutes=1440 +caffeine.specs.relations.maxSize=100000 -caching.specs.deviceCredentials.timeToLiveInMinutes=1440 -caching.specs.deviceCredentials.maxSize=100000 +caffeine.specs.deviceCredentials.timeToLiveInMinutes=1440 +caffeine.specs.deviceCredentials.maxSize=100000 + +caffeine.specs.devices.timeToLiveInMinutes=1440 +caffeine.specs.devices.maxSize=100000 caching.specs.devices.timeToLiveInMinutes=1440 caching.specs.devices.maxSize=100000 + +redis.connection.host=localhost +redis.connection.port=6379 +redis.connection.db=0 +redis.connection.password= diff --git a/docker/docker-compose-tests.yml b/docker/docker-compose-tests.yml new file mode 100644 index 0000000000..2ef4631a8b --- /dev/null +++ b/docker/docker-compose-tests.yml @@ -0,0 +1,27 @@ +# +# Copyright © 2016-2017 The Thingsboard Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +version: '3.3' +services: + redis: + image: redis:4.0 + networks: + - core + ports: + - "6379:6379" + +networks: + core: diff --git a/pom.xml b/pom.xml index dd3cad9a6c..80e846d56f 100755 --- a/pom.xml +++ b/pom.xml @@ -32,6 +32,8 @@ 1.4.3.RELEASE 4.3.4.RELEASE 4.2.0.RELEASE + 1.8.10.RELEASE + 2.9.0 0.7.0 2.2.0 4.12 @@ -783,6 +785,16 @@ 2.2 test + + org.springframework.data + spring-data-redis + ${spring-data-redis.version} + + + redis.clients + jedis + ${jedis.version} + com.sun.winsw winsw diff --git a/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java b/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java index 834a911e77..958c4a79c8 100644 --- a/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java +++ b/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java @@ -34,6 +34,7 @@ import org.thingsboard.server.common.msg.session.*; import org.thingsboard.server.common.transport.SessionMsgProcessor; import org.thingsboard.server.common.transport.adaptor.AdaptorException; import org.thingsboard.server.common.transport.auth.DeviceAuthService; +import org.thingsboard.server.common.transport.quota.QuotaService; import org.thingsboard.server.transport.coap.adaptors.CoapTransportAdaptor; import org.thingsboard.server.transport.coap.session.CoapExchangeObserverProxy; import org.thingsboard.server.transport.coap.session.CoapSessionCtx; @@ -51,13 +52,16 @@ public class CoapTransportResource extends CoapResource { private final CoapTransportAdaptor adaptor; private final SessionMsgProcessor processor; private final DeviceAuthService authService; + private final QuotaService quotaService; private final Field observerField; private final long timeout; - public CoapTransportResource(SessionMsgProcessor processor, DeviceAuthService authService, CoapTransportAdaptor adaptor, String name, long timeout) { + public CoapTransportResource(SessionMsgProcessor processor, DeviceAuthService authService, CoapTransportAdaptor adaptor, String name, + long timeout, QuotaService quotaService) { super(name); this.processor = processor; this.authService = authService; + this.quotaService = quotaService; this.adaptor = adaptor; this.timeout = timeout; // This is important to turn off existing observable logic in @@ -70,6 +74,12 @@ public class CoapTransportResource extends CoapResource { @Override public void handleGET(CoapExchange exchange) { + if(quotaService.isQuotaExceeded(exchange.getSourceAddress().getHostAddress())) { + log.warn("COAP Quota exceeded for [{}:{}] . Disconnect", exchange.getSourceAddress().getHostAddress(), exchange.getSourcePort()); + exchange.respond(ResponseCode.BAD_REQUEST); + return; + } + Optional featureType = getFeatureType(exchange.advanced().getRequest()); if (!featureType.isPresent()) { log.trace("Missing feature type parameter"); diff --git a/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportService.java b/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportService.java index 02a2706776..df8725aa87 100644 --- a/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportService.java +++ b/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportService.java @@ -15,25 +15,25 @@ */ package org.thingsboard.server.transport.coap; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.UnknownHostException; - -import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; - import lombok.extern.slf4j.Slf4j; import org.eclipse.californium.core.CoapResource; import org.eclipse.californium.core.CoapServer; import org.eclipse.californium.core.network.CoapEndpoint; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.thingsboard.server.common.transport.SessionMsgProcessor; -import org.thingsboard.server.common.transport.auth.DeviceAuthService; -import org.thingsboard.server.transport.coap.adaptors.CoapTransportAdaptor; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.ApplicationContext; import org.springframework.stereotype.Service; +import org.thingsboard.server.common.transport.SessionMsgProcessor; +import org.thingsboard.server.common.transport.auth.DeviceAuthService; +import org.thingsboard.server.common.transport.quota.QuotaService; +import org.thingsboard.server.transport.coap.adaptors.CoapTransportAdaptor; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; @Service("CoapTransportService") @ConditionalOnProperty(prefix = "coap", value = "enabled", havingValue = "true", matchIfMissing = true) @@ -54,6 +54,9 @@ public class CoapTransportService { @Autowired(required = false) private DeviceAuthService authService; + @Autowired(required = false) + private QuotaService quotaService; + @Value("${coap.bind_address}") private String host; @@ -83,7 +86,7 @@ public class CoapTransportService { private void createResources() { CoapResource api = new CoapResource(API); - api.add(new CoapTransportResource(processor, authService, adaptor, V1, timeout)); + api.add(new CoapTransportResource(processor, authService, adaptor, V1, timeout, quotaService)); server.add(api); } diff --git a/transport/coap/src/test/java/org/thingsboard/server/transport/coap/CoapServerTest.java b/transport/coap/src/test/java/org/thingsboard/server/transport/coap/CoapServerTest.java index 706af80ec9..5ac589099a 100644 --- a/transport/coap/src/test/java/org/thingsboard/server/transport/coap/CoapServerTest.java +++ b/transport/coap/src/test/java/org/thingsboard/server/transport/coap/CoapServerTest.java @@ -50,6 +50,7 @@ import org.thingsboard.server.common.msg.session.*; import org.thingsboard.server.common.transport.SessionMsgProcessor; import org.thingsboard.server.common.transport.auth.DeviceAuthResult; import org.thingsboard.server.common.transport.auth.DeviceAuthService; +import org.thingsboard.server.common.transport.quota.QuotaService; import java.util.ArrayList; import java.util.List; @@ -131,6 +132,11 @@ public class CoapServerTest { } }; } + + @Bean + public static QuotaService quotaService() { + return key -> false; + } } @Autowired diff --git a/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java b/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java index 70767afdfc..c799dcf7b7 100644 --- a/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java +++ b/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java @@ -35,10 +35,11 @@ import org.thingsboard.server.common.msg.session.FromDeviceMsg; import org.thingsboard.server.common.transport.SessionMsgProcessor; import org.thingsboard.server.common.transport.adaptor.JsonConverter; import org.thingsboard.server.common.transport.auth.DeviceAuthService; +import org.thingsboard.server.common.transport.quota.QuotaService; import org.thingsboard.server.transport.http.session.HttpSessionCtx; +import javax.servlet.http.HttpServletRequest; import java.util.Arrays; -import java.util.Collections; import java.util.HashSet; import java.util.Set; @@ -59,11 +60,18 @@ public class DeviceApiController { @Autowired(required = false) private DeviceAuthService authService; + @Autowired(required = false) + private QuotaService quotaService; + @RequestMapping(value = "/{deviceToken}/attributes", method = RequestMethod.GET, produces = "application/json") public DeferredResult getDeviceAttributes(@PathVariable("deviceToken") String deviceToken, @RequestParam(value = "clientKeys", required = false, defaultValue = "") String clientKeys, - @RequestParam(value = "sharedKeys", required = false, defaultValue = "") String sharedKeys) { + @RequestParam(value = "sharedKeys", required = false, defaultValue = "") String sharedKeys, + HttpServletRequest httpRequest) { DeferredResult responseWriter = new DeferredResult(); + if (quotaExceeded(httpRequest, responseWriter)) { + return responseWriter; + } HttpSessionCtx ctx = getHttpSessionCtx(responseWriter); if (ctx.login(new DeviceTokenCredentials(deviceToken))) { GetAttributesRequest request; @@ -84,8 +92,11 @@ public class DeviceApiController { @RequestMapping(value = "/{deviceToken}/attributes", method = RequestMethod.POST) public DeferredResult postDeviceAttributes(@PathVariable("deviceToken") String deviceToken, - @RequestBody String json) { + @RequestBody String json, HttpServletRequest request) { DeferredResult responseWriter = new DeferredResult(); + if (quotaExceeded(request, responseWriter)) { + return responseWriter; + } HttpSessionCtx ctx = getHttpSessionCtx(responseWriter); if (ctx.login(new DeviceTokenCredentials(deviceToken))) { try { @@ -101,8 +112,11 @@ public class DeviceApiController { @RequestMapping(value = "/{deviceToken}/telemetry", method = RequestMethod.POST) public DeferredResult postTelemetry(@PathVariable("deviceToken") String deviceToken, - @RequestBody String json) { + @RequestBody String json, HttpServletRequest request) { DeferredResult responseWriter = new DeferredResult(); + if (quotaExceeded(request, responseWriter)) { + return responseWriter; + } HttpSessionCtx ctx = getHttpSessionCtx(responseWriter); if (ctx.login(new DeviceTokenCredentials(deviceToken))) { try { @@ -118,15 +132,20 @@ public class DeviceApiController { @RequestMapping(value = "/{deviceToken}/rpc", method = RequestMethod.GET, produces = "application/json") public DeferredResult subscribeToCommands(@PathVariable("deviceToken") String deviceToken, - @RequestParam(value = "timeout", required = false, defaultValue = "0") long timeout) { - return subscribe(deviceToken, timeout, new RpcSubscribeMsg()); + @RequestParam(value = "timeout", required = false, defaultValue = "0") long timeout, + HttpServletRequest request) { + + return subscribe(deviceToken, timeout, new RpcSubscribeMsg(), request); } @RequestMapping(value = "/{deviceToken}/rpc/{requestId}", method = RequestMethod.POST) public DeferredResult replyToCommand(@PathVariable("deviceToken") String deviceToken, @PathVariable("requestId") Integer requestId, - @RequestBody String json) { + @RequestBody String json, HttpServletRequest request) { DeferredResult responseWriter = new DeferredResult(); + if (quotaExceeded(request, responseWriter)) { + return responseWriter; + } HttpSessionCtx ctx = getHttpSessionCtx(responseWriter); if (ctx.login(new DeviceTokenCredentials(deviceToken))) { try { @@ -143,8 +162,11 @@ public class DeviceApiController { @RequestMapping(value = "/{deviceToken}/rpc", method = RequestMethod.POST) public DeferredResult postRpcRequest(@PathVariable("deviceToken") String deviceToken, - @RequestBody String json) { + @RequestBody String json, HttpServletRequest httpRequest) { DeferredResult responseWriter = new DeferredResult(); + if (quotaExceeded(httpRequest, responseWriter)) { + return responseWriter; + } HttpSessionCtx ctx = getHttpSessionCtx(responseWriter); if (ctx.login(new DeviceTokenCredentials(deviceToken))) { try { @@ -163,12 +185,17 @@ public class DeviceApiController { @RequestMapping(value = "/{deviceToken}/attributes/updates", method = RequestMethod.GET, produces = "application/json") public DeferredResult subscribeToAttributes(@PathVariable("deviceToken") String deviceToken, - @RequestParam(value = "timeout", required = false, defaultValue = "0") long timeout) { - return subscribe(deviceToken, timeout, new AttributesSubscribeMsg()); + @RequestParam(value = "timeout", required = false, defaultValue = "0") long timeout, + HttpServletRequest httpRequest) { + + return subscribe(deviceToken, timeout, new AttributesSubscribeMsg(), httpRequest); } - private DeferredResult subscribe(String deviceToken, long timeout, FromDeviceMsg msg) { + private DeferredResult subscribe(String deviceToken, long timeout, FromDeviceMsg msg, HttpServletRequest httpRequest) { DeferredResult responseWriter = new DeferredResult(); + if (quotaExceeded(httpRequest, responseWriter)) { + return responseWriter; + } HttpSessionCtx ctx = getHttpSessionCtx(responseWriter, timeout); if (ctx.login(new DeviceTokenCredentials(deviceToken))) { try { @@ -195,4 +222,13 @@ public class DeviceApiController { processor.process(new BasicToDeviceActorSessionMsg(ctx.getDevice(), msg)); } + private boolean quotaExceeded(HttpServletRequest request, DeferredResult responseWriter) { + if (quotaService.isQuotaExceeded(request.getRemoteAddr())) { + log.warn("REST Quota exceeded for [{}] . Disconnect", request.getRemoteAddr()); + responseWriter.setResult(new ResponseEntity<>(HttpStatus.BANDWIDTH_LIMIT_EXCEEDED)); + return true; + } + return false; + } + } diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index 7e4c2eac9c..d5f7c006fe 100644 --- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -16,7 +16,6 @@ package org.thingsboard.server.transport.mqtt; import com.fasterxml.jackson.databind.JsonNode; -import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.codec.mqtt.*; @@ -36,18 +35,18 @@ import org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg; import org.thingsboard.server.common.transport.SessionMsgProcessor; import org.thingsboard.server.common.transport.adaptor.AdaptorException; import org.thingsboard.server.common.transport.auth.DeviceAuthService; +import org.thingsboard.server.common.transport.quota.QuotaService; import org.thingsboard.server.dao.EncryptionUtil; import org.thingsboard.server.dao.device.DeviceService; import org.thingsboard.server.dao.relation.RelationService; import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor; -import org.thingsboard.server.transport.mqtt.session.GatewaySessionCtx; import org.thingsboard.server.transport.mqtt.session.DeviceSessionCtx; +import org.thingsboard.server.transport.mqtt.session.GatewaySessionCtx; import org.thingsboard.server.transport.mqtt.util.SslUtil; import javax.net.ssl.SSLPeerUnverifiedException; import javax.security.cert.X509Certificate; import java.net.InetSocketAddress; -import java.net.SocketAddress; import java.util.ArrayList; import java.util.List; @@ -72,13 +71,14 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement private final DeviceService deviceService; private final DeviceAuthService authService; private final RelationService relationService; + private final QuotaService quotaService; private final SslHandler sslHandler; private volatile boolean connected; private volatile InetSocketAddress address; private volatile GatewaySessionCtx gatewaySessionCtx; public MqttTransportHandler(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, RelationService relationService, - MqttTransportAdaptor adaptor, SslHandler sslHandler) { + MqttTransportAdaptor adaptor, SslHandler sslHandler, QuotaService quotaService) { this.processor = processor; this.deviceService = deviceService; this.relationService = relationService; @@ -87,6 +87,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement this.deviceSessionCtx = new DeviceSessionCtx(processor, authService, adaptor); this.sessionId = deviceSessionCtx.getSessionId().toUidStr(); this.sslHandler = sslHandler; + this.quotaService = quotaService; } @Override @@ -102,35 +103,43 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement if (msg.fixedHeader() == null) { log.info("[{}:{}] Invalid message received", address.getHostName(), address.getPort()); processDisconnect(ctx); - } else { - deviceSessionCtx.setChannel(ctx); - switch (msg.fixedHeader().messageType()) { - case CONNECT: - processConnect(ctx, (MqttConnectMessage) msg); - break; - case PUBLISH: - processPublish(ctx, (MqttPublishMessage) msg); - break; - case SUBSCRIBE: - processSubscribe(ctx, (MqttSubscribeMessage) msg); - break; - case UNSUBSCRIBE: - processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg); - break; - case PINGREQ: - if (checkConnected(ctx)) { - ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0))); - } - break; - case DISCONNECT: - if (checkConnected(ctx)) { - processDisconnect(ctx); - } - break; - default: - break; - } + return; + } + + if (quotaService.isQuotaExceeded(address.getHostName())) { + log.warn("MQTT Quota exceeded for [{}:{}] . Disconnect", address.getHostName(), address.getPort()); + processDisconnect(ctx); + return; } + + deviceSessionCtx.setChannel(ctx); + switch (msg.fixedHeader().messageType()) { + case CONNECT: + processConnect(ctx, (MqttConnectMessage) msg); + break; + case PUBLISH: + processPublish(ctx, (MqttPublishMessage) msg); + break; + case SUBSCRIBE: + processSubscribe(ctx, (MqttSubscribeMessage) msg); + break; + case UNSUBSCRIBE: + processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg); + break; + case PINGREQ: + if (checkConnected(ctx)) { + ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0))); + } + break; + case DISCONNECT: + if (checkConnected(ctx)) { + processDisconnect(ctx); + } + break; + default: + break; + } + } private void processPublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg) { diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportServerInitializer.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportServerInitializer.java index 1469290837..93812a80a8 100644 --- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportServerInitializer.java +++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportServerInitializer.java @@ -15,27 +15,19 @@ */ package org.thingsboard.server.transport.mqtt; -import io.netty.buffer.ByteBufAllocator; -import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.mqtt.MqttDecoder; import io.netty.handler.codec.mqtt.MqttEncoder; -import io.netty.handler.ssl.SslContext; -import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.SslHandler; -import io.netty.handler.ssl.util.SelfSignedCertificate; -import org.springframework.beans.factory.annotation.Value; import org.thingsboard.server.common.transport.SessionMsgProcessor; import org.thingsboard.server.common.transport.auth.DeviceAuthService; +import org.thingsboard.server.common.transport.quota.QuotaService; import org.thingsboard.server.dao.device.DeviceService; import org.thingsboard.server.dao.relation.RelationService; import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor; -import javax.net.ssl.SSLException; -import java.security.cert.CertificateException; - /** * @author Andrew Shvayka */ @@ -49,16 +41,18 @@ public class MqttTransportServerInitializer extends ChannelInitializer