261 changed files with 5049 additions and 2144 deletions
@ -0,0 +1,37 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors; |
|||
|
|||
import lombok.RequiredArgsConstructor; |
|||
import org.thingsboard.server.common.data.EntityType; |
|||
import org.thingsboard.server.common.data.id.EntityId; |
|||
|
|||
import java.util.function.Predicate; |
|||
|
|||
@RequiredArgsConstructor |
|||
public class TbEntityTypeActorIdPredicate implements Predicate<TbActorId> { |
|||
|
|||
private final EntityType entityType; |
|||
|
|||
@Override |
|||
public boolean test(TbActorId actorId) { |
|||
return actorId instanceof TbEntityActorId && testEntityId(((TbEntityActorId) actorId).getEntityId()); |
|||
} |
|||
|
|||
protected boolean testEntityId(EntityId entityId) { |
|||
return entityId.getEntityType().equals(entityType); |
|||
} |
|||
} |
|||
@ -0,0 +1,323 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.edge; |
|||
|
|||
import com.fasterxml.jackson.databind.JsonNode; |
|||
import com.fasterxml.jackson.databind.ObjectMapper; |
|||
import com.google.common.util.concurrent.Futures; |
|||
import com.google.common.util.concurrent.ListenableFuture; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.stereotype.Service; |
|||
import org.thingsboard.server.common.data.EntityType; |
|||
import org.thingsboard.server.common.data.alarm.Alarm; |
|||
import org.thingsboard.server.common.data.audit.ActionType; |
|||
import org.thingsboard.server.common.data.edge.Edge; |
|||
import org.thingsboard.server.common.data.edge.EdgeEvent; |
|||
import org.thingsboard.server.common.data.edge.EdgeEventType; |
|||
import org.thingsboard.server.common.data.id.AlarmId; |
|||
import org.thingsboard.server.common.data.id.DashboardId; |
|||
import org.thingsboard.server.common.data.id.EdgeId; |
|||
import org.thingsboard.server.common.data.id.EntityId; |
|||
import org.thingsboard.server.common.data.id.EntityIdFactory; |
|||
import org.thingsboard.server.common.data.id.IdBased; |
|||
import org.thingsboard.server.common.data.id.RuleChainId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.data.page.TextPageData; |
|||
import org.thingsboard.server.common.data.page.TextPageLink; |
|||
import org.thingsboard.server.common.data.page.TimePageData; |
|||
import org.thingsboard.server.common.data.page.TimePageLink; |
|||
import org.thingsboard.server.common.data.relation.EntityRelation; |
|||
import org.thingsboard.server.common.data.relation.RelationTypeGroup; |
|||
import org.thingsboard.server.common.data.rule.RuleChainMetaData; |
|||
import org.thingsboard.server.common.msg.queue.TbCallback; |
|||
import org.thingsboard.server.dao.alarm.AlarmService; |
|||
import org.thingsboard.server.dao.edge.EdgeEventService; |
|||
import org.thingsboard.server.dao.edge.EdgeService; |
|||
import org.thingsboard.server.dao.relation.RelationService; |
|||
import org.thingsboard.server.dao.rule.RuleChainService; |
|||
import org.thingsboard.server.gen.transport.TransportProtos; |
|||
import org.thingsboard.server.queue.util.TbCoreComponent; |
|||
import org.thingsboard.server.service.executors.DbCallbackExecutorService; |
|||
|
|||
import javax.annotation.PostConstruct; |
|||
import javax.annotation.PreDestroy; |
|||
import java.io.IOException; |
|||
import java.util.ArrayList; |
|||
import java.util.Collections; |
|||
import java.util.HashSet; |
|||
import java.util.List; |
|||
import java.util.Set; |
|||
import java.util.UUID; |
|||
import java.util.concurrent.ExecutorService; |
|||
import java.util.concurrent.Executors; |
|||
import java.util.stream.Collectors; |
|||
|
|||
@Service |
|||
@TbCoreComponent |
|||
@Slf4j |
|||
public class DefaultEdgeNotificationService implements EdgeNotificationService { |
|||
|
|||
private static final ObjectMapper mapper = new ObjectMapper(); |
|||
|
|||
@Autowired |
|||
private EdgeService edgeService; |
|||
|
|||
@Autowired |
|||
private RuleChainService ruleChainService; |
|||
|
|||
@Autowired |
|||
private AlarmService alarmService; |
|||
|
|||
@Autowired |
|||
private RelationService relationService; |
|||
|
|||
@Autowired |
|||
private EdgeEventService edgeEventService; |
|||
|
|||
@Autowired |
|||
private DbCallbackExecutorService dbCallbackExecutorService; |
|||
|
|||
private ExecutorService tsCallBackExecutor; |
|||
|
|||
@PostConstruct |
|||
public void initExecutor() { |
|||
tsCallBackExecutor = Executors.newSingleThreadExecutor(); |
|||
} |
|||
|
|||
@PreDestroy |
|||
public void shutdownExecutor() { |
|||
if (tsCallBackExecutor != null) { |
|||
tsCallBackExecutor.shutdownNow(); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public TimePageData<EdgeEvent> findEdgeEvents(TenantId tenantId, EdgeId edgeId, TimePageLink pageLink) { |
|||
return edgeEventService.findEdgeEvents(tenantId, edgeId, pageLink); |
|||
} |
|||
|
|||
@Override |
|||
public Edge setEdgeRootRuleChain(TenantId tenantId, Edge edge, RuleChainId ruleChainId) throws IOException { |
|||
edge.setRootRuleChainId(ruleChainId); |
|||
Edge savedEdge = edgeService.saveEdge(edge); |
|||
saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.RULE_CHAIN, ActionType.UPDATED, ruleChainId, null); |
|||
return savedEdge; |
|||
} |
|||
|
|||
private void saveEdgeEvent(TenantId tenantId, |
|||
EdgeId edgeId, |
|||
EdgeEventType edgeEventType, |
|||
ActionType edgeEventAction, |
|||
EntityId entityId, |
|||
JsonNode entityBody) { |
|||
log.debug("Pushing edge event to edge queue. tenantId [{}], edgeId [{}], edgeEventType [{}], edgeEventAction[{}], entityId [{}], entityBody [{}]", |
|||
tenantId, edgeId, edgeEventType, edgeEventAction, entityId, entityBody); |
|||
|
|||
EdgeEvent edgeEvent = new EdgeEvent(); |
|||
edgeEvent.setEdgeId(edgeId); |
|||
edgeEvent.setTenantId(tenantId); |
|||
edgeEvent.setEdgeEventType(edgeEventType); |
|||
edgeEvent.setEdgeEventAction(edgeEventAction.name()); |
|||
if (entityId != null) { |
|||
edgeEvent.setEntityId(entityId.getId()); |
|||
} |
|||
edgeEvent.setEntityBody(entityBody); |
|||
edgeEventService.saveAsync(edgeEvent); |
|||
} |
|||
|
|||
@Override |
|||
public void pushNotificationToEdge(TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg, TbCallback callback) { |
|||
try { |
|||
TenantId tenantId = new TenantId(new UUID(edgeNotificationMsg.getTenantIdMSB(), edgeNotificationMsg.getTenantIdLSB())); |
|||
EdgeEventType edgeEventType = EdgeEventType.valueOf(edgeNotificationMsg.getEdgeEventType()); |
|||
switch (edgeEventType) { |
|||
// TODO: voba - handle edge updates
|
|||
// case EDGE:
|
|||
case ASSET: |
|||
case DEVICE: |
|||
case ENTITY_VIEW: |
|||
case DASHBOARD: |
|||
case RULE_CHAIN: |
|||
processEntities(tenantId, edgeNotificationMsg); |
|||
break; |
|||
case ALARM: |
|||
processAlarm(tenantId, edgeNotificationMsg); |
|||
break; |
|||
case RELATION: |
|||
processRelation(tenantId, edgeNotificationMsg); |
|||
break; |
|||
default: |
|||
log.debug("Edge event type [{}] is not designed to be pushed to edge", edgeEventType); |
|||
} |
|||
} catch (Exception e) { |
|||
callback.onFailure(e); |
|||
log.error("Can't push to edge updates, edgeNotificationMsg [{}]", edgeNotificationMsg, e); |
|||
} finally { |
|||
callback.onSuccess(); |
|||
} |
|||
} |
|||
|
|||
private void processEntities(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) { |
|||
ActionType edgeEventActionType = ActionType.valueOf(edgeNotificationMsg.getEdgeEventAction()); |
|||
EdgeEventType edgeEventType = EdgeEventType.valueOf(edgeNotificationMsg.getEdgeEventType()); |
|||
EntityId entityId = EntityIdFactory.getByEdgeEventTypeAndUuid(edgeEventType, new UUID(edgeNotificationMsg.getEntityIdMSB(), edgeNotificationMsg.getEntityIdLSB())); |
|||
switch (edgeEventActionType) { |
|||
// TODO: voba - ADDED is not required for CE version ?
|
|||
// case ADDED:
|
|||
case UPDATED: |
|||
ListenableFuture<List<EdgeId>> edgeIdsFuture = findRelatedEdgeIdsByEntityId(tenantId, entityId); |
|||
Futures.transform(edgeIdsFuture, edgeIds -> { |
|||
if (edgeIds != null && !edgeIds.isEmpty()) { |
|||
for (EdgeId edgeId : edgeIds) { |
|||
try { |
|||
saveEdgeEvent(tenantId, edgeId, edgeEventType, edgeEventActionType, entityId, null); |
|||
if (edgeEventType.equals(EdgeEventType.RULE_CHAIN)) { |
|||
RuleChainMetaData ruleChainMetaData = ruleChainService.loadRuleChainMetaData(tenantId, new RuleChainId(entityId.getId())); |
|||
saveEdgeEvent(tenantId, edgeId, EdgeEventType.RULE_CHAIN_METADATA, edgeEventActionType, ruleChainMetaData.getRuleChainId(), null); |
|||
} |
|||
} catch (Exception e) { |
|||
log.error("[{}] Failed to push event to edge, edgeId [{}], edgeEventType [{}], edgeEventActionType [{}], entityId [{}]", |
|||
tenantId, edgeId, edgeEventType, edgeEventActionType, entityId, e); |
|||
} |
|||
} |
|||
} |
|||
return null; |
|||
}, dbCallbackExecutorService); |
|||
break; |
|||
case DELETED: |
|||
TextPageData<Edge> edgesByTenantId = edgeService.findEdgesByTenantId(tenantId, new TextPageLink(Integer.MAX_VALUE)); |
|||
if (edgesByTenantId != null && edgesByTenantId.getData() != null && !edgesByTenantId.getData().isEmpty()) { |
|||
for (Edge edge : edgesByTenantId.getData()) { |
|||
saveEdgeEvent(tenantId, edge.getId(), edgeEventType, edgeEventActionType, entityId, null); |
|||
} |
|||
} |
|||
break; |
|||
case ASSIGNED_TO_EDGE: |
|||
case UNASSIGNED_FROM_EDGE: |
|||
EdgeId edgeId = new EdgeId(new UUID(edgeNotificationMsg.getEdgeIdMSB(), edgeNotificationMsg.getEdgeIdLSB())); |
|||
saveEdgeEvent(tenantId, edgeId, edgeEventType, edgeEventActionType, entityId, null); |
|||
break; |
|||
case RELATIONS_DELETED: |
|||
// TODO: voba - add support for relations deleted
|
|||
break; |
|||
} |
|||
} |
|||
|
|||
private void processAlarm(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) { |
|||
AlarmId alarmId = new AlarmId(new UUID(edgeNotificationMsg.getEntityIdMSB(), edgeNotificationMsg.getEntityIdLSB())); |
|||
ListenableFuture<Alarm> alarmFuture = alarmService.findAlarmByIdAsync(tenantId, alarmId); |
|||
Futures.transform(alarmFuture, alarm -> { |
|||
if (alarm != null) { |
|||
EdgeEventType edgeEventType = getEdgeQueueTypeByEntityType(alarm.getOriginator().getEntityType()); |
|||
if (edgeEventType != null) { |
|||
ListenableFuture<List<EdgeId>> relatedEdgeIdsByEntityIdFuture = findRelatedEdgeIdsByEntityId(tenantId, alarm.getOriginator()); |
|||
Futures.transform(relatedEdgeIdsByEntityIdFuture, relatedEdgeIdsByEntityId -> { |
|||
if (relatedEdgeIdsByEntityId != null) { |
|||
for (EdgeId edgeId : relatedEdgeIdsByEntityId) { |
|||
saveEdgeEvent(tenantId, |
|||
edgeId, |
|||
EdgeEventType.ALARM, |
|||
ActionType.valueOf(edgeNotificationMsg.getEdgeEventAction()), |
|||
alarmId, |
|||
null); |
|||
} |
|||
} |
|||
return null; |
|||
}, dbCallbackExecutorService); |
|||
} |
|||
} |
|||
return null; |
|||
}, dbCallbackExecutorService); |
|||
} |
|||
|
|||
private void processRelation(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) { |
|||
EntityRelation entityRelation = mapper.convertValue(edgeNotificationMsg.getEntityBody(), EntityRelation.class); |
|||
List<ListenableFuture<List<EdgeId>>> futures = new ArrayList<>(); |
|||
futures.add(findRelatedEdgeIdsByEntityId(tenantId, entityRelation.getTo())); |
|||
futures.add(findRelatedEdgeIdsByEntityId(tenantId, entityRelation.getFrom())); |
|||
ListenableFuture<List<List<EdgeId>>> combinedFuture = Futures.allAsList(futures); |
|||
Futures.transform(combinedFuture, listOfListsEdgeIds -> { |
|||
Set<EdgeId> uniqueEdgeIds = new HashSet<>(); |
|||
if (listOfListsEdgeIds != null && !listOfListsEdgeIds.isEmpty()) { |
|||
for (List<EdgeId> listOfListsEdgeId : listOfListsEdgeIds) { |
|||
if (listOfListsEdgeId != null) { |
|||
uniqueEdgeIds.addAll(listOfListsEdgeId); |
|||
} |
|||
} |
|||
} |
|||
if (!uniqueEdgeIds.isEmpty()) { |
|||
for (EdgeId edgeId : uniqueEdgeIds) { |
|||
saveEdgeEvent(tenantId, |
|||
edgeId, |
|||
EdgeEventType.RELATION, |
|||
ActionType.valueOf(edgeNotificationMsg.getEdgeEventAction()), |
|||
null, |
|||
mapper.valueToTree(entityRelation)); |
|||
} |
|||
} |
|||
return null; |
|||
}, dbCallbackExecutorService); |
|||
} |
|||
|
|||
private ListenableFuture<List<EdgeId>> findRelatedEdgeIdsByEntityId(TenantId tenantId, EntityId entityId) { |
|||
switch (entityId.getEntityType()) { |
|||
case DEVICE: |
|||
case ASSET: |
|||
case ENTITY_VIEW: |
|||
ListenableFuture<List<EntityRelation>> originatorEdgeRelationsFuture = relationService.findByToAndTypeAsync(tenantId, entityId, EntityRelation.CONTAINS_TYPE, RelationTypeGroup.EDGE); |
|||
return Futures.transform(originatorEdgeRelationsFuture, originatorEdgeRelations -> { |
|||
if (originatorEdgeRelations != null && originatorEdgeRelations.size() > 0) { |
|||
return Collections.singletonList(new EdgeId(originatorEdgeRelations.get(0).getFrom().getId())); |
|||
} else { |
|||
return Collections.emptyList(); |
|||
} |
|||
}, dbCallbackExecutorService); |
|||
case DASHBOARD: |
|||
return convertToEdgeIds(edgeService.findEdgesByTenantIdAndDashboardId(tenantId, new DashboardId(entityId.getId()))); |
|||
case RULE_CHAIN: |
|||
return convertToEdgeIds(edgeService.findEdgesByTenantIdAndRuleChainId(tenantId, new RuleChainId(entityId.getId()))); |
|||
default: |
|||
return Futures.immediateFuture(Collections.emptyList()); |
|||
} |
|||
} |
|||
|
|||
private ListenableFuture<List<EdgeId>> convertToEdgeIds(ListenableFuture<List<Edge>> future) { |
|||
return Futures.transform(future, edges -> { |
|||
if (edges != null && !edges.isEmpty()) { |
|||
return edges.stream().map(IdBased::getId).collect(Collectors.toList()); |
|||
} else { |
|||
return Collections.emptyList(); |
|||
} |
|||
}, dbCallbackExecutorService); |
|||
} |
|||
|
|||
private EdgeEventType getEdgeQueueTypeByEntityType(EntityType entityType) { |
|||
switch (entityType) { |
|||
case DEVICE: |
|||
return EdgeEventType.DEVICE; |
|||
case ASSET: |
|||
return EdgeEventType.ASSET; |
|||
case ENTITY_VIEW: |
|||
return EdgeEventType.ENTITY_VIEW; |
|||
default: |
|||
log.info("Unsupported entity type: [{}]", entityType); |
|||
return null; |
|||
} |
|||
} |
|||
} |
|||
|
|||
|
|||
@ -0,0 +1,38 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.edge; |
|||
|
|||
import org.thingsboard.server.common.data.Event; |
|||
import org.thingsboard.server.common.data.edge.Edge; |
|||
import org.thingsboard.server.common.data.edge.EdgeEvent; |
|||
import org.thingsboard.server.common.data.id.EdgeId; |
|||
import org.thingsboard.server.common.data.id.RuleChainId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.data.page.TimePageData; |
|||
import org.thingsboard.server.common.data.page.TimePageLink; |
|||
import org.thingsboard.server.common.msg.queue.TbCallback; |
|||
import org.thingsboard.server.gen.transport.TransportProtos; |
|||
|
|||
import java.io.IOException; |
|||
|
|||
public interface EdgeNotificationService { |
|||
|
|||
TimePageData<EdgeEvent> findEdgeEvents(TenantId tenantId, EdgeId edgeId, TimePageLink pageLink); |
|||
|
|||
Edge setEdgeRootRuleChain(TenantId tenantId, Edge edge, RuleChainId ruleChainId) throws IOException; |
|||
|
|||
void pushNotificationToEdge(TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg, TbCallback callback); |
|||
} |
|||
@ -0,0 +1,56 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.edge.rpc.constructor; |
|||
|
|||
import com.google.gson.JsonElement; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.stereotype.Component; |
|||
import org.thingsboard.server.common.data.audit.ActionType; |
|||
import org.thingsboard.server.common.data.id.EntityId; |
|||
import org.thingsboard.server.common.transport.adaptor.JsonConverter; |
|||
import org.thingsboard.server.gen.edge.EntityDataProto; |
|||
|
|||
@Component |
|||
@Slf4j |
|||
public class EntityDataMsgConstructor { |
|||
|
|||
public EntityDataProto constructEntityDataMsg(EntityId entityId, ActionType actionType, JsonElement entityData) { |
|||
EntityDataProto.Builder builder = EntityDataProto.newBuilder() |
|||
.setEntityIdMSB(entityId.getId().getMostSignificantBits()) |
|||
.setEntityIdLSB(entityId.getId().getLeastSignificantBits()) |
|||
.setEntityType(entityId.getEntityType().name()); |
|||
switch (actionType) { |
|||
case TIMESERIES_UPDATED: |
|||
try { |
|||
builder.setPostTelemetryMsg(JsonConverter.convertToTelemetryProto(entityData)); |
|||
} catch (Exception e) { |
|||
log.warn("Can't convert to telemetry proto, entityData [{}]", entityData, e); |
|||
} |
|||
break; |
|||
case ATTRIBUTES_UPDATED: |
|||
try { |
|||
builder.setPostAttributesMsg(JsonConverter.convertToAttributesProto(entityData)); |
|||
} catch (Exception e) { |
|||
log.warn("Can't convert to attributes proto, entityData [{}]", entityData, e); |
|||
} |
|||
break; |
|||
// TODO: voba - add support for attribute delete
|
|||
// case ATTRIBUTES_DELETED:
|
|||
} |
|||
return builder.build(); |
|||
} |
|||
|
|||
} |
|||
@ -1,139 +0,0 @@ |
|||
# |
|||
# Copyright © 2016-2020 The Thingsboard Authors |
|||
# |
|||
# Licensed under the Apache License, Version 2.0 (the "License"); |
|||
# you may not use this file except in compliance with the License. |
|||
# You may obtain a copy of the License at |
|||
# |
|||
# http://www.apache.org/licenses/LICENSE-2.0 |
|||
# |
|||
# Unless required by applicable law or agreed to in writing, software |
|||
# distributed under the License is distributed on an "AS IS" BASIS, |
|||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
# See the License for the specific language governing permissions and |
|||
# limitations under the License. |
|||
# |
|||
|
|||
|
|||
akka { |
|||
# JVM shutdown, System.exit(-1), in case of a fatal error, |
|||
# such as OutOfMemoryError |
|||
jvm-exit-on-fatal-error = off |
|||
loglevel = "INFO" |
|||
loggers = ["akka.event.slf4j.Slf4jLogger"] |
|||
} |
|||
|
|||
# This dispatcher is used for app |
|||
app-dispatcher { |
|||
type = Dispatcher |
|||
executor = "fork-join-executor" |
|||
fork-join-executor { |
|||
# Min number of threads to cap factor-based parallelism number to |
|||
parallelism-min = 1 |
|||
# Max number of threads to cap factor-based parallelism number to |
|||
parallelism-max = 1 |
|||
|
|||
# The parallelism factor is used to determine thread pool size using the |
|||
# following formula: ceil(available processors * factor). Resulting size |
|||
# is then bounded by the parallelism-min and parallelism-max values. |
|||
parallelism-factor = 1.0 |
|||
} |
|||
# How long time the dispatcher will wait for new actors until it shuts down |
|||
shutdown-timeout = 1s |
|||
|
|||
# Throughput defines the number of messages that are processed in a batch |
|||
# before the thread is returned to the pool. Set to 1 for as fair as possible. |
|||
throughput = 5 |
|||
} |
|||
|
|||
# This dispatcher is used for rpc actors |
|||
rpc-dispatcher { |
|||
type = Dispatcher |
|||
executor = "fork-join-executor" |
|||
fork-join-executor { |
|||
# Min number of threads to cap factor-based parallelism number to |
|||
parallelism-min = 2 |
|||
# Max number of threads to cap factor-based parallelism number to |
|||
parallelism-max = 8 |
|||
|
|||
# The parallelism factor is used to determine thread pool size using the |
|||
# following formula: ceil(available processors * factor). Resulting size |
|||
# is then bounded by the parallelism-min and parallelism-max values. |
|||
parallelism-factor = 0.5 |
|||
} |
|||
# How long time the dispatcher will wait for new actors until it shuts down |
|||
shutdown-timeout = 1s |
|||
|
|||
# Throughput defines the number of messages that are processed in a batch |
|||
# before the thread is returned to the pool. Set to 1 for as fair as possible. |
|||
throughput = 5 |
|||
} |
|||
|
|||
# This dispatcher is used for auth |
|||
core-dispatcher { |
|||
type = Dispatcher |
|||
executor = "fork-join-executor" |
|||
fork-join-executor { |
|||
# Min number of threads to cap factor-based parallelism number to |
|||
parallelism-min = 2 |
|||
# Max number of threads to cap factor-based parallelism number to |
|||
parallelism-max = 12 |
|||
|
|||
# The parallelism factor is used to determine thread pool size using the |
|||
# following formula: ceil(available processors * factor). Resulting size |
|||
# is then bounded by the parallelism-min and parallelism-max values. |
|||
parallelism-factor = 0.25 |
|||
} |
|||
# How long time the dispatcher will wait for new actors until it shuts down |
|||
shutdown-timeout = 1s |
|||
|
|||
# Throughput defines the number of messages that are processed in a batch |
|||
# before the thread is returned to the pool. Set to 1 for as fair as possible. |
|||
throughput = 5 |
|||
} |
|||
|
|||
# This dispatcher is used for system rule chains and rule node actors |
|||
system-rule-dispatcher { |
|||
type = Dispatcher |
|||
executor = "fork-join-executor" |
|||
fork-join-executor { |
|||
# Min number of threads to cap factor-based parallelism number to |
|||
parallelism-min = 2 |
|||
# Max number of threads to cap factor-based parallelism number to |
|||
parallelism-max = 12 |
|||
|
|||
# The parallelism factor is used to determine thread pool size using the |
|||
# following formula: ceil(available processors * factor). Resulting size |
|||
# is then bounded by the parallelism-min and parallelism-max values. |
|||
parallelism-factor = 0.25 |
|||
} |
|||
# How long time the dispatcher will wait for new actors until it shuts down |
|||
shutdown-timeout = 1s |
|||
|
|||
# Throughput defines the number of messages that are processed in a batch |
|||
# before the thread is returned to the pool. Set to 1 for as fair as possible. |
|||
throughput = 5 |
|||
} |
|||
|
|||
# This dispatcher is used for tenant rule chains and rule node actors |
|||
rule-dispatcher { |
|||
type = Dispatcher |
|||
executor = "fork-join-executor" |
|||
fork-join-executor { |
|||
# Min number of threads to cap factor-based parallelism number to |
|||
parallelism-min = 2 |
|||
# Max number of threads to cap factor-based parallelism number to |
|||
parallelism-max = 12 |
|||
|
|||
# The parallelism factor is used to determine thread pool size using the |
|||
# following formula: ceil(available processors * factor). Resulting size |
|||
# is then bounded by the parallelism-min and parallelism-max values. |
|||
parallelism-factor = 0.25 |
|||
} |
|||
# How long time the dispatcher will wait for new actors until it shuts down |
|||
shutdown-timeout = 1s |
|||
|
|||
# Throughput defines the number of messages that are processed in a batch |
|||
# before the thread is returned to the pool. Set to 1 for as fair as possible. |
|||
throughput = 5 |
|||
} |
|||
@ -0,0 +1,75 @@ |
|||
<!-- |
|||
|
|||
Copyright © 2016-2020 The Thingsboard Authors |
|||
|
|||
Licensed under the Apache License, Version 2.0 (the "License"); |
|||
you may not use this file except in compliance with the License. |
|||
You may obtain a copy of the License at |
|||
|
|||
http://www.apache.org/licenses/LICENSE-2.0 |
|||
|
|||
Unless required by applicable law or agreed to in writing, software |
|||
distributed under the License is distributed on an "AS IS" BASIS, |
|||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
See the License for the specific language governing permissions and |
|||
limitations under the License. |
|||
|
|||
--> |
|||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" |
|||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> |
|||
<modelVersion>4.0.0</modelVersion> |
|||
<parent> |
|||
<groupId>org.thingsboard</groupId> |
|||
<version>2.5.3-SNAPSHOT</version> |
|||
<artifactId>common</artifactId> |
|||
</parent> |
|||
<groupId>org.thingsboard.common</groupId> |
|||
<artifactId>actor</artifactId> |
|||
<packaging>jar</packaging> |
|||
|
|||
<name>Thingsboard Actor system</name> |
|||
<url>https://thingsboard.io</url> |
|||
|
|||
<properties> |
|||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> |
|||
<main.dir>${basedir}/../..</main.dir> |
|||
</properties> |
|||
|
|||
<dependencies> |
|||
<dependency> |
|||
<groupId>org.thingsboard.common</groupId> |
|||
<artifactId>util</artifactId> |
|||
</dependency> |
|||
<dependency> |
|||
<groupId>org.thingsboard.common</groupId> |
|||
<artifactId>message</artifactId> |
|||
</dependency> |
|||
<dependency> |
|||
<groupId>org.slf4j</groupId> |
|||
<artifactId>slf4j-api</artifactId> |
|||
</dependency> |
|||
<dependency> |
|||
<groupId>org.slf4j</groupId> |
|||
<artifactId>log4j-over-slf4j</artifactId> |
|||
</dependency> |
|||
<dependency> |
|||
<groupId>ch.qos.logback</groupId> |
|||
<artifactId>logback-core</artifactId> |
|||
</dependency> |
|||
<dependency> |
|||
<groupId>ch.qos.logback</groupId> |
|||
<artifactId>logback-classic</artifactId> |
|||
</dependency> |
|||
<dependency> |
|||
<groupId>junit</groupId> |
|||
<artifactId>junit</artifactId> |
|||
<scope>test</scope> |
|||
</dependency> |
|||
<dependency> |
|||
<groupId>org.mockito</groupId> |
|||
<artifactId>mockito-all</artifactId> |
|||
<scope>test</scope> |
|||
</dependency> |
|||
</dependencies> |
|||
|
|||
</project> |
|||
@ -0,0 +1,34 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors; |
|||
|
|||
import lombok.Getter; |
|||
|
|||
public abstract class AbstractTbActor implements TbActor { |
|||
|
|||
@Getter |
|||
protected TbActorCtx ctx; |
|||
|
|||
@Override |
|||
public void init(TbActorCtx ctx) throws TbActorException { |
|||
this.ctx = ctx; |
|||
} |
|||
|
|||
@Override |
|||
public TbActorRef getActorRef() { |
|||
return ctx; |
|||
} |
|||
} |
|||
@ -0,0 +1,215 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors; |
|||
|
|||
import lombok.Data; |
|||
import lombok.Getter; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.thingsboard.common.util.ThingsBoardThreadFactory; |
|||
import org.thingsboard.server.common.msg.TbActorMsg; |
|||
|
|||
import java.util.Collections; |
|||
import java.util.List; |
|||
import java.util.Set; |
|||
import java.util.concurrent.ConcurrentHashMap; |
|||
import java.util.concurrent.ConcurrentMap; |
|||
import java.util.concurrent.ExecutorService; |
|||
import java.util.concurrent.Executors; |
|||
import java.util.concurrent.ScheduledExecutorService; |
|||
import java.util.concurrent.TimeUnit; |
|||
import java.util.concurrent.locks.Lock; |
|||
import java.util.concurrent.locks.ReentrantLock; |
|||
import java.util.function.Predicate; |
|||
import java.util.stream.Collectors; |
|||
|
|||
@Slf4j |
|||
@Data |
|||
public class DefaultTbActorSystem implements TbActorSystem { |
|||
|
|||
private final ConcurrentMap<String, Dispatcher> dispatchers = new ConcurrentHashMap<>(); |
|||
private final ConcurrentMap<TbActorId, TbActorMailbox> actors = new ConcurrentHashMap<>(); |
|||
private final ConcurrentMap<TbActorId, ReentrantLock> actorCreationLocks = new ConcurrentHashMap<>(); |
|||
private final ConcurrentMap<TbActorId, Set<TbActorId>> parentChildMap = new ConcurrentHashMap<>(); |
|||
|
|||
@Getter |
|||
private final TbActorSystemSettings settings; |
|||
@Getter |
|||
private final ScheduledExecutorService scheduler; |
|||
|
|||
public DefaultTbActorSystem(TbActorSystemSettings settings) { |
|||
this.settings = settings; |
|||
this.scheduler = Executors.newScheduledThreadPool(settings.getSchedulerPoolSize(), ThingsBoardThreadFactory.forName("actor-system-scheduler")); |
|||
} |
|||
|
|||
@Override |
|||
public void createDispatcher(String dispatcherId, ExecutorService executor) { |
|||
Dispatcher current = dispatchers.putIfAbsent(dispatcherId, new Dispatcher(dispatcherId, executor)); |
|||
if (current != null) { |
|||
throw new RuntimeException("Dispatcher with id [" + dispatcherId + "] is already registered!"); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public void destroyDispatcher(String dispatcherId) { |
|||
Dispatcher dispatcher = dispatchers.remove(dispatcherId); |
|||
if (dispatcher != null) { |
|||
dispatcher.getExecutor().shutdownNow(); |
|||
} else { |
|||
throw new RuntimeException("Dispatcher with id [" + dispatcherId + "] is not registered!"); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public TbActorRef getActor(TbActorId actorId) { |
|||
return actors.get(actorId); |
|||
} |
|||
|
|||
@Override |
|||
public TbActorRef createRootActor(String dispatcherId, TbActorCreator creator) { |
|||
return createActor(dispatcherId, creator, null); |
|||
} |
|||
|
|||
@Override |
|||
public TbActorRef createChildActor(String dispatcherId, TbActorCreator creator, TbActorId parent) { |
|||
return createActor(dispatcherId, creator, parent); |
|||
} |
|||
|
|||
private TbActorRef createActor(String dispatcherId, TbActorCreator creator, TbActorId parent) { |
|||
Dispatcher dispatcher = dispatchers.get(dispatcherId); |
|||
if (dispatcher == null) { |
|||
log.warn("Dispatcher with id [{}] is not registered!", dispatcherId); |
|||
throw new RuntimeException("Dispatcher with id [" + dispatcherId + "] is not registered!"); |
|||
} |
|||
|
|||
TbActorId actorId = creator.createActorId(); |
|||
TbActorMailbox actorMailbox = actors.get(actorId); |
|||
if (actorMailbox != null) { |
|||
log.debug("Actor with id [{}] is already registered!", actorId); |
|||
} else { |
|||
Lock actorCreationLock = actorCreationLocks.computeIfAbsent(actorId, id -> new ReentrantLock()); |
|||
actorCreationLock.lock(); |
|||
try { |
|||
actorMailbox = actors.get(actorId); |
|||
if (actorMailbox == null) { |
|||
log.debug("Creating actor with id [{}]!", actorId); |
|||
TbActor actor = creator.createActor(); |
|||
TbActorRef parentRef = null; |
|||
if (parent != null) { |
|||
parentRef = getActor(parent); |
|||
if (parentRef == null) { |
|||
throw new TbActorNotRegisteredException(parent, "Parent Actor with id [" + parent + "] is not registered!"); |
|||
} |
|||
} |
|||
TbActorMailbox mailbox = new TbActorMailbox(this, settings, actorId, parentRef, actor, dispatcher); |
|||
actors.put(actorId, mailbox); |
|||
mailbox.initActor(); |
|||
actorMailbox = mailbox; |
|||
if (parent != null) { |
|||
parentChildMap.computeIfAbsent(parent, id -> ConcurrentHashMap.newKeySet()).add(actorId); |
|||
} |
|||
} else { |
|||
log.debug("Actor with id [{}] is already registered!", actorId); |
|||
} |
|||
} finally { |
|||
actorCreationLock.unlock(); |
|||
actorCreationLocks.remove(actorId); |
|||
} |
|||
} |
|||
return actorMailbox; |
|||
} |
|||
|
|||
@Override |
|||
public void tellWithHighPriority(TbActorId target, TbActorMsg actorMsg) { |
|||
tell(target, actorMsg, true); |
|||
} |
|||
|
|||
@Override |
|||
public void tell(TbActorId target, TbActorMsg actorMsg) { |
|||
tell(target, actorMsg, false); |
|||
} |
|||
|
|||
private void tell(TbActorId target, TbActorMsg actorMsg, boolean highPriority) { |
|||
TbActorMailbox mailbox = actors.get(target); |
|||
if (mailbox == null) { |
|||
throw new TbActorNotRegisteredException(target, "Actor with id [" + target + "] is not registered!"); |
|||
} |
|||
if (highPriority) { |
|||
mailbox.tellWithHighPriority(actorMsg); |
|||
} else { |
|||
mailbox.tell(actorMsg); |
|||
} |
|||
} |
|||
|
|||
|
|||
@Override |
|||
public void broadcastToChildren(TbActorId parent, TbActorMsg msg) { |
|||
broadcastToChildren(parent, id -> true, msg); |
|||
} |
|||
|
|||
@Override |
|||
public void broadcastToChildren(TbActorId parent, Predicate<TbActorId> childFilter, TbActorMsg msg) { |
|||
Set<TbActorId> children = parentChildMap.get(parent); |
|||
if (children != null) { |
|||
children.stream().filter(childFilter).forEach(id -> tell(id, msg)); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public List<TbActorId> filterChildren(TbActorId parent, Predicate<TbActorId> childFilter) { |
|||
Set<TbActorId> children = parentChildMap.get(parent); |
|||
if (children != null) { |
|||
return children.stream().filter(childFilter).collect(Collectors.toList()); |
|||
} else { |
|||
return Collections.emptyList(); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public void stop(TbActorRef actorRef) { |
|||
stop(actorRef.getActorId()); |
|||
} |
|||
|
|||
@Override |
|||
public void stop(TbActorId actorId) { |
|||
Set<TbActorId> children = parentChildMap.remove(actorId); |
|||
if (children != null) { |
|||
for (TbActorId child : children) { |
|||
stop(child); |
|||
} |
|||
} |
|||
TbActorMailbox mailbox = actors.remove(actorId); |
|||
if (mailbox != null) { |
|||
mailbox.destroy(); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public void stop() { |
|||
dispatchers.values().forEach(dispatcher -> { |
|||
dispatcher.getExecutor().shutdown(); |
|||
try { |
|||
dispatcher.getExecutor().awaitTermination(3, TimeUnit.SECONDS); |
|||
} catch (InterruptedException e) { |
|||
log.warn("[{}] Failed to stop dispatcher", dispatcher.getDispatcherId(), e); |
|||
} |
|||
}); |
|||
if (scheduler != null) { |
|||
scheduler.shutdownNow(); |
|||
} |
|||
actors.clear(); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,45 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors; |
|||
|
|||
import lombok.Getter; |
|||
import lombok.ToString; |
|||
|
|||
@ToString |
|||
public class InitFailureStrategy { |
|||
|
|||
@Getter |
|||
private boolean stop; |
|||
@Getter |
|||
private long retryDelay; |
|||
|
|||
private InitFailureStrategy(boolean stop, long retryDelay) { |
|||
this.stop = stop; |
|||
this.retryDelay = retryDelay; |
|||
} |
|||
|
|||
public static InitFailureStrategy retryImmediately() { |
|||
return new InitFailureStrategy(false, 0); |
|||
} |
|||
|
|||
public static InitFailureStrategy retryWithDelay(long ms) { |
|||
return new InitFailureStrategy(false, ms); |
|||
} |
|||
|
|||
public static InitFailureStrategy stop() { |
|||
return new InitFailureStrategy(true, 0); |
|||
} |
|||
} |
|||
@ -0,0 +1,38 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors; |
|||
|
|||
import lombok.Getter; |
|||
import lombok.ToString; |
|||
|
|||
@ToString |
|||
public class ProcessFailureStrategy { |
|||
|
|||
@Getter |
|||
private boolean stop; |
|||
|
|||
private ProcessFailureStrategy(boolean stop) { |
|||
this.stop = stop; |
|||
} |
|||
|
|||
public static ProcessFailureStrategy stop() { |
|||
return new ProcessFailureStrategy(true); |
|||
} |
|||
|
|||
public static ProcessFailureStrategy resume() { |
|||
return new ProcessFailureStrategy(false); |
|||
} |
|||
} |
|||
@ -0,0 +1,43 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors; |
|||
|
|||
import org.thingsboard.server.common.msg.TbActorMsg; |
|||
|
|||
public interface TbActor { |
|||
|
|||
boolean process(TbActorMsg msg); |
|||
|
|||
TbActorRef getActorRef(); |
|||
|
|||
default void init(TbActorCtx ctx) throws TbActorException { |
|||
} |
|||
|
|||
default void destroy() throws TbActorException { |
|||
} |
|||
|
|||
default InitFailureStrategy onInitFailure(int attempt, Throwable t) { |
|||
return InitFailureStrategy.retryWithDelay(5000 * attempt); |
|||
} |
|||
|
|||
default ProcessFailureStrategy onProcessFailure(Throwable t) { |
|||
if (t instanceof Error) { |
|||
return ProcessFailureStrategy.stop(); |
|||
} else { |
|||
return ProcessFailureStrategy.resume(); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,24 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors; |
|||
|
|||
public interface TbActorCreator { |
|||
|
|||
TbActorId createActorId(); |
|||
|
|||
TbActor createActor(); |
|||
|
|||
} |
|||
@ -0,0 +1,41 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors; |
|||
|
|||
import org.thingsboard.server.common.msg.TbActorMsg; |
|||
|
|||
import java.util.List; |
|||
import java.util.function.Predicate; |
|||
import java.util.function.Supplier; |
|||
|
|||
public interface TbActorCtx extends TbActorRef { |
|||
|
|||
TbActorId getSelf(); |
|||
|
|||
TbActorRef getParentRef(); |
|||
|
|||
void tell(TbActorId target, TbActorMsg msg); |
|||
|
|||
void stop(TbActorId target); |
|||
|
|||
TbActorRef getOrCreateChildActor(TbActorId actorId, Supplier<String> dispatcher, Supplier<TbActorCreator> creator); |
|||
|
|||
void broadcastToChildren(TbActorMsg msg); |
|||
|
|||
void broadcastToChildren(TbActorMsg msg, Predicate<TbActorId> childFilter); |
|||
|
|||
List<TbActorId> filterChildren(Predicate<TbActorId> childFilter); |
|||
} |
|||
@ -0,0 +1,23 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors; |
|||
|
|||
public class TbActorException extends Exception { |
|||
|
|||
public TbActorException(String message, Throwable cause) { |
|||
super(message, cause); |
|||
} |
|||
} |
|||
@ -0,0 +1,20 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors; |
|||
|
|||
public interface TbActorId { |
|||
|
|||
} |
|||
@ -0,0 +1,209 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors; |
|||
|
|||
import lombok.Data; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.thingsboard.server.common.msg.TbActorMsg; |
|||
|
|||
import java.util.List; |
|||
import java.util.concurrent.ConcurrentLinkedQueue; |
|||
import java.util.concurrent.TimeUnit; |
|||
import java.util.concurrent.atomic.AtomicBoolean; |
|||
import java.util.function.Predicate; |
|||
import java.util.function.Supplier; |
|||
|
|||
@Slf4j |
|||
@Data |
|||
public final class TbActorMailbox implements TbActorCtx { |
|||
private static final boolean HIGH_PRIORITY = true; |
|||
private static final boolean NORMAL_PRIORITY = false; |
|||
|
|||
private static final boolean FREE = false; |
|||
private static final boolean BUSY = true; |
|||
|
|||
private static final boolean NOT_READY = false; |
|||
private static final boolean READY = true; |
|||
|
|||
private final TbActorSystem system; |
|||
private final TbActorSystemSettings settings; |
|||
private final TbActorId selfId; |
|||
private final TbActorRef parentRef; |
|||
private final TbActor actor; |
|||
private final Dispatcher dispatcher; |
|||
private final ConcurrentLinkedQueue<TbActorMsg> highPriorityMsgs = new ConcurrentLinkedQueue<>(); |
|||
private final ConcurrentLinkedQueue<TbActorMsg> normalPriorityMsgs = new ConcurrentLinkedQueue<>(); |
|||
private final AtomicBoolean busy = new AtomicBoolean(FREE); |
|||
private final AtomicBoolean ready = new AtomicBoolean(NOT_READY); |
|||
private final AtomicBoolean destroyInProgress = new AtomicBoolean(); |
|||
|
|||
public void initActor() { |
|||
dispatcher.getExecutor().execute(() -> tryInit(1)); |
|||
} |
|||
|
|||
private void tryInit(int attempt) { |
|||
try { |
|||
log.debug("[{}] Trying to init actor, attempt: {}", selfId, attempt); |
|||
if (!destroyInProgress.get()) { |
|||
actor.init(this); |
|||
if (!destroyInProgress.get()) { |
|||
ready.set(READY); |
|||
tryProcessQueue(false); |
|||
} |
|||
} |
|||
} catch (Throwable t) { |
|||
log.debug("[{}] Failed to init actor, attempt: {}", selfId, attempt, t); |
|||
int attemptIdx = attempt + 1; |
|||
InitFailureStrategy strategy = actor.onInitFailure(attempt, t); |
|||
if (strategy.isStop() || (settings.getMaxActorInitAttempts() > 0 && attemptIdx > settings.getMaxActorInitAttempts())) { |
|||
log.info("[{}] Failed to init actor, attempt {}, going to stop attempts.", selfId, attempt, t); |
|||
system.stop(selfId); |
|||
} else if (strategy.getRetryDelay() > 0) { |
|||
log.info("[{}] Failed to init actor, attempt {}, going to retry in attempts in {}ms", selfId, attempt, strategy.getRetryDelay()); |
|||
log.debug("[{}] Error", selfId, t); |
|||
system.getScheduler().schedule(() -> dispatcher.getExecutor().execute(() -> tryInit(attemptIdx)), strategy.getRetryDelay(), TimeUnit.MILLISECONDS); |
|||
} else { |
|||
log.info("[{}] Failed to init actor, attempt {}, going to retry immediately", selfId, attempt); |
|||
log.debug("[{}] Error", selfId, t); |
|||
dispatcher.getExecutor().execute(() -> tryInit(attemptIdx)); |
|||
} |
|||
} |
|||
} |
|||
|
|||
private void enqueue(TbActorMsg msg, boolean highPriority) { |
|||
if (highPriority) { |
|||
highPriorityMsgs.add(msg); |
|||
} else { |
|||
normalPriorityMsgs.add(msg); |
|||
} |
|||
tryProcessQueue(true); |
|||
} |
|||
|
|||
private void tryProcessQueue(boolean newMsg) { |
|||
if (ready.get() == READY) { |
|||
if (newMsg || !highPriorityMsgs.isEmpty() || !normalPriorityMsgs.isEmpty()) { |
|||
if (busy.compareAndSet(FREE, BUSY)) { |
|||
dispatcher.getExecutor().execute(this::processMailbox); |
|||
} else { |
|||
log.trace("[{}] MessageBox is busy, new msg: {}", selfId, newMsg); |
|||
} |
|||
} else { |
|||
log.trace("[{}] MessageBox is empty, new msg: {}", selfId, newMsg); |
|||
} |
|||
} else { |
|||
log.trace("[{}] MessageBox is not ready, new msg: {}", selfId, newMsg); |
|||
} |
|||
} |
|||
|
|||
private void processMailbox() { |
|||
boolean noMoreElements = false; |
|||
for (int i = 0; i < settings.getActorThroughput(); i++) { |
|||
TbActorMsg msg = highPriorityMsgs.poll(); |
|||
if (msg == null) { |
|||
msg = normalPriorityMsgs.poll(); |
|||
} |
|||
if (msg != null) { |
|||
try { |
|||
log.debug("[{}] Going to process message: {}", selfId, msg); |
|||
actor.process(msg); |
|||
} catch (Throwable t) { |
|||
log.debug("[{}] Failed to process message: {}", selfId, msg, t); |
|||
ProcessFailureStrategy strategy = actor.onProcessFailure(t); |
|||
if (strategy.isStop()) { |
|||
system.stop(selfId); |
|||
} |
|||
} |
|||
} else { |
|||
noMoreElements = true; |
|||
break; |
|||
} |
|||
} |
|||
if (noMoreElements) { |
|||
busy.set(FREE); |
|||
dispatcher.getExecutor().execute(() -> tryProcessQueue(false)); |
|||
} else { |
|||
dispatcher.getExecutor().execute(this::processMailbox); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public TbActorId getSelf() { |
|||
return selfId; |
|||
} |
|||
|
|||
@Override |
|||
public void tell(TbActorId target, TbActorMsg actorMsg) { |
|||
system.tell(target, actorMsg); |
|||
} |
|||
|
|||
@Override |
|||
public void broadcastToChildren(TbActorMsg msg) { |
|||
system.broadcastToChildren(selfId, msg); |
|||
} |
|||
|
|||
@Override |
|||
public void broadcastToChildren(TbActorMsg msg, Predicate<TbActorId> childFilter) { |
|||
system.broadcastToChildren(selfId, childFilter, msg); |
|||
} |
|||
|
|||
@Override |
|||
public List<TbActorId> filterChildren(Predicate<TbActorId> childFilter) { |
|||
return system.filterChildren(selfId, childFilter); |
|||
} |
|||
|
|||
@Override |
|||
public void stop(TbActorId target) { |
|||
system.stop(target); |
|||
} |
|||
|
|||
@Override |
|||
public TbActorRef getOrCreateChildActor(TbActorId actorId, Supplier<String> dispatcher, Supplier<TbActorCreator> creator) { |
|||
TbActorRef actorRef = system.getActor(actorId); |
|||
if (actorRef == null) { |
|||
return system.createChildActor(dispatcher.get(), creator.get(), selfId); |
|||
} else { |
|||
return actorRef; |
|||
} |
|||
} |
|||
|
|||
public void destroy() { |
|||
destroyInProgress.set(true); |
|||
dispatcher.getExecutor().execute(() -> { |
|||
try { |
|||
ready.set(NOT_READY); |
|||
actor.destroy(); |
|||
} catch (Throwable t) { |
|||
log.warn("[{}] Failed to destroy actor: {}", selfId, t); |
|||
} |
|||
}); |
|||
} |
|||
|
|||
@Override |
|||
public TbActorId getActorId() { |
|||
return selfId; |
|||
} |
|||
|
|||
@Override |
|||
public void tell(TbActorMsg actorMsg) { |
|||
enqueue(actorMsg, NORMAL_PRIORITY); |
|||
} |
|||
|
|||
@Override |
|||
public void tellWithHighPriority(TbActorMsg actorMsg) { |
|||
enqueue(actorMsg, HIGH_PRIORITY); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,29 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors; |
|||
|
|||
import lombok.Getter; |
|||
|
|||
public class TbActorNotRegisteredException extends RuntimeException { |
|||
|
|||
@Getter |
|||
private TbActorId target; |
|||
|
|||
public TbActorNotRegisteredException(TbActorId target, String message) { |
|||
super(message); |
|||
this.target = target; |
|||
} |
|||
} |
|||
@ -0,0 +1,28 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors; |
|||
|
|||
import org.thingsboard.server.common.msg.TbActorMsg; |
|||
|
|||
public interface TbActorRef { |
|||
|
|||
TbActorId getActorId(); |
|||
|
|||
void tell(TbActorMsg actorMsg); |
|||
|
|||
void tellWithHighPriority(TbActorMsg actorMsg); |
|||
|
|||
} |
|||
@ -0,0 +1,54 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors; |
|||
|
|||
import org.thingsboard.server.common.msg.TbActorMsg; |
|||
|
|||
import java.util.List; |
|||
import java.util.concurrent.ExecutorService; |
|||
import java.util.concurrent.ScheduledExecutorService; |
|||
import java.util.function.Predicate; |
|||
|
|||
public interface TbActorSystem { |
|||
|
|||
ScheduledExecutorService getScheduler(); |
|||
|
|||
void createDispatcher(String dispatcherId, ExecutorService executor); |
|||
|
|||
void destroyDispatcher(String dispatcherId); |
|||
|
|||
TbActorRef getActor(TbActorId actorId); |
|||
|
|||
TbActorRef createRootActor(String dispatcherId, TbActorCreator creator); |
|||
|
|||
TbActorRef createChildActor(String dispatcherId, TbActorCreator creator, TbActorId parent); |
|||
|
|||
void tell(TbActorId target, TbActorMsg actorMsg); |
|||
|
|||
void tellWithHighPriority(TbActorId target, TbActorMsg actorMsg); |
|||
|
|||
void stop(TbActorRef actorRef); |
|||
|
|||
void stop(TbActorId actorId); |
|||
|
|||
void stop(); |
|||
|
|||
void broadcastToChildren(TbActorId parent, TbActorMsg msg); |
|||
|
|||
void broadcastToChildren(TbActorId parent, Predicate<TbActorId> childFilter, TbActorMsg msg); |
|||
|
|||
List<TbActorId> filterChildren(TbActorId parent, Predicate<TbActorId> childFilter); |
|||
} |
|||
@ -0,0 +1,27 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors; |
|||
|
|||
import lombok.Data; |
|||
|
|||
@Data |
|||
public class TbActorSystemSettings { |
|||
|
|||
private final int actorThroughput; |
|||
private final int schedulerPoolSize; |
|||
private final int maxActorInitAttempts; |
|||
|
|||
} |
|||
@ -0,0 +1,49 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors; |
|||
|
|||
import lombok.Getter; |
|||
import org.thingsboard.server.common.data.id.EntityId; |
|||
|
|||
import java.util.Objects; |
|||
|
|||
public class TbEntityActorId implements TbActorId { |
|||
|
|||
@Getter |
|||
private final EntityId entityId; |
|||
|
|||
public TbEntityActorId(EntityId entityId) { |
|||
this.entityId = entityId; |
|||
} |
|||
|
|||
@Override |
|||
public String toString() { |
|||
return entityId.getEntityType() + "|" + entityId.getId(); |
|||
} |
|||
|
|||
@Override |
|||
public boolean equals(Object o) { |
|||
if (this == o) return true; |
|||
if (o == null || getClass() != o.getClass()) return false; |
|||
TbEntityActorId that = (TbEntityActorId) o; |
|||
return entityId.equals(that.entityId); |
|||
} |
|||
|
|||
@Override |
|||
public int hashCode() { |
|||
return Objects.hash(entityId); |
|||
} |
|||
} |
|||
@ -0,0 +1,224 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors; |
|||
|
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.junit.After; |
|||
import org.junit.Assert; |
|||
import org.junit.Before; |
|||
import org.junit.Test; |
|||
import org.junit.runner.RunWith; |
|||
import org.mockito.runners.MockitoJUnitRunner; |
|||
import org.thingsboard.server.common.data.id.DeviceId; |
|||
|
|||
import java.util.ArrayList; |
|||
import java.util.List; |
|||
import java.util.Random; |
|||
import java.util.UUID; |
|||
import java.util.concurrent.CountDownLatch; |
|||
import java.util.concurrent.ExecutorService; |
|||
import java.util.concurrent.Executors; |
|||
import java.util.concurrent.TimeUnit; |
|||
import java.util.concurrent.atomic.AtomicInteger; |
|||
import java.util.concurrent.atomic.AtomicLong; |
|||
|
|||
@Slf4j |
|||
@RunWith(MockitoJUnitRunner.class) |
|||
public class ActorSystemTest { |
|||
|
|||
public static final String ROOT_DISPATCHER = "root-dispatcher"; |
|||
private static final int _100K = 100 * 1024; |
|||
|
|||
private volatile TbActorSystem actorSystem; |
|||
private volatile ExecutorService submitPool; |
|||
private int parallelism; |
|||
|
|||
@Before |
|||
public void initActorSystem() { |
|||
int cores = Runtime.getRuntime().availableProcessors(); |
|||
parallelism = Math.max(2, cores / 2); |
|||
TbActorSystemSettings settings = new TbActorSystemSettings(5, parallelism, 42); |
|||
actorSystem = new DefaultTbActorSystem(settings); |
|||
submitPool = Executors.newWorkStealingPool(parallelism); |
|||
} |
|||
|
|||
@After |
|||
public void shutdownActorSystem() { |
|||
actorSystem.stop(); |
|||
submitPool.shutdownNow(); |
|||
} |
|||
|
|||
@Test |
|||
public void test1actorsAnd100KMessages() throws InterruptedException { |
|||
actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism)); |
|||
testActorsAndMessages(1, _100K, 1); |
|||
} |
|||
|
|||
@Test |
|||
public void test10actorsAnd100KMessages() throws InterruptedException { |
|||
actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism)); |
|||
testActorsAndMessages(10, _100K, 1); |
|||
} |
|||
|
|||
@Test |
|||
public void test100KActorsAnd1Messages5timesSingleThread() throws InterruptedException { |
|||
actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newSingleThreadExecutor()); |
|||
testActorsAndMessages(_100K, 1, 5); |
|||
} |
|||
|
|||
@Test |
|||
public void test100KActorsAnd1Messages5times() throws InterruptedException { |
|||
actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism)); |
|||
testActorsAndMessages(_100K, 1, 5); |
|||
} |
|||
|
|||
@Test |
|||
public void test100KActorsAnd10Messages() throws InterruptedException { |
|||
actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism)); |
|||
testActorsAndMessages(_100K, 10, 1); |
|||
} |
|||
|
|||
@Test |
|||
public void test1KActorsAnd1KMessages() throws InterruptedException { |
|||
actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism)); |
|||
testActorsAndMessages(1000, 1000, 10); |
|||
} |
|||
|
|||
@Test |
|||
public void testNoMessagesAfterDestroy() throws InterruptedException { |
|||
actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism)); |
|||
ActorTestCtx testCtx1 = getActorTestCtx(1); |
|||
ActorTestCtx testCtx2 = getActorTestCtx(1); |
|||
|
|||
TbActorRef actorId1 = actorSystem.createRootActor(ROOT_DISPATCHER, new SlowInitActor.SlowInitActorCreator( |
|||
new TbEntityActorId(new DeviceId(UUID.randomUUID())), testCtx1)); |
|||
TbActorRef actorId2 = actorSystem.createRootActor(ROOT_DISPATCHER, new SlowInitActor.SlowInitActorCreator( |
|||
new TbEntityActorId(new DeviceId(UUID.randomUUID())), testCtx2)); |
|||
|
|||
actorId1.tell(new IntTbActorMsg(42)); |
|||
actorId2.tell(new IntTbActorMsg(42)); |
|||
actorSystem.stop(actorId1); |
|||
|
|||
Assert.assertTrue(testCtx2.getLatch().await(1, TimeUnit.SECONDS)); |
|||
Assert.assertFalse(testCtx1.getLatch().await(1, TimeUnit.SECONDS)); |
|||
} |
|||
|
|||
@Test |
|||
public void testOneActorCreated() throws InterruptedException { |
|||
actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism)); |
|||
ActorTestCtx testCtx1 = getActorTestCtx(1); |
|||
ActorTestCtx testCtx2 = getActorTestCtx(1); |
|||
TbActorId actorId = new TbEntityActorId(new DeviceId(UUID.randomUUID())); |
|||
submitPool.submit(() -> actorSystem.createRootActor(ROOT_DISPATCHER, new SlowCreateActor.SlowCreateActorCreator(actorId, testCtx1))); |
|||
submitPool.submit(() -> actorSystem.createRootActor(ROOT_DISPATCHER, new SlowCreateActor.SlowCreateActorCreator(actorId, testCtx2))); |
|||
|
|||
Thread.sleep(1000); |
|||
actorSystem.tell(actorId, new IntTbActorMsg(42)); |
|||
|
|||
Assert.assertTrue(testCtx1.getLatch().await(1, TimeUnit.SECONDS)); |
|||
Assert.assertFalse(testCtx2.getLatch().await(1, TimeUnit.SECONDS)); |
|||
} |
|||
|
|||
@Test |
|||
public void testActorCreatorCalledOnce() throws InterruptedException { |
|||
actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism)); |
|||
ActorTestCtx testCtx = getActorTestCtx(1); |
|||
TbActorId actorId = new TbEntityActorId(new DeviceId(UUID.randomUUID())); |
|||
for (int i = 0; i < 1000; i++) { |
|||
submitPool.submit(() -> actorSystem.createRootActor(ROOT_DISPATCHER, new SlowCreateActor.SlowCreateActorCreator(actorId, testCtx))); |
|||
} |
|||
Thread.sleep(1000); |
|||
actorSystem.tell(actorId, new IntTbActorMsg(42)); |
|||
|
|||
Assert.assertTrue(testCtx.getLatch().await(1, TimeUnit.SECONDS)); |
|||
//One for creation and one for message
|
|||
Assert.assertEquals(2, testCtx.getInvocationCount().get()); |
|||
} |
|||
|
|||
@Test |
|||
public void testFailedInit() throws InterruptedException { |
|||
actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism)); |
|||
ActorTestCtx testCtx1 = getActorTestCtx(1); |
|||
ActorTestCtx testCtx2 = getActorTestCtx(1); |
|||
|
|||
TbActorRef actorId1 = actorSystem.createRootActor(ROOT_DISPATCHER, new FailedToInitActor.FailedToInitActorCreator( |
|||
new TbEntityActorId(new DeviceId(UUID.randomUUID())), testCtx1, 1, 3000)); |
|||
TbActorRef actorId2 = actorSystem.createRootActor(ROOT_DISPATCHER, new FailedToInitActor.FailedToInitActorCreator( |
|||
new TbEntityActorId(new DeviceId(UUID.randomUUID())), testCtx2, 2, 1)); |
|||
|
|||
actorId1.tell(new IntTbActorMsg(42)); |
|||
actorId2.tell(new IntTbActorMsg(42)); |
|||
|
|||
Assert.assertFalse(testCtx1.getLatch().await(2, TimeUnit.SECONDS)); |
|||
Assert.assertTrue(testCtx2.getLatch().await(1, TimeUnit.SECONDS)); |
|||
Assert.assertTrue(testCtx1.getLatch().await(3, TimeUnit.SECONDS)); |
|||
} |
|||
|
|||
|
|||
public void testActorsAndMessages(int actorsCount, int msgNumber, int times) throws InterruptedException { |
|||
Random random = new Random(); |
|||
int[] randomIntegers = new int[msgNumber]; |
|||
long sumTmp = 0; |
|||
for (int i = 0; i < msgNumber; i++) { |
|||
int tmp = random.nextInt(); |
|||
randomIntegers[i] = tmp; |
|||
sumTmp += tmp; |
|||
} |
|||
long expected = sumTmp; |
|||
|
|||
List<ActorTestCtx> testCtxes = new ArrayList<>(); |
|||
|
|||
List<TbActorRef> actorRefs = new ArrayList<>(); |
|||
for (int actorIdx = 0; actorIdx < actorsCount; actorIdx++) { |
|||
ActorTestCtx testCtx = getActorTestCtx(msgNumber); |
|||
actorRefs.add(actorSystem.createRootActor(ROOT_DISPATCHER, new TestRootActor.TestRootActorCreator( |
|||
new TbEntityActorId(new DeviceId(UUID.randomUUID())), testCtx))); |
|||
testCtxes.add(testCtx); |
|||
} |
|||
|
|||
for (int t = 0; t < times; t++) { |
|||
long start = System.nanoTime(); |
|||
for (int i = 0; i < msgNumber; i++) { |
|||
int tmp = randomIntegers[i]; |
|||
submitPool.execute(() -> actorRefs.forEach(actorId -> actorId.tell(new IntTbActorMsg(tmp)))); |
|||
} |
|||
log.info("Submitted all messages"); |
|||
testCtxes.forEach(ctx -> { |
|||
try { |
|||
boolean success = ctx.getLatch().await(1, TimeUnit.MINUTES); |
|||
if (!success) { |
|||
log.warn("Failed: {}, {}", ctx.getActual().get(), ctx.getInvocationCount().get()); |
|||
} |
|||
Assert.assertTrue(success); |
|||
Assert.assertEquals(expected, ctx.getActual().get()); |
|||
Assert.assertEquals(msgNumber, ctx.getInvocationCount().get()); |
|||
ctx.clear(); |
|||
} catch (InterruptedException e) { |
|||
e.printStackTrace(); |
|||
} |
|||
}); |
|||
long duration = System.nanoTime() - start; |
|||
log.info("Time spend: {}ns ({} ms)", duration, TimeUnit.NANOSECONDS.toMillis(duration)); |
|||
} |
|||
} |
|||
|
|||
private ActorTestCtx getActorTestCtx(int i) { |
|||
CountDownLatch countDownLatch = new CountDownLatch(1); |
|||
AtomicLong actual = new AtomicLong(); |
|||
AtomicInteger invocations = new AtomicInteger(); |
|||
return new ActorTestCtx(countDownLatch, invocations, i, actual); |
|||
} |
|||
} |
|||
@ -0,0 +1,39 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors; |
|||
|
|||
import lombok.AllArgsConstructor; |
|||
import lombok.Data; |
|||
|
|||
import java.util.concurrent.CountDownLatch; |
|||
import java.util.concurrent.atomic.AtomicInteger; |
|||
import java.util.concurrent.atomic.AtomicLong; |
|||
|
|||
@Data |
|||
@AllArgsConstructor |
|||
public class ActorTestCtx { |
|||
|
|||
private volatile CountDownLatch latch; |
|||
private final AtomicInteger invocationCount; |
|||
private final int expectedInvocationCount; |
|||
private final AtomicLong actual; |
|||
|
|||
public void clear() { |
|||
latch = new CountDownLatch(1); |
|||
invocationCount.set(0); |
|||
actual.set(0L); |
|||
} |
|||
} |
|||
@ -0,0 +1,72 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors; |
|||
|
|||
import lombok.extern.slf4j.Slf4j; |
|||
|
|||
@Slf4j |
|||
public class FailedToInitActor extends TestRootActor { |
|||
|
|||
int retryAttempts; |
|||
int retryDelay; |
|||
int attempts = 0; |
|||
|
|||
public FailedToInitActor(TbActorId actorId, ActorTestCtx testCtx, int retryAttempts, int retryDelay) { |
|||
super(actorId, testCtx); |
|||
this.retryAttempts = retryAttempts; |
|||
this.retryDelay = retryDelay; |
|||
} |
|||
|
|||
@Override |
|||
public void init(TbActorCtx ctx) throws TbActorException { |
|||
if (attempts < retryAttempts) { |
|||
attempts++; |
|||
throw new TbActorException("Test attempt", new RuntimeException()); |
|||
} else { |
|||
super.init(ctx); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public InitFailureStrategy onInitFailure(int attempt, Throwable t) { |
|||
return InitFailureStrategy.retryWithDelay(retryDelay); |
|||
} |
|||
|
|||
public static class FailedToInitActorCreator implements TbActorCreator { |
|||
|
|||
private final TbActorId actorId; |
|||
private final ActorTestCtx testCtx; |
|||
private final int retryAttempts; |
|||
private final int retryDelay; |
|||
|
|||
public FailedToInitActorCreator(TbActorId actorId, ActorTestCtx testCtx, int retryAttempts, int retryDelay) { |
|||
this.actorId = actorId; |
|||
this.testCtx = testCtx; |
|||
this.retryAttempts = retryAttempts; |
|||
this.retryDelay = retryDelay; |
|||
} |
|||
|
|||
@Override |
|||
public TbActorId createActorId() { |
|||
return actorId; |
|||
} |
|||
|
|||
@Override |
|||
public TbActor createActor() { |
|||
return new FailedToInitActor(actorId, testCtx, retryAttempts, retryDelay); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,35 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors; |
|||
|
|||
import lombok.Getter; |
|||
import org.thingsboard.server.common.msg.MsgType; |
|||
import org.thingsboard.server.common.msg.TbActorMsg; |
|||
|
|||
public class IntTbActorMsg implements TbActorMsg { |
|||
|
|||
@Getter |
|||
private final int value; |
|||
|
|||
public IntTbActorMsg(int value) { |
|||
this.value = value; |
|||
} |
|||
|
|||
@Override |
|||
public MsgType getMsgType() { |
|||
return MsgType.QUEUE_TO_RULE_ENGINE_MSG; |
|||
} |
|||
} |
|||
@ -0,0 +1,53 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors; |
|||
|
|||
import lombok.extern.slf4j.Slf4j; |
|||
|
|||
@Slf4j |
|||
public class SlowCreateActor extends TestRootActor { |
|||
|
|||
public SlowCreateActor(TbActorId actorId, ActorTestCtx testCtx) { |
|||
super(actorId, testCtx); |
|||
try { |
|||
Thread.sleep(500); |
|||
} catch (InterruptedException e) { |
|||
e.printStackTrace(); |
|||
} |
|||
testCtx.getInvocationCount().incrementAndGet(); |
|||
} |
|||
|
|||
public static class SlowCreateActorCreator implements TbActorCreator { |
|||
|
|||
private final TbActorId actorId; |
|||
private final ActorTestCtx testCtx; |
|||
|
|||
public SlowCreateActorCreator(TbActorId actorId, ActorTestCtx testCtx) { |
|||
this.actorId = actorId; |
|||
this.testCtx = testCtx; |
|||
} |
|||
|
|||
@Override |
|||
public TbActorId createActorId() { |
|||
return actorId; |
|||
} |
|||
|
|||
@Override |
|||
public TbActor createActor() { |
|||
return new SlowCreateActor(actorId, testCtx); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,57 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors; |
|||
|
|||
import lombok.extern.slf4j.Slf4j; |
|||
|
|||
@Slf4j |
|||
public class SlowInitActor extends TestRootActor { |
|||
|
|||
public SlowInitActor(TbActorId actorId, ActorTestCtx testCtx) { |
|||
super(actorId, testCtx); |
|||
} |
|||
|
|||
@Override |
|||
public void init(TbActorCtx ctx) throws TbActorException { |
|||
try { |
|||
Thread.sleep(500); |
|||
} catch (InterruptedException e) { |
|||
e.printStackTrace(); |
|||
} |
|||
super.init(ctx); |
|||
} |
|||
|
|||
public static class SlowInitActorCreator implements TbActorCreator { |
|||
|
|||
private final TbActorId actorId; |
|||
private final ActorTestCtx testCtx; |
|||
|
|||
public SlowInitActorCreator(TbActorId actorId, ActorTestCtx testCtx) { |
|||
this.actorId = actorId; |
|||
this.testCtx = testCtx; |
|||
} |
|||
|
|||
@Override |
|||
public TbActorId createActorId() { |
|||
return actorId; |
|||
} |
|||
|
|||
@Override |
|||
public TbActor createActor() { |
|||
return new SlowInitActor(actorId, testCtx); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,87 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors; |
|||
|
|||
import lombok.Getter; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.thingsboard.server.common.msg.TbActorMsg; |
|||
|
|||
@Slf4j |
|||
public class TestRootActor extends AbstractTbActor { |
|||
|
|||
@Getter |
|||
private final TbActorId actorId; |
|||
@Getter |
|||
private final ActorTestCtx testCtx; |
|||
|
|||
private boolean initialized; |
|||
private long sum; |
|||
private int count; |
|||
|
|||
public TestRootActor(TbActorId actorId, ActorTestCtx testCtx) { |
|||
this.actorId = actorId; |
|||
this.testCtx = testCtx; |
|||
} |
|||
|
|||
@Override |
|||
public void init(TbActorCtx ctx) throws TbActorException { |
|||
super.init(ctx); |
|||
initialized = true; |
|||
} |
|||
|
|||
@Override |
|||
public boolean process(TbActorMsg msg) { |
|||
if (initialized) { |
|||
int value = ((IntTbActorMsg) msg).getValue(); |
|||
sum += value; |
|||
count += 1; |
|||
if (count == testCtx.getExpectedInvocationCount()) { |
|||
testCtx.getActual().set(sum); |
|||
testCtx.getInvocationCount().addAndGet(count); |
|||
sum = 0; |
|||
count = 0; |
|||
testCtx.getLatch().countDown(); |
|||
} |
|||
} |
|||
return true; |
|||
} |
|||
|
|||
@Override |
|||
public void destroy() { |
|||
|
|||
} |
|||
|
|||
public static class TestRootActorCreator implements TbActorCreator { |
|||
|
|||
private final TbActorId actorId; |
|||
private final ActorTestCtx testCtx; |
|||
|
|||
public TestRootActorCreator(TbActorId actorId, ActorTestCtx testCtx) { |
|||
this.actorId = actorId; |
|||
this.testCtx = testCtx; |
|||
} |
|||
|
|||
@Override |
|||
public TbActorId createActorId() { |
|||
return actorId; |
|||
} |
|||
|
|||
@Override |
|||
public TbActor createActor() { |
|||
return new TestRootActor(actorId, testCtx); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,14 @@ |
|||
<?xml version="1.0" encoding="UTF-8" ?> |
|||
|
|||
<configuration> |
|||
<appender name="console" class="ch.qos.logback.core.ConsoleAppender"> |
|||
<encoder> |
|||
<pattern>%d{ISO8601} [%thread] %-5level %logger{36} - %msg%n</pattern> |
|||
</encoder> |
|||
</appender> |
|||
|
|||
<root level="INFO"> |
|||
<appender-ref ref="console"/> |
|||
</root> |
|||
|
|||
</configuration> |
|||
Some files were not shown because too many files changed in this diff
Loading…
Reference in new issue