From 3783ba67a4d613cca909b5a49c563008a94db8b3 Mon Sep 17 00:00:00 2001 From: steve Date: Tue, 7 Nov 2017 11:23:50 +1300 Subject: [PATCH 01/16] Fix for Defect #394 --- .../server/actors/device/DeviceActorMessageProcessor.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java index 00af0461d1..14ff096747 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java @@ -164,7 +164,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso } else { logger.debug("[{}] No pending RPC messages for new async session [{}]", deviceId, sessionId); } - Set sentOneWayIds = new HashSet<>(); + Set sentOneWayIds = new HashSet<>(); if (type == SessionType.ASYNC) { rpcPendingMap.entrySet().forEach(processPendingRpc(context, sessionId, server, sentOneWayIds)); } else { @@ -174,12 +174,12 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso sentOneWayIds.forEach(rpcPendingMap::remove); } - private Consumer> processPendingRpc(ActorContext context, SessionId sessionId, Optional server, Set sentOneWayIds) { + private Consumer> processPendingRpc(ActorContext context, SessionId sessionId, Optional server, Set sentOneWayIds) { return entry -> { ToDeviceRpcRequest request = entry.getValue().getMsg().getMsg(); ToDeviceRpcRequestBody body = request.getBody(); if (request.isOneway()) { - sentOneWayIds.add(request.getId()); + sentOneWayIds.add(entry.getKey()); ToPluginRpcResponseDeviceMsg responsePluginMsg = toPluginRpcResponseMsg(entry.getValue().getMsg(), (String) null); context.parent().tell(responsePluginMsg, ActorRef.noSender()); } From 12407281c7e30828d6baeadbe5586bda715373ad Mon Sep 17 00:00:00 2001 From: Unknown Date: Fri, 10 Nov 2017 22:34:17 -0800 Subject: [PATCH 02/16] Fix intervals --- application/src/main/resources/thingsboard.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index e1bad01cce..83630ce56a 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -66,8 +66,8 @@ plugins: # JWT Token parameters security.jwt: - tokenExpirationTime: "${JWT_TOKEN_EXPIRATION_TIME:9000000}" # Number of seconds (15 mins) - refreshTokenExpTime: "${JWT_REFRESH_TOKEN_EXPIRATION_TIME:36000000}" # Seconds (1 hour) + tokenExpirationTime: "${JWT_TOKEN_EXPIRATION_TIME:900}" # Number of seconds (15 mins) + refreshTokenExpTime: "${JWT_REFRESH_TOKEN_EXPIRATION_TIME:3600}" # Seconds (1 hour) tokenIssuer: "${JWT_TOKEN_ISSUER:thingsboard.io}" tokenSigningKey: "${JWT_TOKEN_SIGNING_KEY:thingsboardDefaultSigningKey}" From 9e1dbd4fb4f2fbf313a5e9b4337ad5cbd138e939 Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Wed, 15 Nov 2017 13:04:11 +0200 Subject: [PATCH 03/16] Added check for updated TS --- ui/src/app/api/subscription.js | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ui/src/app/api/subscription.js b/ui/src/app/api/subscription.js index 4786bf9835..022a31cbed 100644 --- a/ui/src/app/api/subscription.js +++ b/ui/src/app/api/subscription.js @@ -654,8 +654,9 @@ export default class Subscription { if (!sourceData.data.length) { update = false; } else if (prevData && prevData[0] && prevData[0].length > 1 && sourceData.data.length > 0) { + var prevTs = prevData[0][0]; var prevValue = prevData[0][1]; - if (prevValue === sourceData.data[0][1]) { + if (prevTs === sourceData.data[0][0] && prevValue === sourceData.data[0][1]) { update = false; } } From 8eb6172dee17d3e89a17b0ad92cf3b47afb475b6 Mon Sep 17 00:00:00 2001 From: Igor Kulikov Date: Thu, 16 Nov 2017 11:53:24 +0200 Subject: [PATCH 04/16] Ignore cache control headers for static resources. --- .../server/config/ThingsboardSecurityConfiguration.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/application/src/main/java/org/thingsboard/server/config/ThingsboardSecurityConfiguration.java b/application/src/main/java/org/thingsboard/server/config/ThingsboardSecurityConfiguration.java index 639579863a..6755408b0c 100644 --- a/application/src/main/java/org/thingsboard/server/config/ThingsboardSecurityConfiguration.java +++ b/application/src/main/java/org/thingsboard/server/config/ThingsboardSecurityConfiguration.java @@ -28,6 +28,7 @@ import org.springframework.security.authentication.AuthenticationManager; import org.springframework.security.config.annotation.authentication.builders.AuthenticationManagerBuilder; import org.springframework.security.config.annotation.method.configuration.EnableGlobalMethodSecurity; import org.springframework.security.config.annotation.web.builders.HttpSecurity; +import org.springframework.security.config.annotation.web.builders.WebSecurity; import org.springframework.security.config.annotation.web.configuration.EnableWebSecurity; import org.springframework.security.config.annotation.web.configuration.WebSecurityConfigurerAdapter; import org.springframework.security.config.http.SessionCreationPolicy; @@ -147,6 +148,11 @@ public class ThingsboardSecurityConfiguration extends WebSecurityConfigurerAdapt return new BCryptPasswordEncoder(); } + @Override + public void configure(WebSecurity web) throws Exception { + web.ignoring().antMatchers("/static/**"); + } + @Override protected void configure(HttpSecurity http) throws Exception { http.headers().cacheControl().and().frameOptions().disable() From 19ad5737d842076ce8f02f617012004735d4dad3 Mon Sep 17 00:00:00 2001 From: Igor Kulikov Date: Thu, 16 Nov 2017 18:58:10 +0200 Subject: [PATCH 05/16] Do logout if userId of stored JWT token no longer valid. --- ui/src/app/api/user.service.js | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/ui/src/app/api/user.service.js b/ui/src/app/api/user.service.js index 16686576d3..d09387bda8 100644 --- a/ui/src/app/api/user.service.js +++ b/ui/src/app/api/user.service.js @@ -302,7 +302,7 @@ function UserService($http, $q, $rootScope, adminService, dashboardService, logi $rootScope.forceFullscreen = true; fetchAllowedDashboardIds(); } else if (currentUser.userId) { - getUser(currentUser.userId).then( + getUser(currentUser.userId, true).then( function success(user) { currentUserDetails = user; updateUserLang(); @@ -319,6 +319,7 @@ function UserService($http, $q, $rootScope, adminService, dashboardService, logi }, function fail() { deferred.reject(); + logout(); } ) } else { @@ -414,19 +415,19 @@ function UserService($http, $q, $rootScope, adminService, dashboardService, logi } $http.post(url, user).then(function success(response) { deferred.resolve(response.data); - }, function fail(response) { - deferred.reject(response.data); + }, function fail() { + deferred.reject(); }); return deferred.promise; } - function getUser(userId) { + function getUser(userId, ignoreErrors) { var deferred = $q.defer(); var url = '/api/user/' + userId; - $http.get(url).then(function success(response) { + $http.get(url, { ignoreErrors: ignoreErrors }).then(function success(response) { deferred.resolve(response.data); - }, function fail(response) { - deferred.reject(response.data); + }, function fail() { + deferred.reject(); }); return deferred.promise; } @@ -436,8 +437,8 @@ function UserService($http, $q, $rootScope, adminService, dashboardService, logi var url = '/api/user/' + userId; $http.delete(url).then(function success() { deferred.resolve(); - }, function fail(response) { - deferred.reject(response.data); + }, function fail() { + deferred.reject(); }); return deferred.promise; } From 19c5f92e210c411376c1a91d4d87c4c44bc4c67c Mon Sep 17 00:00:00 2001 From: Igor Kulikov Date: Fri, 17 Nov 2017 15:07:03 +0200 Subject: [PATCH 06/16] UI: Improve layout for small resolution screens. --- ui/src/app/components/details-sidenav.scss | 5 +++- ui/src/app/components/js-func.scss | 2 +- ui/src/app/components/js-func.tpl.html | 2 +- .../relation/relation-dialog.controller.js | 4 +-- .../app/entity/relation/relation-dialog.scss | 2 +- .../entity/relation/relation-dialog.tpl.html | 2 +- ui/src/app/home/home-links.controller.js | 3 +++ ui/src/app/home/home-links.scss | 26 +++++++++++++++++++ ui/src/app/home/home-links.tpl.html | 2 +- ui/src/scss/main.scss | 6 ++++- 10 files changed, 44 insertions(+), 10 deletions(-) create mode 100644 ui/src/app/home/home-links.scss diff --git a/ui/src/app/components/details-sidenav.scss b/ui/src/app/components/details-sidenav.scss index 54a029dc37..159162a50e 100644 --- a/ui/src/app/components/details-sidenav.scss +++ b/ui/src/app/components/details-sidenav.scss @@ -16,7 +16,10 @@ @import '../../scss/constants'; .tb-details-title { - font-size: 1.600rem; + font-size: 1.000rem; + @media (min-width: $layout-breakpoint-gt-sm) { + font-size: 1.600rem; + } font-weight: 400; text-transform: uppercase; margin: 20px 8px 0 0; diff --git a/ui/src/app/components/js-func.scss b/ui/src/app/components/js-func.scss index 7c36b8101e..3124dc12d3 100644 --- a/ui/src/app/components/js-func.scss +++ b/ui/src/app/components/js-func.scss @@ -22,7 +22,7 @@ tb-js-func { border: 1px solid #C0C0C0; height: 100%; #tb-javascript-input { - min-width: 400px; + min-width: 200px; min-height: 200px; width: 100%; height: 100%; diff --git a/ui/src/app/components/js-func.tpl.html b/ui/src/app/components/js-func.tpl.html index ee3c2e0947..bec7991af9 100644 --- a/ui/src/app/components/js-func.tpl.html +++ b/ui/src/app/components/js-func.tpl.html @@ -19,7 +19,7 @@
function({{ functionArgsString }}) { - +
- +
diff --git a/ui/src/app/home/home-links.controller.js b/ui/src/app/home/home-links.controller.js index 273433ba23..799a0bc0b2 100644 --- a/ui/src/app/home/home-links.controller.js +++ b/ui/src/app/home/home-links.controller.js @@ -13,6 +13,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + +import './home-links.scss'; + /*@ngInject*/ export default function HomeLinksController($scope, menu) { var vm = this; diff --git a/ui/src/app/home/home-links.scss b/ui/src/app/home/home-links.scss new file mode 100644 index 0000000000..e210334a88 --- /dev/null +++ b/ui/src/app/home/home-links.scss @@ -0,0 +1,26 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +@import "../../scss/constants"; + +.tb-home-links { + .md-headline { + font-size: 20px; + @media (min-width: $layout-breakpoint-xmd) { + font-size: 24px; + } + } +} \ No newline at end of file diff --git a/ui/src/app/home/home-links.tpl.html b/ui/src/app/home/home-links.tpl.html index 5eac69fe48..e27f20a820 100644 --- a/ui/src/app/home/home-links.tpl.html +++ b/ui/src/app/home/home-links.tpl.html @@ -15,7 +15,7 @@ limitations under the License. --> - + diff --git a/ui/src/scss/main.scss b/ui/src/scss/main.scss index 076dbef296..93418b27fe 100644 --- a/ui/src/scss/main.scss +++ b/ui/src/scss/main.scss @@ -494,11 +494,15 @@ md-tabs.tb-headless { height: 100%; max-width: 240px; span { - padding: 10px 10px 20px 10px; + padding: 0 0 20px 0; font-size: 18px; font-weight: 400; white-space: normal; line-height: 18px; + max-height: 18px; + min-height: 18px; + height: 18px; + margin: auto; } } From bfec91567f0bfd3a9684db15d15eadba20a9ec26 Mon Sep 17 00:00:00 2001 From: Igor Kulikov Date: Wed, 22 Nov 2017 19:00:57 +0200 Subject: [PATCH 07/16] Send user password as payload field rather than url parameter. --- .../server/controller/AuthController.java | 21 ++++++++++--------- .../controller/AbstractControllerTest.java | 5 ++++- .../controller/BaseUserControllerTest.java | 21 +++++++++++++++---- ui/src/app/api/login.service.js | 16 +++++++------- 4 files changed, 40 insertions(+), 23 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/controller/AuthController.java b/application/src/main/java/org/thingsboard/server/controller/AuthController.java index 0246e07bb6..cb2f5b9a33 100644 --- a/application/src/main/java/org/thingsboard/server/controller/AuthController.java +++ b/application/src/main/java/org/thingsboard/server/controller/AuthController.java @@ -19,8 +19,6 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import lombok.extern.slf4j.Slf4j; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; @@ -30,7 +28,6 @@ import org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder; import org.springframework.web.bind.annotation.*; import org.thingsboard.server.common.data.User; import org.thingsboard.server.common.data.security.UserCredentials; -import org.thingsboard.server.dao.user.UserService; import org.thingsboard.server.exception.ThingsboardErrorCode; import org.thingsboard.server.exception.ThingsboardException; import org.thingsboard.server.service.mail.MailService; @@ -78,9 +75,10 @@ public class AuthController extends BaseController { @RequestMapping(value = "/auth/changePassword", method = RequestMethod.POST) @ResponseStatus(value = HttpStatus.OK) public void changePassword ( - @RequestParam(value = "currentPassword") String currentPassword, - @RequestParam(value = "newPassword") String newPassword) throws ThingsboardException { + @RequestBody JsonNode changePasswordRequest) throws ThingsboardException { try { + String currentPassword = changePasswordRequest.get("currentPassword").asText(); + String newPassword = changePasswordRequest.get("newPassword").asText(); SecurityUser securityUser = getCurrentUser(); UserCredentials userCredentials = userService.findUserCredentialsByUserId(securityUser.getId()); if (!passwordEncoder.matches(currentPassword, userCredentials.getPassword())) { @@ -118,9 +116,10 @@ public class AuthController extends BaseController { @RequestMapping(value = "/noauth/resetPasswordByEmail", method = RequestMethod.POST) @ResponseStatus(value = HttpStatus.OK) public void requestResetPasswordByEmail ( - @RequestParam(value = "email") String email, + @RequestBody JsonNode resetPasswordByEmailRequest, HttpServletRequest request) throws ThingsboardException { try { + String email = resetPasswordByEmailRequest.get("email").asText(); UserCredentials userCredentials = userService.requestPasswordReset(email); String baseUrl = constructBaseUrl(request); String resetUrl = String.format("%s/api/noauth/resetPassword?resetToken=%s", baseUrl, @@ -158,10 +157,11 @@ public class AuthController extends BaseController { @ResponseStatus(value = HttpStatus.OK) @ResponseBody public JsonNode activateUser( - @RequestParam(value = "activateToken") String activateToken, - @RequestParam(value = "password") String password, + @RequestBody JsonNode activateRequest, HttpServletRequest request) throws ThingsboardException { try { + String activateToken = activateRequest.get("activateToken").asText(); + String password = activateRequest.get("password").asText(); String encodedPassword = passwordEncoder.encode(password); UserCredentials credentials = userService.activateUserCredentials(activateToken, encodedPassword); User user = userService.findUserById(credentials.getUserId()); @@ -194,10 +194,11 @@ public class AuthController extends BaseController { @ResponseStatus(value = HttpStatus.OK) @ResponseBody public JsonNode resetPassword( - @RequestParam(value = "resetToken") String resetToken, - @RequestParam(value = "password") String password, + @RequestBody JsonNode resetPasswordRequest, HttpServletRequest request) throws ThingsboardException { try { + String resetToken = resetPasswordRequest.get("resetToken").asText(); + String password = resetPasswordRequest.get("password").asText(); UserCredentials userCredentials = userService.findUserCredentialsByResetToken(resetToken); if (userCredentials != null) { String encodedPassword = passwordEncoder.encode(password); diff --git a/application/src/test/java/org/thingsboard/server/controller/AbstractControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/AbstractControllerTest.java index 689f3164f6..19e4329c33 100644 --- a/application/src/test/java/org/thingsboard/server/controller/AbstractControllerTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/AbstractControllerTest.java @@ -221,7 +221,10 @@ public abstract class AbstractControllerTest { doGet("/api/noauth/activate?activateToken={activateToken}", TestMailService.currentActivateToken) .andExpect(status().isSeeOther()) .andExpect(header().string(HttpHeaders.LOCATION, "/login/createPassword?activateToken=" + TestMailService.currentActivateToken)); - JsonNode tokenInfo = readResponse(doPost("/api/noauth/activate", "activateToken", TestMailService.currentActivateToken, "password", password).andExpect(status().isOk()), JsonNode.class); + JsonNode activateRequest = new ObjectMapper().createObjectNode() + .put("activateToken", TestMailService.currentActivateToken) + .put("password", password); + JsonNode tokenInfo = readResponse(doPost("/api/noauth/activate", activateRequest).andExpect(status().isOk()), JsonNode.class); validateAndSetJwtToken(tokenInfo, user.getEmail()); return savedUser; } diff --git a/application/src/test/java/org/thingsboard/server/controller/BaseUserControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/BaseUserControllerTest.java index 8dac038772..e3c87f66c2 100644 --- a/application/src/test/java/org/thingsboard/server/controller/BaseUserControllerTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/BaseUserControllerTest.java @@ -17,6 +17,7 @@ package org.thingsboard.server.controller; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.lang3.RandomStringUtils; import org.junit.Assert; import org.junit.Test; @@ -73,7 +74,11 @@ public abstract class BaseUserControllerTest extends AbstractControllerTest { .andExpect(status().isSeeOther()) .andExpect(header().string(HttpHeaders.LOCATION, "/login/createPassword?activateToken=" + TestMailService.currentActivateToken)); - JsonNode tokenInfo = readResponse(doPost("/api/noauth/activate", "activateToken", TestMailService.currentActivateToken, "password", "testPassword").andExpect(status().isOk()), JsonNode.class); + JsonNode activateRequest = new ObjectMapper().createObjectNode() + .put("activateToken", TestMailService.currentActivateToken) + .put("password", "testPassword"); + + JsonNode tokenInfo = readResponse(doPost("/api/noauth/activate", activateRequest).andExpect(status().isOk()), JsonNode.class); validateAndSetJwtToken(tokenInfo, email); doGet("/api/auth/user") @@ -117,13 +122,21 @@ public abstract class BaseUserControllerTest extends AbstractControllerTest { User savedUser = createUserAndLogin(user, "testPassword1"); logout(); - doPost("/api/noauth/resetPasswordByEmail", "email", email) + + JsonNode resetPasswordByEmailRequest = new ObjectMapper().createObjectNode() + .put("email", email); + + doPost("/api/noauth/resetPasswordByEmail", resetPasswordByEmailRequest) .andExpect(status().isOk()); doGet("/api/noauth/resetPassword?resetToken={resetToken}", TestMailService.currentResetPasswordToken) .andExpect(status().isSeeOther()) .andExpect(header().string(HttpHeaders.LOCATION, "/login/resetPassword?resetToken=" + TestMailService.currentResetPasswordToken)); - - JsonNode tokenInfo = readResponse(doPost("/api/noauth/resetPassword", "resetToken", TestMailService.currentResetPasswordToken, "password", "testPassword2").andExpect(status().isOk()), JsonNode.class); + + JsonNode resetPasswordRequest = new ObjectMapper().createObjectNode() + .put("resetToken", TestMailService.currentResetPasswordToken) + .put("password", "testPassword2"); + + JsonNode tokenInfo = readResponse(doPost("/api/noauth/resetPassword", resetPasswordRequest).andExpect(status().isOk()), JsonNode.class); validateAndSetJwtToken(tokenInfo, email); doGet("/api/auth/user") diff --git a/ui/src/app/api/login.service.js b/ui/src/app/api/login.service.js index 272e4dfa20..74f9587cf0 100644 --- a/ui/src/app/api/login.service.js +++ b/ui/src/app/api/login.service.js @@ -65,8 +65,8 @@ function LoginService($http, $q) { function sendResetPasswordLink(email) { var deferred = $q.defer(); - var url = '/api/noauth/resetPasswordByEmail?email=' + email; - $http.post(url, null).then(function success(response) { + var url = '/api/noauth/resetPasswordByEmail'; + $http.post(url, {email: email}).then(function success(response) { deferred.resolve(response); }, function fail() { deferred.reject(); @@ -76,8 +76,8 @@ function LoginService($http, $q) { function resetPassword(resetToken, password) { var deferred = $q.defer(); - var url = '/api/noauth/resetPassword?resetToken=' + resetToken + '&password=' + password; - $http.post(url, null).then(function success(response) { + var url = '/api/noauth/resetPassword'; + $http.post(url, {resetToken: resetToken, password: password}).then(function success(response) { deferred.resolve(response); }, function fail() { deferred.reject(); @@ -87,8 +87,8 @@ function LoginService($http, $q) { function activate(activateToken, password) { var deferred = $q.defer(); - var url = '/api/noauth/activate?activateToken=' + activateToken + '&password=' + password; - $http.post(url, null).then(function success(response) { + var url = '/api/noauth/activate'; + $http.post(url, {activateToken: activateToken, password: password}).then(function success(response) { deferred.resolve(response); }, function fail() { deferred.reject(); @@ -98,8 +98,8 @@ function LoginService($http, $q) { function changePassword(currentPassword, newPassword) { var deferred = $q.defer(); - var url = '/api/auth/changePassword?currentPassword=' + currentPassword + '&newPassword=' + newPassword; - $http.post(url, null).then(function success(response) { + var url = '/api/auth/changePassword'; + $http.post(url, {currentPassword: currentPassword, newPassword: newPassword}).then(function success(response) { deferred.resolve(response); }, function fail() { deferred.reject(); From 160aa7ac04ed5956e6125ae54bafdf22ff352e17 Mon Sep 17 00:00:00 2001 From: Unknown Date: Wed, 22 Nov 2017 23:28:06 -0800 Subject: [PATCH 08/16] Protect against bogus interval --- .../plugin/telemetry/handlers/TelemetryRestMsgHandler.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java index 0b8e992525..874bf89670 100644 --- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java +++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java @@ -128,8 +128,9 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler { Optional interval = request.getLongParamValue("interval"); Optional limit = request.getIntParamValue("limit"); + // If some of these params are specified, they all must be if (startTs.isPresent() || endTs.isPresent() || interval.isPresent() || limit.isPresent()) { - if (!startTs.isPresent() || !endTs.isPresent() || !interval.isPresent()) { + if (!startTs.isPresent() || !endTs.isPresent() || !interval.isPresent() || interval.get() < 0) { msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST)); return; } From b1bc922d1656496f48972336d12547a56b8a2874 Mon Sep 17 00:00:00 2001 From: Unknown Date: Wed, 22 Nov 2017 23:07:03 -0800 Subject: [PATCH 09/16] If user specifies an interval of 0, convert aggregation to NONE This prevents an endless loop that can hang up the server, and is probably what the user intuitively wanted. Merge remote-tracking branch 'origin/zero_interval' into zero_interval Fix merge issue Fix whitespace to match original More whitespace --- .../plugin/telemetry/handlers/TelemetryRestMsgHandler.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java index 874bf89670..7e92fc15f1 100644 --- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java +++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java @@ -134,7 +134,10 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler { msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST)); return; } - Aggregation agg = Aggregation.valueOf(request.getParameter("agg", Aggregation.NONE.name())); + + // If interval is 0, convert this to a NONE aggregation, which is probably what the user really wanted + Aggregation agg = (interval.isPresent() && interval.get() == 0) ? Aggregation.valueOf(Aggregation.NONE.name()) : + Aggregation.valueOf(request.getParameter("agg", Aggregation.NONE.name())); List queries = keys.stream().map(key -> new BaseTsKvQuery(key, startTs.get(), endTs.get(), interval.get(), limit.orElse(TelemetryWebsocketMsgHandler.DEFAULT_LIMIT), agg)) .collect(Collectors.toList()); From bdaee28d9e39c91626235418e3f4013d0aafbf5d Mon Sep 17 00:00:00 2001 From: mp-loki Date: Tue, 7 Nov 2017 19:08:06 -0500 Subject: [PATCH 10/16] SQS and SNS Plugins added --- .../rule/RuleActorMessageProcessor.java | 2 +- extensions/extension-sns/pom.xml | 80 ++++++++++++++++++ .../extension-sns/src/assembly/extension.xml | 37 ++++++++ .../sns/action/SnsTopicActionMsg.java | 31 +++++++ .../sns/action/SnsTopicActionPayload.java | 37 ++++++++ .../sns/action/SnsTopicPluginAction.java | 45 ++++++++++ .../SnsTopicPluginActionConfiguration.java | 30 +++++++ .../sns/plugin/SnsMessageHandler.java | 63 ++++++++++++++ .../extensions/sns/plugin/SnsPlugin.java | 79 +++++++++++++++++ .../sns/plugin/SnsPluginConfiguration.java | 30 +++++++ .../main/resources/SnsPluginDescriptor.json | 30 +++++++ .../resources/SnsTopicActionDescriptor.json | 34 ++++++++ extensions/extention-sqs/pom.xml | 80 ++++++++++++++++++ .../extention-sqs/src/assembly/extension.xml | 37 ++++++++ .../action/fifo/SqsFifoQueueActionMsg.java | 31 +++++++ .../fifo/SqsFifoQueueActionPayload.java | 39 +++++++++ .../action/fifo/SqsFifoQueuePluginAction.java | 49 +++++++++++ ...SqsFifoQueuePluginActionConfiguration.java | 30 +++++++ .../standard/SqsStandardQueueActionMsg.java | 31 +++++++ .../SqsStandardQueueActionPayload.java | 39 +++++++++ .../SqsStandardQueuePluginAction.java | 46 ++++++++++ ...tandardQueuePluginActionConfiguration.java | 32 +++++++ .../sqs/plugin/SqsMessageHandler.java | 84 +++++++++++++++++++ .../extensions/sqs/plugin/SqsPlugin.java | 78 +++++++++++++++++ .../sqs/plugin/SqsPluginConfiguration.java | 30 +++++++ .../SqsFifoQueueActionDescriptor.json | 34 ++++++++ .../main/resources/SqsPluginDescriptor.json | 30 +++++++ .../SqsStandardQueueActionDescriptor.json | 41 +++++++++ .../server/extensions/sqs/SqsDemoClient.java | 69 +++++++++++++++ .../src/test/resources/logback.xml | 10 +++ extensions/pom.xml | 2 + pom.xml | 12 +++ resume.bat | 18 ++++ tools/src/main/python/mqtt-send-telemetry.py | 44 ++++++++++ 34 files changed, 1363 insertions(+), 1 deletion(-) create mode 100644 extensions/extension-sns/pom.xml create mode 100644 extensions/extension-sns/src/assembly/extension.xml create mode 100644 extensions/extension-sns/src/main/java/org/thingsboard/server/extensions/sns/action/SnsTopicActionMsg.java create mode 100644 extensions/extension-sns/src/main/java/org/thingsboard/server/extensions/sns/action/SnsTopicActionPayload.java create mode 100644 extensions/extension-sns/src/main/java/org/thingsboard/server/extensions/sns/action/SnsTopicPluginAction.java create mode 100644 extensions/extension-sns/src/main/java/org/thingsboard/server/extensions/sns/action/SnsTopicPluginActionConfiguration.java create mode 100644 extensions/extension-sns/src/main/java/org/thingsboard/server/extensions/sns/plugin/SnsMessageHandler.java create mode 100644 extensions/extension-sns/src/main/java/org/thingsboard/server/extensions/sns/plugin/SnsPlugin.java create mode 100644 extensions/extension-sns/src/main/java/org/thingsboard/server/extensions/sns/plugin/SnsPluginConfiguration.java create mode 100644 extensions/extension-sns/src/main/resources/SnsPluginDescriptor.json create mode 100644 extensions/extension-sns/src/main/resources/SnsTopicActionDescriptor.json create mode 100644 extensions/extention-sqs/pom.xml create mode 100644 extensions/extention-sqs/src/assembly/extension.xml create mode 100644 extensions/extention-sqs/src/main/java/org/thingsboard/server/extensions/sqs/action/fifo/SqsFifoQueueActionMsg.java create mode 100644 extensions/extention-sqs/src/main/java/org/thingsboard/server/extensions/sqs/action/fifo/SqsFifoQueueActionPayload.java create mode 100644 extensions/extention-sqs/src/main/java/org/thingsboard/server/extensions/sqs/action/fifo/SqsFifoQueuePluginAction.java create mode 100644 extensions/extention-sqs/src/main/java/org/thingsboard/server/extensions/sqs/action/fifo/SqsFifoQueuePluginActionConfiguration.java create mode 100644 extensions/extention-sqs/src/main/java/org/thingsboard/server/extensions/sqs/action/standard/SqsStandardQueueActionMsg.java create mode 100644 extensions/extention-sqs/src/main/java/org/thingsboard/server/extensions/sqs/action/standard/SqsStandardQueueActionPayload.java create mode 100644 extensions/extention-sqs/src/main/java/org/thingsboard/server/extensions/sqs/action/standard/SqsStandardQueuePluginAction.java create mode 100644 extensions/extention-sqs/src/main/java/org/thingsboard/server/extensions/sqs/action/standard/SqsStandardQueuePluginActionConfiguration.java create mode 100644 extensions/extention-sqs/src/main/java/org/thingsboard/server/extensions/sqs/plugin/SqsMessageHandler.java create mode 100644 extensions/extention-sqs/src/main/java/org/thingsboard/server/extensions/sqs/plugin/SqsPlugin.java create mode 100644 extensions/extention-sqs/src/main/java/org/thingsboard/server/extensions/sqs/plugin/SqsPluginConfiguration.java create mode 100644 extensions/extention-sqs/src/main/resources/SqsFifoQueueActionDescriptor.json create mode 100644 extensions/extention-sqs/src/main/resources/SqsPluginDescriptor.json create mode 100644 extensions/extention-sqs/src/main/resources/SqsStandardQueueActionDescriptor.json create mode 100644 extensions/extention-sqs/src/test/java/org/thingsboard/server/extensions/sqs/SqsDemoClient.java create mode 100644 extensions/extention-sqs/src/test/resources/logback.xml create mode 100644 resume.bat create mode 100644 tools/src/main/python/mqtt-send-telemetry.py diff --git a/application/src/main/java/org/thingsboard/server/actors/rule/RuleActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/rule/RuleActorMessageProcessor.java index c695ec8316..7a7e3ff1ab 100644 --- a/application/src/main/java/org/thingsboard/server/actors/rule/RuleActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/rule/RuleActorMessageProcessor.java @@ -170,7 +170,7 @@ class RuleActorMessageProcessor extends ComponentMsgProcessor { Optional> ruleToPluginMsgOptional = action.convert(ruleCtx, inMsg, inMsgMd); if (ruleToPluginMsgOptional.isPresent()) { RuleToPluginMsg ruleToPluginMsg = ruleToPluginMsgOptional.get(); - logger.debug("[{}] Device msg is converter to: {}", entityId, ruleToPluginMsg); + logger.debug("[{}] Device msg is converted to: {}", entityId, ruleToPluginMsg); context.parent().tell(new RuleToPluginMsgWrapper(pluginTenantId, pluginId, tenantId, entityId, ruleToPluginMsg), context.self()); if (action.isOneWayAction()) { pushToNextRule(context, msg.getCtx(), RuleEngineError.NO_TWO_WAY_ACTIONS); diff --git a/extensions/extension-sns/pom.xml b/extensions/extension-sns/pom.xml new file mode 100644 index 0000000000..37f8fac57c --- /dev/null +++ b/extensions/extension-sns/pom.xml @@ -0,0 +1,80 @@ + + + + + extensions + org.thingsboard + 1.4.0-SNAPSHOT + + 4.0.0 + org.thingsboard.extensions + extension-sns + jar + + Thingsboard Server SNS Extension + http://thingsboard.org + + + UTF-8 + ${basedir}/../.. + 1.11.229 + + + + + org.thingsboard + extensions-api + provided + + + com.amazonaws + aws-java-sdk-sns + ${aws.sdk.version} + + + org.thingsboard + extensions-core + + + + + + + maven-assembly-plugin + + + src/assembly/extension.xml + + + + + make-assembly + package + + single + + + + + + + + \ No newline at end of file diff --git a/extensions/extension-sns/src/assembly/extension.xml b/extensions/extension-sns/src/assembly/extension.xml new file mode 100644 index 0000000000..2395d5551a --- /dev/null +++ b/extensions/extension-sns/src/assembly/extension.xml @@ -0,0 +1,37 @@ + + + extension + + jar + + false + + + / + true + true + runtime + + + + + + \ No newline at end of file diff --git a/extensions/extension-sns/src/main/java/org/thingsboard/server/extensions/sns/action/SnsTopicActionMsg.java b/extensions/extension-sns/src/main/java/org/thingsboard/server/extensions/sns/action/SnsTopicActionMsg.java new file mode 100644 index 0000000000..b64a22bdba --- /dev/null +++ b/extensions/extension-sns/src/main/java/org/thingsboard/server/extensions/sns/action/SnsTopicActionMsg.java @@ -0,0 +1,31 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.extensions.sns.action; + +import org.thingsboard.server.common.data.id.CustomerId; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.extensions.api.plugins.msg.AbstractRuleToPluginMsg; + +/** + * Created by Valerii Sosliuk on 11/15/2017. + */ +public class SnsTopicActionMsg extends AbstractRuleToPluginMsg { + + public SnsTopicActionMsg(TenantId tenantId, CustomerId customerId, DeviceId deviceId, SnsTopicActionPayload payload) { + super(tenantId, customerId, deviceId, payload); + } +} diff --git a/extensions/extension-sns/src/main/java/org/thingsboard/server/extensions/sns/action/SnsTopicActionPayload.java b/extensions/extension-sns/src/main/java/org/thingsboard/server/extensions/sns/action/SnsTopicActionPayload.java new file mode 100644 index 0000000000..a1bf89a248 --- /dev/null +++ b/extensions/extension-sns/src/main/java/org/thingsboard/server/extensions/sns/action/SnsTopicActionPayload.java @@ -0,0 +1,37 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.extensions.sns.action; + +import lombok.Builder; +import lombok.Data; +import org.thingsboard.server.common.msg.session.MsgType; + +import java.io.Serializable; + +/** + * Created by Valerii Sosliuk on 11/15/2017. + */ +@Data +@Builder +public class SnsTopicActionPayload implements Serializable { + + private final String topicArn; + private final String msgBody; + + private final Integer requestId; + private final MsgType msgType; + private final boolean sync; +} diff --git a/extensions/extension-sns/src/main/java/org/thingsboard/server/extensions/sns/action/SnsTopicPluginAction.java b/extensions/extension-sns/src/main/java/org/thingsboard/server/extensions/sns/action/SnsTopicPluginAction.java new file mode 100644 index 0000000000..79de23acd4 --- /dev/null +++ b/extensions/extension-sns/src/main/java/org/thingsboard/server/extensions/sns/action/SnsTopicPluginAction.java @@ -0,0 +1,45 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.extensions.sns.action; + +import org.thingsboard.server.common.msg.device.ToDeviceActorMsg; +import org.thingsboard.server.common.msg.session.FromDeviceRequestMsg; +import org.thingsboard.server.extensions.api.component.Action; +import org.thingsboard.server.extensions.api.plugins.msg.RuleToPluginMsg; +import org.thingsboard.server.extensions.api.rules.RuleContext; +import org.thingsboard.server.extensions.core.action.template.AbstractTemplatePluginAction; + +import java.util.Optional; + +/** + * Created by Valerii Sosliuk on 11/15/2017. + */ +@Action(name = "SNS Topic Action", descriptor = "SnsTopicActionDescriptor.json", configuration = SnsTopicPluginActionConfiguration.class) +public class SnsTopicPluginAction extends AbstractTemplatePluginAction { + + @Override + protected Optional buildRuleToPluginMsg(RuleContext ctx, ToDeviceActorMsg msg, FromDeviceRequestMsg payload) { + SnsTopicActionPayload.SnsTopicActionPayloadBuilder builder = SnsTopicActionPayload.builder(); + builder.msgType(payload.getMsgType()); + builder.requestId(payload.getRequestId()); + builder.topicArn(configuration.getTopicArn()); + builder.msgBody(getMsgBody(ctx, msg)); + return Optional.of(new SnsTopicActionMsg(msg.getTenantId(), + msg.getCustomerId(), + msg.getDeviceId(), + builder.build())); + } +} diff --git a/extensions/extension-sns/src/main/java/org/thingsboard/server/extensions/sns/action/SnsTopicPluginActionConfiguration.java b/extensions/extension-sns/src/main/java/org/thingsboard/server/extensions/sns/action/SnsTopicPluginActionConfiguration.java new file mode 100644 index 0000000000..468c3ddbc3 --- /dev/null +++ b/extensions/extension-sns/src/main/java/org/thingsboard/server/extensions/sns/action/SnsTopicPluginActionConfiguration.java @@ -0,0 +1,30 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.extensions.sns.action; + +import lombok.Data; +import org.thingsboard.server.extensions.core.action.template.TemplateActionConfiguration; + +/** + * Created by Valerii Sosliuk on 11/15/2017. + */ +@Data +public class SnsTopicPluginActionConfiguration implements TemplateActionConfiguration { + + private String topicArn; + private String template; + private boolean sync; +} diff --git a/extensions/extension-sns/src/main/java/org/thingsboard/server/extensions/sns/plugin/SnsMessageHandler.java b/extensions/extension-sns/src/main/java/org/thingsboard/server/extensions/sns/plugin/SnsMessageHandler.java new file mode 100644 index 0000000000..c7fd99c8be --- /dev/null +++ b/extensions/extension-sns/src/main/java/org/thingsboard/server/extensions/sns/plugin/SnsMessageHandler.java @@ -0,0 +1,63 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.extensions.sns.plugin; + +import com.amazonaws.services.sns.AmazonSNS; +import com.amazonaws.services.sns.model.PublishRequest; +import com.amazonaws.services.sns.model.PublishResult; +import com.amazonaws.services.sqs.AmazonSQS; +import com.amazonaws.services.sqs.model.SendMessageRequest; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.common.data.id.RuleId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.msg.core.BasicStatusCodeResponse; +import org.thingsboard.server.extensions.api.plugins.PluginContext; +import org.thingsboard.server.extensions.api.plugins.handlers.RuleMsgHandler; +import org.thingsboard.server.extensions.api.plugins.msg.ResponsePluginToRuleMsg; +import org.thingsboard.server.extensions.api.plugins.msg.RuleToPluginMsg; +import org.thingsboard.server.extensions.api.rules.RuleException; +import org.thingsboard.server.extensions.sns.action.SnsTopicActionMsg; +import org.thingsboard.server.extensions.sns.action.SnsTopicActionPayload; + +/** + * Created by Valerii Sosliuk on 11/6/2017. + */ +@RequiredArgsConstructor +@Slf4j +public class SnsMessageHandler implements RuleMsgHandler { + + private final AmazonSNS sns; + + @Override + public void process(PluginContext ctx, TenantId tenantId, RuleId ruleId, RuleToPluginMsg msg) throws RuleException { + if (msg instanceof SnsTopicActionMsg) { + SnsTopicActionPayload payload = ((SnsTopicActionMsg) msg).getPayload(); + PublishRequest publishRequest = new PublishRequest() + .withTopicArn(payload.getTopicArn()) + .withMessage(payload.getMsgBody()); + sns.publish(publishRequest); + if (payload.isSync()) { + ctx.reply(new ResponsePluginToRuleMsg(msg.getUid(), tenantId, ruleId, + BasicStatusCodeResponse.onSuccess(payload.getMsgType(), payload.getRequestId()))); + } + return; + } + throw new RuleException("Unsupported message type " + msg.getClass().getName() + "!"); + + } + +} diff --git a/extensions/extension-sns/src/main/java/org/thingsboard/server/extensions/sns/plugin/SnsPlugin.java b/extensions/extension-sns/src/main/java/org/thingsboard/server/extensions/sns/plugin/SnsPlugin.java new file mode 100644 index 0000000000..86f5de0331 --- /dev/null +++ b/extensions/extension-sns/src/main/java/org/thingsboard/server/extensions/sns/plugin/SnsPlugin.java @@ -0,0 +1,79 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.extensions.sns.plugin; + +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.services.sns.AmazonSNS; +import com.amazonaws.services.sns.AmazonSNSClient; +import org.thingsboard.server.extensions.api.component.Plugin; +import org.thingsboard.server.extensions.api.plugins.AbstractPlugin; +import org.thingsboard.server.extensions.api.plugins.PluginContext; +import org.thingsboard.server.extensions.api.plugins.handlers.RuleMsgHandler; +import org.thingsboard.server.extensions.sns.action.SnsTopicPluginAction; + +/** + * Created by Valerii Sosliuk on 11/15/2017. + */ +@Plugin(name = "SNS Plugin", actions = {SnsTopicPluginAction.class}, + descriptor = "SnsPluginDescriptor.json", configuration = SnsPluginConfiguration.class) +public class SnsPlugin extends AbstractPlugin { + + private SnsMessageHandler snsMessageHandler; + private SnsPluginConfiguration configuration; + + @Override + public void init(SnsPluginConfiguration configuration) { + this.configuration = configuration; + init(); + } + + private void init() { + AWSCredentials awsCredentials = new BasicAWSCredentials(configuration.getAccessKeyId(), configuration.getSecretAccessKey()); + AWSStaticCredentialsProvider credProvider = new AWSStaticCredentialsProvider(awsCredentials); + AmazonSNS sns = AmazonSNSClient.builder() + .withCredentials(credProvider) + .withRegion(configuration.getRegion()) + .build(); + this.snsMessageHandler = new SnsMessageHandler(sns); + + } + + private void destroy() { + this.snsMessageHandler = null; + } + + @Override + protected RuleMsgHandler getRuleMsgHandler() { + return snsMessageHandler; + } + + @Override + public void resume(PluginContext ctx) { + init(); + } + + @Override + public void suspend(PluginContext ctx) { + destroy(); + } + + @Override + public void stop(PluginContext ctx) { + destroy(); + } +} diff --git a/extensions/extension-sns/src/main/java/org/thingsboard/server/extensions/sns/plugin/SnsPluginConfiguration.java b/extensions/extension-sns/src/main/java/org/thingsboard/server/extensions/sns/plugin/SnsPluginConfiguration.java new file mode 100644 index 0000000000..dee78f3f57 --- /dev/null +++ b/extensions/extension-sns/src/main/java/org/thingsboard/server/extensions/sns/plugin/SnsPluginConfiguration.java @@ -0,0 +1,30 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.extensions.sns.plugin; + +import lombok.Data; + +/** + * Created by Valerii Sosliuk on 11/5/2017. + */ +@Data +public class SnsPluginConfiguration { + + private String accessKeyId; + private String secretAccessKey; + private String region; + +} diff --git a/extensions/extension-sns/src/main/resources/SnsPluginDescriptor.json b/extensions/extension-sns/src/main/resources/SnsPluginDescriptor.json new file mode 100644 index 0000000000..04e23e75bc --- /dev/null +++ b/extensions/extension-sns/src/main/resources/SnsPluginDescriptor.json @@ -0,0 +1,30 @@ +{ + "schema": { + "title": "SNS Plugin Configuration", + "type": "object", + "properties": { + "accessKeyId": { + "title": "Access Key ID", + "type": "string" + }, + "secretAccessKey": { + "title": "Secret Access Key", + "type": "string" + }, + "region": { + "title": "Region", + "type": "string" + } + }, + "required": [ + "accessKeyId", + "secretAccessKey", + "region" + ] + }, + "form": [ + "accessKeyId", + "secretAccessKey", + "region" + ] +} \ No newline at end of file diff --git a/extensions/extension-sns/src/main/resources/SnsTopicActionDescriptor.json b/extensions/extension-sns/src/main/resources/SnsTopicActionDescriptor.json new file mode 100644 index 0000000000..a8a2793885 --- /dev/null +++ b/extensions/extension-sns/src/main/resources/SnsTopicActionDescriptor.json @@ -0,0 +1,34 @@ +{ + "schema": { + "title": "SNS Topic Action Configuration", + "type": "object", + "properties": { + "sync": { + "title": "Requires delivery confirmation", + "type": "boolean" + }, + "topicArn": { + "title": "Topic ARN", + "type": "string" + }, + "template": { + "title": "Body Template", + "type": "string" + } + }, + "required": [ + "sync", + "topicArn", + "template" + ] + }, + "form": [ + "sync", + "topicArn", + { + "key": "template", + "type": "textarea", + "rows": 5 + } + ] +} \ No newline at end of file diff --git a/extensions/extention-sqs/pom.xml b/extensions/extention-sqs/pom.xml new file mode 100644 index 0000000000..13eb5dda1a --- /dev/null +++ b/extensions/extention-sqs/pom.xml @@ -0,0 +1,80 @@ + + + + + extensions + org.thingsboard + 1.4.0-SNAPSHOT + + 4.0.0 + org.thingsboard.extensions + extension-sqs + jar + + Thingsboard Server SQS Extension + http://thingsboard.org + + + UTF-8 + ${basedir}/../.. + 1.11.229 + + + + + org.thingsboard + extensions-api + provided + + + com.amazonaws + aws-java-sdk-sqs + ${aws.sdk.version} + + + org.thingsboard + extensions-core + + + + + + + maven-assembly-plugin + + + src/assembly/extension.xml + + + + + make-assembly + package + + single + + + + + + + + \ No newline at end of file diff --git a/extensions/extention-sqs/src/assembly/extension.xml b/extensions/extention-sqs/src/assembly/extension.xml new file mode 100644 index 0000000000..2395d5551a --- /dev/null +++ b/extensions/extention-sqs/src/assembly/extension.xml @@ -0,0 +1,37 @@ + + + extension + + jar + + false + + + / + true + true + runtime + + + + + + \ No newline at end of file diff --git a/extensions/extention-sqs/src/main/java/org/thingsboard/server/extensions/sqs/action/fifo/SqsFifoQueueActionMsg.java b/extensions/extention-sqs/src/main/java/org/thingsboard/server/extensions/sqs/action/fifo/SqsFifoQueueActionMsg.java new file mode 100644 index 0000000000..c4650636f3 --- /dev/null +++ b/extensions/extention-sqs/src/main/java/org/thingsboard/server/extensions/sqs/action/fifo/SqsFifoQueueActionMsg.java @@ -0,0 +1,31 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.extensions.sqs.action.fifo; + +import org.thingsboard.server.common.data.id.CustomerId; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.extensions.api.plugins.msg.AbstractRuleToPluginMsg; + +/** + * Created by Valerii Sosliuk on 11/10/2017. + */ +public class SqsFifoQueueActionMsg extends AbstractRuleToPluginMsg { + + public SqsFifoQueueActionMsg(TenantId tenantId, CustomerId customerId, DeviceId deviceId, SqsFifoQueueActionPayload payload) { + super(tenantId, customerId, deviceId, payload); + } +} diff --git a/extensions/extention-sqs/src/main/java/org/thingsboard/server/extensions/sqs/action/fifo/SqsFifoQueueActionPayload.java b/extensions/extention-sqs/src/main/java/org/thingsboard/server/extensions/sqs/action/fifo/SqsFifoQueueActionPayload.java new file mode 100644 index 0000000000..692cd90849 --- /dev/null +++ b/extensions/extention-sqs/src/main/java/org/thingsboard/server/extensions/sqs/action/fifo/SqsFifoQueueActionPayload.java @@ -0,0 +1,39 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.extensions.sqs.action.fifo; + +import lombok.Builder; +import lombok.Data; +import org.thingsboard.server.common.msg.session.MsgType; + +import java.io.Serializable; + +/** + * Created by Valerii Sosliuk on 11/10/2017. + */ +@Data +@Builder +public class SqsFifoQueueActionPayload implements Serializable { + + private final String queue; + private final String msgBody; + private final String deviceId; + + private final Integer requestId; + private final MsgType msgType; + private final boolean sync; + +} diff --git a/extensions/extention-sqs/src/main/java/org/thingsboard/server/extensions/sqs/action/fifo/SqsFifoQueuePluginAction.java b/extensions/extention-sqs/src/main/java/org/thingsboard/server/extensions/sqs/action/fifo/SqsFifoQueuePluginAction.java new file mode 100644 index 0000000000..f107ea103c --- /dev/null +++ b/extensions/extention-sqs/src/main/java/org/thingsboard/server/extensions/sqs/action/fifo/SqsFifoQueuePluginAction.java @@ -0,0 +1,49 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.extensions.sqs.action.fifo; + +import org.thingsboard.server.common.msg.device.ToDeviceActorMsg; +import org.thingsboard.server.common.msg.session.FromDeviceRequestMsg; +import org.thingsboard.server.extensions.api.component.Action; +import org.thingsboard.server.extensions.api.plugins.msg.RuleToPluginMsg; +import org.thingsboard.server.extensions.api.rules.RuleContext; +import org.thingsboard.server.extensions.core.action.template.AbstractTemplatePluginAction; +import org.thingsboard.server.extensions.sqs.action.standard.SqsStandardQueueActionMsg; +import org.thingsboard.server.extensions.sqs.action.standard.SqsStandardQueueActionPayload; +import org.thingsboard.server.extensions.sqs.action.standard.SqsStandardQueuePluginActionConfiguration; + +import java.util.Optional; + +/** + * Created by Valerii Sosliuk on 11/5/2017. + */ +@Action(name = "SQS Fifo Queue Action", descriptor = "SqsFifoQueueActionDescriptor.json", configuration = SqsFifoQueuePluginActionConfiguration.class) +public class SqsFifoQueuePluginAction extends AbstractTemplatePluginAction { + + @Override + protected Optional buildRuleToPluginMsg(RuleContext ctx, ToDeviceActorMsg msg, FromDeviceRequestMsg payload) { + SqsFifoQueueActionPayload.SqsFifoQueueActionPayloadBuilder builder = SqsFifoQueueActionPayload.builder(); + builder.msgType(payload.getMsgType()); + builder.requestId(payload.getRequestId()); + builder.queue(configuration.getQueue()); + builder.deviceId(msg.getDeviceId().toString()); + builder.msgBody(getMsgBody(ctx, msg)); + return Optional.of(new SqsFifoQueueActionMsg(msg.getTenantId(), + msg.getCustomerId(), + msg.getDeviceId(), + builder.build())); + } +} diff --git a/extensions/extention-sqs/src/main/java/org/thingsboard/server/extensions/sqs/action/fifo/SqsFifoQueuePluginActionConfiguration.java b/extensions/extention-sqs/src/main/java/org/thingsboard/server/extensions/sqs/action/fifo/SqsFifoQueuePluginActionConfiguration.java new file mode 100644 index 0000000000..d10032576b --- /dev/null +++ b/extensions/extention-sqs/src/main/java/org/thingsboard/server/extensions/sqs/action/fifo/SqsFifoQueuePluginActionConfiguration.java @@ -0,0 +1,30 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.extensions.sqs.action.fifo; + +import lombok.Data; +import org.thingsboard.server.extensions.core.action.template.TemplateActionConfiguration; + +/** + * Created by Valerii Sosliuk on 11/10/2017. + */ +@Data +public class SqsFifoQueuePluginActionConfiguration implements TemplateActionConfiguration { + + private String queue; + private String template; + private boolean sync; +} diff --git a/extensions/extention-sqs/src/main/java/org/thingsboard/server/extensions/sqs/action/standard/SqsStandardQueueActionMsg.java b/extensions/extention-sqs/src/main/java/org/thingsboard/server/extensions/sqs/action/standard/SqsStandardQueueActionMsg.java new file mode 100644 index 0000000000..6666b274f3 --- /dev/null +++ b/extensions/extention-sqs/src/main/java/org/thingsboard/server/extensions/sqs/action/standard/SqsStandardQueueActionMsg.java @@ -0,0 +1,31 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.extensions.sqs.action.standard; + +import org.thingsboard.server.common.data.id.CustomerId; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.extensions.api.plugins.msg.AbstractRuleToPluginMsg; + +/** + * Created by Valerii Sosliuk on 11/6/2017. + */ +public class SqsStandardQueueActionMsg extends AbstractRuleToPluginMsg { + + public SqsStandardQueueActionMsg(TenantId tenantId, CustomerId customerId, DeviceId deviceId, SqsStandardQueueActionPayload payload) { + super(tenantId, customerId, deviceId, payload); + } +} diff --git a/extensions/extention-sqs/src/main/java/org/thingsboard/server/extensions/sqs/action/standard/SqsStandardQueueActionPayload.java b/extensions/extention-sqs/src/main/java/org/thingsboard/server/extensions/sqs/action/standard/SqsStandardQueueActionPayload.java new file mode 100644 index 0000000000..ce3dd27a8d --- /dev/null +++ b/extensions/extention-sqs/src/main/java/org/thingsboard/server/extensions/sqs/action/standard/SqsStandardQueueActionPayload.java @@ -0,0 +1,39 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.extensions.sqs.action.standard; + +import lombok.Builder; +import lombok.Data; +import org.thingsboard.server.common.msg.session.MsgType; + +import java.io.Serializable; + +/** + * Created by Valerii Sosliuk on 11/6/2017. + */ +@Data +@Builder +public class SqsStandardQueueActionPayload implements Serializable { + + private final String queue; + private final String msgBody; + private final int delaySeconds; + + private final Integer requestId; + private final MsgType msgType; + private final boolean sync; + +} diff --git a/extensions/extention-sqs/src/main/java/org/thingsboard/server/extensions/sqs/action/standard/SqsStandardQueuePluginAction.java b/extensions/extention-sqs/src/main/java/org/thingsboard/server/extensions/sqs/action/standard/SqsStandardQueuePluginAction.java new file mode 100644 index 0000000000..1dd9d56d8a --- /dev/null +++ b/extensions/extention-sqs/src/main/java/org/thingsboard/server/extensions/sqs/action/standard/SqsStandardQueuePluginAction.java @@ -0,0 +1,46 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.extensions.sqs.action.standard; + +import org.thingsboard.server.common.msg.device.ToDeviceActorMsg; +import org.thingsboard.server.common.msg.session.FromDeviceRequestMsg; +import org.thingsboard.server.extensions.api.component.Action; +import org.thingsboard.server.extensions.api.plugins.msg.RuleToPluginMsg; +import org.thingsboard.server.extensions.api.rules.RuleContext; +import org.thingsboard.server.extensions.core.action.template.AbstractTemplatePluginAction; + +import java.util.Optional; + +/** + * Created by Valerii Sosliuk on 11/5/2017. + */ +@Action(name = "SQS Standard Queue Action", descriptor = "SqsStandardQueueActionDescriptor.json", configuration = SqsStandardQueuePluginActionConfiguration.class) +public class SqsStandardQueuePluginAction extends AbstractTemplatePluginAction { + + @Override + protected Optional buildRuleToPluginMsg(RuleContext ctx, ToDeviceActorMsg msg, FromDeviceRequestMsg payload) { + SqsStandardQueueActionPayload.SqsStandardQueueActionPayloadBuilder builder = SqsStandardQueueActionPayload.builder(); + builder.msgType(payload.getMsgType()); + builder.requestId(payload.getRequestId()); + builder.queue(configuration.getQueue()); + builder.delaySeconds(configuration.getDelaySeconds()); + builder.msgBody(getMsgBody(ctx, msg)); + return Optional.of(new SqsStandardQueueActionMsg(msg.getTenantId(), + msg.getCustomerId(), + msg.getDeviceId(), + builder.build())); + } +} diff --git a/extensions/extention-sqs/src/main/java/org/thingsboard/server/extensions/sqs/action/standard/SqsStandardQueuePluginActionConfiguration.java b/extensions/extention-sqs/src/main/java/org/thingsboard/server/extensions/sqs/action/standard/SqsStandardQueuePluginActionConfiguration.java new file mode 100644 index 0000000000..5c21ef5840 --- /dev/null +++ b/extensions/extention-sqs/src/main/java/org/thingsboard/server/extensions/sqs/action/standard/SqsStandardQueuePluginActionConfiguration.java @@ -0,0 +1,32 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.extensions.sqs.action.standard; + +import lombok.Data; +import org.thingsboard.server.extensions.core.action.template.TemplateActionConfiguration; + +/** + * Created by Valerii Sosliuk on 11/6/2017. + */ +@Data +public class SqsStandardQueuePluginActionConfiguration implements TemplateActionConfiguration { + + private String queue; + private int delaySeconds; + private boolean sync; + private String template; + +} diff --git a/extensions/extention-sqs/src/main/java/org/thingsboard/server/extensions/sqs/plugin/SqsMessageHandler.java b/extensions/extention-sqs/src/main/java/org/thingsboard/server/extensions/sqs/plugin/SqsMessageHandler.java new file mode 100644 index 0000000000..b71e0b18b7 --- /dev/null +++ b/extensions/extention-sqs/src/main/java/org/thingsboard/server/extensions/sqs/plugin/SqsMessageHandler.java @@ -0,0 +1,84 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.extensions.sqs.plugin; + +import com.amazonaws.services.sqs.AmazonSQS; +import com.amazonaws.services.sqs.model.SendMessageRequest; +import com.amazonaws.services.sqs.model.SendMessageResult; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.common.data.id.RuleId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.msg.core.BasicStatusCodeResponse; +import org.thingsboard.server.extensions.api.plugins.PluginContext; +import org.thingsboard.server.extensions.api.plugins.handlers.RuleMsgHandler; +import org.thingsboard.server.extensions.api.plugins.msg.AbstractRuleToPluginMsg; +import org.thingsboard.server.extensions.api.plugins.msg.ResponsePluginToRuleMsg; +import org.thingsboard.server.extensions.api.plugins.msg.RuleToPluginMsg; +import org.thingsboard.server.extensions.api.rules.RuleException; +import org.thingsboard.server.extensions.sqs.action.fifo.SqsFifoQueueActionMsg; +import org.thingsboard.server.extensions.sqs.action.fifo.SqsFifoQueueActionPayload; +import org.thingsboard.server.extensions.sqs.action.standard.SqsStandardQueueActionMsg; +import org.thingsboard.server.extensions.sqs.action.standard.SqsStandardQueueActionPayload; + +/** + * Created by Valerii Sosliuk on 11/15/2017. + */ +@RequiredArgsConstructor +@Slf4j +public class SqsMessageHandler implements RuleMsgHandler { + + private final AmazonSQS sqs; + + @Override + public void process(PluginContext ctx, TenantId tenantId, RuleId ruleId, RuleToPluginMsg msg) throws RuleException { + if (msg instanceof SqsStandardQueueActionMsg) { + sendMessageToStandardQueue(ctx, tenantId, ruleId, msg); + return; + } + if (msg instanceof SqsFifoQueueActionMsg) { + sendMessageToFifoQueue(ctx, tenantId, ruleId, msg); + return; + } + throw new RuleException("Unsupported message type " + msg.getClass().getName() + "!"); + } + + private void sendMessageToStandardQueue(PluginContext ctx, TenantId tenantId, RuleId ruleId, RuleToPluginMsg msg) { + SqsStandardQueueActionPayload payload = ((SqsStandardQueueActionMsg) msg).getPayload(); + SendMessageRequest sendMsgRequest = new SendMessageRequest() + .withDelaySeconds(payload.getDelaySeconds()) + .withQueueUrl(payload.getQueue()) + .withMessageBody(payload.getMsgBody()); + sqs.sendMessage(sendMsgRequest); + if (payload.isSync()) { + ctx.reply(new ResponsePluginToRuleMsg(msg.getUid(), tenantId, ruleId, + BasicStatusCodeResponse.onSuccess(payload.getMsgType(), payload.getRequestId()))); + } + } + + private void sendMessageToFifoQueue(PluginContext ctx, TenantId tenantId, RuleId ruleId, RuleToPluginMsg msg) { + SqsFifoQueueActionPayload payload = ((SqsFifoQueueActionMsg) msg).getPayload(); + SendMessageRequest sendMsgRequest = new SendMessageRequest() + .withQueueUrl(payload.getQueue()) + .withMessageBody(payload.getMsgBody()) + .withMessageGroupId(payload.getDeviceId()); + sqs.sendMessage(sendMsgRequest); + if (payload.isSync()) { + ctx.reply(new ResponsePluginToRuleMsg(msg.getUid(), tenantId, ruleId, + BasicStatusCodeResponse.onSuccess(payload.getMsgType(), payload.getRequestId()))); + } + } +} diff --git a/extensions/extention-sqs/src/main/java/org/thingsboard/server/extensions/sqs/plugin/SqsPlugin.java b/extensions/extention-sqs/src/main/java/org/thingsboard/server/extensions/sqs/plugin/SqsPlugin.java new file mode 100644 index 0000000000..3f0252ebf6 --- /dev/null +++ b/extensions/extention-sqs/src/main/java/org/thingsboard/server/extensions/sqs/plugin/SqsPlugin.java @@ -0,0 +1,78 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.extensions.sqs.plugin; + +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.regions.Regions; +import com.amazonaws.services.sqs.AmazonSQS; +import com.amazonaws.services.sqs.AmazonSQSClientBuilder; +import org.thingsboard.server.extensions.api.component.Plugin; +import org.thingsboard.server.extensions.api.plugins.AbstractPlugin; +import org.thingsboard.server.extensions.api.plugins.PluginContext; +import org.thingsboard.server.extensions.api.plugins.handlers.RuleMsgHandler; +import org.thingsboard.server.extensions.sqs.action.fifo.SqsFifoQueuePluginAction; +import org.thingsboard.server.extensions.sqs.action.standard.SqsStandardQueuePluginAction; + +/** + * Created by Valerii Sosliuk on 11/6/2017. + */ +@Plugin(name = "SQS Plugin", actions = {SqsStandardQueuePluginAction.class, SqsFifoQueuePluginAction.class}, + descriptor = "SqsPluginDescriptor.json", configuration = SqsPluginConfiguration.class) +public class SqsPlugin extends AbstractPlugin { + + private SqsMessageHandler sqsMessageHandler; + private SqsPluginConfiguration configuration; + + @Override + public void init(SqsPluginConfiguration configuration) { + this.configuration = configuration; + init(); + } + + private void init() { + AWSCredentials awsCredentials = new BasicAWSCredentials(configuration.getAccessKeyId(), configuration.getSecretAccessKey()); + AmazonSQS sqs = AmazonSQSClientBuilder.standard().withCredentials(new AWSStaticCredentialsProvider(awsCredentials)) + .withRegion(Regions.fromName(configuration.getRegion())).build(); + this.sqsMessageHandler = new SqsMessageHandler(sqs); + + } + + private void destroy() { + this.sqsMessageHandler = null; + } + + @Override + protected RuleMsgHandler getRuleMsgHandler() { + return sqsMessageHandler; + } + + @Override + public void resume(PluginContext ctx) { + init(); + } + + @Override + public void suspend(PluginContext ctx) { + destroy(); + } + + @Override + public void stop(PluginContext ctx) { + destroy(); + } +} diff --git a/extensions/extention-sqs/src/main/java/org/thingsboard/server/extensions/sqs/plugin/SqsPluginConfiguration.java b/extensions/extention-sqs/src/main/java/org/thingsboard/server/extensions/sqs/plugin/SqsPluginConfiguration.java new file mode 100644 index 0000000000..a93b7f3f1d --- /dev/null +++ b/extensions/extention-sqs/src/main/java/org/thingsboard/server/extensions/sqs/plugin/SqsPluginConfiguration.java @@ -0,0 +1,30 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.extensions.sqs.plugin; + +import lombok.Data; + +/** + * Created by Valerii Sosliuk on 11/5/2017. + */ +@Data +public class SqsPluginConfiguration { + + private String accessKeyId; + private String secretAccessKey; + private String region; + +} diff --git a/extensions/extention-sqs/src/main/resources/SqsFifoQueueActionDescriptor.json b/extensions/extention-sqs/src/main/resources/SqsFifoQueueActionDescriptor.json new file mode 100644 index 0000000000..05bfaec11f --- /dev/null +++ b/extensions/extention-sqs/src/main/resources/SqsFifoQueueActionDescriptor.json @@ -0,0 +1,34 @@ +{ + "schema": { + "title": "SQS FIFO Queue Action Configuration", + "type": "object", + "properties": { + "sync": { + "title": "Requires delivery confirmation", + "type": "boolean" + }, + "queue": { + "title": "Queue URL", + "type": "string" + }, + "template": { + "title": "Body Template", + "type": "string" + } + }, + "required": [ + "sync", + "queue", + "template" + ] + }, + "form": [ + "sync", + "queue", + { + "key": "template", + "type": "textarea", + "rows": 5 + } + ] +} \ No newline at end of file diff --git a/extensions/extention-sqs/src/main/resources/SqsPluginDescriptor.json b/extensions/extention-sqs/src/main/resources/SqsPluginDescriptor.json new file mode 100644 index 0000000000..407468b5c8 --- /dev/null +++ b/extensions/extention-sqs/src/main/resources/SqsPluginDescriptor.json @@ -0,0 +1,30 @@ +{ + "schema": { + "title": "SQS Plugin Configuration", + "type": "object", + "properties": { + "accessKeyId": { + "title": "Access Key ID", + "type": "string" + }, + "secretAccessKey": { + "title": "Secret Access Key", + "type": "string" + }, + "region": { + "title": "Region", + "type": "string" + } + }, + "required": [ + "accessKeyId", + "secretAccessKey", + "region" + ] + }, + "form": [ + "accessKeyId", + "secretAccessKey", + "region" + ] +} \ No newline at end of file diff --git a/extensions/extention-sqs/src/main/resources/SqsStandardQueueActionDescriptor.json b/extensions/extention-sqs/src/main/resources/SqsStandardQueueActionDescriptor.json new file mode 100644 index 0000000000..f5502e6a25 --- /dev/null +++ b/extensions/extention-sqs/src/main/resources/SqsStandardQueueActionDescriptor.json @@ -0,0 +1,41 @@ +{ + "schema": { + "title": "SQS Standard Queue Action Configuration", + "type": "object", + "properties": { + "sync": { + "title": "Requires delivery confirmation", + "type": "boolean" + }, + "queue": { + "title": "Queue URL", + "type": "string" + }, + "delaySeconds": { + "title": "Delay Seconds", + "type": "integer", + "default": 0 + }, + "template": { + "title": "Body Template", + "type": "string" + } + }, + "required": [ + "sync", + "queue", + "delaySeconds", + "template" + ] + }, + "form": [ + "sync", + "queue", + "delaySeconds", + { + "key": "template", + "type": "textarea", + "rows": 5 + } + ] +} \ No newline at end of file diff --git a/extensions/extention-sqs/src/test/java/org/thingsboard/server/extensions/sqs/SqsDemoClient.java b/extensions/extention-sqs/src/test/java/org/thingsboard/server/extensions/sqs/SqsDemoClient.java new file mode 100644 index 0000000000..17c9b08e8c --- /dev/null +++ b/extensions/extention-sqs/src/test/java/org/thingsboard/server/extensions/sqs/SqsDemoClient.java @@ -0,0 +1,69 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.extensions.sqs; + +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.regions.Regions; +import com.amazonaws.services.sqs.AmazonSQS; +import com.amazonaws.services.sqs.AmazonSQSClientBuilder; +import com.amazonaws.services.sqs.model.DeleteMessageRequest; +import com.amazonaws.services.sqs.model.Message; +import lombok.extern.slf4j.Slf4j; + +import java.util.List; + +/** + * Created by Valerii Sosliuk on 11/10/2017. + */ +@Slf4j +public class SqsDemoClient { + + private static final String ACCESS_KEY_ID = "$ACCES_KEY_ID"; + private static final String SECRET_ACCESS_KEY = "$SECRET_ACCESS_KEY"; + + private static final String QUEUE_URL = "$QUEUE_URL"; + private static final String REGION = "us-east-1"; + + public static void main(String[] args) { + log.info("Starting SQS Demo Clinent..."); + AWSCredentials awsCredentials = new BasicAWSCredentials(ACCESS_KEY_ID, SECRET_ACCESS_KEY); + AmazonSQS sqs = AmazonSQSClientBuilder.standard().withCredentials(new AWSStaticCredentialsProvider(awsCredentials)) + .withRegion(Regions.fromName(REGION)).build(); + SqsDemoClient client = new SqsDemoClient(); + client.pollMessages(sqs); + } + + private void pollMessages(AmazonSQS sqs) { + log.info("Polling messages"); + while (true) { + List messages = sqs.receiveMessage(QUEUE_URL).getMessages(); + messages.forEach(m -> { + log.info("Message Received: " + m.getBody()); + System.out.println(m.getBody()); + DeleteMessageRequest deleteMessageRequest = new DeleteMessageRequest(QUEUE_URL, m.getReceiptHandle()); + sqs.deleteMessage(deleteMessageRequest); + }); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + e.printStackTrace(); + } + } + } +} diff --git a/extensions/extention-sqs/src/test/resources/logback.xml b/extensions/extention-sqs/src/test/resources/logback.xml new file mode 100644 index 0000000000..757aa32dea --- /dev/null +++ b/extensions/extention-sqs/src/test/resources/logback.xml @@ -0,0 +1,10 @@ + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + \ No newline at end of file diff --git a/extensions/pom.xml b/extensions/pom.xml index 30c939556e..e1c20840ce 100644 --- a/extensions/pom.xml +++ b/extensions/pom.xml @@ -39,6 +39,8 @@ extension-rest-api-call extension-kafka extension-mqtt + extention-sqs + extension-sns diff --git a/pom.xml b/pom.xml index 1f9a92d97b..e45fc3ea28 100755 --- a/pom.xml +++ b/pom.xml @@ -350,6 +350,18 @@ extension ${project.version} + + org.thingsboard.extensions + extension-sqs + extension + ${project.version} + + + org.thingsboard.extensions + extension-sns + extension + ${project.version} + org.thingsboard.common data diff --git a/resume.bat b/resume.bat new file mode 100644 index 0000000000..c3c0e1d817 --- /dev/null +++ b/resume.bat @@ -0,0 +1,18 @@ +@REM +@REM Copyright © 2016-2017 The Thingsboard Authors +@REM +@REM Licensed under the Apache License, Version 2.0 (the "License"); +@REM you may not use this file except in compliance with the License. +@REM You may obtain a copy of the License at +@REM +@REM http://www.apache.org/licenses/LICENSE-2.0 +@REM +@REM Unless required by applicable law or agreed to in writing, software +@REM distributed under the License is distributed on an "AS IS" BASIS, +@REM WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +@REM See the License for the specific language governing permissions and +@REM limitations under the License. +@REM + +mvn clean install -rf :application + diff --git a/tools/src/main/python/mqtt-send-telemetry.py b/tools/src/main/python/mqtt-send-telemetry.py new file mode 100644 index 0000000000..b9af45a0b1 --- /dev/null +++ b/tools/src/main/python/mqtt-send-telemetry.py @@ -0,0 +1,44 @@ +# +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import paho.mqtt.client as mqtt +from time import sleep +import random + +broker="test.mosquitto.org" +topic_pub='v1/devices/me/telemetry' + + +client = mqtt.Client() + +client.username_pw_set("qyA3gP50SpGwfwyNGyi7") +client.connect('127.0.0.1', 1883, 1) + +for i in range(100): + x = random.randrange(20, 100) + print x + msg = '{"windSpeed":"'+ str(x) + '"}' + client.publish(topic_pub, msg) + sleep(0.1) +#while True: +# val3 = random.uniform(0, 5) +# val4 = random.uniform(-3, 3)# +# +# msg = '{"key3": '+ str(val3) +', "key4": ' + str(val4) + '}' +# +# print('Message: ' + msg) +# client.publish(topic_pub, msg) +# +# sleep(0.5) From e2a8f85f7000cdd1816c5b522d01dc9eaf324fa3 Mon Sep 17 00:00:00 2001 From: mp-loki Date: Thu, 16 Nov 2017 07:42:42 -0500 Subject: [PATCH 11/16] License Header updated --- tools/src/main/python/mqtt-send-telemetry.py | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/tools/src/main/python/mqtt-send-telemetry.py b/tools/src/main/python/mqtt-send-telemetry.py index b9af45a0b1..8c4263d4ce 100644 --- a/tools/src/main/python/mqtt-send-telemetry.py +++ b/tools/src/main/python/mqtt-send-telemetry.py @@ -1,4 +1,6 @@ +# -*- coding: utf-8 -*- # +# Copyright © 2016-2017 The Thingsboard Authors # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -23,22 +25,12 @@ topic_pub='v1/devices/me/telemetry' client = mqtt.Client() -client.username_pw_set("qyA3gP50SpGwfwyNGyi7") +client.username_pw_set("TEST_TOKEN") client.connect('127.0.0.1', 1883, 1) -for i in range(100): +for i in range(5): x = random.randrange(20, 100) print x msg = '{"windSpeed":"'+ str(x) + '"}' client.publish(topic_pub, msg) sleep(0.1) -#while True: -# val3 = random.uniform(0, 5) -# val4 = random.uniform(-3, 3)# -# -# msg = '{"key3": '+ str(val3) +', "key4": ' + str(val4) + '}' -# -# print('Message: ' + msg) -# client.publish(topic_pub, msg) -# -# sleep(0.5) From 5607d9a78ac28b1ef58a66a15765bbf70a137311 Mon Sep 17 00:00:00 2001 From: mp-loki Date: Sun, 19 Nov 2017 23:33:28 -0500 Subject: [PATCH 12/16] SNS SQS pom files organized --- extensions/extension-sns/pom.xml | 9 +++++---- extensions/extention-sqs/pom.xml | 9 +++++---- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/extensions/extension-sns/pom.xml b/extensions/extension-sns/pom.xml index 37f8fac57c..c68ea7d74b 100644 --- a/extensions/extension-sns/pom.xml +++ b/extensions/extension-sns/pom.xml @@ -44,15 +44,16 @@ extensions-api provided + + org.thingsboard + extensions-core + provided + com.amazonaws aws-java-sdk-sns ${aws.sdk.version} - - org.thingsboard - extensions-core - diff --git a/extensions/extention-sqs/pom.xml b/extensions/extention-sqs/pom.xml index 13eb5dda1a..1e16912e39 100644 --- a/extensions/extention-sqs/pom.xml +++ b/extensions/extention-sqs/pom.xml @@ -44,15 +44,16 @@ extensions-api provided + + org.thingsboard + extensions-core + provided + com.amazonaws aws-java-sdk-sqs ${aws.sdk.version} - - org.thingsboard - extensions-core - From f87bc9b5b098420414147c78e516ba18df5f4278 Mon Sep 17 00:00:00 2001 From: mp-loki Date: Tue, 28 Nov 2017 00:02:53 -0500 Subject: [PATCH 13/16] Fixed java.lang.IllegalArgumentException when QoS = 0 --- .../server/transport/mqtt/session/GatewayDeviceSessionCtx.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java index 7b527dff83..7646131c42 100644 --- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java +++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java @@ -83,7 +83,7 @@ public class GatewayDeviceSessionCtx extends DeviceAwareSessionContext { if (responseMsg.isSuccess()) { MsgType requestMsgType = responseMsg.getRequestMsgType(); Integer requestId = responseMsg.getRequestId(); - if (requestMsgType == MsgType.POST_ATTRIBUTES_REQUEST || requestMsgType == MsgType.POST_TELEMETRY_REQUEST) { + if (requestId >= 0 && requestMsgType == MsgType.POST_ATTRIBUTES_REQUEST || requestMsgType == MsgType.POST_TELEMETRY_REQUEST) { return Optional.of(MqttTransportHandler.createMqttPubAckMsg(requestId)); } } From 5251782c97afedff832179a85a76869c132aab68 Mon Sep 17 00:00:00 2001 From: mp-loki Date: Fri, 24 Nov 2017 22:14:38 -0500 Subject: [PATCH 14/16] TB-75 unique constraint violation fix --- .../JpaAbstractDaoListeningExecutorService.java | 1 + .../dao/sql/timeseries/JpaTimeseriesDao.java | 14 ++++++++------ 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/JpaAbstractDaoListeningExecutorService.java b/dao/src/main/java/org/thingsboard/server/dao/sql/JpaAbstractDaoListeningExecutorService.java index 7c34aa1d86..78282674ab 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/JpaAbstractDaoListeningExecutorService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/JpaAbstractDaoListeningExecutorService.java @@ -22,6 +22,7 @@ import javax.annotation.PreDestroy; import java.util.concurrent.Executors; public abstract class JpaAbstractDaoListeningExecutorService { + protected ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10)); @PreDestroy diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java index 7fddfaeebf..0a4e685007 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java @@ -17,9 +17,7 @@ package org.thingsboard.server.dao.sql.timeseries; import com.google.common.base.Function; import com.google.common.collect.Lists; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.SettableFuture; +import com.google.common.util.concurrent.*; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.domain.PageRequest; @@ -40,6 +38,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; import java.util.stream.Collectors; import static org.thingsboard.server.common.data.UUIDConverter.fromTimeUUID; @@ -50,6 +49,8 @@ import static org.thingsboard.server.common.data.UUIDConverter.fromTimeUUID; @SqlDao public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService implements TimeseriesDao { + private ListeningExecutorService insertService = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()); + @Autowired private TsKvRepository tsKvRepository; @@ -232,7 +233,8 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp entity.setDoubleValue(tsKvEntry.getDoubleValue().orElse(null)); entity.setLongValue(tsKvEntry.getLongValue().orElse(null)); entity.setBooleanValue(tsKvEntry.getBooleanValue().orElse(null)); - return service.submit(() -> { + log.trace("Saving entity: " + entity); + return insertService.submit(() -> { tsKvRepository.save(entity); return null; }); @@ -240,7 +242,7 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp @Override public ListenableFuture savePartition(EntityId entityId, long tsKvEntryTs, String key, long ttl) { - return service.submit(() -> null); + return insertService.submit(() -> null); } @Override @@ -254,7 +256,7 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp latestEntity.setDoubleValue(tsKvEntry.getDoubleValue().orElse(null)); latestEntity.setLongValue(tsKvEntry.getLongValue().orElse(null)); latestEntity.setBooleanValue(tsKvEntry.getBooleanValue().orElse(null)); - return service.submit(() -> { + return insertService.submit(() -> { tsKvLatestRepository.save(latestEntity); return null; }); From f7b0ec98b606006048df5c4c7bc9f15a88edf60c Mon Sep 17 00:00:00 2001 From: mp-loki Date: Mon, 27 Nov 2017 07:31:41 -0500 Subject: [PATCH 15/16] TB-75 Service shutdown added --- .../server/dao/sql/timeseries/JpaTimeseriesDao.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java index 0a4e685007..170f9a6c2c 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java @@ -34,6 +34,7 @@ import org.thingsboard.server.dao.timeseries.TimeseriesDao; import org.thingsboard.server.dao.util.SqlDao; import javax.annotation.Nullable; +import javax.annotation.PreDestroy; import java.util.ArrayList; import java.util.List; import java.util.Optional; @@ -262,4 +263,9 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp }); } + @PreDestroy + void onDestroy() { + insertService.shutdown(); + } + } From 336f0dc07952845e814fdcd45f4073129e4c4105 Mon Sep 17 00:00:00 2001 From: Andrew Shvayka Date: Wed, 29 Nov 2017 10:51:57 +0200 Subject: [PATCH 16/16] Ability to fetch multiple device attributes using Gateway MQTT API --- .../mqtt/session/GatewayDeviceSessionCtx.java | 42 +++++++++++-------- .../mqtt/session/GatewaySessionCtx.java | 22 ++++++---- 2 files changed, 38 insertions(+), 26 deletions(-) diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java index 7646131c42..7bed03ac48 100644 --- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java +++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java @@ -16,6 +16,7 @@ package org.thingsboard.server.transport.mqtt.session; import com.google.gson.Gson; +import com.google.gson.JsonArray; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import io.netty.buffer.ByteBuf; @@ -24,6 +25,7 @@ import io.netty.buffer.UnpooledByteBufAllocator; import io.netty.handler.codec.mqtt.*; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.id.SessionId; +import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.msg.core.*; import org.thingsboard.server.common.msg.kv.AttributesKVMsg; @@ -35,6 +37,7 @@ import org.thingsboard.server.transport.mqtt.MqttTopics; import org.thingsboard.server.transport.mqtt.MqttTransportHandler; import java.nio.charset.Charset; +import java.util.List; import java.util.Optional; import java.util.concurrent.atomic.AtomicInteger; @@ -135,40 +138,43 @@ public class GatewayDeviceSessionCtx extends DeviceAwareSessionContext { if (responseData.isPresent()) { AttributesKVMsg msg = responseData.get(); if (msg.getClientAttributes() != null) { - msg.getClientAttributes().forEach(v -> addValueToJson(result, "value", v)); + addValues(result, msg.getClientAttributes()); } if (msg.getSharedAttributes() != null) { - msg.getSharedAttributes().forEach(v -> addValueToJson(result, "value", v)); + addValues(result, msg.getSharedAttributes()); } } return createMqttPublishMsg(topic, result); } + private void addValues(JsonObject result, List kvList) { + if (kvList.size() == 1) { + addValueToJson(result, "value", kvList.get(0)); + } else { + JsonObject values; + if (result.has("values")) { + values = result.get("values").getAsJsonObject(); + } else { + values = new JsonObject(); + result.add("values", values); + } + kvList.forEach(value -> addValueToJson(values, value.getKey(), value)); + } + } + private void addValueToJson(JsonObject json, String name, KvEntry entry) { switch (entry.getDataType()) { case BOOLEAN: - Optional booleanValue = entry.getBooleanValue(); - if (booleanValue.isPresent()) { - json.addProperty(name, booleanValue.get()); - } + entry.getBooleanValue().ifPresent(aBoolean -> json.addProperty(name, aBoolean)); break; case STRING: - Optional stringValue = entry.getStrValue(); - if (stringValue.isPresent()) { - json.addProperty(name, stringValue.get()); - } + entry.getStrValue().ifPresent(aString -> json.addProperty(name, aString)); break; case DOUBLE: - Optional doubleValue = entry.getDoubleValue(); - if (doubleValue.isPresent()) { - json.addProperty(name, doubleValue.get()); - } + entry.getDoubleValue().ifPresent(aDouble -> json.addProperty(name, aDouble)); break; case LONG: - Optional longValue = entry.getLongValue(); - if (longValue.isPresent()) { - json.addProperty(name, longValue.get()); - } + entry.getLongValue().ifPresent(aLong -> json.addProperty(name, aLong)); break; } } diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java index 00d1f0c6d7..5bccf49663 100644 --- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java +++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java @@ -41,10 +41,7 @@ import org.thingsboard.server.dao.relation.RelationService; import org.thingsboard.server.transport.mqtt.MqttTransportHandler; import org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; +import java.util.*; import java.util.stream.Collectors; import static org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor.validateJsonPayload; @@ -193,13 +190,22 @@ public class GatewaySessionCtx { int requestId = jsonObj.get("id").getAsInt(); String deviceName = jsonObj.get(DEVICE_PROPERTY).getAsString(); boolean clientScope = jsonObj.get("client").getAsBoolean(); - String key = jsonObj.get("key").getAsString(); + Set keys; + if (jsonObj.has("key")) { + keys = Collections.singleton(jsonObj.get("key").getAsString()); + } else { + JsonArray keysArray = jsonObj.get("keys").getAsJsonArray(); + keys = new HashSet<>(); + for (JsonElement keyObj : keysArray) { + keys.add(keyObj.getAsString()); + } + } BasicGetAttributesRequest request; if (clientScope) { - request = new BasicGetAttributesRequest(requestId, Collections.singleton(key), null); + request = new BasicGetAttributesRequest(requestId, keys, null); } else { - request = new BasicGetAttributesRequest(requestId, null, Collections.singleton(key)); + request = new BasicGetAttributesRequest(requestId, null, keys); } GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName); processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(), @@ -251,7 +257,7 @@ public class GatewaySessionCtx { } private void ack(MqttPublishMessage msg) { - if(msg.variableHeader().messageId() > 0) { + if (msg.variableHeader().messageId() > 0) { writeAndFlush(MqttTransportHandler.createMqttPubAckMsg(msg.variableHeader().messageId())); } }