Browse Source

Kafka, RabbitMQ and REST API call extensions + Docker image

pull/1/head
Volodymyr Babak 10 years ago
parent
commit
19d3afa650
  1. 38
      dao/src/main/java/org/thingsboard/server/dao/cache/PreviousDeviceCredentialsIdKeyGenerator.java
  2. 88
      dao/src/main/java/org/thingsboard/server/dao/cache/ServiceCacheConfiguration.java
  3. 5
      docker/db-schema.env
  4. 26
      docker/db-schema/Dockerfile
  5. 53
      docker/db-schema/install_schema.sh
  6. 31
      docker/deploy.sh
  7. 31
      docker/deploy_cassandra_zookeeper.sh
  8. 26
      docker/docker-compose.random.yml
  9. 26
      docker/docker-compose.static.yml
  10. 52
      docker/docker-compose.yml
  11. 8
      docker/thingsboard.env
  12. 23
      docker/thingsboard/Dockerfile
  13. 36
      docker/thingsboard/run_web_app.sh
  14. 97
      extensions/extension-kafka/pom.xml
  15. 39
      extensions/extension-kafka/src/assembly/extension.xml
  16. 28
      extensions/extension-kafka/src/main/java/org/thingsboard/server/extensions/kafka/action/KafkaActionMsg.java
  17. 34
      extensions/extension-kafka/src/main/java/org/thingsboard/server/extensions/kafka/action/KafkaActionPayload.java
  18. 45
      extensions/extension-kafka/src/main/java/org/thingsboard/server/extensions/kafka/action/KafkaPluginAction.java
  19. 26
      extensions/extension-kafka/src/main/java/org/thingsboard/server/extensions/kafka/action/KafkaPluginActionConfiguration.java
  20. 61
      extensions/extension-kafka/src/main/java/org/thingsboard/server/extensions/kafka/plugin/KafkaMsgHandler.java
  21. 93
      extensions/extension-kafka/src/main/java/org/thingsboard/server/extensions/kafka/plugin/KafkaPlugin.java
  22. 34
      extensions/extension-kafka/src/main/java/org/thingsboard/server/extensions/kafka/plugin/KafkaPluginConfiguration.java
  23. 34
      extensions/extension-kafka/src/main/resources/KafkaActionDescriptor.json
  24. 80
      extensions/extension-kafka/src/main/resources/KafkaPluginDescriptor.json
  25. 83
      extensions/extension-kafka/src/test/java/org/thingsboard/server/extensions/kafka/KafkaDemoClient.java
  26. 139
      extensions/extension-rabbitmq/pom.xml
  27. 34
      extensions/extension-rabbitmq/src/assembly/extension.xml
  28. 31
      extensions/extension-rabbitmq/src/main/java/org/thingsboard/server/extensions/rabbitmq/action/RabbitMqActionMsg.java
  29. 39
      extensions/extension-rabbitmq/src/main/java/org/thingsboard/server/extensions/rabbitmq/action/RabbitMqActionPayload.java
  30. 49
      extensions/extension-rabbitmq/src/main/java/org/thingsboard/server/extensions/rabbitmq/action/RabbitMqPluginAction.java
  31. 32
      extensions/extension-rabbitmq/src/main/java/org/thingsboard/server/extensions/rabbitmq/action/RabbitMqPluginActionConfiguration.java
  32. 86
      extensions/extension-rabbitmq/src/main/java/org/thingsboard/server/extensions/rabbitmq/plugin/RabbitMqMsgHandler.java
  33. 109
      extensions/extension-rabbitmq/src/main/java/org/thingsboard/server/extensions/rabbitmq/plugin/RabbitMqPlugin.java
  34. 47
      extensions/extension-rabbitmq/src/main/java/org/thingsboard/server/extensions/rabbitmq/plugin/RabbitMqPluginConfiguration.java
  35. 78
      extensions/extension-rabbitmq/src/main/resources/RabbitMqActionDescriptor.json
  36. 79
      extensions/extension-rabbitmq/src/main/resources/RabbitMqPluginDescriptor.json
  37. 56
      extensions/extension-rabbitmq/src/test/java/org/thingsboard/server/extensions/rabbitmq/DemoClient.java
  38. 98
      extensions/extension-rest-api-call/pom.xml
  39. 34
      extensions/extension-rest-api-call/src/assembly/extension.xml
  40. 28
      extensions/extension-rest-api-call/src/main/java/org/thingsboard/server/extensions/rest/action/RestApiCallActionMsg.java
  41. 37
      extensions/extension-rest-api-call/src/main/java/org/thingsboard/server/extensions/rest/action/RestApiCallActionPayload.java
  42. 60
      extensions/extension-rest-api-call/src/main/java/org/thingsboard/server/extensions/rest/action/RestApiCallPluginAction.java
  43. 28
      extensions/extension-rest-api-call/src/main/java/org/thingsboard/server/extensions/rest/action/RestApiCallPluginActionConfiguration.java
  44. 67
      extensions/extension-rest-api-call/src/main/java/org/thingsboard/server/extensions/rest/plugin/RestApiCallMsgHandler.java
  45. 84
      extensions/extension-rest-api-call/src/main/java/org/thingsboard/server/extensions/rest/plugin/RestApiCallPlugin.java
  46. 30
      extensions/extension-rest-api-call/src/main/java/org/thingsboard/server/extensions/rest/plugin/RestApiCallPluginConfiguration.java
  47. 63
      extensions/extension-rest-api-call/src/main/resources/RestApiCallActionDescriptor.json
  48. 69
      extensions/extension-rest-api-call/src/main/resources/RestApiCallPluginDescriptor.json
  49. 70
      extensions/extension-rest-api-call/src/test/java/org/thingsboard/server/extensions/kafka/RestApiCallDemoClient.java
  50. 43
      extensions/pom.xml

38
dao/src/main/java/org/thingsboard/server/dao/cache/PreviousDeviceCredentialsIdKeyGenerator.java

