|
|
|
@ -20,9 +20,6 @@ import com.google.common.util.concurrent.ListenableFuture; |
|
|
|
import com.google.common.util.concurrent.MoreExecutors; |
|
|
|
import lombok.extern.slf4j.Slf4j; |
|
|
|
import org.springframework.beans.factory.annotation.Autowired; |
|
|
|
import org.springframework.beans.factory.annotation.Qualifier; |
|
|
|
import org.springframework.boot.context.properties.ConfigurationProperties; |
|
|
|
import org.springframework.context.annotation.Bean; |
|
|
|
import org.springframework.stereotype.Service; |
|
|
|
import org.springframework.transaction.annotation.Transactional; |
|
|
|
import org.springframework.transaction.event.TransactionalEventListener; |
|
|
|
@ -108,7 +105,7 @@ public class DeviceServiceImpl extends AbstractCachedEntityService<DeviceCacheKe |
|
|
|
public static final String INCORRECT_DEVICE_ID = "Incorrect deviceId "; |
|
|
|
public static final String INCORRECT_EDGE_ID = "Incorrect edgeId "; |
|
|
|
public static final String PAYLOAD = "\"{temperature:25}\""; |
|
|
|
|
|
|
|
public static final String NOT_SUPPORTED = "Not supported"; |
|
|
|
|
|
|
|
@Autowired |
|
|
|
private DeviceDao deviceDao; |
|
|
|
@ -128,45 +125,8 @@ public class DeviceServiceImpl extends AbstractCachedEntityService<DeviceCacheKe |
|
|
|
@Autowired |
|
|
|
private EntityCountService countService; |
|
|
|
|
|
|
|
@Bean |
|
|
|
@ConfigurationProperties(prefix = "transport.mqtt") |
|
|
|
public DeviceConnectivityConfiguration mqttDeviceConnectivityProperties() { |
|
|
|
return new DeviceConnectivityConfiguration (); |
|
|
|
} |
|
|
|
|
|
|
|
@Bean |
|
|
|
@ConfigurationProperties(prefix = "transport.mqtt.ssl") |
|
|
|
public DeviceConnectivityConfiguration mqttsDeviceConnectivityProperties() { |
|
|
|
return new DeviceConnectivityConfiguration (); |
|
|
|
} |
|
|
|
|
|
|
|
@Bean |
|
|
|
@ConfigurationProperties(prefix = "transport.coap") |
|
|
|
public DeviceConnectivityConfiguration coapDeviceConnectivityProperties() { |
|
|
|
return new DeviceConnectivityConfiguration (); |
|
|
|
} |
|
|
|
|
|
|
|
@Bean |
|
|
|
@ConfigurationProperties(prefix = "transport.coap.dtls") |
|
|
|
public DeviceConnectivityConfiguration coapsDeviceConnectivityProperties() { |
|
|
|
return new DeviceConnectivityConfiguration (); |
|
|
|
} |
|
|
|
|
|
|
|
@Autowired |
|
|
|
@Qualifier("mqttDeviceConnectivityProperties") |
|
|
|
private DeviceConnectivityConfiguration mqttProperties; |
|
|
|
|
|
|
|
@Autowired |
|
|
|
@Qualifier("mqttsDeviceConnectivityProperties") |
|
|
|
private DeviceConnectivityConfiguration mqttsProperties; |
|
|
|
|
|
|
|
@Autowired |
|
|
|
@Qualifier("coapDeviceConnectivityProperties") |
|
|
|
private DeviceConnectivityConfiguration coapProperties; |
|
|
|
|
|
|
|
@Autowired |
|
|
|
@Qualifier("coapsDeviceConnectivityProperties") |
|
|
|
private DeviceConnectivityConfiguration coapsProperties; |
|
|
|
private DeviceConnectivityConfiguration deviceConnectivityConfiguration; |
|
|
|
|
|
|
|
@Override |
|
|
|
public DeviceInfo findDeviceInfoById(TenantId tenantId, DeviceId deviceId) { |
|
|
|
@ -191,24 +151,11 @@ public class DeviceServiceImpl extends AbstractCachedEntityService<DeviceCacheKe |
|
|
|
|
|
|
|
switch (transportType) { |
|
|
|
case DEFAULT: |
|
|
|
switch (credentialsType) { |
|
|
|
case ACCESS_TOKEN: |
|
|
|
commands.put("http", getHttpPublishCommand(baseUrl, creds)); |
|
|
|
commands.put("mqtt", getMqttPublishCommand(mqttProperties.getDeviceConnectivityHost(), mqttProperties.getDeviceConnectivityPort(), creds)); |
|
|
|
commands.put("mqtts", getMqttPublishCommand(mqttsProperties.getDeviceConnectivityHost(), mqttsProperties.getDeviceConnectivityPort(), creds)); |
|
|
|
commands.put("coap", getCoapPublishCommand(coapProperties.getDeviceConnectivityHost(), coapProperties.getDeviceConnectivityPort(), creds)); |
|
|
|
commands.put("coaps", getCoapPublishCommand(coapsProperties.getDeviceConnectivityHost(), coapsProperties.getDeviceConnectivityPort(), creds)); break; |
|
|
|
case MQTT_BASIC: |
|
|
|
commands.put("mqtt", getMqttPublishCommand(mqttProperties.getDeviceConnectivityHost(), mqttProperties.getDeviceConnectivityPort(), creds)); |
|
|
|
commands.put("mqtts", getMqttPublishCommand(mqttsProperties.getDeviceConnectivityHost(), mqttsProperties.getDeviceConnectivityPort(), creds)); |
|
|
|
break; |
|
|
|
case X509_CERTIFICATE: |
|
|
|
commands.put("mqtt", getMqttPublishCommand(mqttProperties.getDeviceConnectivityHost(), mqttProperties.getDeviceConnectivityPort(), creds)); |
|
|
|
commands.put("mqtts", getMqttPublishCommand(mqttsProperties.getDeviceConnectivityHost(), mqttsProperties.getDeviceConnectivityPort(), creds)); |
|
|
|
commands.put("coap", getCoapPublishCommand(coapProperties.getDeviceConnectivityHost(), coapProperties.getDeviceConnectivityPort(), creds)); |
|
|
|
commands.put("coaps", getCoapPublishCommand(coapsProperties.getDeviceConnectivityHost(), coapsProperties.getDeviceConnectivityPort(), creds)); |
|
|
|
break; |
|
|
|
} |
|
|
|
Optional.ofNullable(getHttpPublishCommand(baseUrl, creds)).ifPresent(v -> commands.put("http", v)); |
|
|
|
Optional.ofNullable(getMqttPublishCommand(creds)).ifPresent(v -> commands.put("mqtt", v)); |
|
|
|
Optional.ofNullable(getMqttsPublishCommand(creds)).ifPresent(v -> commands.put("mqtts", v)); |
|
|
|
Optional.ofNullable(getCoapPublishCommand(creds)).ifPresent(v -> commands.put("coap", v)); |
|
|
|
Optional.ofNullable(getCoapsPublishCommand(creds)).ifPresent(v -> commands.put("coaps", v)); |
|
|
|
break; |
|
|
|
case MQTT: |
|
|
|
MqttDeviceProfileTransportConfiguration transportConfiguration = |
|
|
|
@ -217,25 +164,22 @@ public class DeviceServiceImpl extends AbstractCachedEntityService<DeviceCacheKe |
|
|
|
TransportPayloadType payloadType = transportConfiguration.getTransportPayloadTypeConfiguration().getTransportPayloadType(); |
|
|
|
String payload = (payloadType == TransportPayloadType.PROTOBUF) ? " -f protobufFileName" : " -m " + PAYLOAD; |
|
|
|
|
|
|
|
commands.put("mqtt", getMqttPublishCommand(mqttProperties.getDeviceConnectivityHost(), mqttProperties.getDeviceConnectivityPort(), |
|
|
|
topicName, creds, payload)); |
|
|
|
commands.put("mqtts", getMqttPublishCommand(mqttProperties.getDeviceConnectivityHost(), mqttProperties.getDeviceConnectivityPort(), |
|
|
|
topicName, creds, payload)); |
|
|
|
Optional.ofNullable(getMqttPublishCommand(topicName, creds, payload)).ifPresent(v -> commands.put("mqtt", v)); |
|
|
|
Optional.ofNullable(getMqttsPublishCommand(topicName, creds, payload)).ifPresent(v -> commands.put("mqtts", v)); |
|
|
|
break; |
|
|
|
case COAP: |
|
|
|
CoapDeviceProfileTransportConfiguration coapTransportConfiguration = |
|
|
|
(CoapDeviceProfileTransportConfiguration) deviceProfile.getProfileData().getTransportConfiguration(); |
|
|
|
CoapDeviceTypeConfiguration coapConfiguration = coapTransportConfiguration.getCoapDeviceTypeConfiguration(); |
|
|
|
if (coapConfiguration instanceof DefaultCoapDeviceTypeConfiguration) { |
|
|
|
commands.put("coap", getCoapPublishCommand(coapProperties.getDeviceConnectivityHost(), coapProperties.getDeviceConnectivityPort(), creds)); |
|
|
|
commands.put("coaps", getCoapPublishCommand(coapsProperties.getDeviceConnectivityHost(), coapsProperties.getDeviceConnectivityPort(), creds)); |
|
|
|
Optional.ofNullable(getCoapPublishCommand(creds)).ifPresent(v -> commands.put("coap", v)); |
|
|
|
Optional.ofNullable(getCoapsPublishCommand(creds)).ifPresent(v -> commands.put("coaps", v)); |
|
|
|
} else if (coapConfiguration instanceof EfentoCoapDeviceTypeConfiguration) { |
|
|
|
commands.put("coap", "Not supported"); |
|
|
|
commands.put("coaps", "Not supported"); |
|
|
|
commands.put("coap for efento", "Not supported"); |
|
|
|
} |
|
|
|
break; |
|
|
|
default: |
|
|
|
commands.put(transportType.name(), "Not supported"); |
|
|
|
commands.put(transportType.name(), NOT_SUPPORTED); |
|
|
|
} |
|
|
|
return commands; |
|
|
|
} |
|
|
|
@ -800,18 +744,61 @@ public class DeviceServiceImpl extends AbstractCachedEntityService<DeviceCacheKe |
|
|
|
} |
|
|
|
|
|
|
|
private String getHttpPublishCommand(String baseurl, DeviceCredentials deviceCredentials) { |
|
|
|
return String.format("curl -v -X POST %s/api/v1/%s/telemetry --header Content-Type:application/json --data " + PAYLOAD, |
|
|
|
baseurl, deviceCredentials.getCredentialsId()); |
|
|
|
if (deviceCredentials.getCredentialsType() == DeviceCredentialsType.ACCESS_TOKEN) { |
|
|
|
return String.format("curl -v -X POST %s/api/v1/%s/telemetry --header Content-Type:application/json --data " + PAYLOAD, |
|
|
|
baseurl, deviceCredentials.getCredentialsId()); |
|
|
|
} |
|
|
|
return null; |
|
|
|
} |
|
|
|
|
|
|
|
private String getMqttPublishCommand(String host, Integer port, DeviceCredentials deviceCredentials) { |
|
|
|
return getMqttPublishCommand(host, port, "v1/devices/me/telemetry", deviceCredentials, " -m " + PAYLOAD); |
|
|
|
private String getMqttPublishCommand(DeviceCredentials deviceCredentials) { |
|
|
|
return getMqttPublishCommand("v1/devices/me/telemetry", deviceCredentials, " -m " + PAYLOAD); |
|
|
|
} |
|
|
|
|
|
|
|
private String getMqttPublishCommand(String host, Integer port, String deviceTelemetryTopic, DeviceCredentials deviceCredentials, String payload) { |
|
|
|
private String getMqttPublishCommand(String deviceTelemetryTopic, DeviceCredentials deviceCredentials, String payload) { |
|
|
|
DeviceConnectivityInfo mqttProps = deviceConnectivityConfiguration.getConnectivity().get("mqtt"); |
|
|
|
|
|
|
|
StringBuilder command = new StringBuilder("mosquitto_pub -d -q 1"); |
|
|
|
command.append(" -h ").append(host); |
|
|
|
command.append(" -p ").append(port); |
|
|
|
command.append(" -h ").append(mqttProps.getHost()); |
|
|
|
command.append(" -p ").append(mqttProps.getPort()); |
|
|
|
command.append(" -t ").append(deviceTelemetryTopic); |
|
|
|
|
|
|
|
switch (deviceCredentials.getCredentialsType()) { |
|
|
|
case ACCESS_TOKEN: |
|
|
|
command.append(" -u ").append(deviceCredentials.getCredentialsId()); |
|
|
|
break; |
|
|
|
case MQTT_BASIC: |
|
|
|
BasicMqttCredentials credentials = JacksonUtil.fromString(deviceCredentials.getCredentialsValue(), |
|
|
|
BasicMqttCredentials.class); |
|
|
|
if (credentials != null) { |
|
|
|
if (credentials.getClientId() != null) { |
|
|
|
command.append(" -i ").append(credentials.getClientId()); |
|
|
|
} |
|
|
|
if (credentials.getUserName() != null) { |
|
|
|
command.append(" -u ").append(credentials.getUserName()); |
|
|
|
} |
|
|
|
if (credentials.getPassword() != null) { |
|
|
|
command.append(" -P ").append(credentials.getPassword()); |
|
|
|
} |
|
|
|
} |
|
|
|
break; |
|
|
|
default: |
|
|
|
return null; |
|
|
|
} |
|
|
|
command.append(payload); |
|
|
|
return command.toString(); |
|
|
|
} |
|
|
|
|
|
|
|
private String getMqttsPublishCommand(DeviceCredentials deviceCredentials) { |
|
|
|
return getMqttsPublishCommand("v1/devices/me/telemetry", deviceCredentials, " -m " + PAYLOAD); |
|
|
|
} |
|
|
|
|
|
|
|
private String getMqttsPublishCommand(String deviceTelemetryTopic, DeviceCredentials deviceCredentials, String payload) { |
|
|
|
DeviceConnectivityInfo mqttsProps = deviceConnectivityConfiguration.getConnectivity().get("mqtts"); |
|
|
|
|
|
|
|
StringBuilder command = new StringBuilder("mosquitto_pub --cafile tb-server-chain.pem -d -q 1"); |
|
|
|
command.append(" -h ").append(mqttsProps.getHost()); |
|
|
|
command.append(" -p ").append(mqttsProps.getPort()); |
|
|
|
command.append(" -t ").append(deviceTelemetryTopic); |
|
|
|
|
|
|
|
switch (deviceCredentials.getCredentialsType()) { |
|
|
|
@ -831,25 +818,40 @@ public class DeviceServiceImpl extends AbstractCachedEntityService<DeviceCacheKe |
|
|
|
if (credentials.getPassword() != null) { |
|
|
|
command.append(" -P ").append(credentials.getPassword()); |
|
|
|
} |
|
|
|
} else { |
|
|
|
return null; |
|
|
|
} |
|
|
|
break; |
|
|
|
case X509_CERTIFICATE: |
|
|
|
command.append(" --cafile server.pem --key key.pem --cert cert.pem"); |
|
|
|
command.append(" --key key.pem --cert cert.pem"); |
|
|
|
break; |
|
|
|
default: |
|
|
|
return null; |
|
|
|
} |
|
|
|
command.append(payload); |
|
|
|
return command.toString(); |
|
|
|
} |
|
|
|
|
|
|
|
private String getCoapPublishCommand(String host, Integer port, DeviceCredentials deviceCredentials) { |
|
|
|
private String getCoapPublishCommand(DeviceCredentials deviceCredentials) { |
|
|
|
DeviceConnectivityInfo coapProperties = deviceConnectivityConfiguration.getConnectivity().get("coap"); |
|
|
|
if (deviceCredentials.getCredentialsType() == DeviceCredentialsType.ACCESS_TOKEN) { |
|
|
|
return String.format("coap-client -m POST coap://%s:%s/api/v1/%s/telemetry -t json -e %s", |
|
|
|
coapProperties.getHost(), coapProperties.getPort(), deviceCredentials.getCredentialsId(), PAYLOAD); |
|
|
|
} |
|
|
|
return null; |
|
|
|
} |
|
|
|
|
|
|
|
private String getCoapsPublishCommand(DeviceCredentials deviceCredentials) { |
|
|
|
DeviceConnectivityInfo coapsProperties = deviceConnectivityConfiguration.getConnectivity().get("coaps"); |
|
|
|
switch (deviceCredentials.getCredentialsType()) { |
|
|
|
case ACCESS_TOKEN: |
|
|
|
return String.format("coap-client -m post coap://%s:%s/api/v1/%s/telemetry -t json -e %s", |
|
|
|
host, port, deviceCredentials.getCredentialsId(), PAYLOAD); |
|
|
|
return String.format("coap-client-openssl -v 9 -m POST coaps://%s:%s/api/v1/%s/telemetry -t json -e %s ", |
|
|
|
coapsProperties.getHost(), coapsProperties.getPort(), deviceCredentials.getCredentialsId(), PAYLOAD); |
|
|
|
case X509_CERTIFICATE: |
|
|
|
return String.format("coap-client-openssl -v 9 -c cert.pem -j key.pem -m POST -t json -e %s " + |
|
|
|
"coaps://%s:%s/api/v1/telemetry", PAYLOAD, host, port); |
|
|
|
return String.format("coap-client-openssl -v 9 -c cert.pem -j key.pem -m POST " + |
|
|
|
"coaps://%s:%s/api/v1/telemetry -t json -e %s ", coapsProperties.getHost(), coapsProperties.getPort(), PAYLOAD); |
|
|
|
default: |
|
|
|
return "Not supported"; |
|
|
|
return null; |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|