diff --git a/dao/src/main/java/org/thingsboard/server/dao/cache/PreviousDeviceCredentialsIdKeyGenerator.java b/dao/src/main/java/org/thingsboard/server/dao/cache/PreviousDeviceCredentialsIdKeyGenerator.java new file mode 100644 index 0000000000..3893662280 --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/cache/PreviousDeviceCredentialsIdKeyGenerator.java @@ -0,0 +1,38 @@ +/** + * Copyright © 2016 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.cache; + +import org.springframework.cache.interceptor.KeyGenerator; +import org.thingsboard.server.common.data.security.DeviceCredentials; +import org.thingsboard.server.dao.device.DeviceCredentialsService; + +import java.lang.reflect.Method; + +public class PreviousDeviceCredentialsIdKeyGenerator implements KeyGenerator { + + @Override + public Object generate(Object o, Method method, Object... objects) { + DeviceCredentialsService deviceCredentialsService = (DeviceCredentialsService) o; + DeviceCredentials deviceCredentials = (DeviceCredentials) objects[0]; + if (deviceCredentials.getDeviceId() != null) { + DeviceCredentials oldDeviceCredentials = deviceCredentialsService.findDeviceCredentialsByDeviceId(deviceCredentials.getDeviceId()); + if (oldDeviceCredentials != null) { + return oldDeviceCredentials.getCredentialsId(); + } + } + return null; + } +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/cache/ServiceCacheConfiguration.java b/dao/src/main/java/org/thingsboard/server/dao/cache/ServiceCacheConfiguration.java new file mode 100644 index 0000000000..e45084e4d4 --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/cache/ServiceCacheConfiguration.java @@ -0,0 +1,88 @@ +/** + * Copyright © 2016 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.cache; + +import com.hazelcast.config.Config; +import com.hazelcast.config.DiscoveryStrategyConfig; +import com.hazelcast.config.MapConfig; +import com.hazelcast.config.MaxSizeConfig; +import com.hazelcast.core.Hazelcast; +import com.hazelcast.core.HazelcastInstance; +import com.hazelcast.instance.GroupProperty; +import com.hazelcast.spring.cache.HazelcastCacheManager; +import com.hazelcast.zookeeper.ZookeeperDiscoveryProperties; +import com.hazelcast.zookeeper.ZookeeperDiscoveryStrategyFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.cache.CacheManager; +import org.springframework.cache.annotation.EnableCaching; +import org.springframework.cache.interceptor.KeyGenerator; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.thingsboard.server.common.data.CacheConstants; + +@Configuration +@EnableCaching +@ConditionalOnProperty(prefix = "cache", value = "enabled", havingValue = "true") +public class ServiceCacheConfiguration { + + private static final String HAZELCAST_CLUSTER_NAME = "hazelcast"; + + @Value("${cache.device_credentials.max_size}") + private Integer deviceCredentialsCacheMaxSize; + @Value("${cache.device_credentials.time_to_live}") + private Integer deviceCredentialsCacheTTL; + + @Value("${zk.enabled}") + private boolean zkEnabled; + @Value("${zk.url}") + private String zkUrl; + @Value("${zk.zk_dir}") + private String zkDir; + + @Bean + public HazelcastInstance hazelcastInstance() { + Config config = new Config(); + + if (zkEnabled) { + config.getNetworkConfig().getJoin().getMulticastConfig().setEnabled(false); + + config.setProperty(GroupProperty.DISCOVERY_SPI_ENABLED.getName(), Boolean.TRUE.toString()); + DiscoveryStrategyConfig discoveryStrategyConfig = new DiscoveryStrategyConfig(new ZookeeperDiscoveryStrategyFactory()); + discoveryStrategyConfig.addProperty(ZookeeperDiscoveryProperties.ZOOKEEPER_URL.key(), zkUrl); + discoveryStrategyConfig.addProperty(ZookeeperDiscoveryProperties.ZOOKEEPER_PATH.key(), zkDir); + discoveryStrategyConfig.addProperty(ZookeeperDiscoveryProperties.GROUP.key(), HAZELCAST_CLUSTER_NAME); + config.getNetworkConfig().getJoin().getDiscoveryConfig().addDiscoveryStrategyConfig(discoveryStrategyConfig); + } + + MapConfig deviceCredentialsCacheConfig = new MapConfig(CacheConstants.DEVICE_CREDENTIALS_CACHE); + deviceCredentialsCacheConfig.setTimeToLiveSeconds(deviceCredentialsCacheTTL); + deviceCredentialsCacheConfig.setMaxSizeConfig(new MaxSizeConfig(deviceCredentialsCacheMaxSize, MaxSizeConfig.MaxSizePolicy.PER_NODE)); + config.addMapConfig(deviceCredentialsCacheConfig); + + return Hazelcast.newHazelcastInstance(config); + } + + @Bean + public KeyGenerator previousDeviceCredentialsId() { + return new PreviousDeviceCredentialsIdKeyGenerator(); + } + + @Bean + public CacheManager cacheManager() { + return new HazelcastCacheManager(hazelcastInstance()); + } +} diff --git a/docker/db-schema.env b/docker/db-schema.env new file mode 100644 index 0000000000..c8d2bd83d8 --- /dev/null +++ b/docker/db-schema.env @@ -0,0 +1,5 @@ +#Db schema configuration + +SKIP_SCHEMA_CREATION=false +SKIP_SYSTEM_DATA=false +SKIP_DEMO_DATA=false \ No newline at end of file diff --git a/docker/db-schema/Dockerfile b/docker/db-schema/Dockerfile new file mode 100644 index 0000000000..12e7dc74ef --- /dev/null +++ b/docker/db-schema/Dockerfile @@ -0,0 +1,26 @@ +# +# Copyright © 2016 The Thingsboard Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +FROM cassandra:3.9 + +ADD install_schema.sh /root/install_schema.sh + +RUN apt-get update \ + && apt-get install -y nmap + +RUN chmod +x /root/install_schema.sh + +WORKDIR /root diff --git a/docker/db-schema/install_schema.sh b/docker/db-schema/install_schema.sh new file mode 100644 index 0000000000..b642f8c46e --- /dev/null +++ b/docker/db-schema/install_schema.sh @@ -0,0 +1,53 @@ +#!/bin/bash +# +# Copyright © 2016 The Thingsboard Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +until nmap db -p 9042 | grep "9042/tcp open" +do + echo "Wait for Cassandra..." + sleep 10 +done + +if [ "$SKIP_SCHEMA_CREATION" == "false" ]; then + echo "Creating 'Thingsboard' keyspace..." + cqlsh db -f /root/schema.cql + if [ "$?" -eq 0 ]; then + echo "'Thingsboard' keyspace was successfully created!" + else + echo "There were issues while creating 'Thingsboard' keyspace!" + fi +fi + +if [ "$SKIP_SYSTEM_DATA" == "false" ]; then + echo "Adding system data..." + cqlsh db -f /root/system-data.cql + if [ "$?" -eq 0 ]; then + echo "System data was successfully added!" + else + echo "There were issues while adding System data!" + fi +fi + +if [ "$SKIP_DEMO_DATA" == "false" ]; then + echo "Adding demo data..." + cqlsh db -f /root/demo-data.cql + if [ "$?" -eq 0 ]; then + echo "Demo data was successfully added!" + else + echo "There were issues while adding Demo data!" + fi +fi diff --git a/docker/deploy.sh b/docker/deploy.sh new file mode 100755 index 0000000000..14062268aa --- /dev/null +++ b/docker/deploy.sh @@ -0,0 +1,31 @@ +#!/bin/bash +# +# Copyright © 2016 The Thingsboard Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +command='docker-compose -f docker-compose.yml -f docker-compose.random.yml' + +echo "stopping images.." +$command stop + +echo "removing stopped images.." +$command rm -f + +echo "building images.." +$command build + +echo "starting images..." +$command up -d diff --git a/docker/deploy_cassandra_zookeeper.sh b/docker/deploy_cassandra_zookeeper.sh new file mode 100755 index 0000000000..6c4cc50592 --- /dev/null +++ b/docker/deploy_cassandra_zookeeper.sh @@ -0,0 +1,31 @@ +#!/bin/bash +# +# Copyright © 2016 The Thingsboard Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +command='docker-compose -f docker-compose.yml -f docker-compose.static.yml' + +echo "stopping images.." +$command stop + +echo "removing stopped images.." +$command rm -f + +echo "building images.." +$command build + +echo "starting cassandra, zookeeper, db-schema images..." +$command up -d cassandra zookeeper db-schema diff --git a/docker/docker-compose.random.yml b/docker/docker-compose.random.yml new file mode 100644 index 0000000000..9b5190142a --- /dev/null +++ b/docker/docker-compose.random.yml @@ -0,0 +1,26 @@ +# +# Copyright © 2016 The Thingsboard Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +version: '2' + +services: + cassandra: + ports: + - "9042" + - "9160" + zookeeper: + ports: + - "2181" diff --git a/docker/docker-compose.static.yml b/docker/docker-compose.static.yml new file mode 100644 index 0000000000..bdaf4eb99c --- /dev/null +++ b/docker/docker-compose.static.yml @@ -0,0 +1,26 @@ +# +# Copyright © 2016 The Thingsboard Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +version: '2' + +services: + cassandra: + ports: + - "9042:9042" + - "9160:9160" + zookeeper: + ports: + - "2181:2181" diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml new file mode 100644 index 0000000000..3dcfb6249f --- /dev/null +++ b/docker/docker-compose.yml @@ -0,0 +1,52 @@ +# +# Copyright © 2016 The Thingsboard Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +version: '2' + +services: + thingsboard: + build: thingsboard + ports: + - "8080:8080" + - "1883:1883" + - "5683:5683" + links: + - cassandra:db + - zookeeper:zk + - db-schema:db-schema + volumes: + - "../application/target/thingsboard.deb:/root/thingsboard.deb" + env_file: + - thingsboard.env + entrypoint: ./run_web_app.sh + db-schema: + build: db-schema + links: + - cassandra:db + env_file: + - db-schema.env + volumes: + - "../dao/src/main/resources/schema.cql:/root/schema.cql" + - "../dao/src/main/resources/demo-data.cql:/root/demo-data.cql" + - "../dao/src/main/resources/system-data.cql:/root/system-data.cql" + entrypoint: ./install_schema.sh + cassandra: + image: "cassandra:3.9" + volumes: + - "${CASSANDRA_DATA_DIR}:/var/lib/cassandra" + zookeeper: + image: "zookeeper:3.4.9" + restart: always diff --git a/docker/thingsboard.env b/docker/thingsboard.env new file mode 100644 index 0000000000..2325790cd6 --- /dev/null +++ b/docker/thingsboard.env @@ -0,0 +1,8 @@ +#Thingsboard server configuration + +CASSANDRA_URL=db:9042 +ZOOKEEPER_URL=zk:2181 +MQTT_BIND_ADDRESS=0.0.0.0 +MQTT_BIND_PORT=1883 +COAP_BIND_ADDRESS=0.0.0.0 +COAP_BIND_PORT=5683 \ No newline at end of file diff --git a/docker/thingsboard/Dockerfile b/docker/thingsboard/Dockerfile new file mode 100644 index 0000000000..ee6acd4305 --- /dev/null +++ b/docker/thingsboard/Dockerfile @@ -0,0 +1,23 @@ +# +# Copyright © 2016 The Thingsboard Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +FROM java:8-jre + +ADD run_web_app.sh /root/run_web_app.sh + +RUN chmod +x /root/run_web_app.sh + +WORKDIR /root diff --git a/docker/thingsboard/run_web_app.sh b/docker/thingsboard/run_web_app.sh new file mode 100755 index 0000000000..f57cac0820 --- /dev/null +++ b/docker/thingsboard/run_web_app.sh @@ -0,0 +1,36 @@ +#!/bin/bash +# +# Copyright © 2016 The Thingsboard Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +dpkg -i /root/thingsboard.deb + +reachable=0 +while [ $reachable -eq 0 ]; +do + echo "db-schema container is still in progress. waiting until it completed..." + sleep 3 + ping -q -c 1 db-schema > /dev/null 2>&1 + if [ "$?" -ne 0 ]; + then + echo "db-schema container completed!" + reachable=1 + fi +done + +echo "Starting 'Thingsboard' service..." +thingsboard start + diff --git a/extensions/extension-kafka/pom.xml b/extensions/extension-kafka/pom.xml new file mode 100644 index 0000000000..23a5e4bbbc --- /dev/null +++ b/extensions/extension-kafka/pom.xml @@ -0,0 +1,97 @@ + + + + 4.0.0 + + org.thingsboard.server + 0.0.1-SNAPSHOT + extensions + + org.thingsboard.server.extensions + extension-kafka + jar + + Thingsboard Server Kafka Extension + http://thingsboard.org + + + UTF-8 + ${basedir}/../.. + + + + + ch.qos.logback + logback-core + provided + + + ch.qos.logback + logback-classic + provided + + + org.thingsboard.server + extensions-api + provided + + + org.thingsboard.server + extensions-core + provided + + + org.apache.velocity + velocity + provided + + + org.apache.velocity + velocity-tools + provided + + + org.apache.kafka + kafka_2.10 + + + + + + maven-assembly-plugin + + + src/assembly/extension.xml + + + + + make-assembly + package + + single + + + + + + + \ No newline at end of file diff --git a/extensions/extension-kafka/src/assembly/extension.xml b/extensions/extension-kafka/src/assembly/extension.xml new file mode 100644 index 0000000000..408fc7a619 --- /dev/null +++ b/extensions/extension-kafka/src/assembly/extension.xml @@ -0,0 +1,39 @@ + + + extension + + jar + + false + + + / + true + true + runtime + + org.apache.zookeeper:zookeeper + org.scala-lang:scala-library + io.netty:netty + + + + \ No newline at end of file diff --git a/extensions/extension-kafka/src/main/java/org/thingsboard/server/extensions/kafka/action/KafkaActionMsg.java b/extensions/extension-kafka/src/main/java/org/thingsboard/server/extensions/kafka/action/KafkaActionMsg.java new file mode 100644 index 0000000000..c06781474b --- /dev/null +++ b/extensions/extension-kafka/src/main/java/org/thingsboard/server/extensions/kafka/action/KafkaActionMsg.java @@ -0,0 +1,28 @@ +/** + * Copyright © 2016 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.extensions.kafka.action; + +import org.thingsboard.server.common.data.id.CustomerId; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.extensions.api.plugins.msg.AbstractRuleToPluginMsg; + +public class KafkaActionMsg extends AbstractRuleToPluginMsg { + + public KafkaActionMsg(TenantId tenantId, CustomerId customerId, DeviceId deviceId, KafkaActionPayload payload) { + super(tenantId, customerId, deviceId, payload); + } +} diff --git a/extensions/extension-kafka/src/main/java/org/thingsboard/server/extensions/kafka/action/KafkaActionPayload.java b/extensions/extension-kafka/src/main/java/org/thingsboard/server/extensions/kafka/action/KafkaActionPayload.java new file mode 100644 index 0000000000..bd723cad50 --- /dev/null +++ b/extensions/extension-kafka/src/main/java/org/thingsboard/server/extensions/kafka/action/KafkaActionPayload.java @@ -0,0 +1,34 @@ +/** + * Copyright © 2016 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.extensions.kafka.action; + +import lombok.Builder; +import lombok.Data; +import org.thingsboard.server.common.msg.session.MsgType; + +import java.io.Serializable; + +@Data +@Builder +public class KafkaActionPayload implements Serializable { + + private final String topic; + private final String msgBody; + private final boolean sync; + + private final Integer requestId; + private final MsgType msgType; +} diff --git a/extensions/extension-kafka/src/main/java/org/thingsboard/server/extensions/kafka/action/KafkaPluginAction.java b/extensions/extension-kafka/src/main/java/org/thingsboard/server/extensions/kafka/action/KafkaPluginAction.java new file mode 100644 index 0000000000..3d05b43aa7 --- /dev/null +++ b/extensions/extension-kafka/src/main/java/org/thingsboard/server/extensions/kafka/action/KafkaPluginAction.java @@ -0,0 +1,45 @@ +/** + * Copyright © 2016 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.extensions.kafka.action; + +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.common.msg.device.ToDeviceActorMsg; +import org.thingsboard.server.common.msg.session.FromDeviceRequestMsg; +import org.thingsboard.server.extensions.api.component.Action; +import org.thingsboard.server.extensions.api.plugins.msg.RuleToPluginMsg; +import org.thingsboard.server.extensions.api.rules.RuleContext; +import org.thingsboard.server.extensions.core.action.template.AbstractTemplatePluginAction; + +import java.util.Optional; + +@Action(name = "Kafka Plugin Action", descriptor = "KafkaActionDescriptor.json", configuration = KafkaPluginActionConfiguration.class) +@Slf4j +public class KafkaPluginAction extends AbstractTemplatePluginAction { + + @Override + protected Optional> buildRuleToPluginMsg(RuleContext ctx, ToDeviceActorMsg msg, FromDeviceRequestMsg payload) { + KafkaActionPayload.KafkaActionPayloadBuilder builder = KafkaActionPayload.builder(); + builder.msgType(payload.getMsgType()); + builder.requestId(payload.getRequestId()); + builder.sync(configuration.isSync()); + builder.topic(configuration.getTopic()); + builder.msgBody(getMsgBody(ctx, msg)); + return Optional.of(new KafkaActionMsg(msg.getTenantId(), + msg.getCustomerId(), + msg.getDeviceId(), + builder.build())); + } +} diff --git a/extensions/extension-kafka/src/main/java/org/thingsboard/server/extensions/kafka/action/KafkaPluginActionConfiguration.java b/extensions/extension-kafka/src/main/java/org/thingsboard/server/extensions/kafka/action/KafkaPluginActionConfiguration.java new file mode 100644 index 0000000000..f748ba2662 --- /dev/null +++ b/extensions/extension-kafka/src/main/java/org/thingsboard/server/extensions/kafka/action/KafkaPluginActionConfiguration.java @@ -0,0 +1,26 @@ +/** + * Copyright © 2016 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.extensions.kafka.action; + +import lombok.Data; +import org.thingsboard.server.extensions.core.action.template.TemplateActionConfiguration; + +@Data +public class KafkaPluginActionConfiguration implements TemplateActionConfiguration { + private boolean sync; + private String topic; + private String template; +} diff --git a/extensions/extension-kafka/src/main/java/org/thingsboard/server/extensions/kafka/plugin/KafkaMsgHandler.java b/extensions/extension-kafka/src/main/java/org/thingsboard/server/extensions/kafka/plugin/KafkaMsgHandler.java new file mode 100644 index 0000000000..3dbb825471 --- /dev/null +++ b/extensions/extension-kafka/src/main/java/org/thingsboard/server/extensions/kafka/plugin/KafkaMsgHandler.java @@ -0,0 +1,61 @@ +/** + * Copyright © 2016 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.extensions.kafka.plugin; + +import lombok.RequiredArgsConstructor; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.thingsboard.server.common.data.id.RuleId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.msg.core.BasicStatusCodeResponse; +import org.thingsboard.server.extensions.api.plugins.PluginContext; +import org.thingsboard.server.extensions.api.plugins.handlers.RuleMsgHandler; +import org.thingsboard.server.extensions.api.plugins.msg.ResponsePluginToRuleMsg; +import org.thingsboard.server.extensions.api.plugins.msg.RuleToPluginMsg; +import org.thingsboard.server.extensions.api.rules.RuleException; +import org.thingsboard.server.extensions.kafka.action.KafkaActionMsg; +import org.thingsboard.server.extensions.kafka.action.KafkaActionPayload; + +@RequiredArgsConstructor +public class KafkaMsgHandler implements RuleMsgHandler { + + private final Producer producer; + + @Override + public void process(PluginContext ctx, TenantId tenantId, RuleId ruleId, RuleToPluginMsg msg) throws RuleException { + if (!(msg instanceof KafkaActionMsg)) { + throw new RuleException("Unsupported message type " + msg.getClass().getName() + "!"); + } + KafkaActionPayload payload = ((KafkaActionMsg) msg).getPayload(); + + try { + producer.send(new ProducerRecord<>(payload.getTopic(), payload.getMsgBody()), + (metadata, e) -> { + if (payload.isSync()) { + if (metadata != null) { + ctx.reply(new ResponsePluginToRuleMsg(msg.getUid(), tenantId, ruleId, + BasicStatusCodeResponse.onSuccess(payload.getMsgType(), payload.getRequestId()))); + } else { + ctx.reply(new ResponsePluginToRuleMsg(msg.getUid(), tenantId, ruleId, + BasicStatusCodeResponse.onError(payload.getMsgType(), payload.getRequestId(), e))); + } + } + }); + } catch (Exception e) { + throw new RuleException(e.getMessage(), e); + } + } +} diff --git a/extensions/extension-kafka/src/main/java/org/thingsboard/server/extensions/kafka/plugin/KafkaPlugin.java b/extensions/extension-kafka/src/main/java/org/thingsboard/server/extensions/kafka/plugin/KafkaPlugin.java new file mode 100644 index 0000000000..1642fb5f3f --- /dev/null +++ b/extensions/extension-kafka/src/main/java/org/thingsboard/server/extensions/kafka/plugin/KafkaPlugin.java @@ -0,0 +1,93 @@ +/** + * Copyright © 2016 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.extensions.kafka.plugin; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.thingsboard.server.extensions.api.component.Plugin; +import org.thingsboard.server.extensions.api.plugins.AbstractPlugin; +import org.thingsboard.server.extensions.api.plugins.PluginContext; +import org.thingsboard.server.extensions.api.plugins.handlers.RuleMsgHandler; +import org.thingsboard.server.extensions.kafka.action.KafkaPluginAction; + +import java.util.Properties; + +@Plugin(name = "Kafka Plugin", actions = {KafkaPluginAction.class}, + descriptor = "KafkaPluginDescriptor.json", configuration = KafkaPluginConfiguration.class) +@Slf4j +public class KafkaPlugin extends AbstractPlugin { + + private KafkaMsgHandler handler; + private Producer producer; + private final Properties properties = new Properties(); + + @Override + public void init(KafkaPluginConfiguration configuration) { + properties.put("bootstrap.servers", configuration.getBootstrapServers()); + properties.put("value.serializer", configuration.getValueSerializer()); + properties.put("key.serializer", configuration.getKeySerializer()); + properties.put("acks", String.valueOf(configuration.getAcks())); + properties.put("retries", configuration.getRetries()); + properties.put("batch.size", configuration.getBatchSize()); + properties.put("linger.ms", configuration.getLinger()); + properties.put("buffer.memory", configuration.getBufferMemory()); + if (configuration.getOtherProperties() != null) { + configuration.getOtherProperties() + .stream().forEach(p -> properties.put(p.getKey(), p.getValue())); + } + init(); + } + + private void init() { + try { + this.producer = new KafkaProducer<>(properties); + this.handler = new KafkaMsgHandler(producer); + } catch (Exception e) { + log.error("Failed to start kafka producer", e); + throw new RuntimeException(e); + } + } + + private void destroy() { + try { + this.handler = null; + this.producer.close(); + } catch (Exception e) { + log.error("Failed to close producer during destroy()", e); + } + } + + @Override + protected RuleMsgHandler getRuleMsgHandler() { + return handler; + } + + @Override + public void resume(PluginContext ctx) { + init(); + } + + @Override + public void suspend(PluginContext ctx) { + destroy(); + } + + @Override + public void stop(PluginContext ctx) { + destroy(); + } +} diff --git a/extensions/extension-kafka/src/main/java/org/thingsboard/server/extensions/kafka/plugin/KafkaPluginConfiguration.java b/extensions/extension-kafka/src/main/java/org/thingsboard/server/extensions/kafka/plugin/KafkaPluginConfiguration.java new file mode 100644 index 0000000000..e97aa6ae80 --- /dev/null +++ b/extensions/extension-kafka/src/main/java/org/thingsboard/server/extensions/kafka/plugin/KafkaPluginConfiguration.java @@ -0,0 +1,34 @@ +/** + * Copyright © 2016 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.extensions.kafka.plugin; + +import lombok.Data; +import org.thingsboard.server.extensions.core.plugin.KeyValuePluginProperties; + +import java.util.List; + +@Data +public class KafkaPluginConfiguration { + private String bootstrapServers; + private int retries; + private int batchSize; + private int linger; + private int bufferMemory; + private int acks; + private String keySerializer; + private String valueSerializer; + private List otherProperties; +} \ No newline at end of file diff --git a/extensions/extension-kafka/src/main/resources/KafkaActionDescriptor.json b/extensions/extension-kafka/src/main/resources/KafkaActionDescriptor.json new file mode 100644 index 0000000000..4cfeeabcad --- /dev/null +++ b/extensions/extension-kafka/src/main/resources/KafkaActionDescriptor.json @@ -0,0 +1,34 @@ +{ + "schema": { + "title": "Kafka Action Configuration", + "type": "object", + "properties": { + "sync": { + "title": "Requires delivery confirmation", + "type": "boolean" + }, + "topic": { + "title": "Topic Name", + "type": "string" + }, + "template": { + "title": "Body Template", + "type": "string" + } + }, + "required": [ + "sync", + "topic", + "template" + ] + }, + "form": [ + "sync", + "topic", + { + "key": "template", + "type": "textarea", + "rows": 5 + } + ] +} \ No newline at end of file diff --git a/extensions/extension-kafka/src/main/resources/KafkaPluginDescriptor.json b/extensions/extension-kafka/src/main/resources/KafkaPluginDescriptor.json new file mode 100644 index 0000000000..81a0a729e6 --- /dev/null +++ b/extensions/extension-kafka/src/main/resources/KafkaPluginDescriptor.json @@ -0,0 +1,80 @@ +{ + "schema": { + "title": "Kafka Plugin Configuration", + "type": "object", + "properties": { + "bootstrapServers": { + "title": "Bootstrap Servers", + "type": "string", + "default": "localhost:9092" + }, + "retries": { + "title": "Automatically Retry Times If Fails", + "type": "integer", + "default": 0 + }, + "batchSize": { + "title": "Producer Batch Size On Client", + "type": "integer", + "default": 16384 + }, + "linger": { + "title": "Time To Buffer Locally Before Sending To Kafka Broker (in ms)", + "type": "integer", + "default": 0 + }, + "bufferMemory": { + "title": "Buffer Max Size On Client", + "type": "integer", + "default": 33554432 + }, + "acks": { + "title": "Minimum Number Of Replicas That Must Acknowledge A Write (-1 for 'all')", + "type": "integer", + "default": -1 + }, + "keySerializer": { + "title": "Key Serializer", + "type": "string", + "default": "org.apache.kafka.common.serialization.StringSerializer" + }, + "valueSerializer": { + "title": "Value Serializer", + "type": "string", + "default": "org.apache.kafka.common.serialization.StringSerializer" + }, + "otherProperties": { + "title": "Other Kafka properties", + "type": "array", + "items": { + "title": "Kafka property", + "type": "object", + "properties": { + "key": { + "title": "Key", + "type": "string" + }, + "value": { + "title": "Value", + "type": "string" + } + } + } + } + }, + "required": [ + "bootstrapServers" + ] + }, + "form": [ + "bootstrapServers", + "retries", + "batchSize", + "linger", + "bufferMemory", + "acks", + "keySerializer", + "valueSerializer", + "otherProperties" + ] +} \ No newline at end of file diff --git a/extensions/extension-kafka/src/test/java/org/thingsboard/server/extensions/kafka/KafkaDemoClient.java b/extensions/extension-kafka/src/test/java/org/thingsboard/server/extensions/kafka/KafkaDemoClient.java new file mode 100644 index 0000000000..ff8918bf36 --- /dev/null +++ b/extensions/extension-kafka/src/test/java/org/thingsboard/server/extensions/kafka/KafkaDemoClient.java @@ -0,0 +1,83 @@ +/** + * Copyright © 2016 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.extensions.kafka; + +import kafka.server.KafkaConfig; +import kafka.server.KafkaServerStartable; +import org.apache.zookeeper.server.ServerConfig; +import org.apache.zookeeper.server.ZooKeeperServerMain; +import org.apache.zookeeper.server.quorum.QuorumPeerConfig; + +import java.io.File; +import java.io.IOException; +import java.util.Properties; + +public class KafkaDemoClient { + + private static final int ZK_PORT = 2222; + private static final String HOSTNAME = "localhost"; + private static final String ZOOKEEPER_CONNECT = HOSTNAME + ":" + ZK_PORT; + private static final int KAFKA_PORT = 9092; + private static final int BROKER_ID = 1; + + public static void main(String[] args) { + try { + startZkLocal(); + startKafkaLocal(); + } catch (Exception e) { + System.out.println("Error running local Kafka broker"); + e.printStackTrace(System.out); + } + } + + private static void startZkLocal() throws Exception { + final File zkTmpDir = File.createTempFile("zookeeper", "test"); + if (zkTmpDir.delete() && zkTmpDir.mkdir()) { + Properties zkProperties = new Properties(); + zkProperties.setProperty("dataDir", zkTmpDir.getAbsolutePath()); + zkProperties.setProperty("clientPort", String.valueOf(ZK_PORT)); + + ServerConfig configuration = new ServerConfig(); + QuorumPeerConfig quorumConfiguration = new QuorumPeerConfig(); + quorumConfiguration.parseProperties(zkProperties); + configuration.readFrom(quorumConfiguration); + + new Thread() { + public void run() { + try { + new ZooKeeperServerMain().runFromConfig(configuration); + } catch (IOException e) { + System.out.println("Start of Local ZooKeeper Failed"); + e.printStackTrace(System.err); + } + } + }.start(); + } else { + System.out.println("Failed to delete or create data dir for Zookeeper"); + } + } + + private static void startKafkaLocal() { + Properties kafkaProperties = new Properties(); + kafkaProperties.setProperty("host.name", HOSTNAME); + kafkaProperties.setProperty("port", String.valueOf(KAFKA_PORT)); + kafkaProperties.setProperty("broker.id", String.valueOf(BROKER_ID)); + kafkaProperties.setProperty("zookeeper.connect", ZOOKEEPER_CONNECT); + KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties); + KafkaServerStartable kafka = new KafkaServerStartable(kafkaConfig); + kafka.startup(); + } +} \ No newline at end of file diff --git a/extensions/extension-rabbitmq/pom.xml b/extensions/extension-rabbitmq/pom.xml new file mode 100644 index 0000000000..f2456c5c5e --- /dev/null +++ b/extensions/extension-rabbitmq/pom.xml @@ -0,0 +1,139 @@ + + + 4.0.0 + + org.thingsboard.server + 0.0.1-SNAPSHOT + extensions + + org.thingsboard.server.extensions + extension-rabbitmq + jar + + Thingsboard Server RabbitMQ Extension + http://thingsboard.org + + + UTF-8 + ${basedir}/../.. + + + + + com.rabbitmq + amqp-client + + + org.thingsboard.server + extensions-api + provided + + + org.thingsboard.server + extensions-core + provided + + + org.apache.velocity + velocity + provided + + + org.apache.velocity + velocity-tools + provided + + + org.springframework.boot + spring-boot-starter-web + provided + + + org.springframework + spring-context-support + provided + + + org.slf4j + slf4j-api + provided + + + org.slf4j + log4j-over-slf4j + provided + + + ch.qos.logback + logback-core + provided + + + ch.qos.logback + logback-classic + provided + + + junit + junit + test + + + org.mockito + mockito-all + test + + + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + org.xolstice.maven.plugins + protobuf-maven-plugin + + + org.codehaus.mojo + build-helper-maven-plugin + + + maven-assembly-plugin + + + src/assembly/extension.xml + + + + + make-assembly + package + + single + + + + + + + + diff --git a/extensions/extension-rabbitmq/src/assembly/extension.xml b/extensions/extension-rabbitmq/src/assembly/extension.xml new file mode 100644 index 0000000000..533a4df642 --- /dev/null +++ b/extensions/extension-rabbitmq/src/assembly/extension.xml @@ -0,0 +1,34 @@ + + + extension + + jar + + false + + + / + true + true + runtime + + + diff --git a/extensions/extension-rabbitmq/src/main/java/org/thingsboard/server/extensions/rabbitmq/action/RabbitMqActionMsg.java b/extensions/extension-rabbitmq/src/main/java/org/thingsboard/server/extensions/rabbitmq/action/RabbitMqActionMsg.java new file mode 100644 index 0000000000..e4cecb6c1e --- /dev/null +++ b/extensions/extension-rabbitmq/src/main/java/org/thingsboard/server/extensions/rabbitmq/action/RabbitMqActionMsg.java @@ -0,0 +1,31 @@ +/** + * Copyright © 2016 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.extensions.rabbitmq.action; + +import org.thingsboard.server.common.data.id.CustomerId; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.extensions.api.plugins.msg.AbstractRuleToPluginMsg; + +/** + * @author Andrew Shvayka + */ +public class RabbitMqActionMsg extends AbstractRuleToPluginMsg { + + public RabbitMqActionMsg(TenantId tenantId, CustomerId customerId, DeviceId deviceId, RabbitMqActionPayload payload) { + super(tenantId, customerId, deviceId, payload); + } +} diff --git a/extensions/extension-rabbitmq/src/main/java/org/thingsboard/server/extensions/rabbitmq/action/RabbitMqActionPayload.java b/extensions/extension-rabbitmq/src/main/java/org/thingsboard/server/extensions/rabbitmq/action/RabbitMqActionPayload.java new file mode 100644 index 0000000000..c7d4e0294f --- /dev/null +++ b/extensions/extension-rabbitmq/src/main/java/org/thingsboard/server/extensions/rabbitmq/action/RabbitMqActionPayload.java @@ -0,0 +1,39 @@ +/** + * Copyright © 2016 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.extensions.rabbitmq.action; + +import lombok.Builder; +import lombok.Data; +import org.thingsboard.server.common.msg.session.MsgType; + +import java.io.Serializable; + +/** + * @author Andrew Shvayka + */ +@Data +@Builder +public class RabbitMqActionPayload implements Serializable { + + private final String exchange; + private final String queueName; + private final String messageProperties; + private final String payload; + + private final boolean sync; + private final Integer requestId; + private final MsgType msgType; +} diff --git a/extensions/extension-rabbitmq/src/main/java/org/thingsboard/server/extensions/rabbitmq/action/RabbitMqPluginAction.java b/extensions/extension-rabbitmq/src/main/java/org/thingsboard/server/extensions/rabbitmq/action/RabbitMqPluginAction.java new file mode 100644 index 0000000000..3a78335b4f --- /dev/null +++ b/extensions/extension-rabbitmq/src/main/java/org/thingsboard/server/extensions/rabbitmq/action/RabbitMqPluginAction.java @@ -0,0 +1,49 @@ +/** + * Copyright © 2016 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.extensions.rabbitmq.action; + +import org.thingsboard.server.common.msg.device.ToDeviceActorMsg; +import org.thingsboard.server.common.msg.session.FromDeviceRequestMsg; +import org.thingsboard.server.extensions.api.component.Action; +import org.thingsboard.server.extensions.api.plugins.msg.RuleToPluginMsg; +import org.thingsboard.server.extensions.api.rules.RuleContext; +import org.thingsboard.server.extensions.core.action.template.AbstractTemplatePluginAction; + +import java.util.Optional; + +/** + * @author Andrew Shvayka + */ +@Action(name = "RabbitMQ Plugin Action", + descriptor = "RabbitMqActionDescriptor.json", configuration = RabbitMqPluginActionConfiguration.class) +public class RabbitMqPluginAction extends AbstractTemplatePluginAction { + + @Override + protected Optional> buildRuleToPluginMsg(RuleContext ctx, ToDeviceActorMsg msg, FromDeviceRequestMsg payload) { + RabbitMqActionPayload.RabbitMqActionPayloadBuilder builder = RabbitMqActionPayload.builder(); + builder.sync(configuration.isSync()); + builder.exchange(configuration.getExchange()); + builder.queueName(configuration.getQueueName()); + builder.messageProperties(configuration.getMessageProperties()[0]); + builder.msgType(payload.getMsgType()); + builder.requestId(payload.getRequestId()); + builder.payload(getMsgBody(ctx, msg)); + return Optional.of(new RabbitMqActionMsg(msg.getTenantId(), + msg.getCustomerId(), + msg.getDeviceId(), + builder.build())); + } +} diff --git a/extensions/extension-rabbitmq/src/main/java/org/thingsboard/server/extensions/rabbitmq/action/RabbitMqPluginActionConfiguration.java b/extensions/extension-rabbitmq/src/main/java/org/thingsboard/server/extensions/rabbitmq/action/RabbitMqPluginActionConfiguration.java new file mode 100644 index 0000000000..ebae93be58 --- /dev/null +++ b/extensions/extension-rabbitmq/src/main/java/org/thingsboard/server/extensions/rabbitmq/action/RabbitMqPluginActionConfiguration.java @@ -0,0 +1,32 @@ +/** + * Copyright © 2016 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.extensions.rabbitmq.action; + +import lombok.Data; +import org.thingsboard.server.extensions.core.action.template.TemplateActionConfiguration; + +/** + * @author Andrew Shvayka + */ +@Data +public class RabbitMqPluginActionConfiguration implements TemplateActionConfiguration{ + + private boolean sync; + private String exchange; + private String queueName; + private String[] messageProperties; + private String template; +} diff --git a/extensions/extension-rabbitmq/src/main/java/org/thingsboard/server/extensions/rabbitmq/plugin/RabbitMqMsgHandler.java b/extensions/extension-rabbitmq/src/main/java/org/thingsboard/server/extensions/rabbitmq/plugin/RabbitMqMsgHandler.java new file mode 100644 index 0000000000..90cb9fd665 --- /dev/null +++ b/extensions/extension-rabbitmq/src/main/java/org/thingsboard/server/extensions/rabbitmq/plugin/RabbitMqMsgHandler.java @@ -0,0 +1,86 @@ +/** + * Copyright © 2016 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.extensions.rabbitmq.plugin; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.MessageProperties; +import lombok.RequiredArgsConstructor; +import org.thingsboard.server.common.data.id.RuleId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.msg.core.BasicStatusCodeResponse; +import org.thingsboard.server.extensions.api.plugins.PluginContext; +import org.thingsboard.server.extensions.api.plugins.handlers.RuleMsgHandler; +import org.thingsboard.server.extensions.api.plugins.msg.ResponsePluginToRuleMsg; +import org.thingsboard.server.extensions.api.plugins.msg.RuleToPluginMsg; +import org.thingsboard.server.extensions.api.rules.RuleException; +import org.thingsboard.server.extensions.rabbitmq.action.RabbitMqActionMsg; +import org.thingsboard.server.extensions.rabbitmq.action.RabbitMqActionPayload; + +import java.io.IOException; +import java.nio.charset.Charset; + +/** + * @author Andrew Shvayka + */ +@RequiredArgsConstructor +public class RabbitMqMsgHandler implements RuleMsgHandler { + private static final Charset UTF8 = Charset.forName("UTF-8"); + + private final Channel channel; + + @Override + public void process(PluginContext ctx, TenantId tenantId, RuleId ruleId, RuleToPluginMsg msg) throws RuleException { + if (!(msg instanceof RabbitMqActionMsg)) { + throw new RuleException("Unsupported message type " + msg.getClass().getName() + "!"); + } + RabbitMqActionPayload payload = ((RabbitMqActionMsg) msg).getPayload(); + AMQP.BasicProperties properties = convert(payload.getMessageProperties()); + try { + channel.basicPublish( + payload.getExchange() != null ? payload.getExchange() : "", + payload.getQueueName(), + properties, + payload.getPayload().getBytes(UTF8)); + if (payload.isSync()) { + ctx.reply(new ResponsePluginToRuleMsg(msg.getUid(), tenantId, ruleId, + BasicStatusCodeResponse.onSuccess(payload.getMsgType(), payload.getRequestId()))); + } + } catch (IOException e) { + throw new RuleException(e.getMessage(), e); + } + } + + private static AMQP.BasicProperties convert(String name) throws RuleException { + switch (name) { + case "BASIC": + return MessageProperties.BASIC; + case "TEXT_PLAIN": + return MessageProperties.TEXT_PLAIN; + case "MINIMAL_BASIC": + return MessageProperties.MINIMAL_BASIC; + case "MINIMAL_PERSISTENT_BASIC": + return MessageProperties.MINIMAL_PERSISTENT_BASIC; + case "PERSISTENT_BASIC": + return MessageProperties.PERSISTENT_BASIC; + case "PERSISTENT_TEXT_PLAIN": + return MessageProperties.PERSISTENT_TEXT_PLAIN; + default: + throw new RuleException("Message Properties: '" + name + "' is undefined!"); + } + } + +} diff --git a/extensions/extension-rabbitmq/src/main/java/org/thingsboard/server/extensions/rabbitmq/plugin/RabbitMqPlugin.java b/extensions/extension-rabbitmq/src/main/java/org/thingsboard/server/extensions/rabbitmq/plugin/RabbitMqPlugin.java new file mode 100644 index 0000000000..f4ae9dde47 --- /dev/null +++ b/extensions/extension-rabbitmq/src/main/java/org/thingsboard/server/extensions/rabbitmq/plugin/RabbitMqPlugin.java @@ -0,0 +1,109 @@ +/** + * Copyright © 2016 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.extensions.rabbitmq.plugin; + +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang.StringUtils; +import org.thingsboard.server.extensions.api.component.Plugin; +import org.thingsboard.server.extensions.api.plugins.AbstractPlugin; +import org.thingsboard.server.extensions.api.plugins.PluginContext; +import org.thingsboard.server.extensions.api.plugins.handlers.RuleMsgHandler; +import org.thingsboard.server.extensions.rabbitmq.action.RabbitMqPluginAction; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +/** + * @author Andrew Shvayka + */ +@Plugin(name = "RabbitMQ Plugin", actions = {RabbitMqPluginAction.class}, +descriptor = "RabbitMqPluginDescriptor.json", configuration = RabbitMqPluginConfiguration.class) +@Slf4j +public class RabbitMqPlugin extends AbstractPlugin { + + private ConnectionFactory factory; + private Connection connection; + private RabbitMqMsgHandler handler; + + @Override + public void init(RabbitMqPluginConfiguration configuration) { + factory = new ConnectionFactory(); + factory.setHost(configuration.getHost()); + factory.setPort(configuration.getPort()); + set(configuration.getVirtualHost(), factory::setVirtualHost); + set(configuration.getUserName(), factory::setUsername); + set(configuration.getPassword(), factory::setPassword); + set(configuration.getAutomaticRecoveryEnabled(), factory::setAutomaticRecoveryEnabled); + set(configuration.getConnectionTimeout(), factory::setConnectionTimeout); + set(configuration.getHandshakeTimeout(), factory::setHandshakeTimeout); + set(configuration.getClientProperties(), props -> { + factory.setClientProperties(props.stream().collect(Collectors.toMap( + RabbitMqPluginConfiguration.RabbitMqPluginProperties::getKey, + RabbitMqPluginConfiguration.RabbitMqPluginProperties::getValue))); + }); + + init(); + } + + private void set(T source, Consumer setter) { + if (source != null && !StringUtils.isEmpty(source.toString())) { + setter.accept(source); + } + } + + private void init() { + try { + this.connection = factory.newConnection(); + this.handler = new RabbitMqMsgHandler(connection.createChannel()); + } catch (IOException | TimeoutException e) { + throw new RuntimeException(e); + } + } + + private void destroy() { + try { + this.handler = null; + this.connection.close(); + } catch (Exception e) { + log.info("Failed to close connection during destroy()", e); + } + } + + @Override + protected RuleMsgHandler getRuleMsgHandler() { + return handler; + } + + @Override + public void resume(PluginContext ctx) { + init(); + } + + @Override + public void suspend(PluginContext ctx) { + destroy(); + } + + @Override + public void stop(PluginContext ctx) { + destroy(); + } + +} diff --git a/extensions/extension-rabbitmq/src/main/java/org/thingsboard/server/extensions/rabbitmq/plugin/RabbitMqPluginConfiguration.java b/extensions/extension-rabbitmq/src/main/java/org/thingsboard/server/extensions/rabbitmq/plugin/RabbitMqPluginConfiguration.java new file mode 100644 index 0000000000..0b6ac227b0 --- /dev/null +++ b/extensions/extension-rabbitmq/src/main/java/org/thingsboard/server/extensions/rabbitmq/plugin/RabbitMqPluginConfiguration.java @@ -0,0 +1,47 @@ +/** + * Copyright © 2016 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.extensions.rabbitmq.plugin; + +import lombok.Data; + +import java.util.List; + +/** + * @author Andrew Shvayka + */ +@Data +public class RabbitMqPluginConfiguration { + private String host; + private int port; + private String virtualHost; + + private String userName; + private String password; + + private Boolean automaticRecoveryEnabled; + + private Integer connectionTimeout; + private Integer handshakeTimeout; + + private List clientProperties; + + @Data + public static class RabbitMqPluginProperties { + private String key; + private String value; + } + +} diff --git a/extensions/extension-rabbitmq/src/main/resources/RabbitMqActionDescriptor.json b/extensions/extension-rabbitmq/src/main/resources/RabbitMqActionDescriptor.json new file mode 100644 index 0000000000..3a2e4e1b15 --- /dev/null +++ b/extensions/extension-rabbitmq/src/main/resources/RabbitMqActionDescriptor.json @@ -0,0 +1,78 @@ +{ + "schema": { + "title": "RabbitMQ Action Configuration", + "type": "object", + "properties": { + "sync": { + "title": "Requires delivery confirmation", + "type": "boolean" + }, + "exchange": { + "title": "Exchange", + "type": "string", + "default": "" + }, + "queueName": { + "title": "Queue Name", + "type": "string" + }, + "messageProperties": { + "title": "Message properties", + "type": "array", + "minItems" : 1, + "items": [ + { + "value": "BASIC", + "label": "BASIC" + }, + { + "value": "MINIMAL_BASIC", + "label": "MINIMAL_BASIC" + }, + { + "value": "MINIMAL_PERSISTENT_BASIC", + "label": "MINIMAL_PERSISTENT_BASIC" + }, + { + "value": "PERSISTENT_BASIC", + "label": "PERSISTENT_BASIC" + }, + { + "value": "PERSISTENT_TEXT_PLAIN", + "label": "PERSISTENT_TEXT_PLAIN" + }, + { + "value": "TEXT_PLAIN", + "label": "TEXT_PLAIN" + } + ], + "uniqueItems": true + }, + "template": { + "title": "Body Template", + "type": "string" + } + }, + "required": [ + "sync", + "queueName", + "messageProperties", + "template" + ] + }, + "form": [ + "sync", + "exchange", + "queueName", + { + "key": "messageProperties", + "type": "rc-select", + "multiple": false + }, + { + "key": "template", + "type": "textarea", + "rows": 5 + } + ] +} \ No newline at end of file diff --git a/extensions/extension-rabbitmq/src/main/resources/RabbitMqPluginDescriptor.json b/extensions/extension-rabbitmq/src/main/resources/RabbitMqPluginDescriptor.json new file mode 100644 index 0000000000..e4bc5debc4 --- /dev/null +++ b/extensions/extension-rabbitmq/src/main/resources/RabbitMqPluginDescriptor.json @@ -0,0 +1,79 @@ +{ + "schema": { + "title": "RabbitMQ Plugin Configuration", + "type": "object", + "properties": { + "host": { + "title": "Host", + "type": "string" + }, + "port": { + "title": "Port", + "type": "integer", + "default": 5672, + "minimum": 0, + "maximum": 65536 + }, + "virtualHost": { + "title": "Virtual Host", + "type": "string" + }, + "userName": { + "title": "Username", + "type": "string" + }, + "password": { + "title": "Password", + "type": "string" + }, + "automaticRecoveryEnabled": { + "title": "Automatic Recovery Enabled", + "type": "boolean" + }, + "connectionTimeout": { + "title": "Connection Timeout", + "type": "integer" + }, + "handshakeTimeout": { + "title": "Handshake Timeout", + "type": "integer" + }, + "clientProperties": { + "title": "Client properties", + "type": "array", + "items": { + "title": "Client property", + "type": "object", + "properties": { + "key": { + "title": "Key", + "type": "string" + }, + "value": { + "title": "Value", + "type": "string" + } + } + } + } + }, + "required": [ + "host", + "port" + ] + }, + "form": [ + "host", + "port", + "virtualHost", + "userName", + { + "key": "password", + "type": "password" + }, + "automaticRecoveryEnabled", + "connectionTimeout", + "handshakeTimeout", + "clientProperties" + ] +} \ No newline at end of file diff --git a/extensions/extension-rabbitmq/src/test/java/org/thingsboard/server/extensions/rabbitmq/DemoClient.java b/extensions/extension-rabbitmq/src/test/java/org/thingsboard/server/extensions/rabbitmq/DemoClient.java new file mode 100644 index 0000000000..c8b04d4970 --- /dev/null +++ b/extensions/extension-rabbitmq/src/test/java/org/thingsboard/server/extensions/rabbitmq/DemoClient.java @@ -0,0 +1,56 @@ +/** + * Copyright © 2016 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.extensions.rabbitmq; + +import com.rabbitmq.client.*; + +import java.io.IOException; + +/** + * @author Andrew Shvayka + */ +public class DemoClient { + + private static final String HOST = "localhost"; + private static final String USERNAME = "guest"; + private static final String PASSWORD = "guest"; + private static final String QUEUE_NAME = "queue"; + + + public static void main(String[] argv) throws Exception { + + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost(HOST); + factory.setUsername(USERNAME); + factory.setPassword(PASSWORD); + + Connection connection = factory.newConnection(); + Channel channel = connection.createChannel(); + + channel.queueDeclare(QUEUE_NAME, false, false, false, null); + System.out.println(" [*] Waiting for messages."); + Consumer consumer = new DefaultConsumer(channel) { + @Override + public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) + throws IOException { + String message = new String(body, "UTF-8"); + System.out.println(" [x] Received '" + message + "'"); + } + }; + channel.basicConsume(QUEUE_NAME, true, consumer); + + } +} diff --git a/extensions/extension-rest-api-call/pom.xml b/extensions/extension-rest-api-call/pom.xml new file mode 100644 index 0000000000..175d7362ea --- /dev/null +++ b/extensions/extension-rest-api-call/pom.xml @@ -0,0 +1,98 @@ + + + + 4.0.0 + + org.thingsboard.server + 0.0.1-SNAPSHOT + extensions + + org.thingsboard.server.extensions + extension-rest-api-call + jar + + Thingsboard Server REST API Call Extension + http://thingsboard.org + + + UTF-8 + ${basedir}/../.. + + + + + ch.qos.logback + logback-core + provided + + + ch.qos.logback + logback-classic + provided + + + org.thingsboard.server + extensions-api + provided + + + org.springframework + spring-web + provided + + + org.thingsboard.server + extensions-core + provided + + + org.apache.velocity + velocity + provided + + + org.apache.velocity + velocity-tools + provided + + + + + + maven-assembly-plugin + + + src/assembly/extension.xml + + + + + make-assembly + package + + single + + + + + + + diff --git a/extensions/extension-rest-api-call/src/assembly/extension.xml b/extensions/extension-rest-api-call/src/assembly/extension.xml new file mode 100644 index 0000000000..533a4df642 --- /dev/null +++ b/extensions/extension-rest-api-call/src/assembly/extension.xml @@ -0,0 +1,34 @@ + + + extension + + jar + + false + + + / + true + true + runtime + + + diff --git a/extensions/extension-rest-api-call/src/main/java/org/thingsboard/server/extensions/rest/action/RestApiCallActionMsg.java b/extensions/extension-rest-api-call/src/main/java/org/thingsboard/server/extensions/rest/action/RestApiCallActionMsg.java new file mode 100644 index 0000000000..63e2127a01 --- /dev/null +++ b/extensions/extension-rest-api-call/src/main/java/org/thingsboard/server/extensions/rest/action/RestApiCallActionMsg.java @@ -0,0 +1,28 @@ +/** + * Copyright © 2016 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.extensions.rest.action; + +import org.thingsboard.server.common.data.id.CustomerId; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.extensions.api.plugins.msg.AbstractRuleToPluginMsg; + +public class RestApiCallActionMsg extends AbstractRuleToPluginMsg { + + public RestApiCallActionMsg(TenantId tenantId, CustomerId customerId, DeviceId deviceId, RestApiCallActionPayload payload) { + super(tenantId, customerId, deviceId, payload); + } +} diff --git a/extensions/extension-rest-api-call/src/main/java/org/thingsboard/server/extensions/rest/action/RestApiCallActionPayload.java b/extensions/extension-rest-api-call/src/main/java/org/thingsboard/server/extensions/rest/action/RestApiCallActionPayload.java new file mode 100644 index 0000000000..c21e74663f --- /dev/null +++ b/extensions/extension-rest-api-call/src/main/java/org/thingsboard/server/extensions/rest/action/RestApiCallActionPayload.java @@ -0,0 +1,37 @@ +/** + * Copyright © 2016 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.extensions.rest.action; + +import lombok.Builder; +import lombok.Data; +import org.springframework.http.HttpMethod; +import org.springframework.http.HttpStatus; +import org.thingsboard.server.common.msg.session.MsgType; + +import java.io.Serializable; + +@Data +@Builder +public class RestApiCallActionPayload implements Serializable { + private final String actionPath; + private final String msgBody; + private final HttpMethod httpMethod; + private final HttpStatus expectedResultCode; + private final boolean sync; + + private final Integer requestId; + private final MsgType msgType; +} diff --git a/extensions/extension-rest-api-call/src/main/java/org/thingsboard/server/extensions/rest/action/RestApiCallPluginAction.java b/extensions/extension-rest-api-call/src/main/java/org/thingsboard/server/extensions/rest/action/RestApiCallPluginAction.java new file mode 100644 index 0000000000..9e9a093bb7 --- /dev/null +++ b/extensions/extension-rest-api-call/src/main/java/org/thingsboard/server/extensions/rest/action/RestApiCallPluginAction.java @@ -0,0 +1,60 @@ +/** + * Copyright © 2016 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.extensions.rest.action; + +import lombok.extern.slf4j.Slf4j; +import org.apache.velocity.Template; +import org.apache.velocity.VelocityContext; +import org.apache.velocity.runtime.parser.ParseException; +import org.springframework.http.HttpMethod; +import org.springframework.http.HttpStatus; +import org.thingsboard.server.common.msg.device.ToDeviceActorMsg; +import org.thingsboard.server.common.msg.session.FromDeviceRequestMsg; +import org.thingsboard.server.common.msg.session.ToDeviceMsg; +import org.thingsboard.server.extensions.api.component.Action; +import org.thingsboard.server.extensions.api.plugins.PluginAction; +import org.thingsboard.server.extensions.api.plugins.msg.PluginToRuleMsg; +import org.thingsboard.server.extensions.api.plugins.msg.ResponsePluginToRuleMsg; +import org.thingsboard.server.extensions.api.plugins.msg.RuleToPluginMsg; +import org.thingsboard.server.extensions.api.rules.RuleContext; +import org.thingsboard.server.extensions.api.rules.RuleProcessingMetaData; +import org.thingsboard.server.extensions.core.action.template.AbstractTemplatePluginAction; +import org.thingsboard.server.extensions.core.utils.VelocityUtils; + +import java.util.Optional; + +@Action(name = "REST API Call Plugin Action", + descriptor = "RestApiCallActionDescriptor.json", configuration = RestApiCallPluginActionConfiguration.class) +@Slf4j +public class RestApiCallPluginAction extends AbstractTemplatePluginAction { + + @Override + protected Optional> buildRuleToPluginMsg(RuleContext ctx, ToDeviceActorMsg msg, FromDeviceRequestMsg payload) { + RestApiCallActionPayload.RestApiCallActionPayloadBuilder builder = RestApiCallActionPayload.builder(); + builder.msgType(payload.getMsgType()); + builder.requestId(payload.getRequestId()); + builder.sync(configuration.isSync()); + builder.actionPath(configuration.getActionPath()); + builder.httpMethod(HttpMethod.valueOf(configuration.getRequestMethod()[0])); + builder.expectedResultCode(HttpStatus.valueOf(configuration.getExpectedResultCode())); + builder.msgBody(getMsgBody(ctx, msg)); + return Optional.of(new RestApiCallActionMsg(msg.getTenantId(), + msg.getCustomerId(), + msg.getDeviceId(), + builder.build())); + } + +} diff --git a/extensions/extension-rest-api-call/src/main/java/org/thingsboard/server/extensions/rest/action/RestApiCallPluginActionConfiguration.java b/extensions/extension-rest-api-call/src/main/java/org/thingsboard/server/extensions/rest/action/RestApiCallPluginActionConfiguration.java new file mode 100644 index 0000000000..de8816e2f4 --- /dev/null +++ b/extensions/extension-rest-api-call/src/main/java/org/thingsboard/server/extensions/rest/action/RestApiCallPluginActionConfiguration.java @@ -0,0 +1,28 @@ +/** + * Copyright © 2016 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.extensions.rest.action; + +import lombok.Data; +import org.thingsboard.server.extensions.core.action.template.TemplateActionConfiguration; + +@Data +public class RestApiCallPluginActionConfiguration implements TemplateActionConfiguration { + private boolean sync; + private String template; + private String actionPath; + private int expectedResultCode; + private String[] requestMethod; +} diff --git a/extensions/extension-rest-api-call/src/main/java/org/thingsboard/server/extensions/rest/plugin/RestApiCallMsgHandler.java b/extensions/extension-rest-api-call/src/main/java/org/thingsboard/server/extensions/rest/plugin/RestApiCallMsgHandler.java new file mode 100644 index 0000000000..eebf81bc9a --- /dev/null +++ b/extensions/extension-rest-api-call/src/main/java/org/thingsboard/server/extensions/rest/plugin/RestApiCallMsgHandler.java @@ -0,0 +1,67 @@ +/** + * Copyright © 2016 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.extensions.rest.plugin; + +import lombok.RequiredArgsConstructor; +import org.springframework.http.HttpEntity; +import org.springframework.http.HttpHeaders; +import org.springframework.http.ResponseEntity; +import org.springframework.web.client.RestClientException; +import org.springframework.web.client.RestTemplate; +import org.thingsboard.server.common.data.id.RuleId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.msg.core.BasicStatusCodeResponse; +import org.thingsboard.server.extensions.api.plugins.PluginContext; +import org.thingsboard.server.extensions.api.plugins.handlers.RuleMsgHandler; +import org.thingsboard.server.extensions.api.plugins.msg.ResponsePluginToRuleMsg; +import org.thingsboard.server.extensions.api.plugins.msg.RuleToPluginMsg; +import org.thingsboard.server.extensions.api.rules.RuleException; +import org.thingsboard.server.extensions.rest.action.RestApiCallActionMsg; +import org.thingsboard.server.extensions.rest.action.RestApiCallActionPayload; + +@RequiredArgsConstructor +public class RestApiCallMsgHandler implements RuleMsgHandler { + + private final String baseUrl; + private final HttpHeaders headers; + + @Override + public void process(PluginContext ctx, TenantId tenantId, RuleId ruleId, RuleToPluginMsg msg) throws RuleException { + if (!(msg instanceof RestApiCallActionMsg)) { + throw new RuleException("Unsupported message type " + msg.getClass().getName() + "!"); + } + RestApiCallActionPayload payload = ((RestApiCallActionMsg)msg).getPayload(); + try { + ResponseEntity exchangeResponse = new RestTemplate().exchange( + baseUrl + payload.getActionPath(), + payload.getHttpMethod(), + new HttpEntity<>(payload.getMsgBody(), headers), + String.class); + if (exchangeResponse.getStatusCode().equals(payload.getExpectedResultCode()) && payload.isSync()) { + ctx.reply(new ResponsePluginToRuleMsg(msg.getUid(), tenantId, ruleId, + BasicStatusCodeResponse.onSuccess(payload.getMsgType(), payload.getRequestId()))); + } else if(!exchangeResponse.getStatusCode().equals(payload.getExpectedResultCode())) { + throw new RuntimeException("Response Status Code '" + + exchangeResponse.getStatusCode() + + "' doesn't equals to Expected Status Code '" + + payload.getExpectedResultCode() + "'"); + } + + } catch (RestClientException e) { + throw new RuleException(e.getMessage(), e); + } + } +} diff --git a/extensions/extension-rest-api-call/src/main/java/org/thingsboard/server/extensions/rest/plugin/RestApiCallPlugin.java b/extensions/extension-rest-api-call/src/main/java/org/thingsboard/server/extensions/rest/plugin/RestApiCallPlugin.java new file mode 100644 index 0000000000..8b3fece738 --- /dev/null +++ b/extensions/extension-rest-api-call/src/main/java/org/thingsboard/server/extensions/rest/plugin/RestApiCallPlugin.java @@ -0,0 +1,84 @@ +/** + * Copyright © 2016 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.extensions.rest.plugin; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.http.HttpHeaders; +import org.thingsboard.server.extensions.api.component.Plugin; +import org.thingsboard.server.extensions.api.plugins.AbstractPlugin; +import org.thingsboard.server.extensions.api.plugins.PluginContext; +import org.thingsboard.server.extensions.api.plugins.handlers.RuleMsgHandler; +import org.thingsboard.server.extensions.rest.action.RestApiCallPluginAction; + +import java.util.Base64; + +@Plugin(name = "REST API Call Plugin", actions = {RestApiCallPluginAction.class}, + descriptor = "RestApiCallPluginDescriptor.json", configuration = RestApiCallPluginConfiguration.class) +@Slf4j +public class RestApiCallPlugin extends AbstractPlugin { + + private static final String BASIC_AUTH_METHOD = "BASIC_AUTH"; + private static final String AUTHORIZATION_HEADER_NAME = "Authorization"; + private static final String AUTHORIZATION_HEADER_FORMAT = "Basic %s"; + private static final String CREDENTIALS_TEMPLATE = "%s:%s"; + private static final String BASE_URL_TEMPLATE = "http://%s:%d%s"; + private RestApiCallMsgHandler handler; + private String baseUrl; + private HttpHeaders headers = new HttpHeaders(); + + @Override + public void init(RestApiCallPluginConfiguration configuration) { + this.baseUrl = String.format( + BASE_URL_TEMPLATE, + configuration.getHost(), + configuration.getPort(), + configuration.getBasePath()); + + if (configuration.getAuthMethod()[0].equals(BASIC_AUTH_METHOD)) { + String userName = configuration.getUserName(); + String password = configuration.getPassword(); + String credentials = String.format(CREDENTIALS_TEMPLATE, userName, password); + byte[] token = Base64.getEncoder().encode(credentials.getBytes()); + this.headers.add(AUTHORIZATION_HEADER_NAME, String.format(AUTHORIZATION_HEADER_FORMAT, new String(token))); + } + + init(); + } + + private void init() { + this.handler = new RestApiCallMsgHandler(baseUrl, headers); + } + + @Override + protected RuleMsgHandler getRuleMsgHandler() { + return handler; + } + + @Override + public void resume(PluginContext ctx) { + init(); + } + + @Override + public void suspend(PluginContext ctx) { + log.debug("Suspend method was called, but no impl provided!"); + } + + @Override + public void stop(PluginContext ctx) { + log.debug("Stop method was called, but no impl provided!"); + } +} diff --git a/extensions/extension-rest-api-call/src/main/java/org/thingsboard/server/extensions/rest/plugin/RestApiCallPluginConfiguration.java b/extensions/extension-rest-api-call/src/main/java/org/thingsboard/server/extensions/rest/plugin/RestApiCallPluginConfiguration.java new file mode 100644 index 0000000000..5cddca5cd3 --- /dev/null +++ b/extensions/extension-rest-api-call/src/main/java/org/thingsboard/server/extensions/rest/plugin/RestApiCallPluginConfiguration.java @@ -0,0 +1,30 @@ +/** + * Copyright © 2016 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.extensions.rest.plugin; + +import lombok.Data; + +@Data +public class RestApiCallPluginConfiguration { + private String host; + private int port; + private String basePath; + + private String[] authMethod; + + private String userName; + private String password; +} diff --git a/extensions/extension-rest-api-call/src/main/resources/RestApiCallActionDescriptor.json b/extensions/extension-rest-api-call/src/main/resources/RestApiCallActionDescriptor.json new file mode 100644 index 0000000000..c45d028d85 --- /dev/null +++ b/extensions/extension-rest-api-call/src/main/resources/RestApiCallActionDescriptor.json @@ -0,0 +1,63 @@ +{ + "schema": { + "title": "REST API Call Action Configuration", + "type": "object", + "properties": { + "sync": { + "title": "Requires delivery confirmation", + "type": "boolean" + }, + "template": { + "title": "Body Template", + "type": "string" + }, + "actionPath": { + "title": "Action Path", + "type": "string", + "default": "/" + }, + "requestMethod": { + "title": "Request method", + "type": "array", + "minItems" : 1, + "items": [ + { + "value": "POST", + "label": "POST" + }, + { + "value": "PUT", + "label": "PUT" + } + ], + "uniqueItems": true + }, + "expectedResultCode": { + "title": "Expected Result Code", + "type": "integer" + } + }, + "required": [ + "sync", + "template", + "actionPath", + "expectedResultCode", + "requestMethod" + ] + }, + "form": [ + "sync", + { + "key": "template", + "type": "textarea", + "rows": 5 + }, + "actionPath", + { + "key": "requestMethod", + "type": "rc-select", + "multiple": false + }, + "expectedResultCode" + ] +} \ No newline at end of file diff --git a/extensions/extension-rest-api-call/src/main/resources/RestApiCallPluginDescriptor.json b/extensions/extension-rest-api-call/src/main/resources/RestApiCallPluginDescriptor.json new file mode 100644 index 0000000000..2c319371cc --- /dev/null +++ b/extensions/extension-rest-api-call/src/main/resources/RestApiCallPluginDescriptor.json @@ -0,0 +1,69 @@ +{ + "schema": { + "title": "REST API Call Plugin Configuration", + "type": "object", + "properties": { + "host": { + "title": "Host", + "type": "string" + }, + "port": { + "title": "Port", + "type": "integer", + "default": 8080, + "minimum": 0, + "maximum": 65536 + }, + "basePath": { + "title": "Base Path", + "type": "string", + "default": "/" + }, + "authMethod": { + "title": "Authentication method", + "type": "array", + "minItems" : 1, + "items": [ + { + "value": "NO_AUTH", + "label": "No authentication" + }, + { + "value": "BASIC_AUTH", + "label": "Basic authentication" + } + ], + "uniqueItems": true + }, + "userName": { + "title": "Username", + "type": "string" + }, + "password": { + "title": "Password", + "type": "string" + } + }, + "required": [ + "host", + "port", + "basePath", + "authMethod" + ] + }, + "form": [ + "host", + "port", + "basePath", + { + "key": "authMethod", + "type": "rc-select", + "multiple": false + }, + "userName", + { + "key": "password", + "type": "password" + } + ] +} \ No newline at end of file diff --git a/extensions/extension-rest-api-call/src/test/java/org/thingsboard/server/extensions/kafka/RestApiCallDemoClient.java b/extensions/extension-rest-api-call/src/test/java/org/thingsboard/server/extensions/kafka/RestApiCallDemoClient.java new file mode 100644 index 0000000000..ce1a6fd7a7 --- /dev/null +++ b/extensions/extension-rest-api-call/src/test/java/org/thingsboard/server/extensions/kafka/RestApiCallDemoClient.java @@ -0,0 +1,70 @@ +/** + * Copyright © 2016 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.extensions.rest; + +import com.sun.net.httpserver.*; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.util.stream.Collectors; + +public class RestApiCallDemoClient { + + private static final String DEMO_REST_BASIC_AUTH = "/demo-rest-basic-auth"; + private static final String DEMO_REST_NO_AUTH = "/demo-rest-no-auth"; + private static final String USERNAME = "demo"; + private static final String PASSWORD = "demo"; + private static final int HTTP_SERVER_PORT = 8888; + + public static void main(String[] args) throws IOException { + HttpServer server = HttpServer.create(new InetSocketAddress(HTTP_SERVER_PORT), 0); + + HttpContext secureContext = server.createContext(DEMO_REST_BASIC_AUTH, new RestDemoHandler()); + secureContext.setAuthenticator(new BasicAuthenticator("demo-auth") { + @Override + public boolean checkCredentials(String user, String pwd) { + return user.equals(USERNAME) && pwd.equals(PASSWORD); + } + }); + + server.createContext(DEMO_REST_NO_AUTH, new RestDemoHandler()); + server.setExecutor(null); + System.out.println("[*] Waiting for messages."); + server.start(); + } + + private static class RestDemoHandler implements HttpHandler { + @Override + public void handle(HttpExchange exchange) throws IOException { + String requestBody; + try (BufferedReader br = new BufferedReader(new InputStreamReader(exchange.getRequestBody(), "utf-8"))) { + requestBody = br.lines().collect(Collectors.joining(System.lineSeparator())); + } + System.out.println("[x] Received body: \n" + requestBody); + + String response = "Hello from demo client!"; + exchange.sendResponseHeaders(200, response.length()); + System.out.println("[x] Sending response: \n" + response); + + OutputStream os = exchange.getResponseBody(); + os.write(response.getBytes()); + os.close(); + } + } +} \ No newline at end of file diff --git a/extensions/pom.xml b/extensions/pom.xml new file mode 100644 index 0000000000..8609ea7c23 --- /dev/null +++ b/extensions/pom.xml @@ -0,0 +1,43 @@ + + + 4.0.0 + + org.thingsboard + 0.0.1-SNAPSHOT + server + + org.thingsboard.server + extensions + pom + + Thingsboard Extensions + http://thingsboard.org + + + ${basedir}/.. + + + + extension-rabbitmq + extension-rest-api-call + extension-kafka + + +