@ -0,0 +1,38 @@
/**
* Copyright © 2016 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.cache;
import org.springframework.cache.interceptor.KeyGenerator;
import org.thingsboard.server.common.data.security.DeviceCredentials;
import org.thingsboard.server.dao.device.DeviceCredentialsService;
import java.lang.reflect.Method;
public class PreviousDeviceCredentialsIdKeyGenerator implements KeyGenerator {
@Override
public Object generate(Object o, Method method, Object... objects) {
DeviceCredentialsService deviceCredentialsService = (DeviceCredentialsService) o;
DeviceCredentials deviceCredentials = (DeviceCredentials) objects[0];
if (deviceCredentials.getDeviceId() != null) {
DeviceCredentials oldDeviceCredentials = deviceCredentialsService.findDeviceCredentialsByDeviceId(deviceCredentials.getDeviceId());
if (oldDeviceCredentials != null) {
return oldDeviceCredentials.getCredentialsId();
}
}
return null;
}
}

88
dao/src/main/java/org/thingsboard/server/dao/cache/ServiceCacheConfiguration.java

@ -0,0 +1,88 @@
/**
* Copyright © 2016 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.cache;
import com.hazelcast.config.Config;
import com.hazelcast.config.DiscoveryStrategyConfig;
import com.hazelcast.config.MapConfig;
import com.hazelcast.config.MaxSizeConfig;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.instance.GroupProperty;
import com.hazelcast.spring.cache.HazelcastCacheManager;
import com.hazelcast.zookeeper.ZookeeperDiscoveryProperties;
import com.hazelcast.zookeeper.ZookeeperDiscoveryStrategyFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cache.CacheManager;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.cache.interceptor.KeyGenerator;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.thingsboard.server.common.data.CacheConstants;
@Configuration
@EnableCaching
@ConditionalOnProperty(prefix = "cache", value = "enabled", havingValue = "true")
public class ServiceCacheConfiguration {
private static final String HAZELCAST_CLUSTER_NAME = "hazelcast";
@Value("${cache.device_credentials.max_size}")
private Integer deviceCredentialsCacheMaxSize;
@Value("${cache.device_credentials.time_to_live}")
private Integer deviceCredentialsCacheTTL;
@Value("${zk.enabled}")
private boolean zkEnabled;
@Value("${zk.url}")
private String zkUrl;
@Value("${zk.zk_dir}")
private String zkDir;
@Bean
public HazelcastInstance hazelcastInstance() {
Config config = new Config();
if (zkEnabled) {
config.getNetworkConfig().getJoin().getMulticastConfig().setEnabled(false);
config.setProperty(GroupProperty.DISCOVERY_SPI_ENABLED.getName(), Boolean.TRUE.toString());
DiscoveryStrategyConfig discoveryStrategyConfig = new DiscoveryStrategyConfig(new ZookeeperDiscoveryStrategyFactory());
discoveryStrategyConfig.addProperty(ZookeeperDiscoveryProperties.ZOOKEEPER_URL.key(), zkUrl);
discoveryStrategyConfig.addProperty(ZookeeperDiscoveryProperties.ZOOKEEPER_PATH.key(), zkDir);
discoveryStrategyConfig.addProperty(ZookeeperDiscoveryProperties.GROUP.key(), HAZELCAST_CLUSTER_NAME);
config.getNetworkConfig().getJoin().getDiscoveryConfig().addDiscoveryStrategyConfig(discoveryStrategyConfig);
}
MapConfig deviceCredentialsCacheConfig = new MapConfig(CacheConstants.DEVICE_CREDENTIALS_CACHE);
deviceCredentialsCacheConfig.setTimeToLiveSeconds(deviceCredentialsCacheTTL);
deviceCredentialsCacheConfig.setMaxSizeConfig(new MaxSizeConfig(deviceCredentialsCacheMaxSize, MaxSizeConfig.MaxSizePolicy.PER_NODE));
config.addMapConfig(deviceCredentialsCacheConfig);
return Hazelcast.newHazelcastInstance(config);
}
@Bean
public KeyGenerator previousDeviceCredentialsId() {
return new PreviousDeviceCredentialsIdKeyGenerator();
}
@Bean
public CacheManager cacheManager() {
return new HazelcastCacheManager(hazelcastInstance());
}
}

5
docker/db-schema.env

@ -0,0 +1,5 @@
#Db schema configuration
SKIP_SCHEMA_CREATION=false
SKIP_SYSTEM_DATA=false
SKIP_DEMO_DATA=false

26
docker/db-schema/Dockerfile

@ -0,0 +1,26 @@
#
# Copyright © 2016 The Thingsboard Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
FROM cassandra:3.9
ADD install_schema.sh /root/install_schema.sh
RUN apt-get update \
&& apt-get install -y nmap
RUN chmod +x /root/install_schema.sh
WORKDIR /root

53
docker/db-schema/install_schema.sh

@ -0,0 +1,53 @@
#!/bin/bash
#
# Copyright © 2016 The Thingsboard Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
until nmap db -p 9042 | grep "9042/tcp open"
do
echo "Wait for Cassandra..."
sleep 10
done
if [ "$SKIP_SCHEMA_CREATION" == "false" ]; then
echo "Creating 'Thingsboard' keyspace..."
cqlsh db -f /root/schema.cql
if [ "$?" -eq 0 ]; then
echo "'Thingsboard' keyspace was successfully created!"
else
echo "There were issues while creating 'Thingsboard' keyspace!"
fi
fi
if [ "$SKIP_SYSTEM_DATA" == "false" ]; then
echo "Adding system data..."
cqlsh db -f /root/system-data.cql
if [ "$?" -eq 0 ]; then
echo "System data was successfully added!"
else
echo "There were issues while adding System data!"
fi
fi
if [ "$SKIP_DEMO_DATA" == "false" ]; then
echo "Adding demo data..."
cqlsh db -f /root/demo-data.cql
if [ "$?" -eq 0 ]; then
echo "Demo data was successfully added!"
else
echo "There were issues while adding Demo data!"
fi
fi

31
docker/deploy.sh

@ -0,0 +1,31 @@
#!/bin/bash
#
# Copyright © 2016 The Thingsboard Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
command='docker-compose -f docker-compose.yml -f docker-compose.random.yml'
echo "stopping images.."
$command stop
echo "removing stopped images.."
$command rm -f
echo "building images.."
$command build
echo "starting images..."
$command up -d

31
docker/deploy_cassandra_zookeeper.sh

@ -0,0 +1,31 @@
#!/bin/bash
#
# Copyright © 2016 The Thingsboard Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
command='docker-compose -f docker-compose.yml -f docker-compose.static.yml'
echo "stopping images.."
$command stop
echo "removing stopped images.."
$command rm -f
echo "building images.."
$command build
echo "starting cassandra, zookeeper, db-schema images..."
$command up -d cassandra zookeeper db-schema

26
docker/docker-compose.random.yml

@ -0,0 +1,26 @@
#
# Copyright © 2016 The Thingsboard Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
version: '2'
services:
cassandra:
ports:
- "9042"
- "9160"
zookeeper:
ports:
- "2181"

26
docker/docker-compose.static.yml

@ -0,0 +1,26 @@
#
# Copyright © 2016 The Thingsboard Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
version: '2'
services:
cassandra:
ports:
- "9042:9042"
- "9160:9160"
zookeeper:
ports:
- "2181:2181"

52
docker/docker-compose.yml

@ -0,0 +1,52 @@
#
# Copyright © 2016 The Thingsboard Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
version: '2'
services:
thingsboard:
build: thingsboard
ports:
- "8080:8080"
- "1883:1883"
- "5683:5683"
links:
- cassandra:db
- zookeeper:zk
- db-schema:db-schema
volumes:
- "../application/target/thingsboard.deb:/root/thingsboard.deb"
env_file:
- thingsboard.env
entrypoint: ./run_web_app.sh
db-schema:
build: db-schema
links:
- cassandra:db
env_file:
- db-schema.env
volumes:
- "../dao/src/main/resources/schema.cql:/root/schema.cql"
- "../dao/src/main/resources/demo-data.cql:/root/demo-data.cql"
- "../dao/src/main/resources/system-data.cql:/root/system-data.cql"
entrypoint: ./install_schema.sh
cassandra:
image: "cassandra:3.9"
volumes:
- "${CASSANDRA_DATA_DIR}:/var/lib/cassandra"
zookeeper:
image: "zookeeper:3.4.9"
restart: always

8
docker/thingsboard.env

@ -0,0 +1,8 @@
#Thingsboard server configuration
CASSANDRA_URL=db:9042
ZOOKEEPER_URL=zk:2181
MQTT_BIND_ADDRESS=0.0.0.0
MQTT_BIND_PORT=1883
COAP_BIND_ADDRESS=0.0.0.0
COAP_BIND_PORT=5683

23
docker/thingsboard/Dockerfile

@ -0,0 +1,23 @@
#
# Copyright © 2016 The Thingsboard Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
FROM java:8-jre
ADD run_web_app.sh /root/run_web_app.sh
RUN chmod +x /root/run_web_app.sh
WORKDIR /root

36
docker/thingsboard/run_web_app.sh

@ -0,0 +1,36 @@
#!/bin/bash
#
# Copyright © 2016 The Thingsboard Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
dpkg -i /root/thingsboard.deb
reachable=0
while [ $reachable -eq 0 ];
do
echo "db-schema container is still in progress. waiting until it completed..."
sleep 3
ping -q -c 1 db-schema > /dev/null 2>&1
if [ "$?" -ne 0 ];
then
echo "db-schema container completed!"
reachable=1
fi
done
echo "Starting 'Thingsboard' service..."
thingsboard start

97
extensions/extension-kafka/pom.xml

@ -0,0 +1,97 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright © 2016 The Thingsboard Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.thingsboard.server</groupId>
<version>0.0.1-SNAPSHOT</version>
<artifactId>extensions</artifactId>
</parent>
<groupId>org.thingsboard.server.extensions</groupId>
<artifactId>extension-kafka</artifactId>
<packaging>jar</packaging>
<name>Thingsboard Server Kafka Extension</name>
<url>http://thingsboard.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<main.dir>${basedir}/../..</main.dir>
</properties>
<dependencies>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.thingsboard.server</groupId>
<artifactId>extensions-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.thingsboard.server</groupId>
<artifactId>extensions-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.velocity</groupId>
<artifactId>velocity</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.velocity</groupId>
<artifactId>velocity-tools</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptors>
<descriptor>src/assembly/extension.xml</descriptor>
</descriptors>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

39
extensions/extension-kafka/src/assembly/extension.xml

@ -0,0 +1,39 @@
<!--
Copyright © 2016 The Thingsboard Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<assembly xmlns="http://maven.apache.org/ASSEMBLY/2.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/ASSEMBLY/2.0.0 http://maven.apache.org/xsd/assembly-2.0.0.xsd">
<id>extension</id>
<formats>
<format>jar</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<dependencySets>
<dependencySet>
<outputDirectory>/</outputDirectory>
<useProjectArtifact>true</useProjectArtifact>
<unpack>true</unpack>
<scope>runtime</scope>
<excludes>
<exclude>org.apache.zookeeper:zookeeper</exclude>
<exclude>org.scala-lang:scala-library</exclude>
<exclude>io.netty:netty</exclude>
</excludes>
</dependencySet>
</dependencySets>
</assembly>

28
extensions/extension-kafka/src/main/java/org/thingsboard/server/extensions/kafka/action/KafkaActionMsg.java

@ -0,0 +1,28 @@
/**
* Copyright © 2016 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.extensions.kafka.action;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.extensions.api.plugins.msg.AbstractRuleToPluginMsg;
public class KafkaActionMsg extends AbstractRuleToPluginMsg<KafkaActionPayload> {
public KafkaActionMsg(TenantId tenantId, CustomerId customerId, DeviceId deviceId, KafkaActionPayload payload) {
super(tenantId, customerId, deviceId, payload);
}
}

34
extensions/extension-kafka/src/main/java/org/thingsboard/server/extensions/kafka/action/KafkaActionPayload.java

@ -0,0 +1,34 @@
/**
* Copyright © 2016 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.extensions.kafka.action;
import lombok.Builder;
import lombok.Data;
import org.thingsboard.server.common.msg.session.MsgType;
import java.io.Serializable;
@Data
@Builder
public class KafkaActionPayload implements Serializable {
private final String topic;
private final String msgBody;
private final boolean sync;
private final Integer requestId;
private final MsgType msgType;
}

45
extensions/extension-kafka/src/main/java/org/thingsboard/server/extensions/kafka/action/KafkaPluginAction.java

@ -0,0 +1,45 @@
/**
* Copyright © 2016 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.extensions.kafka.action;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.msg.device.ToDeviceActorMsg;
import org.thingsboard.server.common.msg.session.FromDeviceRequestMsg;
import org.thingsboard.server.extensions.api.component.Action;
import org.thingsboard.server.extensions.api.plugins.msg.RuleToPluginMsg;
import org.thingsboard.server.extensions.api.rules.RuleContext;
import org.thingsboard.server.extensions.core.action.template.AbstractTemplatePluginAction;
import java.util.Optional;
@Action(name = "Kafka Plugin Action", descriptor = "KafkaActionDescriptor.json", configuration = KafkaPluginActionConfiguration.class)
@Slf4j
public class KafkaPluginAction extends AbstractTemplatePluginAction<KafkaPluginActionConfiguration> {
@Override
protected Optional<RuleToPluginMsg<?>> buildRuleToPluginMsg(RuleContext ctx, ToDeviceActorMsg msg, FromDeviceRequestMsg payload) {
KafkaActionPayload.KafkaActionPayloadBuilder builder = KafkaActionPayload.builder();
builder.msgType(payload.getMsgType());
builder.requestId(payload.getRequestId());
builder.sync(configuration.isSync());
builder.topic(configuration.getTopic());
builder.msgBody(getMsgBody(ctx, msg));
return Optional.of(new KafkaActionMsg(msg.getTenantId(),
msg.getCustomerId(),
msg.getDeviceId(),
builder.build()));
}
}

26
extensions/extension-kafka/src/main/java/org/thingsboard/server/extensions/kafka/action/KafkaPluginActionConfiguration.java

@ -0,0 +1,26 @@
/**
* Copyright © 2016 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.extensions.kafka.action;
import lombok.Data;
import org.thingsboard.server.extensions.core.action.template.TemplateActionConfiguration;
@Data
public class KafkaPluginActionConfiguration implements TemplateActionConfiguration {
private boolean sync;
private String topic;
private String template;
}

61
extensions/extension-kafka/src/main/java/org/thingsboard/server/extensions/kafka/plugin/KafkaMsgHandler.java

@ -0,0 +1,61 @@
/**
* Copyright © 2016 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.extensions.kafka.plugin;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.thingsboard.server.common.data.id.RuleId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.core.BasicStatusCodeResponse;
import org.thingsboard.server.extensions.api.plugins.PluginContext;
import org.thingsboard.server.extensions.api.plugins.handlers.RuleMsgHandler;
import org.thingsboard.server.extensions.api.plugins.msg.ResponsePluginToRuleMsg;
import org.thingsboard.server.extensions.api.plugins.msg.RuleToPluginMsg;
import org.thingsboard.server.extensions.api.rules.RuleException;
import org.thingsboard.server.extensions.kafka.action.KafkaActionMsg;
import org.thingsboard.server.extensions.kafka.action.KafkaActionPayload;
@RequiredArgsConstructor
public class KafkaMsgHandler implements RuleMsgHandler {
private final Producer<?, String> producer;
@Override
public void process(PluginContext ctx, TenantId tenantId, RuleId ruleId, RuleToPluginMsg<?> msg) throws RuleException {
if (!(msg instanceof KafkaActionMsg)) {
throw new RuleException("Unsupported message type " + msg.getClass().getName() + "!");
}
KafkaActionPayload payload = ((KafkaActionMsg) msg).getPayload();
try {
producer.send(new ProducerRecord<>(payload.getTopic(), payload.getMsgBody()),
(metadata, e) -> {
if (payload.isSync()) {
if (metadata != null) {
ctx.reply(new ResponsePluginToRuleMsg(msg.getUid(), tenantId, ruleId,
BasicStatusCodeResponse.onSuccess(payload.getMsgType(), payload.getRequestId())));
} else {
ctx.reply(new ResponsePluginToRuleMsg(msg.getUid(), tenantId, ruleId,
BasicStatusCodeResponse.onError(payload.getMsgType(), payload.getRequestId(), e)));
}
}
});
} catch (Exception e) {
throw new RuleException(e.getMessage(), e);
}
}
}

93
extensions/extension-kafka/src/main/java/org/thingsboard/server/extensions/kafka/plugin/KafkaPlugin.java

@ -0,0 +1,93 @@
/**
* Copyright © 2016 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.extensions.kafka.plugin;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.thingsboard.server.extensions.api.component.Plugin;
import org.thingsboard.server.extensions.api.plugins.AbstractPlugin;
import org.thingsboard.server.extensions.api.plugins.PluginContext;
import org.thingsboard.server.extensions.api.plugins.handlers.RuleMsgHandler;
import org.thingsboard.server.extensions.kafka.action.KafkaPluginAction;
import java.util.Properties;
@Plugin(name = "Kafka Plugin", actions = {KafkaPluginAction.class},
descriptor = "KafkaPluginDescriptor.json", configuration = KafkaPluginConfiguration.class)
@Slf4j
public class KafkaPlugin extends AbstractPlugin<KafkaPluginConfiguration> {
private KafkaMsgHandler handler;
private Producer<?, String> producer;
private final Properties properties = new Properties();
@Override
public void init(KafkaPluginConfiguration configuration) {
properties.put("bootstrap.servers", configuration.getBootstrapServers());
properties.put("value.serializer", configuration.getValueSerializer());
properties.put("key.serializer", configuration.getKeySerializer());
properties.put("acks", String.valueOf(configuration.getAcks()));
properties.put("retries", configuration.getRetries());
properties.put("batch.size", configuration.getBatchSize());
properties.put("linger.ms", configuration.getLinger());
properties.put("buffer.memory", configuration.getBufferMemory());
if (configuration.getOtherProperties() != null) {
configuration.getOtherProperties()
.stream().forEach(p -> properties.put(p.getKey(), p.getValue()));
}
init();
}
private void init() {
try {
this.producer = new KafkaProducer<>(properties);
this.handler = new KafkaMsgHandler(producer);
} catch (Exception e) {
log.error("Failed to start kafka producer", e);
throw new RuntimeException(e);
}
}
private void destroy() {
try {
this.handler = null;
this.producer.close();
} catch (Exception e) {
log.error("Failed to close producer during destroy()", e);
}
}
@Override
protected RuleMsgHandler getRuleMsgHandler() {
return handler;
}
@Override
public void resume(PluginContext ctx) {
init();
}
@Override
public void suspend(PluginContext ctx) {
destroy();
}
@Override
public void stop(PluginContext ctx) {
destroy();
}
}

34
extensions/extension-kafka/src/main/java/org/thingsboard/server/extensions/kafka/plugin/KafkaPluginConfiguration.java

@ -0,0 +1,34 @@
/**
* Copyright © 2016 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.extensions.kafka.plugin;
import lombok.Data;
import org.thingsboard.server.extensions.core.plugin.KeyValuePluginProperties;
import java.util.List;
@Data
public class KafkaPluginConfiguration {
private String bootstrapServers;
private int retries;
private int batchSize;
private int linger;
private int bufferMemory;
private int acks;
private String keySerializer;
private String valueSerializer;
private List<KeyValuePluginProperties> otherProperties;
}

34
extensions/extension-kafka/src/main/resources/KafkaActionDescriptor.json

@ -0,0 +1,34 @@
{
"schema": {
"title": "Kafka Action Configuration",
"type": "object",
"properties": {
"sync": {
"title": "Requires delivery confirmation",
"type": "boolean"
},
"topic": {
"title": "Topic Name",
"type": "string"
},
"template": {
"title": "Body Template",
"type": "string"
}
},
"required": [
"sync",
"topic",
"template"
]
},
"form": [
"sync",
"topic",
{
"key": "template",
"type": "textarea",
"rows": 5
}
]
}

80
extensions/extension-kafka/src/main/resources/KafkaPluginDescriptor.json

@ -0,0 +1,80 @@
{
"schema": {
"title": "Kafka Plugin Configuration",
"type": "object",
"properties": {
"bootstrapServers": {
"title": "Bootstrap Servers",
"type": "string",
"default": "localhost:9092"
},
"retries": {
"title": "Automatically Retry Times If Fails",
"type": "integer",
"default": 0
},
"batchSize": {
"title": "Producer Batch Size On Client",
"type": "integer",
"default": 16384
},
"linger": {
"title": "Time To Buffer Locally Before Sending To Kafka Broker (in ms)",
"type": "integer",
"default": 0
},
"bufferMemory": {
"title": "Buffer Max Size On Client",
"type": "integer",
"default": 33554432
},
"acks": {
"title": "Minimum Number Of Replicas That Must Acknowledge A Write (-1 for 'all')",
"type": "integer",
"default": -1
},
"keySerializer": {
"title": "Key Serializer",
"type": "string",
"default": "org.apache.kafka.common.serialization.StringSerializer"
},
"valueSerializer": {
"title": "Value Serializer",
"type": "string",
"default": "org.apache.kafka.common.serialization.StringSerializer"
},
"otherProperties": {
"title": "Other Kafka properties",
"type": "array",
"items": {
"title": "Kafka property",
"type": "object",
"properties": {
"key": {
"title": "Key",
"type": "string"
},
"value": {
"title": "Value",
"type": "string"
}
}
}
}
},
"required": [
"bootstrapServers"
]
},
"form": [
"bootstrapServers",
"retries",
"batchSize",
"linger",
"bufferMemory",
"acks",
"keySerializer",
"valueSerializer",
"otherProperties"
]
}

83
extensions/extension-kafka/src/test/java/org/thingsboard/server/extensions/kafka/KafkaDemoClient.java

@ -0,0 +1,83 @@
/**
* Copyright © 2016 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.extensions.kafka;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServerStartable;
import org.apache.zookeeper.server.ServerConfig;
import org.apache.zookeeper.server.ZooKeeperServerMain;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import java.io.File;
import java.io.IOException;
import java.util.Properties;
public class KafkaDemoClient {
private static final int ZK_PORT = 2222;
private static final String HOSTNAME = "localhost";
private static final String ZOOKEEPER_CONNECT = HOSTNAME + ":" + ZK_PORT;
private static final int KAFKA_PORT = 9092;
private static final int BROKER_ID = 1;
public static void main(String[] args) {
try {
startZkLocal();
startKafkaLocal();
} catch (Exception e) {
System.out.println("Error running local Kafka broker");
e.printStackTrace(System.out);
}
}
private static void startZkLocal() throws Exception {
final File zkTmpDir = File.createTempFile("zookeeper", "test");
if (zkTmpDir.delete() && zkTmpDir.mkdir()) {
Properties zkProperties = new Properties();
zkProperties.setProperty("dataDir", zkTmpDir.getAbsolutePath());
zkProperties.setProperty("clientPort", String.valueOf(ZK_PORT));
ServerConfig configuration = new ServerConfig();
QuorumPeerConfig quorumConfiguration = new QuorumPeerConfig();
quorumConfiguration.parseProperties(zkProperties);
configuration.readFrom(quorumConfiguration);
new Thread() {
public void run() {
try {
new ZooKeeperServerMain().runFromConfig(configuration);
} catch (IOException e) {
System.out.println("Start of Local ZooKeeper Failed");
e.printStackTrace(System.err);
}
}
}.start();
} else {
System.out.println("Failed to delete or create data dir for Zookeeper");
}
}
private static void startKafkaLocal() {
Properties kafkaProperties = new Properties();
kafkaProperties.setProperty("host.name", HOSTNAME);
kafkaProperties.setProperty("port", String.valueOf(KAFKA_PORT));
kafkaProperties.setProperty("broker.id", String.valueOf(BROKER_ID));
kafkaProperties.setProperty("zookeeper.connect", ZOOKEEPER_CONNECT);
KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties);
KafkaServerStartable kafka = new KafkaServerStartable(kafkaConfig);
kafka.startup();
}
}

139
extensions/extension-rabbitmq/pom.xml

@ -0,0 +1,139 @@
<!--
Copyright © 2016 The Thingsboard Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.thingsboard.server</groupId>
<version>0.0.1-SNAPSHOT</version>
<artifactId>extensions</artifactId>
</parent>
<groupId>org.thingsboard.server.extensions</groupId>
<artifactId>extension-rabbitmq</artifactId>
<packaging>jar</packaging>
<name>Thingsboard Server RabbitMQ Extension</name>
<url>http://thingsboard.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<main.dir>${basedir}/../..</main.dir>
</properties>
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
</dependency>
<dependency>
<groupId>org.thingsboard.server</groupId>
<artifactId>extensions-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.thingsboard.server</groupId>
<artifactId>extensions-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.velocity</groupId>
<artifactId>velocity</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.velocity</groupId>
<artifactId>velocity-tools</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptors>
<descriptor>src/assembly/extension.xml</descriptor>
</descriptors>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

34
extensions/extension-rabbitmq/src/assembly/extension.xml

@ -0,0 +1,34 @@
<!--
Copyright © 2016 The Thingsboard Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<assembly xmlns="http://maven.apache.org/ASSEMBLY/2.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/ASSEMBLY/2.0.0 http://maven.apache.org/xsd/assembly-2.0.0.xsd">
<id>extension</id>
<formats>
<format>jar</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<dependencySets>
<dependencySet>
<outputDirectory>/</outputDirectory>
<useProjectArtifact>true</useProjectArtifact>
<unpack>true</unpack>
<scope>runtime</scope>
</dependencySet>
</dependencySets>
</assembly>

31
extensions/extension-rabbitmq/src/main/java/org/thingsboard/server/extensions/rabbitmq/action/RabbitMqActionMsg.java

@ -0,0 +1,31 @@
/**
* Copyright © 2016 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.extensions.rabbitmq.action;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.extensions.api.plugins.msg.AbstractRuleToPluginMsg;
/**
* @author Andrew Shvayka
*/
public class RabbitMqActionMsg extends AbstractRuleToPluginMsg<RabbitMqActionPayload> {
public RabbitMqActionMsg(TenantId tenantId, CustomerId customerId, DeviceId deviceId, RabbitMqActionPayload payload) {
super(tenantId, customerId, deviceId, payload);
}
}

