committed by
GitHub
281 changed files with 8681 additions and 2513 deletions
@ -0,0 +1,49 @@ |
|||
-- |
|||
-- Copyright © 2016-2022 The Thingsboard Authors |
|||
-- |
|||
-- Licensed under the Apache License, Version 2.0 (the "License"); |
|||
-- you may not use this file except in compliance with the License. |
|||
-- You may obtain a copy of the License at |
|||
-- |
|||
-- http://www.apache.org/licenses/LICENSE-2.0 |
|||
-- |
|||
-- Unless required by applicable law or agreed to in writing, software |
|||
-- distributed under the License is distributed on an "AS IS" BASIS, |
|||
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
-- See the License for the specific language governing permissions and |
|||
-- limitations under the License. |
|||
-- |
|||
|
|||
ALTER TABLE device_profile |
|||
ADD COLUMN IF NOT EXISTS default_queue_id uuid; |
|||
|
|||
DO |
|||
$$ |
|||
BEGIN |
|||
IF EXISTS |
|||
(SELECT column_name |
|||
FROM information_schema.columns |
|||
WHERE table_name = 'device_profile' |
|||
AND column_name = 'default_queue_name' |
|||
) |
|||
THEN |
|||
UPDATE device_profile |
|||
SET default_queue_id = q.id |
|||
FROM queue as q |
|||
WHERE default_queue_name = q.name; |
|||
END IF; |
|||
END |
|||
$$; |
|||
|
|||
DO |
|||
$$ |
|||
BEGIN |
|||
IF NOT EXISTS(SELECT 1 FROM pg_constraint WHERE conname = 'fk_default_queue_device_profile') THEN |
|||
ALTER TABLE device_profile |
|||
ADD CONSTRAINT fk_default_queue_device_profile FOREIGN KEY (default_queue_id) REFERENCES queue (id); |
|||
END IF; |
|||
END; |
|||
$$; |
|||
|
|||
ALTER TABLE device_profile |
|||
DROP COLUMN IF EXISTS default_queue_name; |
|||
@ -0,0 +1,275 @@ |
|||
/** |
|||
* Copyright © 2016-2022 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.entitiy.dashboard; |
|||
|
|||
import lombok.AllArgsConstructor; |
|||
import org.springframework.stereotype.Service; |
|||
import org.thingsboard.server.common.data.Customer; |
|||
import org.thingsboard.server.common.data.Dashboard; |
|||
import org.thingsboard.server.common.data.EntityType; |
|||
import org.thingsboard.server.common.data.ShortCustomerInfo; |
|||
import org.thingsboard.server.common.data.audit.ActionType; |
|||
import org.thingsboard.server.common.data.edge.Edge; |
|||
import org.thingsboard.server.common.data.edge.EdgeEventActionType; |
|||
import org.thingsboard.server.common.data.exception.ThingsboardException; |
|||
import org.thingsboard.server.common.data.id.CustomerId; |
|||
import org.thingsboard.server.common.data.id.DashboardId; |
|||
import org.thingsboard.server.common.data.id.EdgeId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.queue.util.TbCoreComponent; |
|||
import org.thingsboard.server.service.entitiy.AbstractTbEntityService; |
|||
import org.thingsboard.server.service.security.model.SecurityUser; |
|||
|
|||
import java.util.HashSet; |
|||
import java.util.List; |
|||
import java.util.Set; |
|||
|
|||
@Service |
|||
@TbCoreComponent |
|||
@AllArgsConstructor |
|||
public class DefaultTbDashboardService extends AbstractTbEntityService implements TbDashboardService { |
|||
|
|||
@Override |
|||
public Dashboard save(Dashboard dashboard, SecurityUser user) throws ThingsboardException { |
|||
ActionType actionType = dashboard.getId() == null ? ActionType.ADDED : ActionType.UPDATED; |
|||
TenantId tenantId = dashboard.getTenantId(); |
|||
try { |
|||
Dashboard savedDashboard = checkNotNull(dashboardService.saveDashboard(dashboard)); |
|||
notificationEntityService.notifyCreateOrUpdateEntity(tenantId, savedDashboard.getId(), savedDashboard, |
|||
null, actionType, user); |
|||
return savedDashboard; |
|||
} catch (Exception e) { |
|||
notificationEntityService.notifyEntity(tenantId, emptyId(EntityType.DASHBOARD), dashboard, null, actionType, user, e); |
|||
throw handleException(e); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public void delete(Dashboard dashboard, SecurityUser user) throws ThingsboardException { |
|||
TenantId tenantId = dashboard.getTenantId(); |
|||
DashboardId dashboardId = dashboard.getId(); |
|||
try { |
|||
List<EdgeId> relatedEdgeIds = findRelatedEdgeIds(tenantId, dashboardId); |
|||
dashboardService.deleteDashboard(tenantId, dashboardId); |
|||
notificationEntityService.notifyDeleteEntity(tenantId, dashboardId, dashboard, user.getCustomerId(), |
|||
ActionType.DELETED, relatedEdgeIds, user, dashboardId.toString()); |
|||
} catch (Exception e) { |
|||
notificationEntityService.notifyEntity(tenantId, emptyId(EntityType.DASHBOARD), null, null, |
|||
ActionType.DELETED, user, e, dashboardId.toString()); |
|||
throw handleException(e); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public Dashboard assignDashboardToCustomer(DashboardId dashboardId, Customer customer, SecurityUser user) throws ThingsboardException { |
|||
ActionType actionType = ActionType.ASSIGNED_TO_CUSTOMER; |
|||
CustomerId customerId = customer.getId(); |
|||
try { |
|||
Dashboard savedDashboard = checkNotNull(dashboardService.assignDashboardToCustomer(user.getTenantId(), dashboardId, customerId)); |
|||
notificationEntityService.notifyAssignOrUnassignEntityToCustomer(user.getTenantId(), dashboardId, customerId, savedDashboard, |
|||
actionType, EdgeEventActionType.ASSIGNED_TO_CUSTOMER, user, true, customerId.toString(), customer.getName()); |
|||
return savedDashboard; |
|||
} catch (Exception e) { |
|||
notificationEntityService.notifyEntity(user.getTenantId(), emptyId(EntityType.DASHBOARD), null, null, |
|||
actionType, user, e, dashboardId.toString(), customerId.toString()); |
|||
throw handleException(e); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public Dashboard assignDashboardToPublicCustomer(DashboardId dashboardId, SecurityUser user) throws ThingsboardException { |
|||
ActionType actionType = ActionType.ASSIGNED_TO_CUSTOMER; |
|||
try { |
|||
Customer publicCustomer = customerService.findOrCreatePublicCustomer(user.getTenantId()); |
|||
Dashboard savedDashboard = checkNotNull(dashboardService.assignDashboardToCustomer(user.getTenantId(), dashboardId, publicCustomer.getId())); |
|||
notificationEntityService.notifyAssignOrUnassignEntityToCustomer(user.getTenantId(), dashboardId, user.getCustomerId(), savedDashboard, |
|||
actionType, null, user, false, dashboardId.toString(), |
|||
publicCustomer.getId().toString(), publicCustomer.getName()); |
|||
return savedDashboard; |
|||
} catch (Exception e) { |
|||
notificationEntityService.notifyEntity(user.getTenantId(), emptyId(EntityType.DASHBOARD), null, null, |
|||
actionType, user, e, dashboardId.toString()); |
|||
throw handleException(e); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public Dashboard unassignDashboardFromPublicCustomer(Dashboard dashboard, SecurityUser user) throws ThingsboardException { |
|||
ActionType actionType = ActionType.UNASSIGNED_FROM_CUSTOMER; |
|||
try { |
|||
Customer publicCustomer = customerService.findOrCreatePublicCustomer(dashboard.getTenantId()); |
|||
Dashboard savedDashboard = checkNotNull(dashboardService.unassignDashboardFromCustomer(user.getTenantId(), dashboard.getId(), publicCustomer.getId())); |
|||
notificationEntityService.notifyAssignOrUnassignEntityToCustomer(user.getTenantId(), dashboard.getId(), user.getCustomerId(), dashboard, |
|||
actionType, null, user, false, dashboard.getId().toString(), |
|||
publicCustomer.getId().toString(), publicCustomer.getName()); |
|||
return savedDashboard; |
|||
} catch (Exception e) { |
|||
notificationEntityService.notifyEntity(user.getTenantId(), emptyId(EntityType.DASHBOARD), null, null, |
|||
actionType, user, e, dashboard.getId().toString()); |
|||
throw handleException(e); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public Dashboard updateDashboardCustomers(Dashboard dashboard, Set<CustomerId> customerIds, SecurityUser user) throws ThingsboardException { |
|||
ActionType actionType = ActionType.ASSIGNED_TO_CUSTOMER; |
|||
TenantId tenantId = user.getTenantId(); |
|||
try { |
|||
Set<CustomerId> addedCustomerIds = new HashSet<>(); |
|||
Set<CustomerId> removedCustomerIds = new HashSet<>(); |
|||
for (CustomerId customerId : customerIds) { |
|||
if (!dashboard.isAssignedToCustomer(customerId)) { |
|||
addedCustomerIds.add(customerId); |
|||
} |
|||
} |
|||
|
|||
Set<ShortCustomerInfo> assignedCustomers = dashboard.getAssignedCustomers(); |
|||
if (assignedCustomers != null) { |
|||
for (ShortCustomerInfo customerInfo : assignedCustomers) { |
|||
if (!customerIds.contains(customerInfo.getCustomerId())) { |
|||
removedCustomerIds.add(customerInfo.getCustomerId()); |
|||
} |
|||
} |
|||
} |
|||
|
|||
if (addedCustomerIds.isEmpty() && removedCustomerIds.isEmpty()) { |
|||
return dashboard; |
|||
} else { |
|||
Dashboard savedDashboard = null; |
|||
for (CustomerId customerId : addedCustomerIds) { |
|||
savedDashboard = checkNotNull(dashboardService.assignDashboardToCustomer(tenantId, dashboard.getId(), customerId)); |
|||
ShortCustomerInfo customerInfo = savedDashboard.getAssignedCustomerInfo(customerId); |
|||
notificationEntityService.notifyAssignOrUnassignEntityToCustomer(tenantId, savedDashboard.getId(), customerId, savedDashboard, |
|||
actionType, EdgeEventActionType.ASSIGNED_TO_CUSTOMER, user, true, customerInfo.getTitle()); |
|||
} |
|||
for (CustomerId customerId : removedCustomerIds) { |
|||
ShortCustomerInfo customerInfo = dashboard.getAssignedCustomerInfo(customerId); |
|||
savedDashboard = checkNotNull(dashboardService.unassignDashboardFromCustomer(tenantId, dashboard.getId(), customerId)); |
|||
notificationEntityService.notifyAssignOrUnassignEntityToCustomer(tenantId, savedDashboard.getId(), customerId, savedDashboard, |
|||
ActionType.UNASSIGNED_FROM_CUSTOMER, EdgeEventActionType.UNASSIGNED_FROM_CUSTOMER, user, true, customerInfo.getTitle()); |
|||
} |
|||
return savedDashboard; |
|||
} |
|||
} catch (Exception e) { |
|||
notificationEntityService.notifyEntity(tenantId, emptyId(EntityType.DASHBOARD), null, null, |
|||
actionType, user, e, dashboard.getId().toString()); |
|||
throw handleException(e); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public Dashboard addDashboardCustomers(Dashboard dashboard, Set<CustomerId> customerIds, SecurityUser user) throws ThingsboardException { |
|||
ActionType actionType = ActionType.ASSIGNED_TO_CUSTOMER; |
|||
TenantId tenantId = user.getTenantId(); |
|||
try { |
|||
if (customerIds.isEmpty()) { |
|||
return dashboard; |
|||
} else { |
|||
Dashboard savedDashboard = null; |
|||
for (CustomerId customerId : customerIds) { |
|||
savedDashboard = checkNotNull(dashboardService.assignDashboardToCustomer(tenantId, dashboard.getId(), customerId)); |
|||
ShortCustomerInfo customerInfo = savedDashboard.getAssignedCustomerInfo(customerId); |
|||
notificationEntityService.notifyAssignOrUnassignEntityToCustomer(tenantId, savedDashboard.getId(), customerId, savedDashboard, |
|||
actionType, EdgeEventActionType.ASSIGNED_TO_CUSTOMER, user, true, customerInfo.getTitle()); |
|||
} |
|||
return savedDashboard; |
|||
} |
|||
} catch (Exception e) { |
|||
notificationEntityService.notifyEntity(tenantId, emptyId(EntityType.DASHBOARD), null, null, |
|||
actionType, user, e, dashboard.getId().toString()); |
|||
throw handleException(e); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public Dashboard removeDashboardCustomers(Dashboard dashboard, Set<CustomerId> customerIds, SecurityUser user) throws ThingsboardException { |
|||
ActionType actionType = ActionType.UNASSIGNED_FROM_CUSTOMER; |
|||
TenantId tenantId = user.getTenantId(); |
|||
try { |
|||
if (customerIds.isEmpty()) { |
|||
return dashboard; |
|||
} else { |
|||
Dashboard savedDashboard = null; |
|||
for (CustomerId customerId : customerIds) { |
|||
ShortCustomerInfo customerInfo = dashboard.getAssignedCustomerInfo(customerId); |
|||
savedDashboard = checkNotNull(dashboardService.unassignDashboardFromCustomer(tenantId, dashboard.getId(), customerId)); |
|||
notificationEntityService.notifyAssignOrUnassignEntityToCustomer(tenantId, savedDashboard.getId(), customerId, savedDashboard, |
|||
actionType, EdgeEventActionType.UNASSIGNED_FROM_CUSTOMER, user, true, customerInfo.getTitle()); |
|||
} |
|||
return savedDashboard; |
|||
} |
|||
} catch (Exception e) { |
|||
notificationEntityService.notifyEntity(tenantId, emptyId(EntityType.DASHBOARD), null, null, |
|||
actionType, user, e, dashboard.getId().toString()); |
|||
throw handleException(e); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public Dashboard asignDashboardToEdge(DashboardId dashboardId, Edge edge, SecurityUser user) throws ThingsboardException { |
|||
ActionType actionType = ActionType.ASSIGNED_TO_EDGE; |
|||
TenantId tenantId = user.getTenantId(); |
|||
EdgeId edgeId = edge.getId(); |
|||
try { |
|||
Dashboard savedDashboard = checkNotNull(dashboardService.assignDashboardToEdge(tenantId, dashboardId, edgeId)); |
|||
notificationEntityService.notifyAssignOrUnassignEntityToEdge(tenantId, dashboardId, user.getCustomerId(), |
|||
edgeId, savedDashboard, actionType, EdgeEventActionType.ASSIGNED_TO_EDGE, user, dashboardId.toString(), |
|||
edgeId.toString(), edge.getName()); |
|||
return savedDashboard; |
|||
} catch (Exception e) { |
|||
notificationEntityService.notifyEntity(tenantId, emptyId(EntityType.DEVICE), null, null, |
|||
actionType, user, e, dashboardId.toString(), edgeId.toString()); |
|||
throw handleException(e); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public Dashboard unassignDashboardFromEdge(Dashboard dashboard, Edge edge, SecurityUser user) throws ThingsboardException { |
|||
ActionType actionType = ActionType.UNASSIGNED_FROM_EDGE; |
|||
TenantId tenantId = dashboard.getTenantId(); |
|||
DashboardId dashboardId = dashboard.getId(); |
|||
EdgeId edgeId = edge.getId(); |
|||
try { |
|||
Dashboard savedDevice = checkNotNull(dashboardService.unassignDashboardFromEdge(tenantId, dashboardId, edgeId)); |
|||
|
|||
notificationEntityService.notifyAssignOrUnassignEntityToEdge(tenantId, dashboardId, user.getCustomerId(), |
|||
edgeId, dashboard, actionType, EdgeEventActionType.UNASSIGNED_FROM_EDGE, user, dashboardId.toString(), |
|||
edgeId.toString(), edge.getName()); |
|||
return savedDevice; |
|||
} catch (Exception e) { |
|||
notificationEntityService.notifyEntity(tenantId, emptyId(EntityType.DASHBOARD), null, null, |
|||
actionType, user, e, dashboardId.toString(), edgeId.toString()); |
|||
throw handleException(e); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public Dashboard unassignDashboardFromCustomer(Dashboard dashboard, Customer customer, SecurityUser user) throws ThingsboardException { |
|||
ActionType actionType = ActionType.UNASSIGNED_FROM_CUSTOMER; |
|||
TenantId tenantId = dashboard.getTenantId(); |
|||
try { |
|||
Dashboard savedDashboard = checkNotNull(dashboardService.unassignDashboardFromCustomer(tenantId, dashboard.getId(), customer.getId())); |
|||
notificationEntityService.notifyAssignOrUnassignEntityToCustomer(tenantId, dashboard.getId(), customer.getId(), savedDashboard, |
|||
actionType, EdgeEventActionType.UNASSIGNED_FROM_CUSTOMER, user, true, customer.getId().toString(), customer.getName()); |
|||
return savedDashboard; |
|||
} catch (Exception e) { |
|||
notificationEntityService.notifyEntity(tenantId, emptyId(EntityType.DASHBOARD), null, null, |
|||
actionType, user, e, dashboard.getId().toString()); |
|||
throw handleException(e); |
|||
} |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,49 @@ |
|||
/** |
|||
* Copyright © 2016-2022 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.entitiy.dashboard; |
|||
|
|||
import org.thingsboard.server.common.data.Customer; |
|||
import org.thingsboard.server.common.data.Dashboard; |
|||
import org.thingsboard.server.common.data.edge.Edge; |
|||
import org.thingsboard.server.common.data.exception.ThingsboardException; |
|||
import org.thingsboard.server.common.data.id.CustomerId; |
|||
import org.thingsboard.server.common.data.id.DashboardId; |
|||
import org.thingsboard.server.service.entitiy.SimpleTbEntityService; |
|||
import org.thingsboard.server.service.security.model.SecurityUser; |
|||
|
|||
import java.util.Set; |
|||
|
|||
public interface TbDashboardService extends SimpleTbEntityService<Dashboard> { |
|||
|
|||
Dashboard assignDashboardToCustomer(DashboardId dashboardId, Customer customer, SecurityUser user) throws ThingsboardException; |
|||
|
|||
Dashboard assignDashboardToPublicCustomer(DashboardId dashboardId, SecurityUser user) throws ThingsboardException; |
|||
|
|||
Dashboard unassignDashboardFromPublicCustomer(Dashboard dashboard, SecurityUser user) throws ThingsboardException; |
|||
|
|||
Dashboard updateDashboardCustomers(Dashboard dashboard, Set<CustomerId> customerIds, SecurityUser user) throws ThingsboardException; |
|||
|
|||
Dashboard addDashboardCustomers(Dashboard dashboard, Set<CustomerId> customerIds, SecurityUser user) throws ThingsboardException; |
|||
|
|||
Dashboard removeDashboardCustomers(Dashboard dashboard, Set<CustomerId> customerIds, SecurityUser user) throws ThingsboardException; |
|||
|
|||
Dashboard asignDashboardToEdge(DashboardId dashboardId, Edge edge, SecurityUser user) throws ThingsboardException; |
|||
|
|||
Dashboard unassignDashboardFromEdge(Dashboard dashboard, Edge edge, SecurityUser user) throws ThingsboardException; |
|||
|
|||
Dashboard unassignDashboardFromCustomer(Dashboard dashboard, Customer customer, SecurityUser user) throws ThingsboardException; |
|||
|
|||
} |
|||
@ -0,0 +1,269 @@ |
|||
/** |
|||
* Copyright © 2016-2022 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.entitiy.queue; |
|||
|
|||
import lombok.AllArgsConstructor; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.stereotype.Service; |
|||
import org.thingsboard.server.cluster.TbClusterService; |
|||
import org.thingsboard.server.common.data.DeviceProfile; |
|||
import org.thingsboard.server.common.data.TenantProfile; |
|||
import org.thingsboard.server.common.data.id.QueueId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.data.page.PageLink; |
|||
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; |
|||
import org.thingsboard.server.common.data.queue.Queue; |
|||
import org.thingsboard.server.common.data.tenant.profile.TenantProfileQueueConfiguration; |
|||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; |
|||
import org.thingsboard.server.dao.device.DeviceProfileService; |
|||
import org.thingsboard.server.queue.TbQueueAdmin; |
|||
import org.thingsboard.server.queue.scheduler.SchedulerComponent; |
|||
import org.thingsboard.server.queue.util.TbCoreComponent; |
|||
import org.thingsboard.server.service.entitiy.AbstractTbEntityService; |
|||
|
|||
import java.util.ArrayList; |
|||
import java.util.Collections; |
|||
import java.util.List; |
|||
import java.util.Map; |
|||
import java.util.concurrent.TimeUnit; |
|||
import java.util.stream.Collectors; |
|||
|
|||
@Slf4j |
|||
@Service |
|||
@TbCoreComponent |
|||
@AllArgsConstructor |
|||
public class DefaultTbQueueService extends AbstractTbEntityService implements TbQueueService { |
|||
private static final String MAIN = "Main"; |
|||
private static final long DELETE_DELAY = 30; |
|||
|
|||
private final TbClusterService tbClusterService; |
|||
private final TbQueueAdmin tbQueueAdmin; |
|||
private final DeviceProfileService deviceProfileService; |
|||
private final SchedulerComponent scheduler; |
|||
|
|||
@Override |
|||
public Queue saveQueue(Queue queue) { |
|||
boolean create = queue.getId() == null; |
|||
Queue oldQueue; |
|||
|
|||
if (create) { |
|||
oldQueue = null; |
|||
} else { |
|||
oldQueue = queueService.findQueueById(queue.getTenantId(), queue.getId()); |
|||
} |
|||
|
|||
//TODO: add checkNotNull
|
|||
Queue savedQueue = queueService.saveQueue(queue); |
|||
|
|||
if (create) { |
|||
onQueueCreated(savedQueue); |
|||
} else { |
|||
onQueueUpdated(savedQueue, oldQueue); |
|||
} |
|||
|
|||
return savedQueue; |
|||
} |
|||
|
|||
@Override |
|||
public void deleteQueue(TenantId tenantId, QueueId queueId) { |
|||
Queue queue = queueService.findQueueById(tenantId, queueId); |
|||
queueService.deleteQueue(tenantId, queueId); |
|||
onQueueDeleted(queue); |
|||
} |
|||
|
|||
@Override |
|||
public void deleteQueueByQueueName(TenantId tenantId, String queueName) { |
|||
Queue queue = queueService.findQueueByTenantIdAndNameInternal(tenantId, queueName); |
|||
queueService.deleteQueue(tenantId, queue.getId()); |
|||
onQueueDeleted(queue); |
|||
} |
|||
|
|||
private void onQueueCreated(Queue queue) { |
|||
for (int i = 0; i < queue.getPartitions(); i++) { |
|||
tbQueueAdmin.createTopicIfNotExists( |
|||
new TopicPartitionInfo(queue.getTopic(), queue.getTenantId(), i, false).getFullTopicName()); |
|||
} |
|||
|
|||
tbClusterService.onQueueChange(queue); |
|||
} |
|||
|
|||
private void onQueueUpdated(Queue queue, Queue oldQueue) { |
|||
int oldPartitions = oldQueue.getPartitions(); |
|||
int currentPartitions = queue.getPartitions(); |
|||
|
|||
if (currentPartitions != oldPartitions) { |
|||
if (currentPartitions > oldPartitions) { |
|||
log.info("Added [{}] new partitions to [{}] queue", currentPartitions - oldPartitions, queue.getName()); |
|||
for (int i = oldPartitions; i < currentPartitions; i++) { |
|||
tbQueueAdmin.createTopicIfNotExists( |
|||
new TopicPartitionInfo(queue.getTopic(), queue.getTenantId(), i, false).getFullTopicName()); |
|||
} |
|||
tbClusterService.onQueueChange(queue); |
|||
} else { |
|||
log.info("Removed [{}] partitions from [{}] queue", oldPartitions - currentPartitions, queue.getName()); |
|||
tbClusterService.onQueueChange(queue); |
|||
|
|||
scheduler.schedule(() -> { |
|||
for (int i = currentPartitions; i < oldPartitions; i++) { |
|||
String fullTopicName = new TopicPartitionInfo(queue.getTopic(), queue.getTenantId(), i, false).getFullTopicName(); |
|||
log.info("Removed partition [{}]", fullTopicName); |
|||
tbQueueAdmin.deleteTopic( |
|||
fullTopicName); |
|||
} |
|||
}, DELETE_DELAY, TimeUnit.SECONDS); |
|||
} |
|||
} else if (!oldQueue.equals(queue)) { |
|||
tbClusterService.onQueueChange(queue); |
|||
} |
|||
} |
|||
|
|||
private void onQueueDeleted(Queue queue) { |
|||
tbClusterService.onQueueDelete(queue); |
|||
|
|||
// queueStatsService.deleteQueueStatsByQueueId(tenantId, queueId);
|
|||
|
|||
scheduler.schedule(() -> { |
|||
for (int i = 0; i < queue.getPartitions(); i++) { |
|||
String fullTopicName = new TopicPartitionInfo(queue.getTopic(), queue.getTenantId(), i, false).getFullTopicName(); |
|||
log.info("Deleting queue [{}]", fullTopicName); |
|||
try { |
|||
tbQueueAdmin.deleteTopic(fullTopicName); |
|||
} catch (Exception e) { |
|||
log.error("Failed to delete queue [{}]", fullTopicName); |
|||
} |
|||
} |
|||
}, DELETE_DELAY, TimeUnit.SECONDS); |
|||
} |
|||
|
|||
@Override |
|||
public void updateQueuesByTenants(List<TenantId> tenantIds, TenantProfile newTenantProfile, TenantProfile |
|||
oldTenantProfile) { |
|||
boolean oldIsolated = oldTenantProfile != null && oldTenantProfile.isIsolatedTbRuleEngine(); |
|||
boolean newIsolated = newTenantProfile.isIsolatedTbRuleEngine(); |
|||
|
|||
if (!oldIsolated && !newIsolated) { |
|||
return; |
|||
} |
|||
|
|||
if (newTenantProfile.equals(oldTenantProfile)) { |
|||
return; |
|||
} |
|||
|
|||
Map<String, TenantProfileQueueConfiguration> oldQueues; |
|||
Map<String, TenantProfileQueueConfiguration> newQueues; |
|||
|
|||
if (oldIsolated) { |
|||
oldQueues = oldTenantProfile.getProfileData().getQueueConfiguration().stream() |
|||
.collect(Collectors.toMap(TenantProfileQueueConfiguration::getName, q -> q)); |
|||
} else { |
|||
oldQueues = Collections.emptyMap(); |
|||
} |
|||
|
|||
if (newIsolated) { |
|||
newQueues = newTenantProfile.getProfileData().getQueueConfiguration().stream() |
|||
.collect(Collectors.toMap(TenantProfileQueueConfiguration::getName, q -> q)); |
|||
} else { |
|||
newQueues = Collections.emptyMap(); |
|||
} |
|||
|
|||
List<String> toRemove = new ArrayList<>(); |
|||
List<String> toCreate = new ArrayList<>(); |
|||
List<String> toUpdate = new ArrayList<>(); |
|||
|
|||
for (String oldQueue : oldQueues.keySet()) { |
|||
if (!newQueues.containsKey(oldQueue)) { |
|||
toRemove.add(oldQueue); |
|||
} |
|||
} |
|||
|
|||
for (String newQueue : newQueues.keySet()) { |
|||
if (oldQueues.containsKey(newQueue)) { |
|||
toUpdate.add(newQueue); |
|||
} else { |
|||
toCreate.add(newQueue); |
|||
} |
|||
} |
|||
|
|||
tenantIds.forEach(tenantId -> { |
|||
Map<QueueId, List<DeviceProfile>> deviceProfileQueues; |
|||
|
|||
if (oldTenantProfile != null && !newTenantProfile.getId().equals(oldTenantProfile.getId()) || !toRemove.isEmpty()) { |
|||
List<DeviceProfile> deviceProfiles = deviceProfileService.findDeviceProfiles(tenantId, new PageLink(Integer.MAX_VALUE)).getData(); |
|||
deviceProfileQueues = deviceProfiles.stream() |
|||
.filter(dp -> dp.getDefaultQueueId() != null) |
|||
.collect(Collectors.groupingBy(DeviceProfile::getDefaultQueueId)); |
|||
} else { |
|||
deviceProfileQueues = Collections.emptyMap(); |
|||
} |
|||
|
|||
Map<String, QueueId> createdQueues = toCreate.stream() |
|||
.map(key -> saveQueue(new Queue(tenantId, newQueues.get(key)))) |
|||
.collect(Collectors.toMap(Queue::getName, Queue::getId)); |
|||
|
|||
// assigning created queues to device profiles instead of system queues
|
|||
if (oldTenantProfile != null && !oldTenantProfile.isIsolatedTbRuleEngine()) { |
|||
deviceProfileQueues.forEach((queueId, list) -> { |
|||
Queue queue = queueService.findQueueById(TenantId.SYS_TENANT_ID, queueId); |
|||
QueueId queueIdToAssign = createdQueues.get(queue.getName()); |
|||
if (queueIdToAssign == null) { |
|||
queueIdToAssign = createdQueues.get(MAIN); |
|||
} |
|||
for (DeviceProfile deviceProfile : list) { |
|||
deviceProfile.setDefaultQueueId(queueIdToAssign); |
|||
saveDeviceProfile(deviceProfile); |
|||
} |
|||
}); |
|||
} |
|||
|
|||
toUpdate.forEach(key -> { |
|||
Queue queueToUpdate = new Queue(tenantId, newQueues.get(key)); |
|||
Queue foundQueue = queueService.findQueueByTenantIdAndName(tenantId, key); |
|||
queueToUpdate.setId(foundQueue.getId()); |
|||
queueToUpdate.setCreatedTime(foundQueue.getCreatedTime()); |
|||
|
|||
if (queueToUpdate.equals(foundQueue)) { |
|||
//Queue not changed
|
|||
} else { |
|||
saveQueue(queueToUpdate); |
|||
} |
|||
}); |
|||
|
|||
toRemove.forEach(q -> { |
|||
Queue queue = queueService.findQueueByTenantIdAndNameInternal(tenantId, q); |
|||
QueueId queueIdForRemove = queue.getId(); |
|||
if (deviceProfileQueues.containsKey(queueIdForRemove)) { |
|||
Queue foundQueue = queueService.findQueueByTenantIdAndName(tenantId, q); |
|||
if (foundQueue == null || queue.equals(foundQueue)) { |
|||
foundQueue = queueService.findQueueByTenantIdAndName(tenantId, MAIN); |
|||
} |
|||
QueueId newQueueId = foundQueue.getId(); |
|||
deviceProfileQueues.get(queueIdForRemove).stream() |
|||
.peek(dp -> dp.setDefaultQueueId(newQueueId)) |
|||
.forEach(this::saveDeviceProfile); |
|||
} |
|||
deleteQueue(tenantId, queueIdForRemove); |
|||
}); |
|||
}); |
|||
} |
|||
|
|||
//TODO: remove after implementing TbDeviceProfileService
|
|||
private void saveDeviceProfile(DeviceProfile deviceProfile) { |
|||
DeviceProfile savedDeviceProfile = deviceProfileService.saveDeviceProfile(deviceProfile); |
|||
tbClusterService.onDeviceProfileChange(savedDeviceProfile, null); |
|||
tbClusterService.broadcastEntityStateChangeEvent(deviceProfile.getTenantId(), savedDeviceProfile.getId(), ComponentLifecycleEvent.UPDATED); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,34 @@ |
|||
/** |
|||
* Copyright © 2016-2022 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.entitiy.queue; |
|||
|
|||
import org.thingsboard.server.common.data.TenantProfile; |
|||
import org.thingsboard.server.common.data.id.QueueId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.data.queue.Queue; |
|||
|
|||
import java.util.List; |
|||
|
|||
public interface TbQueueService { |
|||
|
|||
Queue saveQueue(Queue queue); |
|||
|
|||
void deleteQueue(TenantId tenantId, QueueId queueId); |
|||
|
|||
void deleteQueueByQueueName(TenantId tenantId, String queueName); |
|||
|
|||
void updateQueuesByTenants(List<TenantId> tenantIds, TenantProfile newTenantProfile, TenantProfile oldTenantProfile); |
|||
} |
|||
@ -0,0 +1,50 @@ |
|||
/** |
|||
* Copyright © 2016-2022 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.entitiy.tenant_profile; |
|||
|
|||
import lombok.AllArgsConstructor; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.stereotype.Service; |
|||
import org.thingsboard.server.common.data.TenantProfile; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.dao.tenant.TenantProfileService; |
|||
import org.thingsboard.server.dao.tenant.TenantService; |
|||
import org.thingsboard.server.queue.util.TbCoreComponent; |
|||
import org.thingsboard.server.service.entitiy.queue.TbQueueService; |
|||
|
|||
import java.util.List; |
|||
|
|||
@Slf4j |
|||
@Service |
|||
@TbCoreComponent |
|||
@AllArgsConstructor |
|||
public class DefaultTbTenantProfileService implements TbTenantProfileService { |
|||
private final TbQueueService tbQueueService; |
|||
private final TenantProfileService tenantProfileService; |
|||
private final TenantService tenantService; |
|||
|
|||
@Override |
|||
public TenantProfile saveTenantProfile(TenantId tenantId, TenantProfile tenantProfile, TenantProfile oldTenantProfile) { |
|||
TenantProfile savedTenantProfile = tenantProfileService.saveTenantProfile(tenantId, tenantProfile); |
|||
|
|||
if (oldTenantProfile != null && savedTenantProfile.isIsolatedTbRuleEngine()) { |
|||
List<TenantId> tenantIds = tenantService.findTenantIdsByTenantProfileId(savedTenantProfile.getId()); |
|||
tbQueueService.updateQueuesByTenants(tenantIds, savedTenantProfile, oldTenantProfile); |
|||
} |
|||
|
|||
return savedTenantProfile; |
|||
} |
|||
} |
|||
@ -0,0 +1,23 @@ |
|||
/** |
|||
* Copyright © 2016-2022 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.entitiy.tenant_profile; |
|||
|
|||
import org.thingsboard.server.common.data.TenantProfile; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
|
|||
public interface TbTenantProfileService { |
|||
TenantProfile saveTenantProfile(TenantId tenantId, TenantProfile tenantProfile, TenantProfile oldTenantProfile); |
|||
} |
|||
@ -0,0 +1,48 @@ |
|||
/** |
|||
* Copyright © 2016-2022 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.install; |
|||
|
|||
import lombok.Data; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.boot.autoconfigure.EnableAutoConfiguration; |
|||
import org.springframework.boot.context.properties.ConfigurationProperties; |
|||
import org.springframework.context.annotation.Configuration; |
|||
import org.springframework.context.annotation.Profile; |
|||
import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; |
|||
|
|||
import javax.annotation.PostConstruct; |
|||
import java.util.List; |
|||
|
|||
@Slf4j |
|||
@Data |
|||
@EnableAutoConfiguration |
|||
@Configuration |
|||
@ConfigurationProperties(prefix = "queue.rule-engine") |
|||
@Profile("install") |
|||
public class TbRuleEngineQueueConfigService { |
|||
|
|||
private String topic; |
|||
private List<TbRuleEngineQueueConfiguration> queues; |
|||
|
|||
@PostConstruct |
|||
public void validate() { |
|||
queues.stream().filter(queue -> queue.getName().equals("Main")).findFirst().orElseThrow(() -> { |
|||
log.error("Main queue is not configured in thingsboard.yml"); |
|||
return new RuntimeException("No \"Main\" queue configured!"); |
|||
}); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,44 @@ |
|||
/** |
|||
* Copyright © 2016-2022 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.queue; |
|||
|
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; |
|||
import org.springframework.stereotype.Service; |
|||
import org.thingsboard.server.dao.queue.QueueService; |
|||
import org.thingsboard.server.queue.discovery.QueueRoutingInfo; |
|||
import org.thingsboard.server.queue.discovery.QueueRoutingInfoService; |
|||
|
|||
import java.util.List; |
|||
import java.util.stream.Collectors; |
|||
|
|||
@Slf4j |
|||
@Service |
|||
@ConditionalOnExpression("'${service.type:null}'=='monolith' || '${service.type:null}'=='tb-core' || '${service.type:null}'=='tb-rule-engine'") |
|||
public class DefaultQueueRoutingInfoService implements QueueRoutingInfoService { |
|||
|
|||
private final QueueService queueService; |
|||
|
|||
public DefaultQueueRoutingInfoService(QueueService queueService) { |
|||
this.queueService = queueService; |
|||
} |
|||
|
|||
@Override |
|||
public List<QueueRoutingInfo> getAllQueuesRoutingInfo() { |
|||
return queueService.findAllQueues().stream().map(QueueRoutingInfo::new).collect(Collectors.toList()); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,30 @@ |
|||
/** |
|||
* Copyright © 2016-2022 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.sync.vc; |
|||
|
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
|
|||
public class ClearRepositoryGitRequest extends VoidGitRequest { |
|||
|
|||
public ClearRepositoryGitRequest(TenantId tenantId) { |
|||
super(tenantId); |
|||
} |
|||
|
|||
public boolean requiresSettings() { |
|||
return false; |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,37 @@ |
|||
/** |
|||
* Copyright © 2016-2022 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.sync.vc; |
|||
|
|||
import lombok.Getter; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.data.sync.vc.VersionCreationResult; |
|||
import org.thingsboard.server.common.data.sync.vc.request.create.VersionCreateRequest; |
|||
|
|||
import java.util.UUID; |
|||
|
|||
public class CommitGitRequest extends PendingGitRequest<VersionCreationResult> { |
|||
|
|||
@Getter |
|||
private final UUID txId; |
|||
private final VersionCreateRequest request; |
|||
|
|||
public CommitGitRequest(TenantId tenantId, VersionCreateRequest request) { |
|||
super(tenantId); |
|||
this.txId = UUID.randomUUID(); |
|||
this.request = request; |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,424 @@ |
|||
/** |
|||
* Copyright © 2016-2022 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.sync.vc; |
|||
|
|||
import com.fasterxml.jackson.databind.ObjectMapper; |
|||
import com.fasterxml.jackson.databind.SerializationFeature; |
|||
import com.google.common.util.concurrent.ListenableFuture; |
|||
import com.google.common.util.concurrent.SettableFuture; |
|||
import com.google.protobuf.ByteString; |
|||
import lombok.SneakyThrows; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.context.annotation.Lazy; |
|||
import org.springframework.stereotype.Service; |
|||
import org.thingsboard.server.cluster.TbClusterService; |
|||
import org.thingsboard.server.common.data.EntityType; |
|||
import org.thingsboard.server.common.data.ExportableEntity; |
|||
import org.thingsboard.server.common.data.StringUtils; |
|||
import org.thingsboard.server.common.data.id.EntityId; |
|||
import org.thingsboard.server.common.data.id.EntityIdFactory; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.data.page.PageData; |
|||
import org.thingsboard.server.common.data.page.PageLink; |
|||
import org.thingsboard.server.common.data.sync.ie.EntityExportData; |
|||
import org.thingsboard.server.common.data.sync.vc.EntitiesVersionControlSettings; |
|||
import org.thingsboard.server.common.data.sync.vc.EntityVersion; |
|||
import org.thingsboard.server.common.data.sync.vc.VersionCreationResult; |
|||
import org.thingsboard.server.common.data.sync.vc.VersionedEntityInfo; |
|||
import org.thingsboard.server.common.data.sync.vc.request.create.VersionCreateRequest; |
|||
import org.thingsboard.server.gen.transport.TransportProtos; |
|||
import org.thingsboard.server.gen.transport.TransportProtos.CommitRequestMsg; |
|||
import org.thingsboard.server.gen.transport.TransportProtos.EntitiesContentRequestMsg; |
|||
import org.thingsboard.server.gen.transport.TransportProtos.EntityContentRequestMsg; |
|||
import org.thingsboard.server.gen.transport.TransportProtos.GenericRepositoryRequestMsg; |
|||
import org.thingsboard.server.gen.transport.TransportProtos.ListEntitiesRequestMsg; |
|||
import org.thingsboard.server.gen.transport.TransportProtos.ListVersionsRequestMsg; |
|||
import org.thingsboard.server.gen.transport.TransportProtos.PrepareMsg; |
|||
import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg; |
|||
import org.thingsboard.server.gen.transport.TransportProtos.VersionControlResponseMsg; |
|||
import org.thingsboard.server.queue.TbQueueCallback; |
|||
import org.thingsboard.server.queue.TbQueueMsgMetadata; |
|||
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; |
|||
import org.thingsboard.server.queue.util.DataDecodingEncodingService; |
|||
import org.thingsboard.server.queue.util.TbCoreComponent; |
|||
|
|||
import java.io.IOException; |
|||
import java.util.HashMap; |
|||
import java.util.List; |
|||
import java.util.Map; |
|||
import java.util.UUID; |
|||
import java.util.function.Function; |
|||
import java.util.stream.Collectors; |
|||
|
|||
@TbCoreComponent |
|||
@Service |
|||
@Slf4j |
|||
public class DefaultGitVersionControlQueueService implements GitVersionControlQueueService { |
|||
|
|||
private final TbServiceInfoProvider serviceInfoProvider; |
|||
private final TbClusterService clusterService; |
|||
private final DataDecodingEncodingService encodingService; |
|||
private final DefaultEntitiesVersionControlService entitiesVersionControlService; |
|||
|
|||
private final Map<UUID, PendingGitRequest<?>> pendingRequestMap = new HashMap<>(); |
|||
private final ObjectMapper jsonMapper = new ObjectMapper().enable(SerializationFeature.INDENT_OUTPUT); |
|||
|
|||
public DefaultGitVersionControlQueueService(TbServiceInfoProvider serviceInfoProvider, TbClusterService clusterService, |
|||
DataDecodingEncodingService encodingService, |
|||
@Lazy DefaultEntitiesVersionControlService entitiesVersionControlService) { |
|||
this.serviceInfoProvider = serviceInfoProvider; |
|||
this.clusterService = clusterService; |
|||
this.encodingService = encodingService; |
|||
this.entitiesVersionControlService = entitiesVersionControlService; |
|||
} |
|||
|
|||
@Override |
|||
public ListenableFuture<CommitGitRequest> prepareCommit(TenantId tenantId, VersionCreateRequest request) { |
|||
SettableFuture<CommitGitRequest> future = SettableFuture.create(); |
|||
|
|||
CommitGitRequest commit = new CommitGitRequest(tenantId, request); |
|||
registerAndSend(commit, builder -> builder.setCommitRequest( |
|||
buildCommitRequest(commit).setPrepareMsg(getCommitPrepareMsg(request)).build() |
|||
).build(), wrap(future, commit)); |
|||
return future; |
|||
} |
|||
|
|||
@Override |
|||
public ListenableFuture<Void> addToCommit(CommitGitRequest commit, EntityExportData<ExportableEntity<EntityId>> entityData) { |
|||
SettableFuture<Void> future = SettableFuture.create(); |
|||
|
|||
String path = getRelativePath(entityData.getEntityType(), entityData.getEntity().getId()); |
|||
String entityDataJson; |
|||
try { |
|||
entityDataJson = jsonMapper.writeValueAsString(entityData); |
|||
} catch (IOException e) { |
|||
//TODO: analyze and return meaningful exceptions that we can show to the client;
|
|||
throw new RuntimeException(e); |
|||
} |
|||
|
|||
registerAndSend(commit, builder -> builder.setCommitRequest( |
|||
buildCommitRequest(commit).setAddMsg( |
|||
TransportProtos.AddMsg.newBuilder() |
|||
.setRelativePath(path).setEntityDataJson(entityDataJson).build() |
|||
).build() |
|||
).build(), wrap(future, null)); |
|||
return future; |
|||
} |
|||
|
|||
@Override |
|||
public ListenableFuture<Void> deleteAll(CommitGitRequest commit, EntityType entityType) { |
|||
SettableFuture<Void> future = SettableFuture.create(); |
|||
|
|||
String path = getRelativePath(entityType, null); |
|||
|
|||
registerAndSend(commit, builder -> builder.setCommitRequest( |
|||
buildCommitRequest(commit).setDeleteMsg( |
|||
TransportProtos.DeleteMsg.newBuilder().setRelativePath(path).build() |
|||
).build() |
|||
).build(), wrap(commit.getFuture(), null)); |
|||
|
|||
return future; |
|||
} |
|||
|
|||
@Override |
|||
public ListenableFuture<VersionCreationResult> push(CommitGitRequest commit) { |
|||
registerAndSend(commit, builder -> builder.setCommitRequest( |
|||
buildCommitRequest(commit).setPushMsg( |
|||
TransportProtos.PushMsg.newBuilder().build() |
|||
).build() |
|||
).build(), wrap(commit.getFuture())); |
|||
|
|||
return commit.getFuture(); |
|||
} |
|||
|
|||
@Override |
|||
public ListenableFuture<PageData<EntityVersion>> listVersions(TenantId tenantId, String branch, PageLink pageLink) { |
|||
return listVersions(tenantId, ListVersionsRequestMsg.newBuilder() |
|||
.setBranchName(branch) |
|||
.setPageSize(pageLink.getPageSize()) |
|||
.setPage(pageLink.getPage()) |
|||
.build()); |
|||
} |
|||
|
|||
@Override |
|||
public ListenableFuture<PageData<EntityVersion>> listVersions(TenantId tenantId, String branch, EntityType entityType, PageLink pageLink) { |
|||
return listVersions(tenantId, ListVersionsRequestMsg.newBuilder() |
|||
.setBranchName(branch).setEntityType(entityType.name()) |
|||
.setPageSize(pageLink.getPageSize()) |
|||
.setPage(pageLink.getPage()) |
|||
.build()); |
|||
} |
|||
|
|||
@Override |
|||
public ListenableFuture<PageData<EntityVersion>> listVersions(TenantId tenantId, String branch, EntityId entityId, PageLink pageLink) { |
|||
return listVersions(tenantId, ListVersionsRequestMsg.newBuilder() |
|||
.setBranchName(branch) |
|||
.setEntityType(entityId.getEntityType().name()) |
|||
.setEntityIdMSB(entityId.getId().getMostSignificantBits()) |
|||
.setEntityIdLSB(entityId.getId().getLeastSignificantBits()) |
|||
.setPageSize(pageLink.getPageSize()) |
|||
.setPage(pageLink.getPage()) |
|||
.build()); |
|||
} |
|||
|
|||
private ListenableFuture<PageData<EntityVersion>> listVersions(TenantId tenantId, ListVersionsRequestMsg requestMsg) { |
|||
ListVersionsGitRequest request = new ListVersionsGitRequest(tenantId); |
|||
|
|||
registerAndSend(request, builder -> builder.setListVersionRequest(requestMsg).build(), wrap(request.getFuture())); |
|||
|
|||
return request.getFuture(); |
|||
} |
|||
|
|||
@Override |
|||
public ListenableFuture<List<VersionedEntityInfo>> listEntitiesAtVersion(TenantId tenantId, String branch, String versionId, EntityType entityType) { |
|||
return listEntitiesAtVersion(tenantId, ListEntitiesRequestMsg.newBuilder() |
|||
.setBranchName(branch) |
|||
.setVersionId(versionId) |
|||
.setEntityType(entityType.name()) |
|||
.build()); |
|||
} |
|||
|
|||
@Override |
|||
public ListenableFuture<List<VersionedEntityInfo>> listEntitiesAtVersion(TenantId tenantId, String branch, String versionId) { |
|||
return listEntitiesAtVersion(tenantId, ListEntitiesRequestMsg.newBuilder() |
|||
.setBranchName(branch) |
|||
.setVersionId(versionId) |
|||
.build()); |
|||
} |
|||
|
|||
private ListenableFuture<List<VersionedEntityInfo>> listEntitiesAtVersion(TenantId tenantId, TransportProtos.ListEntitiesRequestMsg requestMsg) { |
|||
ListEntitiesGitRequest request = new ListEntitiesGitRequest(tenantId); |
|||
|
|||
registerAndSend(request, builder -> builder.setListEntitiesRequest(requestMsg).build(), wrap(request.getFuture())); |
|||
|
|||
return request.getFuture(); |
|||
} |
|||
|
|||
@Override |
|||
public ListenableFuture<List<String>> listBranches(TenantId tenantId) { |
|||
ListBranchesGitRequest request = new ListBranchesGitRequest(tenantId); |
|||
|
|||
registerAndSend(request, builder -> builder.setListBranchesRequest(TransportProtos.ListBranchesRequestMsg.newBuilder().build()).build(), wrap(request.getFuture())); |
|||
|
|||
return request.getFuture(); |
|||
} |
|||
|
|||
@Override |
|||
@SuppressWarnings("rawtypes") |
|||
public ListenableFuture<EntityExportData> getEntity(TenantId tenantId, String versionId, EntityId entityId) { |
|||
EntityContentGitRequest request = new EntityContentGitRequest(tenantId, versionId, entityId); |
|||
registerAndSend(request, builder -> builder.setEntityContentRequest(EntityContentRequestMsg.newBuilder() |
|||
.setVersionId(versionId) |
|||
.setEntityType(entityId.getEntityType().name()) |
|||
.setEntityIdMSB(entityId.getId().getMostSignificantBits()) |
|||
.setEntityIdLSB(entityId.getId().getLeastSignificantBits())).build() |
|||
, wrap(request.getFuture())); |
|||
return request.getFuture(); |
|||
} |
|||
|
|||
private <T> void registerAndSend(PendingGitRequest<T> request, |
|||
Function<ToVersionControlServiceMsg.Builder, ToVersionControlServiceMsg> enrichFunction, TbQueueCallback callback) { |
|||
registerAndSend(request, enrichFunction, null, callback); |
|||
} |
|||
|
|||
private <T> void registerAndSend(PendingGitRequest<T> request, |
|||
Function<ToVersionControlServiceMsg.Builder, ToVersionControlServiceMsg> enrichFunction, EntitiesVersionControlSettings settings, TbQueueCallback callback) { |
|||
if (!request.getFuture().isDone()) { |
|||
pendingRequestMap.putIfAbsent(request.getRequestId(), request); |
|||
var requestBody = enrichFunction.apply(newRequestProto(request, settings)); |
|||
log.trace("[{}][{}] PUSHING request: {}", request.getTenantId(), request.getRequestId(), requestBody); |
|||
clusterService.pushMsgToVersionControl(request.getTenantId(), requestBody, callback); |
|||
} else { |
|||
throw new RuntimeException("Future is already done!"); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
@SuppressWarnings("rawtypes") |
|||
public ListenableFuture<List<EntityExportData>> getEntities(TenantId tenantId, String versionId, EntityType entityType, int offset, int limit) { |
|||
EntitiesContentGitRequest request = new EntitiesContentGitRequest(tenantId, versionId, entityType); |
|||
|
|||
registerAndSend(request, builder -> builder.setEntitiesContentRequest(EntitiesContentRequestMsg.newBuilder() |
|||
.setVersionId(versionId) |
|||
.setEntityType(entityType.name()) |
|||
.setOffset(offset) |
|||
.setLimit(limit) |
|||
).build() |
|||
, wrap(request.getFuture())); |
|||
|
|||
return request.getFuture(); |
|||
} |
|||
|
|||
@Override |
|||
public ListenableFuture<Void> initRepository(TenantId tenantId, EntitiesVersionControlSettings settings) { |
|||
VoidGitRequest request = new VoidGitRequest(tenantId); |
|||
|
|||
registerAndSend(request, builder -> builder.setInitRepositoryRequest(GenericRepositoryRequestMsg.newBuilder().build()).build() |
|||
, settings, wrap(request.getFuture())); |
|||
|
|||
return request.getFuture(); |
|||
} |
|||
|
|||
@Override |
|||
public ListenableFuture<Void> testRepository(TenantId tenantId, EntitiesVersionControlSettings settings) { |
|||
VoidGitRequest request = new VoidGitRequest(tenantId); |
|||
|
|||
registerAndSend(request, builder -> builder |
|||
.setTestRepositoryRequest(GenericRepositoryRequestMsg.newBuilder().build()).build() |
|||
, settings, wrap(request.getFuture())); |
|||
|
|||
return request.getFuture(); |
|||
} |
|||
|
|||
@Override |
|||
public ListenableFuture<Void> clearRepository(TenantId tenantId) { |
|||
ClearRepositoryGitRequest request = new ClearRepositoryGitRequest(tenantId); |
|||
|
|||
registerAndSend(request, builder -> builder.setClearRepositoryRequest(GenericRepositoryRequestMsg.newBuilder().build()).build() |
|||
, wrap(request.getFuture())); |
|||
|
|||
return request.getFuture(); |
|||
} |
|||
|
|||
@Override |
|||
public void processResponse(VersionControlResponseMsg vcResponseMsg) { |
|||
UUID requestId = new UUID(vcResponseMsg.getRequestIdMSB(), vcResponseMsg.getRequestIdLSB()); |
|||
PendingGitRequest<?> request = pendingRequestMap.get(requestId); |
|||
if (request == null) { |
|||
log.debug("[{}] received stale response: {}", requestId, vcResponseMsg); |
|||
return; |
|||
} else { |
|||
log.debug("[{}] processing response: {}", requestId, vcResponseMsg); |
|||
} |
|||
var future = request.getFuture(); |
|||
if (!StringUtils.isEmpty(vcResponseMsg.getError())) { |
|||
future.setException(new RuntimeException(vcResponseMsg.getError())); |
|||
} else { |
|||
if (vcResponseMsg.hasGenericResponse()) { |
|||
future.set(null); |
|||
} else if (vcResponseMsg.hasCommitResponse()) { |
|||
var commitResponse = vcResponseMsg.getCommitResponse(); |
|||
var commitResult = new VersionCreationResult(); |
|||
commitResult.setVersion(new EntityVersion(commitResponse.getCommitId(), commitResponse.getName())); |
|||
commitResult.setAdded(commitResponse.getAdded()); |
|||
commitResult.setRemoved(commitResponse.getRemoved()); |
|||
commitResult.setModified(commitResponse.getModified()); |
|||
((CommitGitRequest) request).getFuture().set(commitResult); |
|||
} else if (vcResponseMsg.hasListBranchesResponse()) { |
|||
var listBranchesResponse = vcResponseMsg.getListBranchesResponse(); |
|||
((ListBranchesGitRequest) request).getFuture().set(listBranchesResponse.getBranchesList()); |
|||
} else if (vcResponseMsg.hasListEntitiesResponse()) { |
|||
var listEntitiesResponse = vcResponseMsg.getListEntitiesResponse(); |
|||
((ListEntitiesGitRequest) request).getFuture().set( |
|||
listEntitiesResponse.getEntitiesList().stream().map(this::getVersionedEntityInfo).collect(Collectors.toList())); |
|||
} else if (vcResponseMsg.hasListVersionsResponse()) { |
|||
var listVersionsResponse = vcResponseMsg.getListVersionsResponse(); |
|||
((ListVersionsGitRequest) request).getFuture().set(toPageData(listVersionsResponse)); |
|||
} else if (vcResponseMsg.hasEntityContentResponse()) { |
|||
var data = vcResponseMsg.getEntityContentResponse().getData(); |
|||
((EntityContentGitRequest) request).getFuture().set(toData(data)); |
|||
} else if (vcResponseMsg.hasEntitiesContentResponse()) { |
|||
var dataList = vcResponseMsg.getEntitiesContentResponse().getDataList(); |
|||
((EntitiesContentGitRequest) request).getFuture() |
|||
.set(dataList.stream().map(this::toData).collect(Collectors.toList())); |
|||
} |
|||
} |
|||
} |
|||
|
|||
private PageData<EntityVersion> toPageData(TransportProtos.ListVersionsResponseMsg listVersionsResponse) { |
|||
var listVersions = listVersionsResponse.getVersionsList().stream().map(this::getEntityVersion).collect(Collectors.toList()); |
|||
return new PageData<>(listVersions, listVersionsResponse.getTotalPages(), listVersionsResponse.getTotalElements(), listVersionsResponse.getHasNext()); |
|||
} |
|||
|
|||
private EntityVersion getEntityVersion(TransportProtos.EntityVersionProto proto) { |
|||
return new EntityVersion(proto.getId(), proto.getName()); |
|||
} |
|||
|
|||
private VersionedEntityInfo getVersionedEntityInfo(TransportProtos.VersionedEntityInfoProto proto) { |
|||
return new VersionedEntityInfo(EntityIdFactory.getByTypeAndUuid(proto.getEntityType(), new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB()))); |
|||
} |
|||
|
|||
@SuppressWarnings("rawtypes") |
|||
@SneakyThrows |
|||
private EntityExportData toData(String data) { |
|||
return jsonMapper.readValue(data, EntityExportData.class); |
|||
} |
|||
|
|||
private static <T> TbQueueCallback wrap(SettableFuture<T> future) { |
|||
return new TbQueueCallback() { |
|||
@Override |
|||
public void onSuccess(TbQueueMsgMetadata metadata) { |
|||
} |
|||
|
|||
@Override |
|||
public void onFailure(Throwable t) { |
|||
future.setException(t); |
|||
} |
|||
}; |
|||
} |
|||
|
|||
private static <T> TbQueueCallback wrap(SettableFuture<T> future, T value) { |
|||
return new TbQueueCallback() { |
|||
@Override |
|||
public void onSuccess(TbQueueMsgMetadata metadata) { |
|||
future.set(value); |
|||
} |
|||
|
|||
@Override |
|||
public void onFailure(Throwable t) { |
|||
future.setException(t); |
|||
} |
|||
}; |
|||
} |
|||
|
|||
private static String getRelativePath(EntityType entityType, EntityId entityId) { |
|||
String path = entityType.name().toLowerCase(); |
|||
if (entityId != null) { |
|||
path += "/" + entityId + ".json"; |
|||
} |
|||
return path; |
|||
} |
|||
|
|||
private static PrepareMsg getCommitPrepareMsg(VersionCreateRequest request) { |
|||
return PrepareMsg.newBuilder().setCommitMsg(request.getVersionName()).setBranchName(request.getBranch()).build(); |
|||
} |
|||
|
|||
private ToVersionControlServiceMsg.Builder newRequestProto(PendingGitRequest<?> request, EntitiesVersionControlSettings settings) { |
|||
var tenantId = request.getTenantId(); |
|||
var requestId = request.getRequestId(); |
|||
var builder = ToVersionControlServiceMsg.newBuilder() |
|||
.setNodeId(serviceInfoProvider.getServiceId()) |
|||
.setTenantIdMSB(tenantId.getId().getMostSignificantBits()) |
|||
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) |
|||
.setRequestIdMSB(requestId.getMostSignificantBits()) |
|||
.setRequestIdLSB(requestId.getLeastSignificantBits()); |
|||
EntitiesVersionControlSettings vcSettings = settings; |
|||
if (vcSettings == null && request.requiresSettings()) { |
|||
vcSettings = entitiesVersionControlService.getVersionControlSettings(tenantId); |
|||
} |
|||
if (vcSettings != null) { |
|||
builder.setVcSettings(ByteString.copyFrom(encodingService.encode(vcSettings))); |
|||
} else { |
|||
throw new RuntimeException("No entity version control settings provisioned!"); |
|||
} |
|||
return builder; |
|||
} |
|||
|
|||
private CommitRequestMsg.Builder buildCommitRequest(CommitGitRequest commit) { |
|||
return CommitRequestMsg.newBuilder().setTxId(commit.getTxId().toString()); |
|||
} |
|||
} |
|||
|
|||
@ -0,0 +1,37 @@ |
|||
/** |
|||
* Copyright © 2016-2022 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.sync.vc; |
|||
|
|||
import lombok.Getter; |
|||
import org.thingsboard.server.common.data.EntityType; |
|||
import org.thingsboard.server.common.data.id.EntityId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.data.sync.ie.EntityExportData; |
|||
|
|||
import java.util.List; |
|||
|
|||
@Getter |
|||
public class EntitiesContentGitRequest extends PendingGitRequest<List<EntityExportData>> { |
|||
|
|||
private final String versionId; |
|||
private final EntityType entityType; |
|||
|
|||
public EntitiesContentGitRequest(TenantId tenantId, String versionId, EntityType entityType) { |
|||
super(tenantId); |
|||
this.versionId = versionId; |
|||
this.entityType = entityType; |
|||
} |
|||
} |
|||
@ -0,0 +1,34 @@ |
|||
/** |
|||
* Copyright © 2016-2022 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.sync.vc; |
|||
|
|||
import lombok.Getter; |
|||
import org.thingsboard.server.common.data.id.EntityId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.data.sync.ie.EntityExportData; |
|||
|
|||
@Getter |
|||
public class EntityContentGitRequest extends PendingGitRequest<EntityExportData> { |
|||
|
|||
private final String versionId; |
|||
private final EntityId entityId; |
|||
|
|||
public EntityContentGitRequest(TenantId tenantId, String versionId, EntityId entityId) { |
|||
super(tenantId); |
|||
this.versionId = versionId; |
|||
this.entityId = entityId; |
|||
} |
|||
} |
|||
@ -0,0 +1,68 @@ |
|||
/** |
|||
* Copyright © 2016-2022 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.sync.vc; |
|||
|
|||
import com.google.common.util.concurrent.ListenableFuture; |
|||
import org.thingsboard.server.common.data.EntityType; |
|||
import org.thingsboard.server.common.data.ExportableEntity; |
|||
import org.thingsboard.server.common.data.id.EntityId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.data.page.PageData; |
|||
import org.thingsboard.server.common.data.page.PageLink; |
|||
import org.thingsboard.server.common.data.sync.ie.EntityExportData; |
|||
import org.thingsboard.server.common.data.sync.vc.EntitiesVersionControlSettings; |
|||
import org.thingsboard.server.common.data.sync.vc.EntityVersion; |
|||
import org.thingsboard.server.common.data.sync.vc.VersionCreationResult; |
|||
import org.thingsboard.server.common.data.sync.vc.VersionedEntityInfo; |
|||
import org.thingsboard.server.common.data.sync.vc.request.create.VersionCreateRequest; |
|||
import org.thingsboard.server.gen.transport.TransportProtos.VersionControlResponseMsg; |
|||
|
|||
import java.util.List; |
|||
|
|||
public interface GitVersionControlQueueService { |
|||
|
|||
ListenableFuture<CommitGitRequest> prepareCommit(TenantId tenantId, VersionCreateRequest request); |
|||
|
|||
ListenableFuture<Void> addToCommit(CommitGitRequest commit, EntityExportData<ExportableEntity<EntityId>> entityData); |
|||
|
|||
ListenableFuture<Void> deleteAll(CommitGitRequest pendingCommit, EntityType entityType); |
|||
|
|||
ListenableFuture<VersionCreationResult> push(CommitGitRequest commit); |
|||
|
|||
ListenableFuture<PageData<EntityVersion>> listVersions(TenantId tenantId, String branch, PageLink pageLink); |
|||
|
|||
ListenableFuture<PageData<EntityVersion>> listVersions(TenantId tenantId, String branch, EntityType entityType, PageLink pageLink); |
|||
|
|||
ListenableFuture<PageData<EntityVersion>> listVersions(TenantId tenantId, String branch, EntityId entityId, PageLink pageLink); |
|||
|
|||
ListenableFuture<List<VersionedEntityInfo>> listEntitiesAtVersion(TenantId tenantId, String branch, String versionId, EntityType entityType); |
|||
|
|||
ListenableFuture<List<VersionedEntityInfo>> listEntitiesAtVersion(TenantId tenantId, String branch, String versionId); |
|||
|
|||
ListenableFuture<List<String>> listBranches(TenantId tenantId); |
|||
|
|||
ListenableFuture<EntityExportData> getEntity(TenantId tenantId, String versionId, EntityId entityId); |
|||
|
|||
ListenableFuture<List<EntityExportData>> getEntities(TenantId tenantId, String versionId, EntityType entityType, int offset, int limit); |
|||
|
|||
ListenableFuture<Void> initRepository(TenantId tenantId, EntitiesVersionControlSettings settings); |
|||
|
|||
ListenableFuture<Void> testRepository(TenantId tenantId, EntitiesVersionControlSettings settings); |
|||
|
|||
ListenableFuture<Void> clearRepository(TenantId tenantId); |
|||
|
|||
void processResponse(VersionControlResponseMsg vcResponseMsg); |
|||
} |
|||
@ -0,0 +1,29 @@ |
|||
/** |
|||
* Copyright © 2016-2022 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.sync.vc; |
|||
|
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.data.sync.vc.VersionedEntityInfo; |
|||
|
|||
import java.util.List; |
|||
|
|||
public class ListBranchesGitRequest extends PendingGitRequest<List<String>> { |
|||
|
|||
public ListBranchesGitRequest(TenantId tenantId) { |
|||
super(tenantId); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,30 @@ |
|||
/** |
|||
* Copyright © 2016-2022 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.sync.vc; |
|||
|
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.data.sync.vc.EntityVersion; |
|||
import org.thingsboard.server.common.data.sync.vc.VersionedEntityInfo; |
|||
|
|||
import java.util.List; |
|||
|
|||
public class ListEntitiesGitRequest extends PendingGitRequest<List<VersionedEntityInfo>> { |
|||
|
|||
public ListEntitiesGitRequest(TenantId tenantId) { |
|||
super(tenantId); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,32 @@ |
|||
/** |
|||
* Copyright © 2016-2022 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.sync.vc; |
|||
|
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.data.page.PageData; |
|||
import org.thingsboard.server.common.data.sync.vc.EntityVersion; |
|||
import org.thingsboard.server.common.data.sync.vc.VersionCreationResult; |
|||
import org.thingsboard.server.common.data.sync.vc.request.create.VersionCreateRequest; |
|||
|
|||
import java.util.List; |
|||
|
|||
public class ListVersionsGitRequest extends PendingGitRequest<PageData<EntityVersion>> { |
|||
|
|||
public ListVersionsGitRequest(TenantId tenantId) { |
|||
super(tenantId); |
|||
} |
|||
|
|||
} |
|||
@ -1,311 +0,0 @@ |
|||
/** |
|||
* Copyright © 2016-2022 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.sync.vc; |
|||
|
|||
import com.fasterxml.jackson.databind.ObjectMapper; |
|||
import com.fasterxml.jackson.databind.SerializationFeature; |
|||
import lombok.RequiredArgsConstructor; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; |
|||
import org.springframework.stereotype.Service; |
|||
import org.thingsboard.common.util.JacksonUtil; |
|||
import org.thingsboard.server.common.data.AdminSettings; |
|||
import org.thingsboard.server.common.data.EntityType; |
|||
import org.thingsboard.server.common.data.ExportableEntity; |
|||
import org.thingsboard.server.common.data.id.EntityId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.data.kv.KvEntry; |
|||
import org.thingsboard.server.common.data.page.PageData; |
|||
import org.thingsboard.server.common.data.page.PageLink; |
|||
import org.thingsboard.server.common.data.sync.ie.EntityExportData; |
|||
import org.thingsboard.server.common.data.sync.vc.EntitiesVersionControlSettings; |
|||
import org.thingsboard.server.common.data.sync.vc.EntityVersion; |
|||
import org.thingsboard.server.common.data.sync.vc.VersionCreationResult; |
|||
import org.thingsboard.server.common.data.sync.vc.VersionedEntityInfo; |
|||
import org.thingsboard.server.common.data.sync.vc.request.create.VersionCreateRequest; |
|||
import org.thingsboard.server.dao.DaoUtil; |
|||
import org.thingsboard.server.dao.settings.AdminSettingsService; |
|||
import org.thingsboard.server.dao.tenant.TenantDao; |
|||
import org.thingsboard.server.queue.util.AfterStartUp; |
|||
|
|||
import java.io.IOException; |
|||
import java.util.ConcurrentModificationException; |
|||
import java.util.HashMap; |
|||
import java.util.List; |
|||
import java.util.Map; |
|||
import java.util.concurrent.ConcurrentHashMap; |
|||
import java.util.concurrent.ConcurrentMap; |
|||
import java.util.concurrent.locks.Lock; |
|||
import java.util.concurrent.locks.ReentrantLock; |
|||
import java.util.function.Consumer; |
|||
import java.util.function.Function; |
|||
import java.util.stream.Collectors; |
|||
|
|||
@Slf4j |
|||
@RequiredArgsConstructor |
|||
@Service |
|||
@ConditionalOnProperty(prefix = "vc", value = "git.service", havingValue = "local", matchIfMissing = true) |
|||
public class LocalGitVersionControlService implements GitVersionControlService { |
|||
|
|||
private final GitRepositoryService gitRepositoryService; |
|||
private final TenantDao tenantDao; |
|||
private final AdminSettingsService adminSettingsService; |
|||
|
|||
private final ConcurrentMap<TenantId, Lock> tenantRepoLocks = new ConcurrentHashMap<>(); |
|||
private final Map<TenantId, PendingCommit> pendingCommitMap = new HashMap<>(); |
|||
|
|||
private final ObjectMapper jsonMapper = new ObjectMapper() |
|||
.enable(SerializationFeature.INDENT_OUTPUT); |
|||
|
|||
@AfterStartUp |
|||
public void init() { |
|||
DaoUtil.processInBatches(tenantDao::findTenantsIds, 100, tenantId -> { |
|||
EntitiesVersionControlSettings settings = getVersionControlSettings(tenantId); |
|||
if (settings != null) { |
|||
try { |
|||
gitRepositoryService.initRepository(tenantId, settings); |
|||
} catch (Exception e) { |
|||
log.warn("Failed to init repository for tenant {}", tenantId, e); |
|||
} |
|||
} |
|||
}); |
|||
} |
|||
|
|||
@Override |
|||
public void testRepository(TenantId tenantId, EntitiesVersionControlSettings settings) { |
|||
var lock = getRepoLock(tenantId); |
|||
lock.lock(); |
|||
try { |
|||
gitRepositoryService.testRepository(tenantId, settings); |
|||
} catch (Exception e) { |
|||
//TODO: analyze and return meaningful exceptions that we can show to the client;
|
|||
throw new RuntimeException(e); |
|||
} finally { |
|||
lock.unlock(); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public void initRepository(TenantId tenantId, EntitiesVersionControlSettings settings) { |
|||
var lock = getRepoLock(tenantId); |
|||
lock.lock(); |
|||
try { |
|||
gitRepositoryService.initRepository(tenantId, settings); |
|||
} catch (Exception e) { |
|||
//TODO: analyze and return meaningful exceptions that we can show to the client;
|
|||
throw new RuntimeException(e); |
|||
} finally { |
|||
lock.unlock(); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public void clearRepository(TenantId tenantId) { |
|||
var lock = getRepoLock(tenantId); |
|||
lock.lock(); |
|||
try { |
|||
gitRepositoryService.clearRepository(tenantId); |
|||
} catch (Exception e) { |
|||
//TODO: analyze and return meaningful exceptions that we can show to the client;
|
|||
throw new RuntimeException(e); |
|||
} finally { |
|||
lock.unlock(); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public PendingCommit prepareCommit(TenantId tenantId, VersionCreateRequest request) { |
|||
var lock = getRepoLock(tenantId); |
|||
lock.lock(); |
|||
try { |
|||
var pendingCommit = new PendingCommit(tenantId, request); |
|||
PendingCommit old = pendingCommitMap.put(tenantId, pendingCommit); |
|||
if (old != null) { |
|||
gitRepositoryService.abort(old); |
|||
} |
|||
try { |
|||
gitRepositoryService.prepareCommit(pendingCommit); |
|||
} catch (Exception e) { |
|||
pendingCommitMap.remove(tenantId); |
|||
gitRepositoryService.cleanUp(pendingCommit); |
|||
throw e; |
|||
} |
|||
return pendingCommit; |
|||
} finally { |
|||
lock.unlock(); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public void deleteAll(PendingCommit commit, EntityType entityType) { |
|||
doInsideLock(commit, c -> { |
|||
try { |
|||
gitRepositoryService.deleteFolderContent(commit, getRelativePath(entityType, null)); |
|||
} catch (IOException e) { |
|||
//TODO: analyze and return meaningful exceptions that we can show to the client;
|
|||
throw new RuntimeException(e); |
|||
} |
|||
}); |
|||
} |
|||
|
|||
@Override |
|||
public void addToCommit(PendingCommit commit, EntityExportData<ExportableEntity<EntityId>> entityData) { |
|||
doInsideLock(commit, c -> { |
|||
String entityDataJson; |
|||
try { |
|||
entityDataJson = jsonMapper.writeValueAsString(entityData); |
|||
gitRepositoryService.add(c, getRelativePath(entityData.getEntityType(), |
|||
entityData.getEntity().getId().toString()), entityDataJson); |
|||
} catch (IOException e) { |
|||
//TODO: analyze and return meaningful exceptions that we can show to the client;
|
|||
throw new RuntimeException(e); |
|||
} |
|||
}); |
|||
} |
|||
|
|||
@Override |
|||
public VersionCreationResult push(PendingCommit commit) { |
|||
VersionCreationResult result = executeInsideLock(commit, gitRepositoryService::push); |
|||
pendingCommitMap.remove(commit.getTenantId()); |
|||
return result; |
|||
} |
|||
|
|||
@Override |
|||
public PageData<EntityVersion> listVersions(TenantId tenantId, String branch, PageLink pageLink) { |
|||
return listVersions(tenantId, branch, (String) null, pageLink); |
|||
} |
|||
|
|||
@Override |
|||
public PageData<EntityVersion> listVersions(TenantId tenantId, String branch, EntityType entityType, PageLink pageLink) { |
|||
return listVersions(tenantId, branch, getRelativePath(entityType, null), pageLink); |
|||
} |
|||
|
|||
@Override |
|||
public PageData<EntityVersion> listVersions(TenantId tenantId, String branch, EntityId entityId, PageLink pageLink) { |
|||
return listVersions(tenantId, branch, getRelativePath(entityId.getEntityType(), entityId.getId().toString()), pageLink); |
|||
} |
|||
|
|||
@Override |
|||
public List<VersionedEntityInfo> listEntitiesAtVersion(TenantId tenantId, String branch, String versionId, EntityType entityType) { |
|||
try { |
|||
return gitRepositoryService.listEntitiesAtVersion(tenantId, branch, versionId, entityType != null ? getRelativePath(entityType, null) : null); |
|||
} catch (Exception e) { |
|||
//TODO: analyze and return meaningful exceptions that we can show to the client;
|
|||
throw new RuntimeException(e); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public List<VersionedEntityInfo> listEntitiesAtVersion(TenantId tenantId, String branch, String versionId) { |
|||
return listEntitiesAtVersion(tenantId, branch, versionId, null); |
|||
} |
|||
|
|||
@Override |
|||
public List<String> listBranches(TenantId tenantId) { |
|||
return gitRepositoryService.listBranches(tenantId); |
|||
} |
|||
|
|||
@Override |
|||
public List<EntityExportData<?>> getEntities(TenantId tenantId, String branch, String versionId, EntityType entityType, int offset, int limit) { |
|||
return listEntitiesAtVersion(tenantId, branch, versionId, entityType).stream() |
|||
.skip(offset).limit(limit) |
|||
.map(entityInfo -> getEntity(tenantId, versionId, entityInfo.getExternalId())) |
|||
.collect(Collectors.toList()); |
|||
} |
|||
|
|||
@Override |
|||
public EntityExportData<?> getEntity(TenantId tenantId, String versionId, EntityId entityId) { |
|||
try { |
|||
String entityDataJson = gitRepositoryService.getFileContentAtCommit(tenantId, |
|||
getRelativePath(entityId.getEntityType(), entityId.getId().toString()), versionId); |
|||
return jsonMapper.readValue(entityDataJson, EntityExportData.class); |
|||
} catch (Exception e) { |
|||
//TODO: analyze and return meaningful exceptions that we can show to the client;
|
|||
throw new RuntimeException(e); |
|||
} |
|||
} |
|||
|
|||
private EntitiesVersionControlSettings getVersionControlSettings(TenantId tenantId) { |
|||
AdminSettings adminSettings = adminSettingsService.findAdminSettingsByKey(tenantId, EntitiesVersionControlService.SETTINGS_KEY); |
|||
if (adminSettings != null) { |
|||
try { |
|||
return JacksonUtil.convertValue(adminSettings.getJsonValue(), EntitiesVersionControlSettings.class); |
|||
} catch (Exception e) { |
|||
throw new RuntimeException("Failed to load version control settings!", e); |
|||
} |
|||
} |
|||
return null; |
|||
} |
|||
|
|||
private PageData<EntityVersion> listVersions(TenantId tenantId, String branch, String path, PageLink pageLink) { |
|||
try { |
|||
return gitRepositoryService.listVersions(tenantId, branch, path, pageLink); |
|||
} catch (Exception e) { |
|||
//TODO: analyze and return meaningful exceptions that we can show to the client;
|
|||
throw new RuntimeException(e); |
|||
} |
|||
} |
|||
|
|||
private void doInsideLock(PendingCommit commit, Consumer<PendingCommit> r) { |
|||
var lock = getRepoLock(commit.getTenantId()); |
|||
lock.lock(); |
|||
try { |
|||
checkCommit(commit); |
|||
r.accept(commit); |
|||
} catch (Exception e) { |
|||
pendingCommitMap.remove(commit.getTenantId()); |
|||
gitRepositoryService.cleanUp(commit); |
|||
throw e; |
|||
} finally { |
|||
lock.unlock(); |
|||
} |
|||
} |
|||
|
|||
private <T> T executeInsideLock(PendingCommit commit, Function<PendingCommit, T> c) { |
|||
var lock = getRepoLock(commit.getTenantId()); |
|||
lock.lock(); |
|||
try { |
|||
checkCommit(commit); |
|||
return c.apply(commit); |
|||
} catch (Exception e) { |
|||
pendingCommitMap.remove(commit.getTenantId()); |
|||
gitRepositoryService.cleanUp(commit); |
|||
throw e; |
|||
} finally { |
|||
lock.unlock(); |
|||
} |
|||
} |
|||
|
|||
private void checkCommit(PendingCommit commit) { |
|||
PendingCommit existing = pendingCommitMap.get(commit.getTenantId()); |
|||
if (existing == null || !existing.getTxId().equals(commit.getTxId())) { |
|||
throw new ConcurrentModificationException(); |
|||
} |
|||
} |
|||
|
|||
private String getRelativePath(EntityType entityType, String entityId) { |
|||
String path = entityType.name().toLowerCase(); |
|||
if (entityId != null) { |
|||
path += "/" + entityId + ".json"; |
|||
} |
|||
return path; |
|||
} |
|||
|
|||
private Lock getRepoLock(TenantId tenantId) { |
|||
return tenantRepoLocks.computeIfAbsent(tenantId, t -> new ReentrantLock()); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,42 @@ |
|||
/** |
|||
* Copyright © 2016-2022 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.sync.vc; |
|||
|
|||
import com.google.common.util.concurrent.SettableFuture; |
|||
import lombok.Getter; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
|
|||
import java.util.UUID; |
|||
|
|||
@Getter |
|||
public class PendingGitRequest<T> { |
|||
|
|||
private final long createdTime; |
|||
private final UUID requestId; |
|||
private final TenantId tenantId; |
|||
private final SettableFuture<T> future; |
|||
|
|||
public PendingGitRequest(TenantId tenantId) { |
|||
this.createdTime = System.currentTimeMillis(); |
|||
this.requestId = UUID.randomUUID(); |
|||
this.tenantId = tenantId; |
|||
this.future = SettableFuture.create(); |
|||
} |
|||
|
|||
public boolean requiresSettings() { |
|||
return true; |
|||
} |
|||
} |
|||
@ -0,0 +1,29 @@ |
|||
/** |
|||
* Copyright © 2016-2022 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.sync.vc; |
|||
|
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.data.sync.vc.EntityVersion; |
|||
|
|||
import java.util.List; |
|||
|
|||
public class VoidGitRequest extends PendingGitRequest<Void> { |
|||
|
|||
public VoidGitRequest(TenantId tenantId) { |
|||
super(tenantId); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,45 @@ |
|||
/** |
|||
* Copyright © 2016-2022 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.dao.queue; |
|||
|
|||
import org.thingsboard.server.common.data.id.QueueId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.data.page.PageData; |
|||
import org.thingsboard.server.common.data.page.PageLink; |
|||
import org.thingsboard.server.common.data.queue.Queue; |
|||
|
|||
import java.util.List; |
|||
|
|||
public interface QueueService { |
|||
|
|||
Queue saveQueue(Queue queue); |
|||
|
|||
void deleteQueue(TenantId tenantId, QueueId queueId); |
|||
|
|||
List<Queue> findQueuesByTenantId(TenantId tenantId); |
|||
|
|||
PageData<Queue> findQueuesByTenantId(TenantId tenantId, PageLink pageLink); |
|||
|
|||
List<Queue> findAllQueues(); |
|||
|
|||
Queue findQueueById(TenantId tenantId, QueueId queueId); |
|||
|
|||
Queue findQueueByTenantIdAndName(TenantId tenantId, String name); |
|||
|
|||
Queue findQueueByTenantIdAndNameInternal(TenantId tenantId, String queueName); |
|||
|
|||
void deleteQueuesByTenantId(TenantId tenantId); |
|||
} |
|||
@ -0,0 +1,41 @@ |
|||
/** |
|||
* Copyright © 2016-2022 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.common.data.id; |
|||
|
|||
import com.fasterxml.jackson.annotation.JsonCreator; |
|||
import com.fasterxml.jackson.annotation.JsonProperty; |
|||
import org.thingsboard.server.common.data.EntityType; |
|||
|
|||
import java.util.UUID; |
|||
|
|||
public class QueueId extends UUIDBased implements EntityId { |
|||
|
|||
private static final long serialVersionUID = 1L; |
|||
|
|||
@JsonCreator |
|||
public QueueId(@JsonProperty("id") UUID id) { |
|||
super(id); |
|||
} |
|||
|
|||
public static QueueId fromString(String queueId) { |
|||
return new QueueId(UUID.fromString(queueId)); |
|||
} |
|||
|
|||
@Override |
|||
public EntityType getEntityType() { |
|||
return EntityType.QUEUE; |
|||
} |
|||
} |
|||
@ -0,0 +1,27 @@ |
|||
/** |
|||
* Copyright © 2016-2022 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.common.data.queue; |
|||
|
|||
import lombok.Data; |
|||
|
|||
@Data |
|||
public class ProcessingStrategy { |
|||
private ProcessingStrategyType type; |
|||
private int retries; |
|||
private double failurePercentage; |
|||
private long pauseBetweenRetries; |
|||
private long maxPauseBetweenRetries; |
|||
} |
|||
@ -0,0 +1,36 @@ |
|||
/** |
|||
* Copyright © 2016-2022 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
|
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.common.data.queue; |
|||
|
|||
public enum ProcessingStrategyType { |
|||
SKIP_ALL_FAILURES, SKIP_ALL_FAILURES_AND_TIMED_OUT, RETRY_ALL, RETRY_FAILED, RETRY_TIMED_OUT, RETRY_FAILED_AND_TIMED_OUT |
|||
} |
|||
@ -0,0 +1,61 @@ |
|||
/** |
|||
* Copyright © 2016-2022 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.common.data.queue; |
|||
|
|||
import lombok.Data; |
|||
import org.thingsboard.server.common.data.HasName; |
|||
import org.thingsboard.server.common.data.HasTenantId; |
|||
import org.thingsboard.server.common.data.SearchTextBasedWithAdditionalInfo; |
|||
import org.thingsboard.server.common.data.id.QueueId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.data.tenant.profile.TenantProfileQueueConfiguration; |
|||
|
|||
@Data |
|||
public class Queue extends SearchTextBasedWithAdditionalInfo<QueueId> implements HasName, HasTenantId { |
|||
private TenantId tenantId; |
|||
private String name; |
|||
private String topic; |
|||
private int pollInterval; |
|||
private int partitions; |
|||
private boolean consumerPerPartition; |
|||
private long packProcessingTimeout; |
|||
private SubmitStrategy submitStrategy; |
|||
private ProcessingStrategy processingStrategy; |
|||
|
|||
public Queue() { |
|||
} |
|||
|
|||
public Queue(QueueId id) { |
|||
super(id); |
|||
} |
|||
|
|||
public Queue(TenantId tenantId, TenantProfileQueueConfiguration queueConfiguration) { |
|||
this.tenantId = tenantId; |
|||
this.name = queueConfiguration.getName(); |
|||
this.topic = queueConfiguration.getTopic(); |
|||
this.pollInterval = queueConfiguration.getPollInterval(); |
|||
this.partitions = queueConfiguration.getPartitions(); |
|||
this.consumerPerPartition = queueConfiguration.isConsumerPerPartition(); |
|||
this.packProcessingTimeout = queueConfiguration.getPackProcessingTimeout(); |
|||
this.submitStrategy = queueConfiguration.getSubmitStrategy(); |
|||
this.processingStrategy = queueConfiguration.getProcessingStrategy(); |
|||
} |
|||
|
|||
@Override |
|||
public String getSearchText() { |
|||
return getName(); |
|||
} |
|||
} |
|||
@ -0,0 +1,24 @@ |
|||
/** |
|||
* Copyright © 2016-2022 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.common.data.queue; |
|||
|
|||
import lombok.Data; |
|||
|
|||
@Data |
|||
public class SubmitStrategy { |
|||
private SubmitStrategyType type; |
|||
private int batchSize; |
|||
} |
|||
@ -0,0 +1,20 @@ |
|||
/** |
|||
* Copyright © 2016-2022 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.common.data.queue; |
|||
|
|||
public enum SubmitStrategyType { |
|||
BURST, BATCH, SEQUENTIAL_BY_ORIGINATOR, SEQUENTIAL_BY_TENANT, SEQUENTIAL |
|||
} |
|||
Some files were not shown because too many files changed in this diff
Loading…
Reference in new issue