|
|
|
@ -15,16 +15,15 @@ |
|
|
|
*/ |
|
|
|
package org.thingsboard.server.actors.tenant; |
|
|
|
|
|
|
|
import akka.actor.ActorInitializationException; |
|
|
|
import akka.actor.ActorRef; |
|
|
|
import akka.actor.LocalActorRef; |
|
|
|
import akka.actor.OneForOneStrategy; |
|
|
|
import akka.actor.Props; |
|
|
|
import akka.actor.SupervisorStrategy; |
|
|
|
import akka.actor.Terminated; |
|
|
|
import com.google.common.collect.BiMap; |
|
|
|
import com.google.common.collect.HashBiMap; |
|
|
|
import lombok.extern.slf4j.Slf4j; |
|
|
|
import org.thingsboard.server.actors.ActorSystemContext; |
|
|
|
import org.thingsboard.server.actors.TbActor; |
|
|
|
import org.thingsboard.server.actors.TbActorCtx; |
|
|
|
import org.thingsboard.server.actors.TbActorId; |
|
|
|
import org.thingsboard.server.actors.TbActorNotRegisteredException; |
|
|
|
import org.thingsboard.server.actors.TbActorRef; |
|
|
|
import org.thingsboard.server.actors.TbEntityActorId; |
|
|
|
import org.thingsboard.server.actors.TbEntityTypeActorIdPredicate; |
|
|
|
import org.thingsboard.server.actors.device.DeviceActorCreator; |
|
|
|
import org.thingsboard.server.actors.ruleChain.RuleChainManagerActor; |
|
|
|
import org.thingsboard.server.actors.service.ContextBasedCreator; |
|
|
|
@ -32,6 +31,7 @@ import org.thingsboard.server.actors.service.DefaultActorService; |
|
|
|
import org.thingsboard.server.common.data.EntityType; |
|
|
|
import org.thingsboard.server.common.data.Tenant; |
|
|
|
import org.thingsboard.server.common.data.id.DeviceId; |
|
|
|
import org.thingsboard.server.common.data.id.EntityId; |
|
|
|
import org.thingsboard.server.common.data.id.RuleChainId; |
|
|
|
import org.thingsboard.server.common.data.id.TenantId; |
|
|
|
import org.thingsboard.server.common.data.rule.RuleChain; |
|
|
|
@ -45,32 +45,25 @@ import org.thingsboard.server.common.msg.queue.PartitionChangeMsg; |
|
|
|
import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg; |
|
|
|
import org.thingsboard.server.common.msg.queue.RuleEngineException; |
|
|
|
import org.thingsboard.server.common.msg.queue.ServiceType; |
|
|
|
import scala.concurrent.duration.Duration; |
|
|
|
|
|
|
|
import java.util.List; |
|
|
|
import java.util.Optional; |
|
|
|
import java.util.stream.Collectors; |
|
|
|
|
|
|
|
@Slf4j |
|
|
|
public class TenantActor extends RuleChainManagerActor { |
|
|
|
|
|
|
|
private final BiMap<DeviceId, ActorRef> deviceActors; |
|
|
|
private boolean isRuleEngineForCurrentTenant; |
|
|
|
private boolean isCore; |
|
|
|
|
|
|
|
private TenantActor(ActorSystemContext systemContext, TenantId tenantId) { |
|
|
|
super(systemContext, tenantId); |
|
|
|
this.deviceActors = HashBiMap.create(); |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public SupervisorStrategy supervisorStrategy() { |
|
|
|
return strategy; |
|
|
|
} |
|
|
|
|
|
|
|
boolean cantFindTenant = false; |
|
|
|
|
|
|
|
@Override |
|
|
|
public void preStart() { |
|
|
|
public void init(TbActorCtx ctx) { |
|
|
|
super.init(ctx); |
|
|
|
log.info("[{}] Starting tenant actor.", tenantId); |
|
|
|
try { |
|
|
|
Tenant tenant = systemContext.getTenantService().findTenantById(tenantId); |
|
|
|
@ -104,12 +97,12 @@ public class TenantActor extends RuleChainManagerActor { |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public void postStop() { |
|
|
|
public void destroy() { |
|
|
|
log.info("[{}] Stopping tenant actor.", tenantId); |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
protected boolean process(TbActorMsg msg) { |
|
|
|
protected boolean doProcess(TbActorMsg msg) { |
|
|
|
if (cantFindTenant) { |
|
|
|
log.info("[{}] Processing missing Tenant msg: {}", tenantId, msg); |
|
|
|
if (msg.getMsgType().equals(MsgType.QUEUE_TO_RULE_ENGINE_MSG)) { |
|
|
|
@ -126,13 +119,13 @@ public class TenantActor extends RuleChainManagerActor { |
|
|
|
//To Rule Chain Actors
|
|
|
|
broadcast(msg); |
|
|
|
} else if (ServiceType.TB_CORE.equals(serviceType)) { |
|
|
|
//To Device Actors
|
|
|
|
List<DeviceId> repartitionedDevices = |
|
|
|
deviceActors.keySet().stream().filter(deviceId -> !isMyPartition(deviceId)).collect(Collectors.toList()); |
|
|
|
for (DeviceId deviceId : repartitionedDevices) { |
|
|
|
ActorRef deviceActor = deviceActors.remove(deviceId); |
|
|
|
context().stop(deviceActor); |
|
|
|
} |
|
|
|
List<TbActorId> deviceActorIds = ctx.filterChildren(new TbEntityTypeActorIdPredicate(EntityType.DEVICE) { |
|
|
|
@Override |
|
|
|
protected boolean testEntityId(EntityId entityId) { |
|
|
|
return super.testEntityId(entityId) && !isMyPartition(entityId); |
|
|
|
} |
|
|
|
}); |
|
|
|
deviceActorIds.forEach(id -> ctx.stop(id)); |
|
|
|
} |
|
|
|
break; |
|
|
|
case COMPONENT_LIFE_CYCLE_MSG: |
|
|
|
@ -158,8 +151,8 @@ public class TenantActor extends RuleChainManagerActor { |
|
|
|
return true; |
|
|
|
} |
|
|
|
|
|
|
|
private boolean isMyPartition(DeviceId deviceId) { |
|
|
|
return systemContext.resolve(ServiceType.TB_CORE, tenantId, deviceId).isMyPartition(); |
|
|
|
private boolean isMyPartition(EntityId entityId) { |
|
|
|
return systemContext.resolve(ServiceType.TB_CORE, tenantId, entityId).isMyPartition(); |
|
|
|
} |
|
|
|
|
|
|
|
private void onQueueToRuleEngineMsg(QueueToRuleEngineMsg msg) { |
|
|
|
@ -170,16 +163,15 @@ public class TenantActor extends RuleChainManagerActor { |
|
|
|
TbMsg tbMsg = msg.getTbMsg(); |
|
|
|
if (tbMsg.getRuleChainId() == null) { |
|
|
|
if (getRootChainActor() != null) { |
|
|
|
getRootChainActor().tell(msg, self()); |
|
|
|
getRootChainActor().tell(msg); |
|
|
|
} else { |
|
|
|
tbMsg.getCallback().onFailure(new RuleEngineException("No Root Rule Chain available!")); |
|
|
|
log.info("[{}] No Root Chain: {}", tenantId, msg); |
|
|
|
} |
|
|
|
} else { |
|
|
|
ActorRef ruleChainActor = get(tbMsg.getRuleChainId()); |
|
|
|
if (ruleChainActor != null) { |
|
|
|
ruleChainActor.tell(msg, self()); |
|
|
|
} else { |
|
|
|
try { |
|
|
|
ctx.tell(new TbEntityActorId(tbMsg.getRuleChainId()), msg); |
|
|
|
} catch (TbActorNotRegisteredException ex) { |
|
|
|
log.trace("Received message for non-existing rule chain: [{}]", tbMsg.getRuleChainId()); |
|
|
|
//TODO: 3.1 Log it to dead letters queue;
|
|
|
|
tbMsg.getCallback().onSuccess(); |
|
|
|
@ -188,61 +180,39 @@ public class TenantActor extends RuleChainManagerActor { |
|
|
|
} |
|
|
|
|
|
|
|
private void onRuleChainMsg(RuleChainAwareMsg msg) { |
|
|
|
getOrCreateActor(context(), msg.getRuleChainId()).tell(msg, self()); |
|
|
|
getOrCreateActor(msg.getRuleChainId()).tell(msg); |
|
|
|
} |
|
|
|
|
|
|
|
private void onToDeviceActorMsg(DeviceAwareMsg msg) { |
|
|
|
if (!isCore) { |
|
|
|
log.warn("RECEIVED INVALID MESSAGE: {}", msg); |
|
|
|
} |
|
|
|
getOrCreateDeviceActor(msg.getDeviceId()).tell(msg, ActorRef.noSender()); |
|
|
|
getOrCreateDeviceActor(msg.getDeviceId()).tell(msg); |
|
|
|
} |
|
|
|
|
|
|
|
private void onComponentLifecycleMsg(ComponentLifecycleMsg msg) { |
|
|
|
if (isRuleEngineForCurrentTenant) { |
|
|
|
ActorRef target = getEntityActorRef(msg.getEntityId()); |
|
|
|
TbActorRef target = getEntityActorRef(msg.getEntityId()); |
|
|
|
if (target != null) { |
|
|
|
if (msg.getEntityId().getEntityType() == EntityType.RULE_CHAIN) { |
|
|
|
RuleChain ruleChain = systemContext.getRuleChainService(). |
|
|
|
findRuleChainById(tenantId, new RuleChainId(msg.getEntityId().getId())); |
|
|
|
visit(ruleChain, target); |
|
|
|
} |
|
|
|
target.tell(msg, ActorRef.noSender()); |
|
|
|
target.tell(msg); |
|
|
|
} else { |
|
|
|
log.debug("[{}] Invalid component lifecycle msg: {}", tenantId, msg); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private ActorRef getOrCreateDeviceActor(DeviceId deviceId) { |
|
|
|
return deviceActors.computeIfAbsent(deviceId, k -> { |
|
|
|
log.debug("[{}][{}] Creating device actor.", tenantId, deviceId); |
|
|
|
ActorRef deviceActor = context().actorOf(Props.create(new DeviceActorCreator(systemContext, tenantId, deviceId)) |
|
|
|
.withDispatcher(DefaultActorService.CORE_DISPATCHER_NAME) |
|
|
|
, deviceId.toString()); |
|
|
|
context().watch(deviceActor); |
|
|
|
log.debug("[{}][{}] Created device actor: {}.", tenantId, deviceId, deviceActor); |
|
|
|
return deviceActor; |
|
|
|
}); |
|
|
|
private TbActorRef getOrCreateDeviceActor(DeviceId deviceId) { |
|
|
|
return ctx.getOrCreateChildActor(new TbEntityActorId(deviceId), |
|
|
|
() -> DefaultActorService.DEVICE_DISPATCHER_NAME, |
|
|
|
() -> new DeviceActorCreator(systemContext, tenantId, deviceId)); |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
protected void processTermination(Terminated message) { |
|
|
|
ActorRef terminated = message.actor(); |
|
|
|
if (terminated instanceof LocalActorRef) { |
|
|
|
boolean removed = deviceActors.inverse().remove(terminated) != null; |
|
|
|
if (removed) { |
|
|
|
log.debug("[{}] Removed actor:", terminated); |
|
|
|
} else { |
|
|
|
log.debug("Removed actor was not found in the device map!"); |
|
|
|
} |
|
|
|
} else { |
|
|
|
throw new IllegalStateException("Remote actors are not supported!"); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
public static class ActorCreator extends ContextBasedCreator<TenantActor> { |
|
|
|
private static final long serialVersionUID = 1L; |
|
|
|
public static class ActorCreator extends ContextBasedCreator { |
|
|
|
|
|
|
|
private final TenantId tenantId; |
|
|
|
|
|
|
|
@ -252,18 +222,14 @@ public class TenantActor extends RuleChainManagerActor { |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public TenantActor create() { |
|
|
|
return new TenantActor(context, tenantId); |
|
|
|
public TbActorId createActorId() { |
|
|
|
return new TbEntityActorId(tenantId); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private final SupervisorStrategy strategy = new OneForOneStrategy(3, Duration.create("1 minute"), t -> { |
|
|
|
log.warn("[{}] Unknown failure", tenantId, t); |
|
|
|
if (t instanceof ActorInitializationException) { |
|
|
|
return SupervisorStrategy.stop(); |
|
|
|
} else { |
|
|
|
return SupervisorStrategy.resume(); |
|
|
|
@Override |
|
|
|
public TbActor createActor() { |
|
|
|
return new TenantActor(context, tenantId); |
|
|
|
} |
|
|
|
}); |
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|