9 changed files with 231 additions and 107 deletions
@ -0,0 +1,108 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.edqs; |
|||
|
|||
import com.google.common.util.concurrent.Futures; |
|||
import com.google.common.util.concurrent.ListenableFuture; |
|||
import com.google.common.util.concurrent.MoreExecutors; |
|||
import jakarta.annotation.PostConstruct; |
|||
import jakarta.annotation.PreDestroy; |
|||
import lombok.RequiredArgsConstructor; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; |
|||
import org.springframework.stereotype.Service; |
|||
import org.thingsboard.common.util.JacksonUtil; |
|||
import org.thingsboard.server.common.data.edqs.query.EdqsRequest; |
|||
import org.thingsboard.server.common.data.edqs.query.EdqsResponse; |
|||
import org.thingsboard.server.common.data.id.CustomerId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.msg.edqs.EdqsApiService; |
|||
import org.thingsboard.server.edqs.state.EdqsPartitionService; |
|||
import org.thingsboard.server.gen.transport.TransportProtos; |
|||
import org.thingsboard.server.gen.transport.TransportProtos.FromEdqsMsg; |
|||
import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg; |
|||
import org.thingsboard.server.queue.TbQueueRequestTemplate; |
|||
import org.thingsboard.server.queue.common.TbProtoQueueMsg; |
|||
import org.thingsboard.server.queue.provider.EdqsClientQueueFactory; |
|||
|
|||
import java.util.UUID; |
|||
|
|||
@Service |
|||
@Slf4j |
|||
@RequiredArgsConstructor |
|||
@ConditionalOnExpression("'${queue.edqs.api_enabled:true}' == 'true' && ('${service.type:null}' == 'monolith' || '${service.type:null}' == 'tb-core')") |
|||
public class DefaultEdqsApiService implements EdqsApiService { |
|||
|
|||
private final EdqsPartitionService edqsPartitionService; |
|||
private final EdqsClientQueueFactory queueFactory; |
|||
private TbQueueRequestTemplate<TbProtoQueueMsg<ToEdqsMsg>, TbProtoQueueMsg<FromEdqsMsg>> requestTemplate; |
|||
|
|||
private Boolean apiEnabled = null; |
|||
|
|||
@PostConstruct |
|||
private void init() { |
|||
requestTemplate = queueFactory.createEdqsRequestTemplate(); |
|||
requestTemplate.init(); |
|||
} |
|||
|
|||
@Override |
|||
public ListenableFuture<EdqsResponse> processRequest(TenantId tenantId, CustomerId customerId, EdqsRequest request) { |
|||
var requestMsg = ToEdqsMsg.newBuilder() |
|||
.setTenantIdMSB(tenantId.getId().getMostSignificantBits()) |
|||
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) |
|||
.setTs(System.currentTimeMillis()) |
|||
.setRequestMsg(TransportProtos.EdqsRequestMsg.newBuilder() |
|||
.setValue(JacksonUtil.toString(request)) |
|||
.build()); |
|||
if (customerId != null && !customerId.isNullUid()) { |
|||
requestMsg.setCustomerIdMSB(customerId.getId().getMostSignificantBits()); |
|||
requestMsg.setCustomerIdLSB(customerId.getId().getLeastSignificantBits()); |
|||
} |
|||
|
|||
Integer partition = edqsPartitionService.resolvePartition(tenantId); |
|||
ListenableFuture<TbProtoQueueMsg<FromEdqsMsg>> resultFuture = requestTemplate.send(new TbProtoQueueMsg<>(UUID.randomUUID(), requestMsg.build()), partition); |
|||
return Futures.transform(resultFuture, msg -> { |
|||
TransportProtos.EdqsResponseMsg responseMsg = msg.getValue().getResponseMsg(); |
|||
return JacksonUtil.fromString(responseMsg.getValue(), EdqsResponse.class); |
|||
}, MoreExecutors.directExecutor()); |
|||
} |
|||
|
|||
@Override |
|||
public boolean isEnabled() { |
|||
return Boolean.TRUE.equals(apiEnabled); |
|||
} |
|||
|
|||
@Override |
|||
public void setEnabled(boolean enabled) { |
|||
if (enabled) { |
|||
log.info("Enabling EDQS API"); |
|||
} else { |
|||
log.info("Disabling EDQS API"); |
|||
} |
|||
apiEnabled = enabled; |
|||
} |
|||
|
|||
@Override |
|||
public boolean isSupported() { |
|||
return true; |
|||
} |
|||
|
|||
@PreDestroy |
|||
private void stop() { |
|||
requestTemplate.stop(); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,34 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.common.msg.edqs; |
|||
|
|||
import com.google.common.util.concurrent.ListenableFuture; |
|||
import org.thingsboard.server.common.data.edqs.query.EdqsRequest; |
|||
import org.thingsboard.server.common.data.edqs.query.EdqsResponse; |
|||
import org.thingsboard.server.common.data.id.CustomerId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
|
|||
public interface EdqsApiService { |
|||
|
|||
ListenableFuture<EdqsResponse> processRequest(TenantId tenantId, CustomerId customerId, EdqsRequest request); |
|||
|
|||
boolean isEnabled(); |
|||
|
|||
void setEnabled(boolean enabled); |
|||
|
|||
boolean isSupported(); |
|||
|
|||
} |
|||
@ -0,0 +1,53 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.dao.sql.query; |
|||
|
|||
import com.google.common.util.concurrent.ListenableFuture; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; |
|||
import org.springframework.stereotype.Service; |
|||
import org.thingsboard.server.common.data.edqs.query.EdqsRequest; |
|||
import org.thingsboard.server.common.data.edqs.query.EdqsResponse; |
|||
import org.thingsboard.server.common.data.id.CustomerId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.msg.edqs.EdqsApiService; |
|||
|
|||
@Service |
|||
@Slf4j |
|||
@ConditionalOnMissingBean(value = EdqsApiService.class, ignored = DummyEdqsApiService.class) |
|||
public class DummyEdqsApiService implements EdqsApiService { |
|||
|
|||
@Override |
|||
public ListenableFuture<EdqsResponse> processRequest(TenantId tenantId, CustomerId customerId, EdqsRequest request) { |
|||
throw new UnsupportedOperationException(); |
|||
} |
|||
|
|||
@Override |
|||
public boolean isEnabled() { |
|||
return false; |
|||
} |
|||
|
|||
@Override |
|||
public void setEnabled(boolean enabled) { |
|||
log.warn("Got request to enable EDQS API, but it isn't supported", new RuntimeException("stacktrace")); |
|||
} |
|||
|
|||
@Override |
|||
public boolean isSupported() { |
|||
return false; |
|||
} |
|||
|
|||
} |
|||
Loading…
Reference in new issue