39
extensions/extension-rabbitmq/src/main/java/org/thingsboard/server/extensions/rabbitmq/action/RabbitMqActionPayload.java

@ -0,0 +1,39 @@
/**
* Copyright © 2016 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.extensions.rabbitmq.action;
import lombok.Builder;
import lombok.Data;
import org.thingsboard.server.common.msg.session.MsgType;
import java.io.Serializable;
/**
* @author Andrew Shvayka
*/
@Data
@Builder
public class RabbitMqActionPayload implements Serializable {
private final String exchange;
private final String queueName;
private final String messageProperties;
private final String payload;
private final boolean sync;
private final Integer requestId;
private final MsgType msgType;
}

49
extensions/extension-rabbitmq/src/main/java/org/thingsboard/server/extensions/rabbitmq/action/RabbitMqPluginAction.java

@ -0,0 +1,49 @@
/**
* Copyright © 2016 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.extensions.rabbitmq.action;
import org.thingsboard.server.common.msg.device.ToDeviceActorMsg;
import org.thingsboard.server.common.msg.session.FromDeviceRequestMsg;
import org.thingsboard.server.extensions.api.component.Action;
import org.thingsboard.server.extensions.api.plugins.msg.RuleToPluginMsg;
import org.thingsboard.server.extensions.api.rules.RuleContext;
import org.thingsboard.server.extensions.core.action.template.AbstractTemplatePluginAction;
import java.util.Optional;
/**
* @author Andrew Shvayka
*/
@Action(name = "RabbitMQ Plugin Action",
descriptor = "RabbitMqActionDescriptor.json", configuration = RabbitMqPluginActionConfiguration.class)
public class RabbitMqPluginAction extends AbstractTemplatePluginAction<RabbitMqPluginActionConfiguration> {
@Override
protected Optional<RuleToPluginMsg<?>> buildRuleToPluginMsg(RuleContext ctx, ToDeviceActorMsg msg, FromDeviceRequestMsg payload) {
RabbitMqActionPayload.RabbitMqActionPayloadBuilder builder = RabbitMqActionPayload.builder();
builder.sync(configuration.isSync());
builder.exchange(configuration.getExchange());
builder.queueName(configuration.getQueueName());
builder.messageProperties(configuration.getMessageProperties()[0]);
builder.msgType(payload.getMsgType());
builder.requestId(payload.getRequestId());
builder.payload(getMsgBody(ctx, msg));
return Optional.of(new RabbitMqActionMsg(msg.getTenantId(),
msg.getCustomerId(),
msg.getDeviceId(),
builder.build()));
}
}

