|
|
|
@ -15,25 +15,43 @@ |
|
|
|
*/ |
|
|
|
package org.thingsboard.server.transport.mqtt.session; |
|
|
|
|
|
|
|
import io.netty.handler.codec.mqtt.MqttMessage; |
|
|
|
import com.google.gson.Gson; |
|
|
|
import com.google.gson.JsonElement; |
|
|
|
import com.google.gson.JsonObject; |
|
|
|
import io.netty.buffer.ByteBuf; |
|
|
|
import io.netty.buffer.ByteBufAllocator; |
|
|
|
import io.netty.buffer.UnpooledByteBufAllocator; |
|
|
|
import io.netty.handler.codec.mqtt.*; |
|
|
|
import org.thingsboard.server.common.data.Device; |
|
|
|
import org.thingsboard.server.common.data.id.SessionId; |
|
|
|
import org.thingsboard.server.common.msg.core.AttributesUpdateNotification; |
|
|
|
import org.thingsboard.server.common.msg.core.ResponseMsg; |
|
|
|
import org.thingsboard.server.common.msg.core.ToDeviceRpcRequestMsg; |
|
|
|
import org.thingsboard.server.common.msg.kv.AttributesKVMsg; |
|
|
|
import org.thingsboard.server.common.msg.session.*; |
|
|
|
import org.thingsboard.server.common.msg.session.ex.SessionException; |
|
|
|
import org.thingsboard.server.common.transport.adaptor.JsonConverter; |
|
|
|
import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext; |
|
|
|
import org.thingsboard.server.transport.mqtt.MqttTopics; |
|
|
|
import org.thingsboard.server.transport.mqtt.MqttTransportHandler; |
|
|
|
|
|
|
|
import java.nio.charset.Charset; |
|
|
|
import java.util.Optional; |
|
|
|
import java.util.concurrent.atomic.AtomicInteger; |
|
|
|
|
|
|
|
/** |
|
|
|
* Created by ashvayka on 19.01.17. |
|
|
|
*/ |
|
|
|
public class GatewayDeviceSessionCtx extends DeviceAwareSessionContext { |
|
|
|
|
|
|
|
private static final Gson GSON = new Gson(); |
|
|
|
private static final Charset UTF8 = Charset.forName("UTF-8"); |
|
|
|
private static final ByteBufAllocator ALLOCATOR = new UnpooledByteBufAllocator(false); |
|
|
|
|
|
|
|
private GatewaySessionCtx parent; |
|
|
|
private final MqttSessionId sessionId; |
|
|
|
private volatile boolean closed; |
|
|
|
private AtomicInteger msgIdSeq = new AtomicInteger(0); |
|
|
|
|
|
|
|
public GatewayDeviceSessionCtx(GatewaySessionCtx parent, Device device) { |
|
|
|
super(parent.getProcessor(), parent.getAuthService(), device); |
|
|
|
@ -70,6 +88,12 @@ public class GatewayDeviceSessionCtx extends DeviceAwareSessionContext { |
|
|
|
} |
|
|
|
} |
|
|
|
break; |
|
|
|
case ATTRIBUTES_UPDATE_NOTIFICATION: |
|
|
|
AttributesUpdateNotification notification = (AttributesUpdateNotification) msg; |
|
|
|
return Optional.of(createMqttPublishMsg(MqttTopics.GATEWAY_ATTRIBUTES_TOPIC, notification.getData())); |
|
|
|
case TO_DEVICE_RPC_REQUEST: |
|
|
|
ToDeviceRpcRequestMsg rpcRequest = (ToDeviceRpcRequestMsg) msg; |
|
|
|
return Optional.of(createMqttPublishMsg(MqttTopics.GATEWAY_RPC_TOPIC, rpcRequest)); |
|
|
|
} |
|
|
|
return Optional.empty(); |
|
|
|
} |
|
|
|
@ -92,4 +116,28 @@ public class GatewayDeviceSessionCtx extends DeviceAwareSessionContext { |
|
|
|
public long getTimeout() { |
|
|
|
return 0; |
|
|
|
} |
|
|
|
|
|
|
|
private MqttMessage createMqttPublishMsg(String topic, AttributesKVMsg data) { |
|
|
|
JsonObject result = new JsonObject(); |
|
|
|
result.addProperty("device", device.getName()); |
|
|
|
result.add("data", JsonConverter.toJson(data, false)); |
|
|
|
return createMqttPublishMsg(topic, result); |
|
|
|
} |
|
|
|
|
|
|
|
private MqttMessage createMqttPublishMsg(String topic, ToDeviceRpcRequestMsg data) { |
|
|
|
JsonObject result = new JsonObject(); |
|
|
|
result.addProperty("device", device.getName()); |
|
|
|
result.add("data", JsonConverter.toJson(data, true)); |
|
|
|
return createMqttPublishMsg(topic, result); |
|
|
|
} |
|
|
|
|
|
|
|
private MqttPublishMessage createMqttPublishMsg(String topic, JsonElement json) { |
|
|
|
MqttFixedHeader mqttFixedHeader = |
|
|
|
new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_LEAST_ONCE, false, 0); |
|
|
|
MqttPublishVariableHeader header = new MqttPublishVariableHeader(topic, msgIdSeq.incrementAndGet()); |
|
|
|
ByteBuf payload = ALLOCATOR.buffer(); |
|
|
|
payload.writeBytes(GSON.toJson(json).getBytes(UTF8)); |
|
|
|
return new MqttPublishMessage(mqttFixedHeader, header, payload); |
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|