Browse Source

Improvements to Tenant rules and plugins startup sequence

pull/266/head
Andrew Shvayka 9 years ago
parent
commit
c4cd601dcb
  1. 3
      application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
  2. 2
      application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
  3. 7
      application/src/main/java/org/thingsboard/server/actors/shared/plugin/TenantPluginManager.java
  4. 10
      application/src/main/java/org/thingsboard/server/actors/shared/rule/RuleManager.java
  5. 7
      application/src/main/java/org/thingsboard/server/actors/shared/rule/TenantRuleManager.java
  6. 11
      application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
  7. 14
      application/src/main/java/org/thingsboard/server/controller/AlarmController.java
  8. 2
      application/src/main/java/org/thingsboard/server/controller/AssetController.java
  9. 2
      application/src/main/java/org/thingsboard/server/controller/DeviceController.java
  10. 2
      application/src/main/resources/thingsboard.yml
  11. 74
      application/src/test/java/org/thingsboard/server/controller/AbstractControllerTest.java
  12. 5
      common/data/src/main/java/org/thingsboard/server/common/data/asset/AssetSearchQuery.java
  13. 2
      dao/src/main/java/org/thingsboard/server/common/data/device/DeviceSearchQuery.java
  14. 1
      dao/src/main/java/org/thingsboard/server/dao/asset/AssetService.java
  15. 1
      dao/src/main/java/org/thingsboard/server/dao/asset/BaseAssetService.java
  16. 1
      dao/src/main/java/org/thingsboard/server/dao/device/DeviceService.java
  17. 1
      dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java
  18. 66
      transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java

3
application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java