32
extensions/extension-rabbitmq/src/main/java/org/thingsboard/server/extensions/rabbitmq/action/RabbitMqPluginActionConfiguration.java

@ -0,0 +1,32 @@
/**
* Copyright © 2016 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.extensions.rabbitmq.action;
import lombok.Data;
import org.thingsboard.server.extensions.core.action.template.TemplateActionConfiguration;
/**
* @author Andrew Shvayka
*/
@Data
public class RabbitMqPluginActionConfiguration implements TemplateActionConfiguration{
private boolean sync;
private String exchange;
private String queueName;
private String[] messageProperties;
private String template;
}

86
extensions/extension-rabbitmq/src/main/java/org/thingsboard/server/extensions/rabbitmq/plugin/RabbitMqMsgHandler.java

@ -0,0 +1,86 @@
/**
* Copyright © 2016 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.extensions.rabbitmq.plugin;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import lombok.RequiredArgsConstructor;
import org.thingsboard.server.common.data.id.RuleId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.core.BasicStatusCodeResponse;
import org.thingsboard.server.extensions.api.plugins.PluginContext;
import org.thingsboard.server.extensions.api.plugins.handlers.RuleMsgHandler;
import org.thingsboard.server.extensions.api.plugins.msg.ResponsePluginToRuleMsg;
import org.thingsboard.server.extensions.api.plugins.msg.RuleToPluginMsg;
import org.thingsboard.server.extensions.api.rules.RuleException;
import org.thingsboard.server.extensions.rabbitmq.action.RabbitMqActionMsg;
import org.thingsboard.server.extensions.rabbitmq.action.RabbitMqActionPayload;
import java.io.IOException;
import java.nio.charset.Charset;
/**
* @author Andrew Shvayka
*/
@RequiredArgsConstructor
public class RabbitMqMsgHandler implements RuleMsgHandler {
private static final Charset UTF8 = Charset.forName("UTF-8");
private final Channel channel;
@Override
public void process(PluginContext ctx, TenantId tenantId, RuleId ruleId, RuleToPluginMsg<?> msg) throws RuleException {
if (!(msg instanceof RabbitMqActionMsg)) {
throw new RuleException("Unsupported message type " + msg.getClass().getName() + "!");
}
RabbitMqActionPayload payload = ((RabbitMqActionMsg) msg).getPayload();
AMQP.BasicProperties properties = convert(payload.getMessageProperties());
try {
channel.basicPublish(
payload.getExchange() != null ? payload.getExchange() : "",
payload.getQueueName(),
properties,
payload.getPayload().getBytes(UTF8));
if (payload.isSync()) {
ctx.reply(new ResponsePluginToRuleMsg(msg.getUid(), tenantId, ruleId,
BasicStatusCodeResponse.onSuccess(payload.getMsgType(), payload.getRequestId())));
}
} catch (IOException e) {
throw new RuleException(e.getMessage(), e);
}
}
private static AMQP.BasicProperties convert(String name) throws RuleException {
switch (name) {
case "BASIC":
return MessageProperties.BASIC;
case "TEXT_PLAIN":
return MessageProperties.TEXT_PLAIN;
case "MINIMAL_BASIC":
return MessageProperties.MINIMAL_BASIC;
case "MINIMAL_PERSISTENT_BASIC":
return MessageProperties.MINIMAL_PERSISTENT_BASIC;
case "PERSISTENT_BASIC":
return MessageProperties.PERSISTENT_BASIC;
case "PERSISTENT_TEXT_PLAIN":
return MessageProperties.PERSISTENT_TEXT_PLAIN;
default:
throw new RuleException("Message Properties: '" + name + "' is undefined!");
}
}
}

