@ -39,8 +39,8 @@ import org.thingsboard.server.common.transport.auth.SessionInfoCreator;
import org.thingsboard.server.gen.transport.TransportProtos ;
import org.thingsboard.server.gen.transport.coap.ConfigProtos ;
import org.thingsboard.server.gen.transport.coap.DeviceInfoProtos ;
import org.thingsboard.server.gen.transport.coap.MeasurementTypeProtos ;
import org.thingsboard.server.gen.transport.coap.MeasurementsProtos ;
import org.thingsboard.server.gen.transport.coap.MeasurementsProtos.ProtoChannel ;
import org.thingsboard.server.transport.coap.AbstractCoapTransportResource ;
import org.thingsboard.server.transport.coap.CoapTransportContext ;
import org.thingsboard.server.transport.coap.callback.CoapDeviceAuthCallback ;
@ -49,19 +49,21 @@ import org.thingsboard.server.transport.coap.efento.utils.CoapEfentoUtils;
import java.nio.ByteBuffer ;
import java.text.SimpleDateFormat ;
import java.util.ArrayList ;
import java.util.Date ;
import java.util.List ;
import java.util.Map ;
import java.util.TreeMap ;
import java.util.UUID ;
import java.util.concurrent.TimeUnit ;
import java.util.stream.Collectors ;
import static com.google.gson.JsonParser.parseString ;
import static org.thingsboard.server.transport.coap.CoapTransportService.CONFIGURATION ;
import static org.thingsboard.server.transport.coap.CoapTransportService.CURRENT_TIMESTAMP ;
import static org.thingsboard.server.transport.coap.CoapTransportService.DEVICE_INFO ;
import static org.thingsboard.server.transport.coap.CoapTransportService.MEASUREMENTS ;
import static org.thingsboard.server.transport.coap.efento.utils.CoapEfentoUtils.isBinarySensor ;
import static org.thingsboard.server.transport.coap.efento.utils.CoapEfentoUtils.isSensorError ;
@Slf4j
public class CoapEfentoTransportResource extends AbstractCoapTransportResource {
@ -220,127 +222,186 @@ public class CoapEfentoTransportResource extends AbstractCoapTransportResource {
}
}
private List < EfentoTelemetry > getEfentoMeasurements ( MeasurementsProtos . ProtoMeasurements protoMeasurements , UUID sessionId ) {
List < EfentoTelemetry > getEfentoMeasurements ( MeasurementsProtos . ProtoMeasurements protoMeasurements , UUID sessionId ) {
String serialNumber = CoapEfentoUtils . convertByteArrayToString ( protoMeasurements . getSerialNum ( ) . toByteArray ( ) ) ;
boolean batteryStatus = protoMeasurements . getBatteryStatus ( ) ;
int measurementPeriodBase = protoMeasurements . getMeasurementPeriodBase ( ) ;
int measurementPeriodFactor = protoMeasurements . getMeasurementPeriodFactor ( ) ;
int signal = protoMeasurements . getSignal ( ) ;
List < MeasurementsProtos . ProtoChannel > channelsList = protoMeasurements . getChannelsList ( ) ;
long nextTransmissionAtMillis = TimeUnit . SECONDS . toMillis ( protoMeasurements . getNextTransmissionAt ( ) ) ;
List < ProtoChannel > channelsList = protoMeasurements . getChannelsList ( ) ;
if ( CollectionUtils . isEmpty ( channelsList ) ) {
throw new IllegalStateException ( "[" + sessionId + "]: Failed to get Efento measurements, reason: channels list is empty!" ) ;
}
Map < Long , JsonObject > valuesMap = new TreeMap < > ( ) ;
if ( ! CollectionUtils . isEmpty ( channelsList ) ) {
int channel = 0 ;
JsonObject values ;
for ( MeasurementsProtos . ProtoChannel protoChannel : channelsList ) {
channel + + ;
boolean isBinarySensor = false ;
MeasurementTypeProtos . MeasurementType measurementType = protoChannel . getType ( ) ;
String measurementTypeName = measurementType . name ( ) ;
if ( measurementType . equals ( MeasurementTypeProtos . MeasurementType . OK_ALARM )
| | measurementType . equals ( MeasurementTypeProtos . MeasurementType . FLOODING ) ) {
isBinarySensor = true ;
}
if ( measurementPeriodFactor = = 0 & & isBinarySensor ) {
measurementPeriodFactor = 14 ;
} else {
measurementPeriodFactor = 1 ;
for ( int channel = 0 ; channel < channelsList . size ( ) ; channel + + ) {
ProtoChannel protoChannel = channelsList . get ( channel ) ;
List < Integer > sampleOffsetsList = protoChannel . getSampleOffsetsList ( ) ;
if ( CollectionUtils . isEmpty ( sampleOffsetsList ) ) {
log . trace ( "[{}][{}] sampleOffsetsList list is empty!" , sessionId , protoChannel . getType ( ) . name ( ) ) ;
continue ;
}
boolean isBinarySensor = isBinarySensor ( protoChannel . getType ( ) ) ;
int channelPeriodFactor = ( measurementPeriodFactor = = 0 ? ( isBinarySensor ? 14 : 1 ) : measurementPeriodFactor ) ;
int measurementPeriod = measurementPeriodBase * channelPeriodFactor ;
long measurementPeriodMillis = TimeUnit . SECONDS . toMillis ( measurementPeriod ) ;
long startTimestampMillis = TimeUnit . SECONDS . toMillis ( protoChannel . getTimestamp ( ) ) ;
for ( int i = 0 ; i < sampleOffsetsList . size ( ) ; i + + ) {
int sampleOffset = sampleOffsetsList . get ( i ) ;
if ( isSensorError ( sampleOffset ) ) {
log . warn ( "[{}],[{}] Sensor error value! Ignoring." , sessionId , sampleOffset ) ;
continue ;
}
int measurementPeriod = measurementPeriodBase * measurementPeriodFactor ;
long measurementPeriodMillis = TimeUnit . SECONDS . toMillis ( measurementPeriod ) ;
long nextTransmissionAtMillis = TimeUnit . SECONDS . toMillis ( protoMeasurements . getNextTransmissionAt ( ) ) ;
int startPoint = protoChannel . getStartPoint ( ) ;
int startTimestamp = protoChannel . getTimestamp ( ) ;
long startTimestampMillis = TimeUnit . SECONDS . toMillis ( startTimestamp ) ;
List < Integer > sampleOffsetsList = protoChannel . getSampleOffsetsList ( ) ;
if ( ! CollectionUtils . isEmpty ( sampleOffsetsList ) ) {
int sampleOfssetsListSize = sampleOffsetsList . size ( ) ;
for ( int i = 0 ; i < sampleOfssetsListSize ; i + + ) {
int sampleOffset = sampleOffsetsList . get ( i ) ;
Integer previousSampleOffset = isBinarySensor & & i > 0 ? sampleOffsetsList . get ( i - 1 ) : null ;
if ( sampleOffset = = - 32768 ) {
log . warn ( "[{}],[{}] Sensor error value! Ignoring." , sessionId , sampleOffset ) ;
} else {
switch ( measurementType ) {
case TEMPERATURE :
values = valuesMap . computeIfAbsent ( startTimestampMillis , k - >
CoapEfentoUtils . setDefaultMeasurements ( serialNumber , batteryStatus , measurementPeriod , nextTransmissionAtMillis , signal , k ) ) ;
values . addProperty ( "temperature_" + channel , ( ( double ) ( startPoint + sampleOffset ) ) / 10f ) ;
startTimestampMillis = startTimestampMillis + measurementPeriodMillis ;
break ;
case WATER_METER :
values = valuesMap . computeIfAbsent ( startTimestampMillis , k - >
CoapEfentoUtils . setDefaultMeasurements ( serialNumber , batteryStatus , measurementPeriod , nextTransmissionAtMillis , signal , k ) ) ;
values . addProperty ( "pulse_counter_water_" + channel , ( ( double ) ( startPoint + sampleOffset ) ) ) ;
startTimestampMillis = startTimestampMillis + measurementPeriodMillis ;
break ;
case HUMIDITY :
values = valuesMap . computeIfAbsent ( startTimestampMillis , k - >
CoapEfentoUtils . setDefaultMeasurements ( serialNumber , batteryStatus , measurementPeriod , nextTransmissionAtMillis , signal , k ) ) ;
values . addProperty ( "humidity_" + channel , ( double ) ( startPoint + sampleOffset ) ) ;
startTimestampMillis = startTimestampMillis + measurementPeriodMillis ;
break ;
case ATMOSPHERIC_PRESSURE :
values = valuesMap . computeIfAbsent ( startTimestampMillis , k - >
CoapEfentoUtils . setDefaultMeasurements ( serialNumber , batteryStatus , measurementPeriod , nextTransmissionAtMillis , signal , k ) ) ;
values . addProperty ( "pressure_" + channel , ( double ) ( startPoint + sampleOffset ) / 10f ) ;
startTimestampMillis = startTimestampMillis + measurementPeriodMillis ;
break ;
case DIFFERENTIAL_PRESSURE :
values = valuesMap . computeIfAbsent ( startTimestampMillis , k - >
CoapEfentoUtils . setDefaultMeasurements ( serialNumber , batteryStatus , measurementPeriod , nextTransmissionAtMillis , signal , k ) ) ;
values . addProperty ( "pressure_diff_" + channel , ( double ) ( startPoint + sampleOffset ) ) ;
startTimestampMillis = startTimestampMillis + measurementPeriodMillis ;
break ;
case OK_ALARM :
boolean currentIsOk = sampleOffset < 0 ;
if ( previousSampleOffset ! = null ) {
boolean previousIsOk = previousSampleOffset < 0 ;
boolean isOk = previousIsOk & & currentIsOk ;
boolean isAlarm = ! previousIsOk & & ! currentIsOk ;
if ( isOk | | isAlarm ) {
break ;
}
}
String data = currentIsOk ? "OK" : "ALARM" ;
long sampleOffsetMillis = TimeUnit . SECONDS . toMillis ( sampleOffset ) ;
long measurementTimestamp = startTimestampMillis + Math . abs ( sampleOffsetMillis ) ;
values = valuesMap . computeIfAbsent ( measurementTimestamp - 1000 , k - >
CoapEfentoUtils . setDefaultMeasurements ( serialNumber , batteryStatus , measurementPeriod , nextTransmissionAtMillis , signal , k ) ) ;
values . addProperty ( "ok_alarm_" + channel , data ) ;
break ;
case PULSE_CNT :
values = valuesMap . computeIfAbsent ( startTimestampMillis , k - >
CoapEfentoUtils . setDefaultMeasurements ( serialNumber , batteryStatus , measurementPeriod , nextTransmissionAtMillis , signal , k ) ) ;
values . addProperty ( "pulse_cnt_" + channel , ( double ) ( startPoint + sampleOffset ) ) ;
startTimestampMillis = startTimestampMillis + measurementPeriodMillis ;
break ;
case NO_SENSOR :
case UNRECOGNIZED :
log . trace ( "[{}][{}] Sensor error value! Ignoring." , sessionId , measurementTypeName ) ;
break ;
default :
log . trace ( "[{}],[{}] Unsupported measurementType! Ignoring." , sessionId , measurementTypeName ) ;
break ;
}
JsonObject values ;
if ( isBinarySensor ) {
boolean currentIsOk = sampleOffset < 0 ;
Integer previousSampleOffset = i > 0 ? sampleOffsetsList . get ( i - 1 ) : null ;
if ( previousSampleOffset ! = null ) { //compare with previous value
boolean previousIsOk = previousSampleOffset < 0 ;
if ( currentIsOk = = previousIsOk ) {
break ;
}
}
long sampleOffsetMillis = TimeUnit . SECONDS . toMillis ( sampleOffset ) ;
long measurementTimestamp = startTimestampMillis + Math . abs ( sampleOffsetMillis ) ;
values = valuesMap . computeIfAbsent ( measurementTimestamp - 1000 , k - >
CoapEfentoUtils . setDefaultMeasurements ( serialNumber , batteryStatus , measurementPeriod , nextTransmissionAtMillis , signal , k ) ) ;
addBinarySample ( protoChannel , currentIsOk , values , channel + 1 , sessionId ) ;
} else {
log . trace ( "[{}][{}] sampleOffsetsList list is empty!" , sessionId , measurementTypeName ) ;
long timestampMillis = startTimestampMillis + i * measurementPeriodMillis ;
values = valuesMap . computeIfAbsent ( timestampMillis , k - > CoapEfentoUtils . setDefaultMeasurements (
serialNumber , batteryStatus , measurementPeriod , nextTransmissionAtMillis , signal , k ) ) ;
addContinuesSample ( protoChannel , sampleOffset , values , channel + 1 , sessionId ) ;
}
}
} else {
throw new IllegalStateException ( "[" + sessionId + "]: Failed to get Efento measurements, reason: channels list is empty!" ) ;
}
if ( ! CollectionUtils . isEmpty ( valuesMap ) ) {
List < EfentoTelemetry > efentoMeasurements = new ArrayList < > ( ) ;
for ( Long ts : valuesMap . keySet ( ) ) {
EfentoTelemetry measurement = new EfentoTelemetry ( ts , valuesMap . get ( ts ) ) ;
efentoMeasurements . add ( measurement ) ;
}
return efentoMeasurements ;
} else {
if ( CollectionUtils . isEmpty ( valuesMap ) ) {
throw new IllegalStateException ( "[" + sessionId + "]: Failed to collect Efento measurements, reason, values map is empty!" ) ;
}
return valuesMap . entrySet ( ) . stream ( )
. map ( entry - > new EfentoTelemetry ( entry . getKey ( ) , entry . getValue ( ) ) )
. collect ( Collectors . toList ( ) ) ;
}
private void addContinuesSample ( ProtoChannel protoChannel , int sampleOffset , JsonObject values , int channelNumber , UUID sessionId ) {
int startPoint = protoChannel . getStartPoint ( ) ;
switch ( protoChannel . getType ( ) ) {
case MEASUREMENT_TYPE_TEMPERATURE :
values . addProperty ( "temperature_" + channelNumber , ( ( double ) ( startPoint + sampleOffset ) ) / 10f ) ;
break ;
case MEASUREMENT_TYPE_WATER_METER :
values . addProperty ( "pulse_counter_water_" + channelNumber , ( ( double ) ( startPoint + sampleOffset ) ) ) ;
break ;
case MEASUREMENT_TYPE_HUMIDITY :
values . addProperty ( "humidity_" + channelNumber , ( double ) ( startPoint + sampleOffset ) ) ;
break ;
case MEASUREMENT_TYPE_ATMOSPHERIC_PRESSURE :
values . addProperty ( "pressure_" + channelNumber , ( double ) ( startPoint + sampleOffset ) / 10f ) ;
break ;
case MEASUREMENT_TYPE_DIFFERENTIAL_PRESSURE :
values . addProperty ( "pressure_diff_" + channelNumber , ( double ) ( startPoint + sampleOffset ) ) ;
break ;
case MEASUREMENT_TYPE_PULSE_CNT :
values . addProperty ( "pulse_cnt_" + channelNumber , ( double ) ( startPoint + sampleOffset ) ) ;
break ;
case MEASUREMENT_TYPE_IAQ :
values . addProperty ( "iaq_" + channelNumber , ( startPoint + sampleOffset ) ) ;
break ;
case MEASUREMENT_TYPE_ELECTRICITY_METER :
values . addProperty ( "watt_hour_" + channelNumber , ( double ) ( startPoint + sampleOffset ) ) ;
break ;
case MEASUREMENT_TYPE_SOIL_MOISTURE :
values . addProperty ( "soil_moisture_" + channelNumber , ( double ) ( startPoint + sampleOffset ) ) ;
break ;
case MEASUREMENT_TYPE_AMBIENT_LIGHT :
values . addProperty ( "ambient_light_" + channelNumber , ( double ) ( startPoint + sampleOffset ) / 10f ) ;
break ;
case MEASUREMENT_TYPE_HIGH_PRESSURE :
values . addProperty ( "high_pressure_" + channelNumber , ( double ) ( startPoint + sampleOffset ) ) ;
break ;
case MEASUREMENT_TYPE_DISTANCE_MM :
values . addProperty ( "distance_mm_" + channelNumber , ( double ) ( startPoint + sampleOffset ) ) ;
break ;
case MEASUREMENT_TYPE_WATER_METER_ACC_MINOR :
values . addProperty ( "acc_counter_water_minor_" + channelNumber , ( double ) ( startPoint + sampleOffset ) ) ;
break ;
case MEASUREMENT_TYPE_WATER_METER_ACC_MAJOR :
values . addProperty ( "acc_counter_water_major_" + channelNumber , ( double ) ( startPoint + sampleOffset ) ) ;
break ;
case MEASUREMENT_TYPE_HUMIDITY_ACCURATE :
values . addProperty ( "humidity_relative_" + channelNumber , ( double ) ( startPoint + sampleOffset ) / 10f ) ;
break ;
case MEASUREMENT_TYPE_STATIC_IAQ :
values . addProperty ( "static_iaq_" + channelNumber , ( double ) ( startPoint + sampleOffset ) ) ;
break ;
case MEASUREMENT_TYPE_CO2_EQUIVALENT :
values . addProperty ( "co2_ppm_" + channelNumber , ( double ) ( startPoint + sampleOffset ) ) ;
break ;
case MEASUREMENT_TYPE_BREATH_VOC :
values . addProperty ( "breath_voc_ppm_" + channelNumber , ( double ) ( startPoint + sampleOffset ) ) ;
break ;
case MEASUREMENT_TYPE_PERCENTAGE :
values . addProperty ( "percentage_" + channelNumber , ( double ) ( startPoint + sampleOffset ) / 100f ) ;
break ;
case MEASUREMENT_TYPE_VOLTAGE :
values . addProperty ( "voltage_" + channelNumber , ( double ) ( startPoint + sampleOffset ) / 10f ) ;
break ;
case MEASUREMENT_TYPE_CURRENT :
values . addProperty ( "current_" + channelNumber , ( double ) ( startPoint + sampleOffset ) / 100f ) ;
break ;
case MEASUREMENT_TYPE_PULSE_CNT_ACC_MINOR :
values . addProperty ( "pulse_cnt_minor_" + channelNumber , ( double ) ( startPoint + sampleOffset ) ) ;
break ;
case MEASUREMENT_TYPE_PULSE_CNT_ACC_MAJOR :
values . addProperty ( "pulse_cnt_major_" + channelNumber , ( double ) ( startPoint + sampleOffset ) ) ;
break ;
case MEASUREMENT_TYPE_ELEC_METER_ACC_MINOR :
values . addProperty ( "elec_meter_minor_" + channelNumber , ( double ) ( startPoint + sampleOffset ) ) ;
break ;
case MEASUREMENT_TYPE_ELEC_METER_ACC_MAJOR :
values . addProperty ( "elec_meter_major_" + channelNumber , ( double ) ( startPoint + sampleOffset ) ) ;
break ;
case MEASUREMENT_TYPE_PULSE_CNT_ACC_WIDE_MINOR :
values . addProperty ( "pulse_cnt_wide_minor_" + channelNumber , ( double ) ( startPoint + sampleOffset ) ) ;
break ;
case MEASUREMENT_TYPE_PULSE_CNT_ACC_WIDE_MAJOR :
values . addProperty ( "pulse_cnt_wide_major_" + channelNumber , ( double ) ( startPoint + sampleOffset ) ) ;
break ;
case MEASUREMENT_TYPE_CURRENT_PRECISE :
values . addProperty ( "current_precise_" + channelNumber , ( double ) ( startPoint + sampleOffset ) / 1000f ) ;
break ;
case MEASUREMENT_TYPE_NO_SENSOR :
case UNRECOGNIZED :
log . trace ( "[{}][{}] Sensor error value! Ignoring." , sessionId , protoChannel . getType ( ) . name ( ) ) ;
break ;
default :
log . trace ( "[{}],[{}] Unsupported measurementType! Ignoring." , sessionId , protoChannel . getType ( ) . name ( ) ) ;
break ;
}
}
private void addBinarySample ( ProtoChannel protoChannel , boolean valueIsOk , JsonObject values , int channel , UUID sessionId ) {
switch ( protoChannel . getType ( ) ) {
case MEASUREMENT_TYPE_OK_ALARM :
values . addProperty ( "ok_alarm_" + channel , valueIsOk ? "OK" : "ALARM" ) ;
break ;
case MEASUREMENT_TYPE_FLOODING :
values . addProperty ( "flooding_" + channel , valueIsOk ? "OK" : "WATER_DETECTED" ) ;
break ;
case MEASUREMENT_TYPE_OUTPUT_CONTROL :
values . addProperty ( "output_control_" + channel , valueIsOk ? "OFF" : "ON" ) ;
break ;
default :
log . trace ( "[{}],[{}] Unsupported binary measurementType! Ignoring." , sessionId , protoChannel . getType ( ) . name ( ) ) ;
break ;
}
}
private EfentoTelemetry getEfentoDeviceInfo ( DeviceInfoProtos . ProtoDeviceInfo protoDeviceInfo ) {