@ -136,6 +136,9 @@ public class ActorSystemContext {
@Value("${actors.statistics.persist_frequency}")
@Getter private long statisticsPersistFrequency;
@Value("${actors.tenant.create_components_on_init}")
@Getter private boolean tenantComponentsInitEnabled;
@Getter @Setter private ActorSystem actorSystem;
@Getter @Setter private ActorRef appActor;

2
application/src/main/java/org/thingsboard/server/actors/app/AppActor.java

@ -174,7 +174,7 @@ public class AppActor extends ContextAwareActor {
TenantId tenantId = toDeviceActorMsg.getTenantId();
ActorRef tenantActor = getOrCreateTenantActor(tenantId);
if (toDeviceActorMsg.getPayload().getMsgType().requiresRulesProcessing()) {
tenantActor.tell(new RuleChainDeviceMsg(toDeviceActorMsg, ruleManager.getRuleChain()), context().self());
tenantActor.tell(new RuleChainDeviceMsg(toDeviceActorMsg, ruleManager.getRuleChain(this.context())), context().self());
} else {
tenantActor.tell(toDeviceActorMsg, context().self());
}

7
application/src/main/java/org/thingsboard/server/actors/shared/plugin/TenantPluginManager.java

@ -15,6 +15,7 @@
*/
package org.thingsboard.server.actors.shared.plugin;
import akka.actor.ActorContext;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.service.DefaultActorService;
import org.thingsboard.server.common.data.id.TenantId;
@ -30,6 +31,12 @@ public class TenantPluginManager extends PluginManager {
this.tenantId = tenantId;
}
public void init(ActorContext context) {
if (systemContext.isTenantComponentsInitEnabled()) {
super.init(context);
}
}
@Override
FetchFunction<PluginMetaData> getFetchPluginsFunction() {
return link -> pluginService.findTenantPlugins(tenantId, link);

10
application/src/main/java/org/thingsboard/server/actors/shared/rule/RuleManager.java

@ -25,7 +25,6 @@ import org.thingsboard.server.actors.rule.RuleActorChain;
import org.thingsboard.server.actors.rule.RuleActorMetaData;
import org.thingsboard.server.actors.rule.SimpleRuleActorChain;
import org.thingsboard.server.actors.service.ContextAwareActor;
import org.thingsboard.server.actors.service.DefaultActorService;
import org.thingsboard.server.common.data.id.RuleId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageDataIterable;
@ -72,6 +71,9 @@ public abstract class RuleManager {
}
public Optional<ActorRef> update(ActorContext context, RuleId ruleId, ComponentLifecycleEvent event) {
if (ruleMap == null) {
init(context);
}
RuleMetaData rule;
if (event != ComponentLifecycleEvent.DELETED) {
rule = systemContext.getRuleService().findRuleById(ruleId);
@ -111,11 +113,13 @@ public abstract class RuleManager {
.withDispatcher(getDispatcherName()), rId.toString()));
}
public RuleActorChain getRuleChain() {
public RuleActorChain getRuleChain(ActorContext context) {
if (ruleMap == null) {
init(context);
}
return ruleChain;
}
private void refreshRuleChain() {
Set<RuleActorMetaData> activeRuleSet = new HashSet<>();
for (Map.Entry<RuleMetaData, RuleActorMetaData> rule : ruleMap.entrySet()) {

7
application/src/main/java/org/thingsboard/server/actors/shared/rule/TenantRuleManager.java

@ -15,6 +15,7 @@
*/
package org.thingsboard.server.actors.shared.rule;
import akka.actor.ActorContext;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.service.DefaultActorService;
import org.thingsboard.server.common.data.id.TenantId;
@ -27,6 +28,12 @@ public class TenantRuleManager extends RuleManager {
super(systemContext, tenantId);
}
public void init(ActorContext context) {
if (systemContext.isTenantComponentsInitEnabled()) {
super.init(context);
}
}
@Override
FetchFunction<RuleMetaData> getFetchRulesFunction() {
return link -> ruleService.findTenantRules(tenantId, link);

11
application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java

@ -151,18 +151,13 @@ public class TenantActor extends ContextAwareActor {
private void process(RuleChainDeviceMsg msg) {
ToDeviceActorMsg toDeviceActorMsg = msg.getToDeviceActorMsg();
ActorRef deviceActor = getOrCreateDeviceActor(toDeviceActorMsg.getDeviceId());
RuleActorChain chain = new ComplexRuleActorChain(msg.getRuleChain(), ruleManager.getRuleChain());
RuleActorChain chain = new ComplexRuleActorChain(msg.getRuleChain(), ruleManager.getRuleChain(this.context()));
deviceActor.tell(new RuleChainDeviceMsg(toDeviceActorMsg, chain), context().self());
}
private ActorRef getOrCreateDeviceActor(DeviceId deviceId) {
ActorRef deviceActor = deviceActors.get(deviceId);
if (deviceActor == null) {
deviceActor = context().actorOf(Props.create(new DeviceActor.ActorCreator(systemContext, tenantId, deviceId))
.withDispatcher(DefaultActorService.CORE_DISPATCHER_NAME), deviceId.toString());
deviceActors.put(deviceId, deviceActor);
}
return deviceActor;
return deviceActors.computeIfAbsent(deviceId, k -> context().actorOf(Props.create(new DeviceActor.ActorCreator(systemContext, tenantId, deviceId))
.withDispatcher(DefaultActorService.CORE_DISPATCHER_NAME), deviceId.toString()));
}
public static class ActorCreator extends ContextBasedCreator<TenantActor> {

14
application/src/main/java/org/thingsboard/server/controller/AlarmController.java

@ -15,30 +15,16 @@
*/
package org.thingsboard.server.controller;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.commons.lang3.StringUtils;
import org.springframework.http.HttpStatus;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.web.bind.annotation.*;
import org.thingsboard.server.common.data.Customer;
import org.thingsboard.server.common.data.Event;
import org.thingsboard.server.common.data.alarm.*;
import org.thingsboard.server.common.data.asset.Asset;
import org.thingsboard.server.common.data.id.*;
import org.thingsboard.server.common.data.page.TextPageData;
import org.thingsboard.server.common.data.page.TextPageLink;
import org.thingsboard.server.common.data.page.TimePageData;
import org.thingsboard.server.common.data.page.TimePageLink;
import org.thingsboard.server.dao.asset.AssetSearchQuery;
import org.thingsboard.server.dao.exception.IncorrectParameterException;
import org.thingsboard.server.dao.model.ModelConstants;
import org.thingsboard.server.exception.ThingsboardErrorCode;
import org.thingsboard.server.exception.ThingsboardException;
import org.thingsboard.server.service.security.model.SecurityUser;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
@RestController
@RequestMapping("/api")

2
application/src/main/java/org/thingsboard/server/controller/AssetController.java

@ -27,7 +27,7 @@ import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.TextPageData;
import org.thingsboard.server.common.data.page.TextPageLink;
import org.thingsboard.server.dao.asset.AssetSearchQuery;
import org.thingsboard.server.common.data.asset.AssetSearchQuery;
import org.thingsboard.server.dao.exception.IncorrectParameterException;
import org.thingsboard.server.dao.model.ModelConstants;
import org.thingsboard.server.exception.ThingsboardException;

2
application/src/main/java/org/thingsboard/server/controller/DeviceController.java

@ -28,7 +28,7 @@ import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.TextPageData;
import org.thingsboard.server.common.data.page.TextPageLink;
import org.thingsboard.server.common.data.security.DeviceCredentials;
import org.thingsboard.server.dao.device.DeviceSearchQuery;
import org.thingsboard.server.common.data.device.DeviceSearchQuery;
import org.thingsboard.server.dao.exception.IncorrectParameterException;
import org.thingsboard.server.dao.model.ModelConstants;
import org.thingsboard.server.exception.ThingsboardException;

2
application/src/main/resources/thingsboard.yml

@ -159,6 +159,8 @@ cassandra:
# Actor system parameters
actors:
tenant:
create_components_on_init: true
session:
sync:
# Default timeout for processing request using synchronous session (HTTP, CoAP) in milliseconds

74
application/src/test/java/org/thingsboard/server/controller/AbstractControllerTest.java

@ -97,28 +97,28 @@ public abstract class AbstractControllerTest {
protected static final String SYS_ADMIN_EMAIL = "sysadmin@thingsboard.org";
private static final String SYS_ADMIN_PASSWORD = "sysadmin";
protected static final String TENANT_ADMIN_EMAIL = "testtenant@thingsboard.org";
private static final String TENANT_ADMIN_PASSWORD = "tenant";
protected static final String CUSTOMER_USER_EMAIL = "testcustomer@thingsboard.org";
private static final String CUSTOMER_USER_PASSWORD = "customer";
protected MediaType contentType = new MediaType(MediaType.APPLICATION_JSON.getType(),
MediaType.APPLICATION_JSON.getSubtype(),
Charset.forName("utf8"));
protected MockMvc mockMvc;
protected String token;
protected String refreshToken;
protected String username;
private TenantId tenantId;
@SuppressWarnings("rawtypes")
private HttpMessageConverter mappingJackson2HttpMessageConverter;
@Autowired
private WebApplicationContext webApplicationContext;
@ -132,7 +132,7 @@ public abstract class AbstractControllerTest {
log.info("Finished test: {}", description.getMethodName());
}
};
@Autowired
void setConverters(HttpMessageConverter<?>[] converters) {
@ -144,7 +144,7 @@ public abstract class AbstractControllerTest {
Assert.assertNotNull("the JSON message converter must not be null",
this.mappingJackson2HttpMessageConverter);
}
@Before
public void setup() throws Exception {
log.info("Executing setup");
@ -188,7 +188,7 @@ public abstract class AbstractControllerTest {
public void teardown() throws Exception {
log.info("Executing teardown");
loginSysAdmin();
doDelete("/api/tenant/"+tenantId.getId().toString())
doDelete("/api/tenant/" + tenantId.getId().toString())
.andExpect(status().isOk());
log.info("Executed teardown");
}
@ -196,7 +196,7 @@ public abstract class AbstractControllerTest {
protected void loginSysAdmin() throws Exception {
login(SYS_ADMIN_EMAIL, SYS_ADMIN_PASSWORD);
}
protected void loginTenantAdmin() throws Exception {
login(TENANT_ADMIN_EMAIL, TENANT_ADMIN_PASSWORD);
}
@ -204,13 +204,13 @@ public abstract class AbstractControllerTest {
protected void loginCustomerUser() throws Exception {
login(CUSTOMER_USER_EMAIL, CUSTOMER_USER_PASSWORD);
}
protected User createUserAndLogin(User user, String password) throws Exception {
User savedUser = doPost("/api/user", user, User.class);
logout();
doGet("/api/noauth/activate?activateToken={activateToken}", TestMailService.currentActivateToken)
.andExpect(status().isSeeOther())
.andExpect(header().string(HttpHeaders.LOCATION, "/login/createPassword?activateToken=" + TestMailService.currentActivateToken));
.andExpect(status().isSeeOther())
.andExpect(header().string(HttpHeaders.LOCATION, "/login/createPassword?activateToken=" + TestMailService.currentActivateToken));
JsonNode tokenInfo = readResponse(doPost("/api/noauth/activate", "activateToken", TestMailService.currentActivateToken, "password", password).andExpect(status().isOk()), JsonNode.class);
validateAndSetJwtToken(tokenInfo, user.getEmail());
return savedUser;
@ -247,14 +247,14 @@ public abstract class AbstractControllerTest {
Assert.assertNotNull(token);
Assert.assertFalse(token.isEmpty());
int i = token.lastIndexOf('.');
Assert.assertTrue(i>0);
String withoutSignature = token.substring(0, i+1);
Jwt<Header,Claims> jwsClaims = Jwts.parser().parseClaimsJwt(withoutSignature);
Assert.assertTrue(i > 0);
String withoutSignature = token.substring(0, i + 1);
Jwt<Header, Claims> jwsClaims = Jwts.parser().parseClaimsJwt(withoutSignature);
Claims claims = jwsClaims.getBody();
String subject = claims.getSubject();
Assert.assertEquals(username, subject);
}
protected void logout() throws Exception {
this.token = null;
this.refreshToken = null;
@ -266,24 +266,24 @@ public abstract class AbstractControllerTest {
request.header(ThingsboardSecurityConfiguration.JWT_TOKEN_HEADER_PARAM, "Bearer " + this.token);
}
}
protected ResultActions doGet(String urlTemplate, Object... urlVariables) throws Exception {
MockHttpServletRequestBuilder getRequest = get(urlTemplate, urlVariables);
setJwtToken(getRequest);
return mockMvc.perform(getRequest);
}
protected <T> T doGet(String urlTemplate, Class<T> responseClass, Object... urlVariables) throws Exception {
return readResponse(doGet(urlTemplate, urlVariables).andExpect(status().isOk()), responseClass);
}
protected <T> T doGetTyped(String urlTemplate, TypeReference<T> responseType, Object... urlVariables) throws Exception {
return readResponse(doGet(urlTemplate, urlVariables).andExpect(status().isOk()), responseType);
}
protected <T> T doGetTypedWithPageLink(String urlTemplate, TypeReference<T> responseType,
TextPageLink pageLink,
Object... urlVariables) throws Exception {
TextPageLink pageLink,
Object... urlVariables) throws Exception {
List<Object> pageLinkVariables = new ArrayList<>();
urlTemplate += "limit={limit}";
pageLinkVariables.add(pageLink.getLimit());
@ -299,18 +299,18 @@ public abstract class AbstractControllerTest {
urlTemplate += "&textOffset={textOffset}";
pageLinkVariables.add(pageLink.getTextOffset());
}
Object[] vars = new Object[urlVariables.length + pageLinkVariables.size()];
Object[] vars = new Object[urlVariables.length + pageLinkVariables.size()];
System.arraycopy(urlVariables, 0, vars, 0, urlVariables.length);
System.arraycopy(pageLinkVariables.toArray(), 0, vars, urlVariables.length, pageLinkVariables.size());
return readResponse(doGet(urlTemplate, vars).andExpect(status().isOk()), responseType);
}
protected <T> T doPost(String urlTemplate, Class<T> responseClass, String... params) throws Exception {
return readResponse(doPost(urlTemplate, params).andExpect(status().isOk()), responseClass);
}
protected <T> T doPost(String urlTemplate, T content, Class<T> responseClass, String... params) throws Exception {
return readResponse(doPost(urlTemplate, content, params).andExpect(status().isOk()), responseClass);
}
@ -318,15 +318,15 @@ public abstract class AbstractControllerTest {
protected <T> T doDelete(String urlTemplate, Class<T> responseClass, String... params) throws Exception {
return readResponse(doDelete(urlTemplate, params).andExpect(status().isOk()), responseClass);
}
protected ResultActions doPost(String urlTemplate, String... params) throws Exception {
MockHttpServletRequestBuilder postRequest = post(urlTemplate);
setJwtToken(postRequest);
populateParams(postRequest, params);
return mockMvc.perform(postRequest);
}
protected <T> ResultActions doPost(String urlTemplate, T content, String... params) throws Exception {
protected <T> ResultActions doPost(String urlTemplate, T content, String... params) throws Exception {
MockHttpServletRequestBuilder postRequest = post(urlTemplate);
setJwtToken(postRequest);
String json = json(content);
@ -334,25 +334,25 @@ public abstract class AbstractControllerTest {
populateParams(postRequest, params);
return mockMvc.perform(postRequest);
}
protected ResultActions doDelete(String urlTemplate, String... params) throws Exception {
MockHttpServletRequestBuilder deleteRequest = delete(urlTemplate);
setJwtToken(deleteRequest);
populateParams(deleteRequest, params);
return mockMvc.perform(deleteRequest);
}
protected void populateParams(MockHttpServletRequestBuilder request, String... params) {
if (params != null && params.length > 0) {
Assert.assertEquals(params.length % 2, 0);
MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<String, String>();
for (int i=0;i<params.length;i+=2) {
paramsMap.add(params[i], params[i+1]);
for (int i = 0; i < params.length; i += 2) {
paramsMap.add(params[i], params[i + 1]);
}
request.params(paramsMap);
}
}
@SuppressWarnings("unchecked")
protected String json(Object o) throws IOException {
MockHttpOutputMessage mockHttpOutputMessage = new MockHttpOutputMessage();
@ -360,14 +360,14 @@ public abstract class AbstractControllerTest {
o, MediaType.APPLICATION_JSON, mockHttpOutputMessage);
return mockHttpOutputMessage.getBodyAsString();
}
@SuppressWarnings("unchecked")
protected <T> T readResponse(ResultActions result, Class<T> responseClass) throws Exception {
byte[] content = result.andReturn().getResponse().getContentAsByteArray();
MockHttpInputMessage mockHttpInputMessage = new MockHttpInputMessage(content);
return (T) this.mappingJackson2HttpMessageConverter.read(responseClass, mockHttpInputMessage);
}
protected <T> T readResponse(ResultActions result, TypeReference<T> type) throws Exception {
byte[] content = result.andReturn().getResponse().getContentAsByteArray();
ObjectMapper mapper = new ObjectMapper();

5
dao/src/main/java/org/thingsboard/server/dao/asset/AssetSearchQuery.java → common/data/src/main/java/org/thingsboard/server/common/data/asset/AssetSearchQuery.java

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.asset;
package org.thingsboard.server.common.data.asset;
import lombok.Data;
import org.thingsboard.server.common.data.EntityType;
@ -22,7 +22,6 @@ import org.thingsboard.server.common.data.relation.EntityRelationsQuery;
import org.thingsboard.server.common.data.relation.EntityTypeFilter;
import org.thingsboard.server.common.data.relation.RelationsSearchParameters;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;
@ -33,9 +32,7 @@ import java.util.List;
public class AssetSearchQuery {
private RelationsSearchParameters parameters;
@Nullable
private String relationType;
@Nullable
private List<String> assetTypes;
public EntityRelationsQuery toEntitySearchQuery() {

2
dao/src/main/java/org/thingsboard/server/dao/device/DeviceSearchQuery.java → dao/src/main/java/org/thingsboard/server/common/data/device/DeviceSearchQuery.java

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.device;
package org.thingsboard.server.common.data.device;
import lombok.Data;
import org.thingsboard.server.common.data.EntityType;

1
dao/src/main/java/org/thingsboard/server/dao/asset/AssetService.java

@ -18,6 +18,7 @@ package org.thingsboard.server.dao.asset;
import com.google.common.util.concurrent.ListenableFuture;
import org.thingsboard.server.common.data.EntitySubtype;
import org.thingsboard.server.common.data.asset.Asset;
import org.thingsboard.server.common.data.asset.AssetSearchQuery;
import org.thingsboard.server.common.data.id.AssetId;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.TenantId;

1
dao/src/main/java/org/thingsboard/server/dao/asset/BaseAssetService.java

@ -29,6 +29,7 @@ import org.thingsboard.server.common.data.EntitySubtype;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.Tenant;
import org.thingsboard.server.common.data.asset.Asset;
import org.thingsboard.server.common.data.asset.AssetSearchQuery;
import org.thingsboard.server.common.data.id.AssetId;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.EntityId;

1
dao/src/main/java/org/thingsboard/server/dao/device/DeviceService.java

@ -18,6 +18,7 @@ package org.thingsboard.server.dao.device;
import com.google.common.util.concurrent.ListenableFuture;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.EntitySubtype;
import org.thingsboard.server.common.data.device.DeviceSearchQuery;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;

1
dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java

@ -25,6 +25,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import org.thingsboard.server.common.data.*;
import org.thingsboard.server.common.data.device.DeviceSearchQuery;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;

66
transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java

@ -16,6 +16,7 @@
package org.thingsboard.server.transport.mqtt;
import com.fasterxml.jackson.databind.JsonNode;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.mqtt.*;
@ -45,6 +46,8 @@ import org.thingsboard.server.transport.mqtt.util.SslUtil;
import javax.net.ssl.SSLPeerUnverifiedException;
import javax.security.cert.X509Certificate;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.List;
@ -71,6 +74,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
private final RelationService relationService;
private final SslHandler sslHandler;
private volatile boolean connected;
private volatile InetSocketAddress address;
private volatile GatewaySessionCtx gatewaySessionCtx;
public MqttTransportHandler(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, RelationService relationService,
@ -94,30 +98,36 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
}
private void processMqttMsg(ChannelHandlerContext ctx, MqttMessage msg) {
deviceSessionCtx.setChannel(ctx);
switch (msg.fixedHeader().messageType()) {
case CONNECT:
processConnect(ctx, (MqttConnectMessage) msg);
break;
case PUBLISH:
processPublish(ctx, (MqttPublishMessage) msg);
break;
case SUBSCRIBE:
processSubscribe(ctx, (MqttSubscribeMessage) msg);
break;
case UNSUBSCRIBE:
processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg);
break;
case PINGREQ:
if (checkConnected(ctx)) {
ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0)));
}
break;
case DISCONNECT:
if (checkConnected(ctx)) {
processDisconnect(ctx);
}
break;
address = (InetSocketAddress) ctx.channel().remoteAddress();
if (msg.fixedHeader() == null) {
log.info("[{}:{}] Invalid message received", address.getHostName(), address.getPort());
processDisconnect(ctx);
} else {
deviceSessionCtx.setChannel(ctx);
switch (msg.fixedHeader().messageType()) {
case CONNECT:
processConnect(ctx, (MqttConnectMessage) msg);
break;
case PUBLISH:
processPublish(ctx, (MqttPublishMessage) msg);
break;
case SUBSCRIBE:
processSubscribe(ctx, (MqttSubscribeMessage) msg);
break;
case UNSUBSCRIBE:
processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg);
break;
case PINGREQ:
if (checkConnected(ctx)) {
ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0)));
}
break;
case DISCONNECT:
if (checkConnected(ctx)) {
processDisconnect(ctx);
}
break;
}
}
}
@ -313,9 +323,11 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
private void processDisconnect(ChannelHandlerContext ctx) {
ctx.close();
processor.process(SessionCloseMsg.onDisconnect(deviceSessionCtx.getSessionId()));
if (gatewaySessionCtx != null) {
gatewaySessionCtx.onGatewayDisconnect();
if (connected) {
processor.process(SessionCloseMsg.onDisconnect(deviceSessionCtx.getSessionId()));
if (gatewaySessionCtx != null) {
gatewaySessionCtx.onGatewayDisconnect();
}
}
}

Loading…
Cancel
Save