109
extensions/extension-rabbitmq/src/main/java/org/thingsboard/server/extensions/rabbitmq/plugin/RabbitMqPlugin.java

@ -0,0 +1,109 @@
/**
* Copyright © 2016 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.extensions.rabbitmq.plugin;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.thingsboard.server.extensions.api.component.Plugin;
import org.thingsboard.server.extensions.api.plugins.AbstractPlugin;
import org.thingsboard.server.extensions.api.plugins.PluginContext;
import org.thingsboard.server.extensions.api.plugins.handlers.RuleMsgHandler;
import org.thingsboard.server.extensions.rabbitmq.action.RabbitMqPluginAction;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.stream.Collectors;
/**
* @author Andrew Shvayka
*/
@Plugin(name = "RabbitMQ Plugin", actions = {RabbitMqPluginAction.class},
descriptor = "RabbitMqPluginDescriptor.json", configuration = RabbitMqPluginConfiguration.class)
@Slf4j
public class RabbitMqPlugin extends AbstractPlugin<RabbitMqPluginConfiguration> {
private ConnectionFactory factory;
private Connection connection;
private RabbitMqMsgHandler handler;
@Override
public void init(RabbitMqPluginConfiguration configuration) {
factory = new ConnectionFactory();
factory.setHost(configuration.getHost());
factory.setPort(configuration.getPort());
set(configuration.getVirtualHost(), factory::setVirtualHost);
set(configuration.getUserName(), factory::setUsername);
set(configuration.getPassword(), factory::setPassword);
set(configuration.getAutomaticRecoveryEnabled(), factory::setAutomaticRecoveryEnabled);
set(configuration.getConnectionTimeout(), factory::setConnectionTimeout);
set(configuration.getHandshakeTimeout(), factory::setHandshakeTimeout);
set(configuration.getClientProperties(), props -> {
factory.setClientProperties(props.stream().collect(Collectors.toMap(
RabbitMqPluginConfiguration.RabbitMqPluginProperties::getKey,
RabbitMqPluginConfiguration.RabbitMqPluginProperties::getValue)));
});
init();
}
private <T> void set(T source, Consumer<T> setter) {
if (source != null && !StringUtils.isEmpty(source.toString())) {
setter.accept(source);
}
}
private void init() {
try {
this.connection = factory.newConnection();
this.handler = new RabbitMqMsgHandler(connection.createChannel());
} catch (IOException | TimeoutException e) {
throw new RuntimeException(e);
}
}
private void destroy() {
try {
this.handler = null;
this.connection.close();
} catch (Exception e) {
log.info("Failed to close connection during destroy()", e);
}
}
@Override
protected RuleMsgHandler getRuleMsgHandler() {
return handler;
}
@Override
public void resume(PluginContext ctx) {
init();
}
@Override
public void suspend(PluginContext ctx) {
destroy();
}
@Override
public void stop(PluginContext ctx) {
destroy();
}
}

