From c4cd601dcbbbec90a83dfe8508766f7dd94e5d53 Mon Sep 17 00:00:00 2001 From: Andrew Shvayka Date: Mon, 28 Aug 2017 12:54:55 +0300 Subject: [PATCH] Improvements to Tenant rules and plugins startup sequence --- .../server/actors/ActorSystemContext.java | 3 + .../server/actors/app/AppActor.java | 2 +- .../shared/plugin/TenantPluginManager.java | 7 ++ .../actors/shared/rule/RuleManager.java | 10 ++- .../actors/shared/rule/TenantRuleManager.java | 7 ++ .../server/actors/tenant/TenantActor.java | 11 +-- .../server/controller/AlarmController.java | 14 ---- .../server/controller/AssetController.java | 2 +- .../server/controller/DeviceController.java | 2 +- .../src/main/resources/thingsboard.yml | 2 + .../controller/AbstractControllerTest.java | 74 +++++++++---------- .../common/data}/asset/AssetSearchQuery.java | 5 +- .../data}/device/DeviceSearchQuery.java | 2 +- .../server/dao/asset/AssetService.java | 1 + .../server/dao/asset/BaseAssetService.java | 1 + .../server/dao/device/DeviceService.java | 1 + .../server/dao/device/DeviceServiceImpl.java | 1 + .../transport/mqtt/MqttTransportHandler.java | 66 ++++++++++------- 18 files changed, 114 insertions(+), 97 deletions(-) rename {dao/src/main/java/org/thingsboard/server/dao => common/data/src/main/java/org/thingsboard/server/common/data}/asset/AssetSearchQuery.java (94%) rename dao/src/main/java/org/thingsboard/server/{dao => common/data}/device/DeviceSearchQuery.java (97%) diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index 5dd738b029..6fe4f6d762 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -136,6 +136,9 @@ public class ActorSystemContext { @Value("${actors.statistics.persist_frequency}") @Getter private long statisticsPersistFrequency; + @Value("${actors.tenant.create_components_on_init}") + @Getter private boolean tenantComponentsInitEnabled; + @Getter @Setter private ActorSystem actorSystem; @Getter @Setter private ActorRef appActor; diff --git a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java index 2ef2ca7487..bb2107b192 100644 --- a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java @@ -174,7 +174,7 @@ public class AppActor extends ContextAwareActor { TenantId tenantId = toDeviceActorMsg.getTenantId(); ActorRef tenantActor = getOrCreateTenantActor(tenantId); if (toDeviceActorMsg.getPayload().getMsgType().requiresRulesProcessing()) { - tenantActor.tell(new RuleChainDeviceMsg(toDeviceActorMsg, ruleManager.getRuleChain()), context().self()); + tenantActor.tell(new RuleChainDeviceMsg(toDeviceActorMsg, ruleManager.getRuleChain(this.context())), context().self()); } else { tenantActor.tell(toDeviceActorMsg, context().self()); } diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/plugin/TenantPluginManager.java b/application/src/main/java/org/thingsboard/server/actors/shared/plugin/TenantPluginManager.java index 89f9efec7c..dde1af69d8 100644 --- a/application/src/main/java/org/thingsboard/server/actors/shared/plugin/TenantPluginManager.java +++ b/application/src/main/java/org/thingsboard/server/actors/shared/plugin/TenantPluginManager.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.actors.shared.plugin; +import akka.actor.ActorContext; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.actors.service.DefaultActorService; import org.thingsboard.server.common.data.id.TenantId; @@ -30,6 +31,12 @@ public class TenantPluginManager extends PluginManager { this.tenantId = tenantId; } + public void init(ActorContext context) { + if (systemContext.isTenantComponentsInitEnabled()) { + super.init(context); + } + } + @Override FetchFunction getFetchPluginsFunction() { return link -> pluginService.findTenantPlugins(tenantId, link); diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/rule/RuleManager.java b/application/src/main/java/org/thingsboard/server/actors/shared/rule/RuleManager.java index 1e00a6de1b..c2fc24ae91 100644 --- a/application/src/main/java/org/thingsboard/server/actors/shared/rule/RuleManager.java +++ b/application/src/main/java/org/thingsboard/server/actors/shared/rule/RuleManager.java @@ -25,7 +25,6 @@ import org.thingsboard.server.actors.rule.RuleActorChain; import org.thingsboard.server.actors.rule.RuleActorMetaData; import org.thingsboard.server.actors.rule.SimpleRuleActorChain; import org.thingsboard.server.actors.service.ContextAwareActor; -import org.thingsboard.server.actors.service.DefaultActorService; import org.thingsboard.server.common.data.id.RuleId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.PageDataIterable; @@ -72,6 +71,9 @@ public abstract class RuleManager { } public Optional update(ActorContext context, RuleId ruleId, ComponentLifecycleEvent event) { + if (ruleMap == null) { + init(context); + } RuleMetaData rule; if (event != ComponentLifecycleEvent.DELETED) { rule = systemContext.getRuleService().findRuleById(ruleId); @@ -111,11 +113,13 @@ public abstract class RuleManager { .withDispatcher(getDispatcherName()), rId.toString())); } - public RuleActorChain getRuleChain() { + public RuleActorChain getRuleChain(ActorContext context) { + if (ruleMap == null) { + init(context); + } return ruleChain; } - private void refreshRuleChain() { Set activeRuleSet = new HashSet<>(); for (Map.Entry rule : ruleMap.entrySet()) { diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/rule/TenantRuleManager.java b/application/src/main/java/org/thingsboard/server/actors/shared/rule/TenantRuleManager.java index 05a1c7495d..12278bb09a 100644 --- a/application/src/main/java/org/thingsboard/server/actors/shared/rule/TenantRuleManager.java +++ b/application/src/main/java/org/thingsboard/server/actors/shared/rule/TenantRuleManager.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.actors.shared.rule; +import akka.actor.ActorContext; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.actors.service.DefaultActorService; import org.thingsboard.server.common.data.id.TenantId; @@ -27,6 +28,12 @@ public class TenantRuleManager extends RuleManager { super(systemContext, tenantId); } + public void init(ActorContext context) { + if (systemContext.isTenantComponentsInitEnabled()) { + super.init(context); + } + } + @Override FetchFunction getFetchRulesFunction() { return link -> ruleService.findTenantRules(tenantId, link); diff --git a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java index 9cc325254d..2fbe1accd7 100644 --- a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java @@ -151,18 +151,13 @@ public class TenantActor extends ContextAwareActor { private void process(RuleChainDeviceMsg msg) { ToDeviceActorMsg toDeviceActorMsg = msg.getToDeviceActorMsg(); ActorRef deviceActor = getOrCreateDeviceActor(toDeviceActorMsg.getDeviceId()); - RuleActorChain chain = new ComplexRuleActorChain(msg.getRuleChain(), ruleManager.getRuleChain()); + RuleActorChain chain = new ComplexRuleActorChain(msg.getRuleChain(), ruleManager.getRuleChain(this.context())); deviceActor.tell(new RuleChainDeviceMsg(toDeviceActorMsg, chain), context().self()); } private ActorRef getOrCreateDeviceActor(DeviceId deviceId) { - ActorRef deviceActor = deviceActors.get(deviceId); - if (deviceActor == null) { - deviceActor = context().actorOf(Props.create(new DeviceActor.ActorCreator(systemContext, tenantId, deviceId)) - .withDispatcher(DefaultActorService.CORE_DISPATCHER_NAME), deviceId.toString()); - deviceActors.put(deviceId, deviceActor); - } - return deviceActor; + return deviceActors.computeIfAbsent(deviceId, k -> context().actorOf(Props.create(new DeviceActor.ActorCreator(systemContext, tenantId, deviceId)) + .withDispatcher(DefaultActorService.CORE_DISPATCHER_NAME), deviceId.toString())); } public static class ActorCreator extends ContextBasedCreator { diff --git a/application/src/main/java/org/thingsboard/server/controller/AlarmController.java b/application/src/main/java/org/thingsboard/server/controller/AlarmController.java index e30113871d..3b21611138 100644 --- a/application/src/main/java/org/thingsboard/server/controller/AlarmController.java +++ b/application/src/main/java/org/thingsboard/server/controller/AlarmController.java @@ -15,30 +15,16 @@ */ package org.thingsboard.server.controller; -import com.google.common.util.concurrent.ListenableFuture; import org.apache.commons.lang3.StringUtils; import org.springframework.http.HttpStatus; import org.springframework.security.access.prepost.PreAuthorize; import org.springframework.web.bind.annotation.*; -import org.thingsboard.server.common.data.Customer; -import org.thingsboard.server.common.data.Event; import org.thingsboard.server.common.data.alarm.*; -import org.thingsboard.server.common.data.asset.Asset; import org.thingsboard.server.common.data.id.*; -import org.thingsboard.server.common.data.page.TextPageData; -import org.thingsboard.server.common.data.page.TextPageLink; import org.thingsboard.server.common.data.page.TimePageData; import org.thingsboard.server.common.data.page.TimePageLink; -import org.thingsboard.server.dao.asset.AssetSearchQuery; -import org.thingsboard.server.dao.exception.IncorrectParameterException; -import org.thingsboard.server.dao.model.ModelConstants; import org.thingsboard.server.exception.ThingsboardErrorCode; import org.thingsboard.server.exception.ThingsboardException; -import org.thingsboard.server.service.security.model.SecurityUser; - -import java.util.ArrayList; -import java.util.List; -import java.util.stream.Collectors; @RestController @RequestMapping("/api") diff --git a/application/src/main/java/org/thingsboard/server/controller/AssetController.java b/application/src/main/java/org/thingsboard/server/controller/AssetController.java index 2857874da0..5809a7fe7e 100644 --- a/application/src/main/java/org/thingsboard/server/controller/AssetController.java +++ b/application/src/main/java/org/thingsboard/server/controller/AssetController.java @@ -27,7 +27,7 @@ import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.TextPageData; import org.thingsboard.server.common.data.page.TextPageLink; -import org.thingsboard.server.dao.asset.AssetSearchQuery; +import org.thingsboard.server.common.data.asset.AssetSearchQuery; import org.thingsboard.server.dao.exception.IncorrectParameterException; import org.thingsboard.server.dao.model.ModelConstants; import org.thingsboard.server.exception.ThingsboardException; diff --git a/application/src/main/java/org/thingsboard/server/controller/DeviceController.java b/application/src/main/java/org/thingsboard/server/controller/DeviceController.java index 68b29b85e8..637a760a00 100644 --- a/application/src/main/java/org/thingsboard/server/controller/DeviceController.java +++ b/application/src/main/java/org/thingsboard/server/controller/DeviceController.java @@ -28,7 +28,7 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.TextPageData; import org.thingsboard.server.common.data.page.TextPageLink; import org.thingsboard.server.common.data.security.DeviceCredentials; -import org.thingsboard.server.dao.device.DeviceSearchQuery; +import org.thingsboard.server.common.data.device.DeviceSearchQuery; import org.thingsboard.server.dao.exception.IncorrectParameterException; import org.thingsboard.server.dao.model.ModelConstants; import org.thingsboard.server.exception.ThingsboardException; diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 6831cc6361..e1bad01cce 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -159,6 +159,8 @@ cassandra: # Actor system parameters actors: + tenant: + create_components_on_init: true session: sync: # Default timeout for processing request using synchronous session (HTTP, CoAP) in milliseconds diff --git a/application/src/test/java/org/thingsboard/server/controller/AbstractControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/AbstractControllerTest.java index 94cef57ea1..0fb12a2b34 100644 --- a/application/src/test/java/org/thingsboard/server/controller/AbstractControllerTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/AbstractControllerTest.java @@ -97,28 +97,28 @@ public abstract class AbstractControllerTest { protected static final String SYS_ADMIN_EMAIL = "sysadmin@thingsboard.org"; private static final String SYS_ADMIN_PASSWORD = "sysadmin"; - + protected static final String TENANT_ADMIN_EMAIL = "testtenant@thingsboard.org"; private static final String TENANT_ADMIN_PASSWORD = "tenant"; protected static final String CUSTOMER_USER_EMAIL = "testcustomer@thingsboard.org"; private static final String CUSTOMER_USER_PASSWORD = "customer"; - + protected MediaType contentType = new MediaType(MediaType.APPLICATION_JSON.getType(), MediaType.APPLICATION_JSON.getSubtype(), Charset.forName("utf8")); protected MockMvc mockMvc; - + protected String token; protected String refreshToken; protected String username; private TenantId tenantId; - + @SuppressWarnings("rawtypes") private HttpMessageConverter mappingJackson2HttpMessageConverter; - + @Autowired private WebApplicationContext webApplicationContext; @@ -132,7 +132,7 @@ public abstract class AbstractControllerTest { log.info("Finished test: {}", description.getMethodName()); } }; - + @Autowired void setConverters(HttpMessageConverter[] converters) { @@ -144,7 +144,7 @@ public abstract class AbstractControllerTest { Assert.assertNotNull("the JSON message converter must not be null", this.mappingJackson2HttpMessageConverter); } - + @Before public void setup() throws Exception { log.info("Executing setup"); @@ -188,7 +188,7 @@ public abstract class AbstractControllerTest { public void teardown() throws Exception { log.info("Executing teardown"); loginSysAdmin(); - doDelete("/api/tenant/"+tenantId.getId().toString()) + doDelete("/api/tenant/" + tenantId.getId().toString()) .andExpect(status().isOk()); log.info("Executed teardown"); } @@ -196,7 +196,7 @@ public abstract class AbstractControllerTest { protected void loginSysAdmin() throws Exception { login(SYS_ADMIN_EMAIL, SYS_ADMIN_PASSWORD); } - + protected void loginTenantAdmin() throws Exception { login(TENANT_ADMIN_EMAIL, TENANT_ADMIN_PASSWORD); } @@ -204,13 +204,13 @@ public abstract class AbstractControllerTest { protected void loginCustomerUser() throws Exception { login(CUSTOMER_USER_EMAIL, CUSTOMER_USER_PASSWORD); } - + protected User createUserAndLogin(User user, String password) throws Exception { User savedUser = doPost("/api/user", user, User.class); logout(); doGet("/api/noauth/activate?activateToken={activateToken}", TestMailService.currentActivateToken) - .andExpect(status().isSeeOther()) - .andExpect(header().string(HttpHeaders.LOCATION, "/login/createPassword?activateToken=" + TestMailService.currentActivateToken)); + .andExpect(status().isSeeOther()) + .andExpect(header().string(HttpHeaders.LOCATION, "/login/createPassword?activateToken=" + TestMailService.currentActivateToken)); JsonNode tokenInfo = readResponse(doPost("/api/noauth/activate", "activateToken", TestMailService.currentActivateToken, "password", password).andExpect(status().isOk()), JsonNode.class); validateAndSetJwtToken(tokenInfo, user.getEmail()); return savedUser; @@ -247,14 +247,14 @@ public abstract class AbstractControllerTest { Assert.assertNotNull(token); Assert.assertFalse(token.isEmpty()); int i = token.lastIndexOf('.'); - Assert.assertTrue(i>0); - String withoutSignature = token.substring(0, i+1); - Jwt jwsClaims = Jwts.parser().parseClaimsJwt(withoutSignature); + Assert.assertTrue(i > 0); + String withoutSignature = token.substring(0, i + 1); + Jwt jwsClaims = Jwts.parser().parseClaimsJwt(withoutSignature); Claims claims = jwsClaims.getBody(); String subject = claims.getSubject(); Assert.assertEquals(username, subject); } - + protected void logout() throws Exception { this.token = null; this.refreshToken = null; @@ -266,24 +266,24 @@ public abstract class AbstractControllerTest { request.header(ThingsboardSecurityConfiguration.JWT_TOKEN_HEADER_PARAM, "Bearer " + this.token); } } - + protected ResultActions doGet(String urlTemplate, Object... urlVariables) throws Exception { MockHttpServletRequestBuilder getRequest = get(urlTemplate, urlVariables); setJwtToken(getRequest); return mockMvc.perform(getRequest); } - + protected T doGet(String urlTemplate, Class responseClass, Object... urlVariables) throws Exception { return readResponse(doGet(urlTemplate, urlVariables).andExpect(status().isOk()), responseClass); } - + protected T doGetTyped(String urlTemplate, TypeReference responseType, Object... urlVariables) throws Exception { return readResponse(doGet(urlTemplate, urlVariables).andExpect(status().isOk()), responseType); } - + protected T doGetTypedWithPageLink(String urlTemplate, TypeReference responseType, - TextPageLink pageLink, - Object... urlVariables) throws Exception { + TextPageLink pageLink, + Object... urlVariables) throws Exception { List pageLinkVariables = new ArrayList<>(); urlTemplate += "limit={limit}"; pageLinkVariables.add(pageLink.getLimit()); @@ -299,18 +299,18 @@ public abstract class AbstractControllerTest { urlTemplate += "&textOffset={textOffset}"; pageLinkVariables.add(pageLink.getTextOffset()); } - - Object[] vars = new Object[urlVariables.length + pageLinkVariables.size()]; + + Object[] vars = new Object[urlVariables.length + pageLinkVariables.size()]; System.arraycopy(urlVariables, 0, vars, 0, urlVariables.length); System.arraycopy(pageLinkVariables.toArray(), 0, vars, urlVariables.length, pageLinkVariables.size()); - + return readResponse(doGet(urlTemplate, vars).andExpect(status().isOk()), responseType); } - + protected T doPost(String urlTemplate, Class responseClass, String... params) throws Exception { return readResponse(doPost(urlTemplate, params).andExpect(status().isOk()), responseClass); } - + protected T doPost(String urlTemplate, T content, Class responseClass, String... params) throws Exception { return readResponse(doPost(urlTemplate, content, params).andExpect(status().isOk()), responseClass); } @@ -318,15 +318,15 @@ public abstract class AbstractControllerTest { protected T doDelete(String urlTemplate, Class responseClass, String... params) throws Exception { return readResponse(doDelete(urlTemplate, params).andExpect(status().isOk()), responseClass); } - + protected ResultActions doPost(String urlTemplate, String... params) throws Exception { MockHttpServletRequestBuilder postRequest = post(urlTemplate); setJwtToken(postRequest); populateParams(postRequest, params); return mockMvc.perform(postRequest); } - - protected ResultActions doPost(String urlTemplate, T content, String... params) throws Exception { + + protected ResultActions doPost(String urlTemplate, T content, String... params) throws Exception { MockHttpServletRequestBuilder postRequest = post(urlTemplate); setJwtToken(postRequest); String json = json(content); @@ -334,25 +334,25 @@ public abstract class AbstractControllerTest { populateParams(postRequest, params); return mockMvc.perform(postRequest); } - + protected ResultActions doDelete(String urlTemplate, String... params) throws Exception { MockHttpServletRequestBuilder deleteRequest = delete(urlTemplate); setJwtToken(deleteRequest); populateParams(deleteRequest, params); return mockMvc.perform(deleteRequest); } - + protected void populateParams(MockHttpServletRequestBuilder request, String... params) { if (params != null && params.length > 0) { Assert.assertEquals(params.length % 2, 0); MultiValueMap paramsMap = new LinkedMultiValueMap(); - for (int i=0;i T readResponse(ResultActions result, Class responseClass) throws Exception { byte[] content = result.andReturn().getResponse().getContentAsByteArray(); MockHttpInputMessage mockHttpInputMessage = new MockHttpInputMessage(content); return (T) this.mappingJackson2HttpMessageConverter.read(responseClass, mockHttpInputMessage); } - + protected T readResponse(ResultActions result, TypeReference type) throws Exception { byte[] content = result.andReturn().getResponse().getContentAsByteArray(); ObjectMapper mapper = new ObjectMapper(); diff --git a/dao/src/main/java/org/thingsboard/server/dao/asset/AssetSearchQuery.java b/common/data/src/main/java/org/thingsboard/server/common/data/asset/AssetSearchQuery.java similarity index 94% rename from dao/src/main/java/org/thingsboard/server/dao/asset/AssetSearchQuery.java rename to common/data/src/main/java/org/thingsboard/server/common/data/asset/AssetSearchQuery.java index 974845b6a1..b955fdbc4f 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/asset/AssetSearchQuery.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/asset/AssetSearchQuery.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.dao.asset; +package org.thingsboard.server.common.data.asset; import lombok.Data; import org.thingsboard.server.common.data.EntityType; @@ -22,7 +22,6 @@ import org.thingsboard.server.common.data.relation.EntityRelationsQuery; import org.thingsboard.server.common.data.relation.EntityTypeFilter; import org.thingsboard.server.common.data.relation.RelationsSearchParameters; -import javax.annotation.Nullable; import java.util.Collections; import java.util.List; @@ -33,9 +32,7 @@ import java.util.List; public class AssetSearchQuery { private RelationsSearchParameters parameters; - @Nullable private String relationType; - @Nullable private List assetTypes; public EntityRelationsQuery toEntitySearchQuery() { diff --git a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceSearchQuery.java b/dao/src/main/java/org/thingsboard/server/common/data/device/DeviceSearchQuery.java similarity index 97% rename from dao/src/main/java/org/thingsboard/server/dao/device/DeviceSearchQuery.java rename to dao/src/main/java/org/thingsboard/server/common/data/device/DeviceSearchQuery.java index fece90bb0d..76b696fdae 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceSearchQuery.java +++ b/dao/src/main/java/org/thingsboard/server/common/data/device/DeviceSearchQuery.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.dao.device; +package org.thingsboard.server.common.data.device; import lombok.Data; import org.thingsboard.server.common.data.EntityType; diff --git a/dao/src/main/java/org/thingsboard/server/dao/asset/AssetService.java b/dao/src/main/java/org/thingsboard/server/dao/asset/AssetService.java index 214ef15b48..7b64b20d48 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/asset/AssetService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/asset/AssetService.java @@ -18,6 +18,7 @@ package org.thingsboard.server.dao.asset; import com.google.common.util.concurrent.ListenableFuture; import org.thingsboard.server.common.data.EntitySubtype; import org.thingsboard.server.common.data.asset.Asset; +import org.thingsboard.server.common.data.asset.AssetSearchQuery; import org.thingsboard.server.common.data.id.AssetId; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.TenantId; diff --git a/dao/src/main/java/org/thingsboard/server/dao/asset/BaseAssetService.java b/dao/src/main/java/org/thingsboard/server/dao/asset/BaseAssetService.java index 9ea6448bd4..14a9f21283 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/asset/BaseAssetService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/asset/BaseAssetService.java @@ -29,6 +29,7 @@ import org.thingsboard.server.common.data.EntitySubtype; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.Tenant; import org.thingsboard.server.common.data.asset.Asset; +import org.thingsboard.server.common.data.asset.AssetSearchQuery; import org.thingsboard.server.common.data.id.AssetId; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.EntityId; diff --git a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceService.java b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceService.java index cd56fefbdb..3b0c5ec124 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceService.java @@ -18,6 +18,7 @@ package org.thingsboard.server.dao.device; import com.google.common.util.concurrent.ListenableFuture; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.EntitySubtype; +import org.thingsboard.server.common.data.device.DeviceSearchQuery; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.TenantId; diff --git a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java index 717aea08bd..76be99697c 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java +++ b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java @@ -25,6 +25,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; import org.thingsboard.server.common.data.*; +import org.thingsboard.server.common.data.device.DeviceSearchQuery; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EntityId; diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index 9c4dd40417..3fd1696988 100644 --- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -16,6 +16,7 @@ package org.thingsboard.server.transport.mqtt; import com.fasterxml.jackson.databind.JsonNode; +import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.codec.mqtt.*; @@ -45,6 +46,8 @@ import org.thingsboard.server.transport.mqtt.util.SslUtil; import javax.net.ssl.SSLPeerUnverifiedException; import javax.security.cert.X509Certificate; +import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.util.ArrayList; import java.util.List; @@ -71,6 +74,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement private final RelationService relationService; private final SslHandler sslHandler; private volatile boolean connected; + private volatile InetSocketAddress address; private volatile GatewaySessionCtx gatewaySessionCtx; public MqttTransportHandler(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, RelationService relationService, @@ -94,30 +98,36 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } private void processMqttMsg(ChannelHandlerContext ctx, MqttMessage msg) { - deviceSessionCtx.setChannel(ctx); - switch (msg.fixedHeader().messageType()) { - case CONNECT: - processConnect(ctx, (MqttConnectMessage) msg); - break; - case PUBLISH: - processPublish(ctx, (MqttPublishMessage) msg); - break; - case SUBSCRIBE: - processSubscribe(ctx, (MqttSubscribeMessage) msg); - break; - case UNSUBSCRIBE: - processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg); - break; - case PINGREQ: - if (checkConnected(ctx)) { - ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0))); - } - break; - case DISCONNECT: - if (checkConnected(ctx)) { - processDisconnect(ctx); - } - break; + address = (InetSocketAddress) ctx.channel().remoteAddress(); + if (msg.fixedHeader() == null) { + log.info("[{}:{}] Invalid message received", address.getHostName(), address.getPort()); + processDisconnect(ctx); + } else { + deviceSessionCtx.setChannel(ctx); + switch (msg.fixedHeader().messageType()) { + case CONNECT: + processConnect(ctx, (MqttConnectMessage) msg); + break; + case PUBLISH: + processPublish(ctx, (MqttPublishMessage) msg); + break; + case SUBSCRIBE: + processSubscribe(ctx, (MqttSubscribeMessage) msg); + break; + case UNSUBSCRIBE: + processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg); + break; + case PINGREQ: + if (checkConnected(ctx)) { + ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0))); + } + break; + case DISCONNECT: + if (checkConnected(ctx)) { + processDisconnect(ctx); + } + break; + } } } @@ -313,9 +323,11 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement private void processDisconnect(ChannelHandlerContext ctx) { ctx.close(); - processor.process(SessionCloseMsg.onDisconnect(deviceSessionCtx.getSessionId())); - if (gatewaySessionCtx != null) { - gatewaySessionCtx.onGatewayDisconnect(); + if (connected) { + processor.process(SessionCloseMsg.onDisconnect(deviceSessionCtx.getSessionId())); + if (gatewaySessionCtx != null) { + gatewaySessionCtx.onGatewayDisconnect(); + } } }