|
|
|
@ -19,6 +19,7 @@ import com.google.gson.JsonParseException; |
|
|
|
import com.google.protobuf.Descriptors; |
|
|
|
import com.google.protobuf.DynamicMessage; |
|
|
|
import lombok.Data; |
|
|
|
import lombok.RequiredArgsConstructor; |
|
|
|
import lombok.extern.slf4j.Slf4j; |
|
|
|
import org.eclipse.californium.core.coap.CoAP; |
|
|
|
import org.eclipse.californium.core.coap.Request; |
|
|
|
@ -53,6 +54,9 @@ import org.thingsboard.server.common.transport.adaptor.AdaptorException; |
|
|
|
import org.thingsboard.server.common.transport.adaptor.JsonConverter; |
|
|
|
import org.thingsboard.server.gen.transport.TransportProtos; |
|
|
|
import org.thingsboard.server.transport.coap.adaptors.CoapTransportAdaptor; |
|
|
|
import org.thingsboard.server.transport.coap.callback.CoapDeviceAuthCallback; |
|
|
|
import org.thingsboard.server.transport.coap.callback.CoapNoOpCallback; |
|
|
|
import org.thingsboard.server.transport.coap.callback.CoapOkCallback; |
|
|
|
|
|
|
|
import java.util.List; |
|
|
|
import java.util.Map; |
|
|
|
@ -81,9 +85,8 @@ public class CoapTransportResource extends AbstractCoapTransportResource { |
|
|
|
private final Set<UUID> rpcSubscriptions = ConcurrentHashMap.newKeySet(); |
|
|
|
private final Set<UUID> attributeSubscriptions = ConcurrentHashMap.newKeySet(); |
|
|
|
|
|
|
|
private ConcurrentMap<String, TbCoapDtlsSessionInfo> dtlsSessionIdMap; |
|
|
|
private long timeout; |
|
|
|
private long sessionReportTimeout; |
|
|
|
private final ConcurrentMap<String, TbCoapDtlsSessionInfo> dtlsSessionIdMap; |
|
|
|
private final long timeout; |
|
|
|
|
|
|
|
public CoapTransportResource(CoapTransportContext ctx, CoapServerService coapServerService, String name) { |
|
|
|
super(ctx, name); |
|
|
|
@ -91,7 +94,7 @@ public class CoapTransportResource extends AbstractCoapTransportResource { |
|
|
|
this.addObserver(new CoapResourceObserver()); |
|
|
|
this.dtlsSessionIdMap = coapServerService.getDtlsSessionsMap(); |
|
|
|
this.timeout = coapServerService.getTimeout(); |
|
|
|
this.sessionReportTimeout = ctx.getSessionReportTimeout(); |
|
|
|
long sessionReportTimeout = ctx.getSessionReportTimeout(); |
|
|
|
ctx.getScheduler().scheduleAtFixedRate(() -> { |
|
|
|
Set<CoapObserveSessionInfo> coapObserveSessionInfos = sessionInfoToObserveRelationMap.keySet(); |
|
|
|
Set<TransportProtos.SessionInfoProto> observeSessions = coapObserveSessionInfos |
|
|
|
@ -110,7 +113,6 @@ public class CoapTransportResource extends AbstractCoapTransportResource { |
|
|
|
return; // because request did not try to establish a relation
|
|
|
|
} |
|
|
|
if (CoAP.ResponseCode.isSuccess(response.getCode())) { |
|
|
|
|
|
|
|
if (!relation.isEstablished()) { |
|
|
|
relation.setEstablished(); |
|
|
|
addObserveRelation(relation); |
|
|
|
@ -280,8 +282,8 @@ public class CoapTransportResource extends AbstractCoapTransportResource { |
|
|
|
CoapObserveSessionInfo currentCoapObserveAttrSessionInfo = tokenToCoapSessionInfoMap.get(getTokenFromRequest(request)); |
|
|
|
if (currentCoapObserveAttrSessionInfo == null) { |
|
|
|
attributeSubscriptions.add(sessionId); |
|
|
|
registerAsyncCoapSession(exchange, sessionInfo, coapTransportAdaptor, |
|
|
|
transportConfigurationContainer.getRpcRequestDynamicMessageBuilder(), getTokenFromRequest(request)); |
|
|
|
registerAsyncCoapSession(exchange, coapTransportAdaptor, transportConfigurationContainer.getRpcRequestDynamicMessageBuilder(), |
|
|
|
sessionInfo, getTokenFromRequest(request)); |
|
|
|
transportService.process(sessionInfo, |
|
|
|
TransportProtos.SubscribeToAttributeUpdatesMsg.getDefaultInstance(), new CoapNoOpCallback(exchange)); |
|
|
|
transportService.process(sessionInfo, |
|
|
|
@ -305,11 +307,11 @@ public class CoapTransportResource extends AbstractCoapTransportResource { |
|
|
|
CoapObserveSessionInfo currentCoapObserveRpcSessionInfo = tokenToCoapSessionInfoMap.get(getTokenFromRequest(request)); |
|
|
|
if (currentCoapObserveRpcSessionInfo == null) { |
|
|
|
rpcSubscriptions.add(sessionId); |
|
|
|
registerAsyncCoapSession(exchange, sessionInfo, coapTransportAdaptor, |
|
|
|
transportConfigurationContainer.getRpcRequestDynamicMessageBuilder(), getTokenFromRequest(request)); |
|
|
|
registerAsyncCoapSession(exchange, coapTransportAdaptor, transportConfigurationContainer.getRpcRequestDynamicMessageBuilder() |
|
|
|
, sessionInfo, getTokenFromRequest(request)); |
|
|
|
transportService.process(sessionInfo, |
|
|
|
TransportProtos.SubscribeToRPCMsg.getDefaultInstance(), |
|
|
|
new CoapNoOpCallback(exchange) |
|
|
|
new CoapOkCallback(exchange, CoAP.ResponseCode.VALID, CoAP.ResponseCode.INTERNAL_SERVER_ERROR) |
|
|
|
); |
|
|
|
} |
|
|
|
break; |
|
|
|
@ -359,14 +361,16 @@ public class CoapTransportResource extends AbstractCoapTransportResource { |
|
|
|
return tokenToCoapSessionInfoMap.remove(token); |
|
|
|
} |
|
|
|
|
|
|
|
private void registerAsyncCoapSession(CoapExchange exchange, TransportProtos.SessionInfoProto sessionInfo, CoapTransportAdaptor coapTransportAdaptor, DynamicMessage.Builder rpcRequestDynamicMessageBuilder, String token) { |
|
|
|
private void registerAsyncCoapSession(CoapExchange exchange, CoapTransportAdaptor coapTransportAdaptor, |
|
|
|
DynamicMessage.Builder rpcRequestDynamicMessageBuilder, TransportProtos.SessionInfoProto sessionInfo, String token) { |
|
|
|
tokenToCoapSessionInfoMap.putIfAbsent(token, new CoapObserveSessionInfo(sessionInfo)); |
|
|
|
transportService.registerAsyncSession(sessionInfo, getCoapSessionListener(exchange, coapTransportAdaptor, rpcRequestDynamicMessageBuilder, sessionInfo)); |
|
|
|
transportService.process(sessionInfo, getSessionEventMsg(TransportProtos.SessionEvent.OPEN), null); |
|
|
|
} |
|
|
|
|
|
|
|
private CoapSessionListener getCoapSessionListener(CoapExchange exchange, CoapTransportAdaptor coapTransportAdaptor, DynamicMessage.Builder rpcRequestDynamicMessageBuilder, TransportProtos.SessionInfoProto sessionInfo) { |
|
|
|
return new CoapSessionListener(this, exchange, coapTransportAdaptor, rpcRequestDynamicMessageBuilder, sessionInfo); |
|
|
|
private CoapSessionListener getCoapSessionListener(CoapExchange exchange, CoapTransportAdaptor coapTransportAdaptor, |
|
|
|
DynamicMessage.Builder rpcRequestDynamicMessageBuilder, TransportProtos.SessionInfoProto sessionInfo) { |
|
|
|
return new CoapSessionListener(exchange, coapTransportAdaptor, rpcRequestDynamicMessageBuilder, sessionInfo); |
|
|
|
} |
|
|
|
|
|
|
|
private String getTokenFromRequest(Request request) { |
|
|
|
@ -448,22 +452,14 @@ public class CoapTransportResource extends AbstractCoapTransportResource { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
@RequiredArgsConstructor |
|
|
|
private class CoapSessionListener implements SessionMsgListener { |
|
|
|
|
|
|
|
private final CoapTransportResource coapTransportResource; |
|
|
|
private final CoapExchange exchange; |
|
|
|
private final CoapTransportAdaptor coapTransportAdaptor; |
|
|
|
private final DynamicMessage.Builder rpcRequestDynamicMessageBuilder; |
|
|
|
private final TransportProtos.SessionInfoProto sessionInfo; |
|
|
|
|
|
|
|
CoapSessionListener(CoapTransportResource coapTransportResource, CoapExchange exchange, CoapTransportAdaptor coapTransportAdaptor, DynamicMessage.Builder rpcRequestDynamicMessageBuilder, TransportProtos.SessionInfoProto sessionInfo) { |
|
|
|
this.coapTransportResource = coapTransportResource; |
|
|
|
this.exchange = exchange; |
|
|
|
this.coapTransportAdaptor = coapTransportAdaptor; |
|
|
|
this.rpcRequestDynamicMessageBuilder = rpcRequestDynamicMessageBuilder; |
|
|
|
this.sessionInfo = sessionInfo; |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public void onGetAttributesResponse(TransportProtos.GetAttributeResponseMsg msg) { |
|
|
|
try { |
|
|
|
@ -497,7 +493,9 @@ public class CoapTransportResource extends AbstractCoapTransportResource { |
|
|
|
public void onToDeviceRpcRequest(UUID sessionId, TransportProtos.ToDeviceRpcRequestMsg msg) { |
|
|
|
log.trace("[{}] Received RPC command to device", sessionId); |
|
|
|
try { |
|
|
|
int requestId = respond(coapTransportAdaptor.convertToPublish(isConRequest(), msg, rpcRequestDynamicMessageBuilder), exchange, sessionInfo); |
|
|
|
Response response = coapTransportAdaptor.convertToPublish(isConRequest(), msg, rpcRequestDynamicMessageBuilder); |
|
|
|
int requestId = getNextMsgId(); |
|
|
|
response.setMID(requestId); |
|
|
|
if (msg.getPersisted()) { |
|
|
|
transportContext.getRpcAwaitingAck().put(requestId, msg); |
|
|
|
transportContext.getScheduler().schedule(() -> { |
|
|
|
@ -507,6 +505,13 @@ public class CoapTransportResource extends AbstractCoapTransportResource { |
|
|
|
} |
|
|
|
}, Math.max(0, msg.getExpirationTime() - System.currentTimeMillis()), TimeUnit.MILLISECONDS); |
|
|
|
} |
|
|
|
response.addMessageObserver(new TbCoapMessageObserver(requestId, id -> { |
|
|
|
TransportProtos.ToDeviceRpcRequestMsg rpcRequestMsg = transportContext.getRpcAwaitingAck().remove(id); |
|
|
|
if (rpcRequestMsg != null) { |
|
|
|
transportService.process(sessionInfo, rpcRequestMsg, false, TransportServiceCallback.EMPTY); |
|
|
|
} |
|
|
|
})); |
|
|
|
exchange.respond(response); |
|
|
|
} catch (AdaptorException e) { |
|
|
|
log.trace("Failed to reply due to error", e); |
|
|
|
closeObserveRelationAndNotify(sessionId, CoAP.ResponseCode.INTERNAL_SERVER_ERROR); |
|
|
|
@ -529,8 +534,8 @@ public class CoapTransportResource extends AbstractCoapTransportResource { |
|
|
|
} |
|
|
|
|
|
|
|
private void closeObserveRelationAndNotify(UUID sessionId, CoAP.ResponseCode responseCode) { |
|
|
|
Map<CoapObserveSessionInfo, ObserveRelation> sessionToObserveRelationMap = coapTransportResource.getCoapSessionInfoToObserveRelationMap(); |
|
|
|
if (coapTransportResource.getObserverCount() > 0 && !CollectionUtils.isEmpty(sessionToObserveRelationMap)) { |
|
|
|
Map<CoapObserveSessionInfo, ObserveRelation> sessionToObserveRelationMap = CoapTransportResource.this.getCoapSessionInfoToObserveRelationMap(); |
|
|
|
if (CoapTransportResource.this.getObserverCount() > 0 && !CollectionUtils.isEmpty(sessionToObserveRelationMap)) { |
|
|
|
Optional<CoapObserveSessionInfo> observeSessionToClose = sessionToObserveRelationMap.keySet().stream().filter(coapObserveSessionInfo -> { |
|
|
|
TransportProtos.SessionInfoProto sessionToDelete = coapObserveSessionInfo.getSessionInfoProto(); |
|
|
|
UUID observeSessionId = new UUID(sessionToDelete.getSessionIdMSB(), sessionToDelete.getSessionIdLSB()); |
|
|
|
@ -539,16 +544,16 @@ public class CoapTransportResource extends AbstractCoapTransportResource { |
|
|
|
if (observeSessionToClose.isPresent()) { |
|
|
|
CoapObserveSessionInfo coapObserveSessionInfo = observeSessionToClose.get(); |
|
|
|
ObserveRelation observeRelation = sessionToObserveRelationMap.get(coapObserveSessionInfo); |
|
|
|
coapTransportResource.clearAndNotifyObserveRelation(observeRelation, responseCode); |
|
|
|
CoapTransportResource.this.clearAndNotifyObserveRelation(observeRelation, responseCode); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private void closeAndDeregister() { |
|
|
|
Request request = exchange.advanced().getRequest(); |
|
|
|
String token = coapTransportResource.getTokenFromRequest(request); |
|
|
|
CoapObserveSessionInfo deleted = coapTransportResource.lookupAsyncSessionInfo(token); |
|
|
|
coapTransportResource.closeAndDeregister(deleted.getSessionInfoProto()); |
|
|
|
String token = CoapTransportResource.this.getTokenFromRequest(request); |
|
|
|
CoapObserveSessionInfo deleted = CoapTransportResource.this.lookupAsyncSessionInfo(token); |
|
|
|
CoapTransportResource.this.closeAndDeregister(deleted.getSessionInfoProto()); |
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|