47
extensions/extension-rabbitmq/src/main/java/org/thingsboard/server/extensions/rabbitmq/plugin/RabbitMqPluginConfiguration.java

@ -0,0 +1,47 @@
/**
* Copyright © 2016 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.extensions.rabbitmq.plugin;
import lombok.Data;
import java.util.List;
/**
* @author Andrew Shvayka
*/
@Data
public class RabbitMqPluginConfiguration {
private String host;
private int port;
private String virtualHost;
private String userName;
private String password;
private Boolean automaticRecoveryEnabled;
private Integer connectionTimeout;
private Integer handshakeTimeout;
private List<RabbitMqPluginProperties> clientProperties;
@Data
public static class RabbitMqPluginProperties {
private String key;
private String value;
}
}

78
extensions/extension-rabbitmq/src/main/resources/RabbitMqActionDescriptor.json

@ -0,0 +1,78 @@
{
"schema": {
"title": "RabbitMQ Action Configuration",
"type": "object",
"properties": {
"sync": {
"title": "Requires delivery confirmation",
"type": "boolean"
},
"exchange": {
"title": "Exchange",
"type": "string",
"default": ""
},
"queueName": {
"title": "Queue Name",
"type": "string"
},
"messageProperties": {
"title": "Message properties",
"type": "array",
"minItems" : 1,
"items": [
{
"value": "BASIC",
"label": "BASIC"
},
{
"value": "MINIMAL_BASIC",
"label": "MINIMAL_BASIC"
},
{
"value": "MINIMAL_PERSISTENT_BASIC",
"label": "MINIMAL_PERSISTENT_BASIC"
},
{
"value": "PERSISTENT_BASIC",
"label": "PERSISTENT_BASIC"
},
{
"value": "PERSISTENT_TEXT_PLAIN",
"label": "PERSISTENT_TEXT_PLAIN"
},
{
"value": "TEXT_PLAIN",
"label": "TEXT_PLAIN"
}
],
"uniqueItems": true
},
"template": {
"title": "Body Template",
"type": "string"
}
},
"required": [
"sync",
"queueName",
"messageProperties",
"template"
]
},
"form": [
"sync",
"exchange",
"queueName",
{
"key": "messageProperties",
"type": "rc-select",
"multiple": false
},
{
"key": "template",
"type": "textarea",
"rows": 5
}
]
}

79
extensions/extension-rabbitmq/src/main/resources/RabbitMqPluginDescriptor.json

@ -0,0 +1,79 @@
{
"schema": {
"title": "RabbitMQ Plugin Configuration",
"type": "object",
"properties": {
"host": {
"title": "Host",
"type": "string"
},
"port": {
"title": "Port",
"type": "integer",
"default": 5672,
"minimum": 0,
"maximum": 65536
},
"virtualHost": {
"title": "Virtual Host",
"type": "string"
},
"userName": {
"title": "Username",
"type": "string"
},
"password": {
"title": "Password",
"type": "string"
},
"automaticRecoveryEnabled": {
"title": "Automatic Recovery Enabled",
"type": "boolean"
},
"connectionTimeout": {
"title": "Connection Timeout",
"type": "integer"
},
"handshakeTimeout": {
"title": "Handshake Timeout",
"type": "integer"
},
"clientProperties": {
"title": "Client properties",
"type": "array",
"items": {
"title": "Client property",
"type": "object",
"properties": {
"key": {
"title": "Key",
"type": "string"
},
"value": {
"title": "Value",
"type": "string"
}
}
}
}
},
"required": [
"host",
"port"
]
},
"form": [
"host",
"port",
"virtualHost",
"userName",
{
"key": "password",
"type": "password"
},
"automaticRecoveryEnabled",
"connectionTimeout",
"handshakeTimeout",
"clientProperties"
]
}

56
extensions/extension-rabbitmq/src/test/java/org/thingsboard/server/extensions/rabbitmq/DemoClient.java

@ -0,0 +1,56 @@
/**
* Copyright © 2016 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.extensions.rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @author Andrew Shvayka
*/
public class DemoClient {
private static final String HOST = "localhost";
private static final String USERNAME = "guest";
private static final String PASSWORD = "guest";
private static final String QUEUE_NAME = "queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages.");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}

98
extensions/extension-rest-api-call/pom.xml

@ -0,0 +1,98 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright © 2016 The Thingsboard Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.thingsboard.server</groupId>
<version>0.0.1-SNAPSHOT</version>
<artifactId>extensions</artifactId>
</parent>
<groupId>org.thingsboard.server.extensions</groupId>
<artifactId>extension-rest-api-call</artifactId>
<packaging>jar</packaging>
<name>Thingsboard Server REST API Call Extension</name>
<url>http://thingsboard.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<main.dir>${basedir}/../..</main.dir>
</properties>
<dependencies>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.thingsboard.server</groupId>
<artifactId>extensions-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.thingsboard.server</groupId>
<artifactId>extensions-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.velocity</groupId>
<artifactId>velocity</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.velocity</groupId>
<artifactId>velocity-tools</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptors>
<descriptor>src/assembly/extension.xml</descriptor>
</descriptors>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

34
extensions/extension-rest-api-call/src/assembly/extension.xml

@ -0,0 +1,34 @@
<!--
Copyright © 2016 The Thingsboard Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<assembly xmlns="http://maven.apache.org/ASSEMBLY/2.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/ASSEMBLY/2.0.0 http://maven.apache.org/xsd/assembly-2.0.0.xsd">
<id>extension</id>
<formats>
<format>jar</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<dependencySets>
<dependencySet>
<outputDirectory>/</outputDirectory>
<useProjectArtifact>true</useProjectArtifact>
<unpack>true</unpack>
<scope>runtime</scope>
</dependencySet>
</dependencySets>
</assembly>

28
extensions/extension-rest-api-call/src/main/java/org/thingsboard/server/extensions/rest/action/RestApiCallActionMsg.java

@ -0,0 +1,28 @@
/**
* Copyright © 2016 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.extensions.rest.action;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.extensions.api.plugins.msg.AbstractRuleToPluginMsg;
public class RestApiCallActionMsg extends AbstractRuleToPluginMsg<RestApiCallActionPayload> {
public RestApiCallActionMsg(TenantId tenantId, CustomerId customerId, DeviceId deviceId, RestApiCallActionPayload payload) {
super(tenantId, customerId, deviceId, payload);
}
}

37
extensions/extension-rest-api-call/src/main/java/org/thingsboard/server/extensions/rest/action/RestApiCallActionPayload.java

@ -0,0 +1,37 @@
/**
* Copyright © 2016 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.extensions.rest.action;
import lombok.Builder;
import lombok.Data;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.thingsboard.server.common.msg.session.MsgType;
import java.io.Serializable;
@Data
@Builder
public class RestApiCallActionPayload implements Serializable {
private final String actionPath;
private final String msgBody;
private final HttpMethod httpMethod;
private final HttpStatus expectedResultCode;
private final boolean sync;
private final Integer requestId;
private final MsgType msgType;
}

60
extensions/extension-rest-api-call/src/main/java/org/thingsboard/server/extensions/rest/action/RestApiCallPluginAction.java

@ -0,0 +1,60 @@
/**
* Copyright © 2016 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.extensions.rest.action;
import lombok.extern.slf4j.Slf4j;
import org.apache.velocity.Template;
import org.apache.velocity.VelocityContext;
import org.apache.velocity.runtime.parser.ParseException;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.thingsboard.server.common.msg.device.ToDeviceActorMsg;
import org.thingsboard.server.common.msg.session.FromDeviceRequestMsg;
import org.thingsboard.server.common.msg.session.ToDeviceMsg;
import org.thingsboard.server.extensions.api.component.Action;
import org.thingsboard.server.extensions.api.plugins.PluginAction;
import org.thingsboard.server.extensions.api.plugins.msg.PluginToRuleMsg;
import org.thingsboard.server.extensions.api.plugins.msg.ResponsePluginToRuleMsg;
import org.thingsboard.server.extensions.api.plugins.msg.RuleToPluginMsg;
import org.thingsboard.server.extensions.api.rules.RuleContext;
import org.thingsboard.server.extensions.api.rules.RuleProcessingMetaData;
import org.thingsboard.server.extensions.core.action.template.AbstractTemplatePluginAction;
import org.thingsboard.server.extensions.core.utils.VelocityUtils;
import java.util.Optional;
@Action(name = "REST API Call Plugin Action",
descriptor = "RestApiCallActionDescriptor.json", configuration = RestApiCallPluginActionConfiguration.class)
@Slf4j
public class RestApiCallPluginAction extends AbstractTemplatePluginAction<RestApiCallPluginActionConfiguration> {
@Override
protected Optional<RuleToPluginMsg<?>> buildRuleToPluginMsg(RuleContext ctx, ToDeviceActorMsg msg, FromDeviceRequestMsg payload) {
RestApiCallActionPayload.RestApiCallActionPayloadBuilder builder = RestApiCallActionPayload.builder();
builder.msgType(payload.getMsgType());
builder.requestId(payload.getRequestId());
builder.sync(configuration.isSync());
builder.actionPath(configuration.getActionPath());
builder.httpMethod(HttpMethod.valueOf(configuration.getRequestMethod()[0]));
builder.expectedResultCode(HttpStatus.valueOf(configuration.getExpectedResultCode()));
builder.msgBody(getMsgBody(ctx, msg));
return Optional.of(new RestApiCallActionMsg(msg.getTenantId(),
msg.getCustomerId(),
msg.getDeviceId(),
builder.build()));
}
}

28
extensions/extension-rest-api-call/src/main/java/org/thingsboard/server/extensions/rest/action/RestApiCallPluginActionConfiguration.java

@ -0,0 +1,28 @@
/**
* Copyright © 2016 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.extensions.rest.action;
import lombok.Data;
import org.thingsboard.server.extensions.core.action.template.TemplateActionConfiguration;
@Data
public class RestApiCallPluginActionConfiguration implements TemplateActionConfiguration {
private boolean sync;
private String template;
private String actionPath;
private int expectedResultCode;
private String[] requestMethod;
}

67
extensions/extension-rest-api-call/src/main/java/org/thingsboard/server/extensions/rest/plugin/RestApiCallMsgHandler.java

@ -0,0 +1,67 @@
/**
* Copyright © 2016 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.extensions.rest.plugin;
import lombok.RequiredArgsConstructor;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.ResponseEntity;
import org.springframework.web.client.RestClientException;
import org.springframework.web.client.RestTemplate;
import org.thingsboard.server.common.data.id.RuleId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.core.BasicStatusCodeResponse;
import org.thingsboard.server.extensions.api.plugins.PluginContext;
import org.thingsboard.server.extensions.api.plugins.handlers.RuleMsgHandler;
import org.thingsboard.server.extensions.api.plugins.msg.ResponsePluginToRuleMsg;
import org.thingsboard.server.extensions.api.plugins.msg.RuleToPluginMsg;
import org.thingsboard.server.extensions.api.rules.RuleException;
import org.thingsboard.server.extensions.rest.action.RestApiCallActionMsg;
import org.thingsboard.server.extensions.rest.action.RestApiCallActionPayload;
@RequiredArgsConstructor
public class RestApiCallMsgHandler implements RuleMsgHandler {
private final String baseUrl;
private final HttpHeaders headers;
@Override
public void process(PluginContext ctx, TenantId tenantId, RuleId ruleId, RuleToPluginMsg<?> msg) throws RuleException {
if (!(msg instanceof RestApiCallActionMsg)) {
throw new RuleException("Unsupported message type " + msg.getClass().getName() + "!");
}
RestApiCallActionPayload payload = ((RestApiCallActionMsg)msg).getPayload();
try {
ResponseEntity<String> exchangeResponse = new RestTemplate().exchange(
baseUrl + payload.getActionPath(),
payload.getHttpMethod(),
new HttpEntity<>(payload.getMsgBody(), headers),
String.class);
if (exchangeResponse.getStatusCode().equals(payload.getExpectedResultCode()) && payload.isSync()) {
ctx.reply(new ResponsePluginToRuleMsg(msg.getUid(), tenantId, ruleId,
BasicStatusCodeResponse.onSuccess(payload.getMsgType(), payload.getRequestId())));
} else if(!exchangeResponse.getStatusCode().equals(payload.getExpectedResultCode())) {
throw new RuntimeException("Response Status Code '"
+ exchangeResponse.getStatusCode()
+ "' doesn't equals to Expected Status Code '"
+ payload.getExpectedResultCode() + "'");
}
} catch (RestClientException e) {
throw new RuleException(e.getMessage(), e);
}
}
}

84
extensions/extension-rest-api-call/src/main/java/org/thingsboard/server/extensions/rest/plugin/RestApiCallPlugin.java

@ -0,0 +1,84 @@
/**
* Copyright © 2016 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.extensions.rest.plugin;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpHeaders;
import org.thingsboard.server.extensions.api.component.Plugin;
import org.thingsboard.server.extensions.api.plugins.AbstractPlugin;
import org.thingsboard.server.extensions.api.plugins.PluginContext;
import org.thingsboard.server.extensions.api.plugins.handlers.RuleMsgHandler;
import org.thingsboard.server.extensions.rest.action.RestApiCallPluginAction;
import java.util.Base64;
@Plugin(name = "REST API Call Plugin", actions = {RestApiCallPluginAction.class},
descriptor = "RestApiCallPluginDescriptor.json", configuration = RestApiCallPluginConfiguration.class)
@Slf4j
public class RestApiCallPlugin extends AbstractPlugin<RestApiCallPluginConfiguration> {
private static final String BASIC_AUTH_METHOD = "BASIC_AUTH";
private static final String AUTHORIZATION_HEADER_NAME = "Authorization";
private static final String AUTHORIZATION_HEADER_FORMAT = "Basic %s";
private static final String CREDENTIALS_TEMPLATE = "%s:%s";
private static final String BASE_URL_TEMPLATE = "http://%s:%d%s";
private RestApiCallMsgHandler handler;
private String baseUrl;
private HttpHeaders headers = new HttpHeaders();
@Override
public void init(RestApiCallPluginConfiguration configuration) {
this.baseUrl = String.format(
BASE_URL_TEMPLATE,
configuration.getHost(),
configuration.getPort(),
configuration.getBasePath());
if (configuration.getAuthMethod()[0].equals(BASIC_AUTH_METHOD)) {
String userName = configuration.getUserName();
String password = configuration.getPassword();
String credentials = String.format(CREDENTIALS_TEMPLATE, userName, password);
byte[] token = Base64.getEncoder().encode(credentials.getBytes());
this.headers.add(AUTHORIZATION_HEADER_NAME, String.format(AUTHORIZATION_HEADER_FORMAT, new String(token)));
}
init();
}
private void init() {
this.handler = new RestApiCallMsgHandler(baseUrl, headers);
}
@Override
protected RuleMsgHandler getRuleMsgHandler() {
return handler;
}
@Override
public void resume(PluginContext ctx) {
init();
}
@Override
public void suspend(PluginContext ctx) {
log.debug("Suspend method was called, but no impl provided!");
}
@Override
public void stop(PluginContext ctx) {
log.debug("Stop method was called, but no impl provided!");
}
}

30
extensions/extension-rest-api-call/src/main/java/org/thingsboard/server/extensions/rest/plugin/RestApiCallPluginConfiguration.java

@ -0,0 +1,30 @@
/**
* Copyright © 2016 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.extensions.rest.plugin;
import lombok.Data;
@Data
public class RestApiCallPluginConfiguration {
private String host;
private int port;
private String basePath;
private String[] authMethod;
private String userName;
private String password;
}

63
extensions/extension-rest-api-call/src/main/resources/RestApiCallActionDescriptor.json

@ -0,0 +1,63 @@
{
"schema": {
"title": "REST API Call Action Configuration",
"type": "object",
"properties": {
"sync": {
"title": "Requires delivery confirmation",
"type": "boolean"
},
"template": {
"title": "Body Template",
"type": "string"
},
"actionPath": {
"title": "Action Path",
"type": "string",
"default": "/"
},
"requestMethod": {
"title": "Request method",
"type": "array",
"minItems" : 1,
"items": [
{
"value": "POST",
"label": "POST"
},
{
"value": "PUT",
"label": "PUT"
}
],
"uniqueItems": true
},
"expectedResultCode": {
"title": "Expected Result Code",
"type": "integer"
}
},
"required": [
"sync",
"template",
"actionPath",
"expectedResultCode",
"requestMethod"
]
},
"form": [
"sync",
{
"key": "template",
"type": "textarea",
"rows": 5
},
"actionPath",
{
"key": "requestMethod",
"type": "rc-select",
"multiple": false
},
"expectedResultCode"
]
}

69
extensions/extension-rest-api-call/src/main/resources/RestApiCallPluginDescriptor.json

@ -0,0 +1,69 @@
{
"schema": {
"title": "REST API Call Plugin Configuration",
"type": "object",
"properties": {
"host": {
"title": "Host",
"type": "string"
},
"port": {
"title": "Port",
"type": "integer",
"default": 8080,
"minimum": 0,
"maximum": 65536
},
"basePath": {
"title": "Base Path",
"type": "string",
"default": "/"
},
"authMethod": {
"title": "Authentication method",
"type": "array",
"minItems" : 1,
"items": [
{
"value": "NO_AUTH",
"label": "No authentication"
},
{
"value": "BASIC_AUTH",
"label": "Basic authentication"
}
],
"uniqueItems": true
},
"userName": {
"title": "Username",
"type": "string"
},
"password": {
"title": "Password",
"type": "string"
}
},
"required": [
"host",
"port",
"basePath",
"authMethod"
]
},
"form": [
"host",
"port",
"basePath",
{
"key": "authMethod",
"type": "rc-select",
"multiple": false
},
"userName",
{
"key": "password",
"type": "password"
}
]
}

70
extensions/extension-rest-api-call/src/test/java/org/thingsboard/server/extensions/kafka/RestApiCallDemoClient.java

@ -0,0 +1,70 @@
/**
* Copyright © 2016 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.extensions.rest;
import com.sun.net.httpserver.*;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.util.stream.Collectors;
public class RestApiCallDemoClient {
private static final String DEMO_REST_BASIC_AUTH = "/demo-rest-basic-auth";
private static final String DEMO_REST_NO_AUTH = "/demo-rest-no-auth";
private static final String USERNAME = "demo";
private static final String PASSWORD = "demo";
private static final int HTTP_SERVER_PORT = 8888;
public static void main(String[] args) throws IOException {
HttpServer server = HttpServer.create(new InetSocketAddress(HTTP_SERVER_PORT), 0);
HttpContext secureContext = server.createContext(DEMO_REST_BASIC_AUTH, new RestDemoHandler());
secureContext.setAuthenticator(new BasicAuthenticator("demo-auth") {
@Override
public boolean checkCredentials(String user, String pwd) {
return user.equals(USERNAME) && pwd.equals(PASSWORD);
}
});
server.createContext(DEMO_REST_NO_AUTH, new RestDemoHandler());
server.setExecutor(null);
System.out.println("[*] Waiting for messages.");
server.start();
}
private static class RestDemoHandler implements HttpHandler {
@Override
public void handle(HttpExchange exchange) throws IOException {
String requestBody;
try (BufferedReader br = new BufferedReader(new InputStreamReader(exchange.getRequestBody(), "utf-8"))) {
requestBody = br.lines().collect(Collectors.joining(System.lineSeparator()));
}
System.out.println("[x] Received body: \n" + requestBody);
String response = "Hello from demo client!";
exchange.sendResponseHeaders(200, response.length());
System.out.println("[x] Sending response: \n" + response);
OutputStream os = exchange.getResponseBody();
os.write(response.getBytes());
os.close();
}
}
}

43
extensions/pom.xml

@ -0,0 +1,43 @@
<!--
Copyright © 2016 The Thingsboard Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.thingsboard</groupId>
<version>0.0.1-SNAPSHOT</version>
<artifactId>server</artifactId>
</parent>
<groupId>org.thingsboard.server</groupId>
<artifactId>extensions</artifactId>
<packaging>pom</packaging>
<name>Thingsboard Extensions</name>
<url>http://thingsboard.org</url>
<properties>
<main.dir>${basedir}/..</main.dir>
</properties>
<modules>
<module>extension-rabbitmq</module>
<module>extension-rest-api-call</module>
<module>extension-kafka</module>
</modules>
</project>
Loading…
Cancel
Save