58 changed files with 1209 additions and 55 deletions
@ -0,0 +1,75 @@ |
|||
<!-- |
|||
|
|||
Copyright © 2016-2020 The Thingsboard Authors |
|||
|
|||
Licensed under the Apache License, Version 2.0 (the "License"); |
|||
you may not use this file except in compliance with the License. |
|||
You may obtain a copy of the License at |
|||
|
|||
http://www.apache.org/licenses/LICENSE-2.0 |
|||
|
|||
Unless required by applicable law or agreed to in writing, software |
|||
distributed under the License is distributed on an "AS IS" BASIS, |
|||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
See the License for the specific language governing permissions and |
|||
limitations under the License. |
|||
|
|||
--> |
|||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" |
|||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> |
|||
<modelVersion>4.0.0</modelVersion> |
|||
<parent> |
|||
<groupId>org.thingsboard</groupId> |
|||
<version>3.0.1-SNAPSHOT</version> |
|||
<artifactId>common</artifactId> |
|||
</parent> |
|||
<groupId>org.thingsboard.common</groupId> |
|||
<artifactId>actor</artifactId> |
|||
<packaging>jar</packaging> |
|||
|
|||
<name>Thingsboard Actor system</name> |
|||
<url>https://thingsboard.io</url> |
|||
|
|||
<properties> |
|||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> |
|||
<main.dir>${basedir}/../..</main.dir> |
|||
</properties> |
|||
|
|||
<dependencies> |
|||
<dependency> |
|||
<groupId>org.thingsboard.common</groupId> |
|||
<artifactId>util</artifactId> |
|||
</dependency> |
|||
<dependency> |
|||
<groupId>org.thingsboard.common</groupId> |
|||
<artifactId>message</artifactId> |
|||
</dependency> |
|||
<dependency> |
|||
<groupId>org.slf4j</groupId> |
|||
<artifactId>slf4j-api</artifactId> |
|||
</dependency> |
|||
<dependency> |
|||
<groupId>org.slf4j</groupId> |
|||
<artifactId>log4j-over-slf4j</artifactId> |
|||
</dependency> |
|||
<dependency> |
|||
<groupId>ch.qos.logback</groupId> |
|||
<artifactId>logback-core</artifactId> |
|||
</dependency> |
|||
<dependency> |
|||
<groupId>ch.qos.logback</groupId> |
|||
<artifactId>logback-classic</artifactId> |
|||
</dependency> |
|||
<dependency> |
|||
<groupId>junit</groupId> |
|||
<artifactId>junit</artifactId> |
|||
<scope>test</scope> |
|||
</dependency> |
|||
<dependency> |
|||
<groupId>org.mockito</groupId> |
|||
<artifactId>mockito-all</artifactId> |
|||
<scope>test</scope> |
|||
</dependency> |
|||
</dependencies> |
|||
|
|||
</project> |
|||
@ -0,0 +1,133 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors; |
|||
|
|||
import lombok.Data; |
|||
import lombok.Getter; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.thingsboard.common.util.ThingsBoardThreadFactory; |
|||
import org.thingsboard.server.common.msg.TbActorMsg; |
|||
|
|||
import java.util.concurrent.ConcurrentHashMap; |
|||
import java.util.concurrent.ConcurrentMap; |
|||
import java.util.concurrent.ExecutorService; |
|||
import java.util.concurrent.Executors; |
|||
import java.util.concurrent.ScheduledExecutorService; |
|||
import java.util.concurrent.locks.Lock; |
|||
import java.util.concurrent.locks.ReentrantLock; |
|||
|
|||
@Slf4j |
|||
@Data |
|||
public class DefaultTbActorSystem implements TbActorSystem { |
|||
|
|||
private final ConcurrentMap<String, Dispatcher> dispatchers = new ConcurrentHashMap<>(); |
|||
private final ConcurrentMap<TbActorId, TbActorMailbox> actors = new ConcurrentHashMap<>(); |
|||
private final ConcurrentMap<TbActorId, ReentrantLock> actorCreationLocks = new ConcurrentHashMap<>(); |
|||
@Getter |
|||
private final TbActorSystemSettings settings; |
|||
@Getter |
|||
private final ScheduledExecutorService scheduler; |
|||
|
|||
public DefaultTbActorSystem(TbActorSystemSettings settings) { |
|||
this.settings = settings; |
|||
this.scheduler = Executors.newScheduledThreadPool(settings.getSchedulerPoolSize(), ThingsBoardThreadFactory.forName("actor-system-scheduler")); |
|||
} |
|||
|
|||
@Override |
|||
public void createDispatcher(String dispatcherId, ExecutorService executor) { |
|||
Dispatcher current = dispatchers.putIfAbsent(dispatcherId, new Dispatcher(dispatcherId, executor)); |
|||
if (current != null) { |
|||
throw new RuntimeException("Dispatcher with id [" + dispatcherId + "] is already registered!"); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public void destroyDispatcher(String dispatcherId) { |
|||
Dispatcher dispatcher = dispatchers.remove(dispatcherId); |
|||
if (dispatcher != null) { |
|||
dispatcher.getExecutor().shutdownNow(); |
|||
} else { |
|||
throw new RuntimeException("Dispatcher with id [" + dispatcherId + "] is not registered!"); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public TbActorId createRootActor(String dispatcherId, TbActorCreator creator) { |
|||
return createActor(dispatcherId, creator, null); |
|||
} |
|||
|
|||
@Override |
|||
public TbActorId createChildActor(String dispatcherId, TbActorCreator creator, TbActorId parent) { |
|||
return createActor(dispatcherId, creator, parent); |
|||
} |
|||
|
|||
private TbActorId createActor(String dispatcherId, TbActorCreator creator, TbActorId parent) { |
|||
Dispatcher dispatcher = dispatchers.get(dispatcherId); |
|||
if (dispatcher == null) { |
|||
log.warn("Dispatcher with id [{}] is not registered!", dispatcherId); |
|||
throw new RuntimeException("Dispatcher with id [" + dispatcherId + "] is not registered!"); |
|||
} |
|||
|
|||
TbActorId actorId = creator.createActorId(); |
|||
TbActorMailbox actorMailbox = actors.get(actorId); |
|||
if (actorMailbox != null) { |
|||
log.debug("Actor with id [{}] is already registered!", actorId); |
|||
} else { |
|||
Lock actorCreationLock = actorCreationLocks.computeIfAbsent(actorId, id -> new ReentrantLock()); |
|||
actorCreationLock.lock(); |
|||
try { |
|||
actorMailbox = actors.get(actorId); |
|||
if (actorMailbox == null) { |
|||
log.debug("Creating actor with id [{}]!", actorId); |
|||
TbActor actor = creator.createActor(); |
|||
TbActorMailbox mailbox = new TbActorMailbox(this, settings, actorId, parent, actor, dispatcher); |
|||
actors.put(actorId, mailbox); |
|||
mailbox.initActor(); |
|||
} else { |
|||
log.debug("Actor with id [{}] is already registered!", actorId); |
|||
} |
|||
} finally { |
|||
actorCreationLock.unlock(); |
|||
actorCreationLocks.remove(actorId); |
|||
} |
|||
} |
|||
return actorId; |
|||
} |
|||
|
|||
@Override |
|||
public void tell(TbActorId target, TbActorMsg actorMsg) { |
|||
TbActorMailbox mailbox = actors.get(target); |
|||
if (mailbox == null) { |
|||
throw new TbActorNotRegisteredException(target, "Actor with id [" + target + "] is not registered!"); |
|||
} |
|||
mailbox.enqueue(actorMsg); |
|||
} |
|||
|
|||
@Override |
|||
public void stop(TbActorId actorId) { |
|||
TbActorMailbox mailbox = actors.remove(actorId); |
|||
if (mailbox != null) { |
|||
mailbox.destroy(); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public void stop() { |
|||
dispatchers.values().forEach(dispatcher -> dispatcher.getExecutor().shutdownNow()); |
|||
actors.clear(); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,28 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors; |
|||
|
|||
import lombok.Data; |
|||
|
|||
import java.util.concurrent.ExecutorService; |
|||
|
|||
@Data |
|||
class Dispatcher { |
|||
|
|||
private final String dispatcherId; |
|||
private final ExecutorService executor; |
|||
|
|||
} |
|||
@ -0,0 +1,45 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors; |
|||
|
|||
import lombok.Getter; |
|||
import lombok.ToString; |
|||
|
|||
@ToString |
|||
public class InitFailureStrategy { |
|||
|
|||
@Getter |
|||
private boolean stop; |
|||
@Getter |
|||
private long retryDelay; |
|||
|
|||
private InitFailureStrategy(boolean stop, long retryDelay) { |
|||
this.stop = stop; |
|||
this.retryDelay = retryDelay; |
|||
} |
|||
|
|||
public static InitFailureStrategy retryImmediately() { |
|||
return new InitFailureStrategy(false, 0); |
|||
} |
|||
|
|||
public static InitFailureStrategy retryWithDelay(long ms) { |
|||
return new InitFailureStrategy(false, ms); |
|||
} |
|||
|
|||
public static InitFailureStrategy stop() { |
|||
return new InitFailureStrategy(true, 0); |
|||
} |
|||
} |
|||
@ -0,0 +1,38 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors; |
|||
|
|||
import lombok.Getter; |
|||
import lombok.ToString; |
|||
|
|||
@ToString |
|||
public class ProcessFailureStrategy { |
|||
|
|||
@Getter |
|||
private boolean stop; |
|||
|
|||
private ProcessFailureStrategy(boolean stop) { |
|||
this.stop = stop; |
|||
} |
|||
|
|||
public static ProcessFailureStrategy stop() { |
|||
return new ProcessFailureStrategy(true); |
|||
} |
|||
|
|||
public static ProcessFailureStrategy resume() { |
|||
return new ProcessFailureStrategy(false); |
|||
} |
|||
} |
|||
@ -0,0 +1,39 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors; |
|||
|
|||
import org.thingsboard.server.common.msg.TbActorMsg; |
|||
|
|||
public interface TbActor { |
|||
|
|||
void init(); |
|||
|
|||
boolean process(TbActorCtx ctx, TbActorMsg msg); |
|||
|
|||
void destroy(); |
|||
|
|||
default InitFailureStrategy onInitFailure(int attempt, Throwable t) { |
|||
return InitFailureStrategy.retryWithDelay(5000); |
|||
} |
|||
|
|||
default ProcessFailureStrategy onProcessFailure(Throwable t) { |
|||
if (t instanceof Error) { |
|||
return ProcessFailureStrategy.stop(); |
|||
} else { |
|||
return ProcessFailureStrategy.resume(); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,24 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors; |
|||
|
|||
public interface TbActorCreator { |
|||
|
|||
TbActorId createActorId(); |
|||
|
|||
TbActor createActor(); |
|||
|
|||
} |
|||
@ -0,0 +1,28 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors; |
|||
|
|||
import org.thingsboard.server.common.msg.TbActorMsg; |
|||
|
|||
public interface TbActorCtx { |
|||
|
|||
TbActorId getSelf(); |
|||
|
|||
TbActorId getParent(); |
|||
|
|||
void tell(TbActorId target, TbActorMsg actorMsg); |
|||
|
|||
} |
|||
@ -0,0 +1,47 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors; |
|||
|
|||
import org.thingsboard.server.common.data.id.EntityId; |
|||
|
|||
import java.util.Objects; |
|||
|
|||
public class TbActorId { |
|||
|
|||
private final EntityId entityId; |
|||
|
|||
public TbActorId(EntityId entityId) { |
|||
this.entityId = entityId; |
|||
} |
|||
|
|||
@Override |
|||
public String toString() { |
|||
return entityId.toString(); |
|||
} |
|||
|
|||
@Override |
|||
public boolean equals(Object o) { |
|||
if (this == o) return true; |
|||
if (o == null || getClass() != o.getClass()) return false; |
|||
TbActorId actorId = (TbActorId) o; |
|||
return entityId.equals(actorId.entityId); |
|||
} |
|||
|
|||
@Override |
|||
public int hashCode() { |
|||
return Objects.hash(entityId); |
|||
} |
|||
} |
|||
@ -0,0 +1,144 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors; |
|||
|
|||
import lombok.Data; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.thingsboard.server.common.msg.TbActorMsg; |
|||
|
|||
import java.util.concurrent.ConcurrentLinkedQueue; |
|||
import java.util.concurrent.TimeUnit; |
|||
import java.util.concurrent.atomic.AtomicBoolean; |
|||
|
|||
@Slf4j |
|||
@Data |
|||
public final class TbActorMailbox implements TbActorCtx { |
|||
private static final boolean FREE = false; |
|||
private static final boolean BUSY = true; |
|||
|
|||
private static final boolean NOT_READY = false; |
|||
private static final boolean READY = true; |
|||
|
|||
private final TbActorSystem system; |
|||
private final TbActorSystemSettings settings; |
|||
private final TbActorId selfId; |
|||
private final TbActorId parentId; |
|||
private final TbActor actor; |
|||
private final Dispatcher dispatcher; |
|||
private final ConcurrentLinkedQueue<TbActorMsg> msgs = new ConcurrentLinkedQueue<>(); |
|||
private final AtomicBoolean busy = new AtomicBoolean(FREE); |
|||
private final AtomicBoolean ready = new AtomicBoolean(NOT_READY); |
|||
private final AtomicBoolean destroyInProgress = new AtomicBoolean(); |
|||
|
|||
public void initActor() { |
|||
dispatcher.getExecutor().execute(() -> tryInit(1)); |
|||
} |
|||
|
|||
private void tryInit(int attempt) { |
|||
try { |
|||
log.debug("[{}] Trying to init actor, attempt: {}", selfId, attempt); |
|||
if (!destroyInProgress.get()) { |
|||
actor.init(); |
|||
if (!destroyInProgress.get()) { |
|||
ready.set(READY); |
|||
tryProcessQueue(false); |
|||
} |
|||
} |
|||
} catch (Throwable t) { |
|||
log.debug("[{}] Failed to init actor, attempt: {}", selfId, attempt, t); |
|||
int attemptIdx = attempt + 1; |
|||
InitFailureStrategy strategy = actor.onInitFailure(attempt, t); |
|||
if (strategy.isStop() || (settings.getMaxActorInitAttempts() > 0 && attemptIdx > settings.getMaxActorInitAttempts())) { |
|||
log.info("[{}] Failed to init actor, attempt {}, going to stop attempts.", selfId, attempt, t); |
|||
system.stop(selfId); |
|||
} else if (strategy.getRetryDelay() > 0) { |
|||
log.info("[{}] Failed to init actor, attempt {}, going to retry in attempts in {}ms", selfId, attempt, strategy.getRetryDelay(), t); |
|||
system.getScheduler().schedule(() -> dispatcher.getExecutor().execute(() -> tryInit(attemptIdx)), strategy.getRetryDelay(), TimeUnit.MILLISECONDS); |
|||
} else { |
|||
log.info("[{}] Failed to init actor, attempt {}, going to retry immediately", selfId, attempt, t); |
|||
dispatcher.getExecutor().execute(() -> tryInit(attemptIdx)); |
|||
} |
|||
} |
|||
} |
|||
|
|||
public void enqueue(TbActorMsg msg) { |
|||
msgs.add(msg); |
|||
tryProcessQueue(true); |
|||
} |
|||
|
|||
private void tryProcessQueue(boolean newMsg) { |
|||
if (ready.get() == READY && (newMsg || !msgs.isEmpty()) && busy.compareAndSet(FREE, BUSY)) { |
|||
dispatcher.getExecutor().execute(this::processMailbox); |
|||
} else { |
|||
log.trace("[{}] MessageBox is busy, new msg: {}", selfId, newMsg); |
|||
} |
|||
} |
|||
|
|||
private void processMailbox() { |
|||
boolean noMoreElements = false; |
|||
for (int i = 0; i < settings.getActorThroughput(); i++) { |
|||
TbActorMsg msg = msgs.poll(); |
|||
if (msg != null) { |
|||
try { |
|||
log.debug("[{}] Going to process message: {}", selfId, msg); |
|||
actor.process(this, msg); |
|||
} catch (Throwable t) { |
|||
log.debug("[{}] Failed to process message: {}", selfId, msg, t); |
|||
ProcessFailureStrategy strategy = actor.onProcessFailure(t); |
|||
if (strategy.isStop()) { |
|||
system.stop(selfId); |
|||
} |
|||
} |
|||
} else { |
|||
noMoreElements = true; |
|||
break; |
|||
} |
|||
} |
|||
if (noMoreElements) { |
|||
busy.set(FREE); |
|||
dispatcher.getExecutor().execute(() -> tryProcessQueue(false)); |
|||
} else { |
|||
dispatcher.getExecutor().execute(this::processMailbox); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public TbActorId getSelf() { |
|||
return selfId; |
|||
} |
|||
|
|||
@Override |
|||
public TbActorId getParent() { |
|||
return parentId; |
|||
} |
|||
|
|||
@Override |
|||
public void tell(TbActorId target, TbActorMsg actorMsg) { |
|||
system.tell(target, actorMsg); |
|||
} |
|||
|
|||
public void destroy() { |
|||
destroyInProgress.set(true); |
|||
dispatcher.getExecutor().execute(() -> { |
|||
try { |
|||
ready.set(NOT_READY); |
|||
actor.destroy(); |
|||
} catch (Throwable t) { |
|||
log.warn("[{}] Failed to destroy actor: {}", selfId, t); |
|||
} |
|||
}); |
|||
} |
|||
} |
|||
@ -0,0 +1,29 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors; |
|||
|
|||
import lombok.Getter; |
|||
|
|||
public class TbActorNotRegisteredException extends RuntimeException { |
|||
|
|||
@Getter |
|||
private TbActorId target; |
|||
|
|||
public TbActorNotRegisteredException(TbActorId target, String message) { |
|||
super(message); |
|||
this.target = target; |
|||
} |
|||
} |
|||
@ -0,0 +1,41 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors; |
|||
|
|||
import org.thingsboard.server.common.msg.TbActorMsg; |
|||
|
|||
import java.util.concurrent.ExecutorService; |
|||
import java.util.concurrent.ScheduledExecutorService; |
|||
|
|||
public interface TbActorSystem { |
|||
|
|||
ScheduledExecutorService getScheduler(); |
|||
|
|||
void createDispatcher(String dispatcherId, ExecutorService executor); |
|||
|
|||
void destroyDispatcher(String dispatcherId); |
|||
|
|||
TbActorId createRootActor(String dispatcherId, TbActorCreator creator); |
|||
|
|||
TbActorId createChildActor(String dispatcherId, TbActorCreator creator, TbActorId parent); |
|||
|
|||
void tell(TbActorId target, TbActorMsg actorMsg); |
|||
|
|||
void stop(TbActorId actorId); |
|||
|
|||
void stop(); |
|||
|
|||
} |
|||
@ -0,0 +1,27 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors; |
|||
|
|||
import lombok.Data; |
|||
|
|||
@Data |
|||
public class TbActorSystemSettings { |
|||
|
|||
private final int actorThroughput; |
|||
private final int schedulerPoolSize; |
|||
private final int maxActorInitAttempts; |
|||
|
|||
} |
|||
@ -0,0 +1,178 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors; |
|||
|
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.junit.After; |
|||
import org.junit.Assert; |
|||
import org.junit.Before; |
|||
import org.junit.Test; |
|||
import org.junit.runner.RunWith; |
|||
import org.mockito.runners.MockitoJUnitRunner; |
|||
import org.thingsboard.server.common.data.id.DeviceId; |
|||
|
|||
import java.util.ArrayList; |
|||
import java.util.List; |
|||
import java.util.Random; |
|||
import java.util.UUID; |
|||
import java.util.concurrent.CountDownLatch; |
|||
import java.util.concurrent.ExecutorService; |
|||
import java.util.concurrent.Executors; |
|||
import java.util.concurrent.TimeUnit; |
|||
import java.util.concurrent.atomic.AtomicInteger; |
|||
import java.util.concurrent.atomic.AtomicLong; |
|||
|
|||
@Slf4j |
|||
@RunWith(MockitoJUnitRunner.class) |
|||
public class ActorSystemTest { |
|||
|
|||
public static final String ROOT_DISPATCHER = "root-dispatcher"; |
|||
private static final int _1M = 1024 * 1024; |
|||
|
|||
private TbActorSystem actorSystem; |
|||
private ExecutorService submitPool; |
|||
|
|||
@Before |
|||
public void initActorSystem() { |
|||
int cores = Runtime.getRuntime().availableProcessors(); |
|||
int parallelism = Math.max(1, cores / 2); |
|||
TbActorSystemSettings settings = new TbActorSystemSettings(5, parallelism, 42); |
|||
actorSystem = new DefaultTbActorSystem(settings); |
|||
submitPool = Executors.newWorkStealingPool(parallelism); |
|||
actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism)); |
|||
} |
|||
|
|||
@After |
|||
public void shutdownActorSystem() { |
|||
actorSystem.stop(); |
|||
submitPool.shutdownNow(); |
|||
} |
|||
|
|||
@Test |
|||
public void test10actorsAnd1MMessages() throws InterruptedException { |
|||
testActorsAndMessages(10, _1M); |
|||
} |
|||
|
|||
@Test |
|||
public void test1MActorsAnd10Messages() throws InterruptedException { |
|||
testActorsAndMessages(_1M, 10); |
|||
} |
|||
|
|||
@Test |
|||
public void test1KActorsAnd1KMessages() throws InterruptedException { |
|||
testActorsAndMessages(1000, 1000); |
|||
} |
|||
|
|||
@Test |
|||
public void testNoMessagesAfterDestroy() throws InterruptedException { |
|||
ActorTestCtx testCtx1 = getActorTestCtx(1); |
|||
ActorTestCtx testCtx2 = getActorTestCtx(1); |
|||
|
|||
TbActorId actorId1 = actorSystem.createRootActor(ROOT_DISPATCHER, new SlowInitActor.SlowInitActorCreator( |
|||
new TbActorId(new DeviceId(UUID.randomUUID())), testCtx1)); |
|||
TbActorId actorId2 = actorSystem.createRootActor(ROOT_DISPATCHER, new SlowInitActor.SlowInitActorCreator( |
|||
new TbActorId(new DeviceId(UUID.randomUUID())), testCtx2)); |
|||
|
|||
actorSystem.tell(actorId1, new IntTbActorMsg(42)); |
|||
actorSystem.tell(actorId2, new IntTbActorMsg(42)); |
|||
actorSystem.stop(actorId1); |
|||
|
|||
Assert.assertTrue(testCtx2.getLatch().await(1, TimeUnit.SECONDS)); |
|||
Assert.assertFalse(testCtx1.getLatch().await(2, TimeUnit.SECONDS)); |
|||
} |
|||
|
|||
@Test |
|||
public void testOneActorCreated() throws InterruptedException { |
|||
ActorTestCtx testCtx1 = getActorTestCtx(1); |
|||
ActorTestCtx testCtx2 = getActorTestCtx(1); |
|||
TbActorId actorId = new TbActorId(new DeviceId(UUID.randomUUID())); |
|||
submitPool.submit(() -> actorSystem.createRootActor(ROOT_DISPATCHER, new SlowCreateActor.SlowCreateActorCreator(actorId, testCtx1))); |
|||
submitPool.submit(() -> actorSystem.createRootActor(ROOT_DISPATCHER, new SlowCreateActor.SlowCreateActorCreator(actorId, testCtx2))); |
|||
|
|||
Thread.sleep(1000); |
|||
actorSystem.tell(actorId, new IntTbActorMsg(42)); |
|||
|
|||
Assert.assertTrue(testCtx1.getLatch().await(3, TimeUnit.SECONDS)); |
|||
Assert.assertFalse(testCtx2.getLatch().await(3, TimeUnit.SECONDS)); |
|||
} |
|||
|
|||
@Test |
|||
public void testActorCreatorCalledOnce() throws InterruptedException { |
|||
ActorTestCtx testCtx = getActorTestCtx(1); |
|||
TbActorId actorId = new TbActorId(new DeviceId(UUID.randomUUID())); |
|||
for(int i =0; i < 1000; i++) { |
|||
submitPool.submit(() -> actorSystem.createRootActor(ROOT_DISPATCHER, new SlowCreateActor.SlowCreateActorCreator(actorId, testCtx))); |
|||
} |
|||
Thread.sleep(1000); |
|||
actorSystem.tell(actorId, new IntTbActorMsg(42)); |
|||
|
|||
Assert.assertTrue(testCtx.getLatch().await(1, TimeUnit.SECONDS)); |
|||
//One for creation and one for message
|
|||
Assert.assertEquals(2, testCtx.getInvocationCount().get()); |
|||
} |
|||
|
|||
|
|||
public void testActorsAndMessages(int actorsCount, int msgNumber) throws InterruptedException { |
|||
Random random = new Random(); |
|||
int[] randomIntegers = new int[msgNumber]; |
|||
long sumTmp = 0; |
|||
for (int i = 0; i < msgNumber; i++) { |
|||
int tmp = random.nextInt(); |
|||
randomIntegers[i] = tmp; |
|||
sumTmp += tmp; |
|||
} |
|||
long expected = sumTmp; |
|||
|
|||
List<ActorTestCtx> testCtxes = new ArrayList<>(); |
|||
|
|||
List<TbActorId> actorIds = new ArrayList<>(); |
|||
for (int actorIdx = 0; actorIdx < actorsCount; actorIdx++) { |
|||
ActorTestCtx testCtx = getActorTestCtx(msgNumber); |
|||
|
|||
actorIds.add(actorSystem.createRootActor(ROOT_DISPATCHER, new TestRootActor.TestRootActorCreator( |
|||
new TbActorId(new DeviceId(UUID.randomUUID())), testCtx))); |
|||
testCtxes.add(testCtx); |
|||
} |
|||
|
|||
long start = System.nanoTime(); |
|||
|
|||
for (int i = 0; i < msgNumber; i++) { |
|||
int tmp = randomIntegers[i]; |
|||
submitPool.execute(() -> actorIds.forEach(actorId -> actorSystem.tell(actorId, new IntTbActorMsg(tmp)))); |
|||
} |
|||
log.info("Submitted all messages"); |
|||
|
|||
testCtxes.forEach(ctx -> { |
|||
try { |
|||
Assert.assertTrue(ctx.getLatch().await(1, TimeUnit.MINUTES)); |
|||
Assert.assertEquals(expected, ctx.getActual().get()); |
|||
Assert.assertEquals(msgNumber, ctx.getInvocationCount().get()); |
|||
} catch (InterruptedException e) { |
|||
e.printStackTrace(); |
|||
} |
|||
}); |
|||
|
|||
long duration = System.nanoTime() - start; |
|||
log.info("Time spend: {}ns ({} ms)", duration, TimeUnit.NANOSECONDS.toMillis(duration)); |
|||
} |
|||
|
|||
private ActorTestCtx getActorTestCtx(int i) { |
|||
CountDownLatch countDownLatch = new CountDownLatch(1); |
|||
AtomicLong actual = new AtomicLong(); |
|||
AtomicInteger invocations = new AtomicInteger(); |
|||
return new ActorTestCtx(countDownLatch, invocations, i, actual); |
|||
} |
|||
} |
|||
@ -0,0 +1,31 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors; |
|||
|
|||
import lombok.Data; |
|||
|
|||
import java.util.concurrent.CountDownLatch; |
|||
import java.util.concurrent.atomic.AtomicInteger; |
|||
import java.util.concurrent.atomic.AtomicLong; |
|||
|
|||
@Data |
|||
public class ActorTestCtx { |
|||
|
|||
private final CountDownLatch latch; |
|||
private final AtomicInteger invocationCount; |
|||
private final int expectedInvocationCount; |
|||
private final AtomicLong actual; |
|||
} |
|||
@ -0,0 +1,35 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors; |
|||
|
|||
import lombok.Getter; |
|||
import org.thingsboard.server.common.msg.MsgType; |
|||
import org.thingsboard.server.common.msg.TbActorMsg; |
|||
|
|||
public class IntTbActorMsg implements TbActorMsg { |
|||
|
|||
@Getter |
|||
private final int value; |
|||
|
|||
public IntTbActorMsg(int value) { |
|||
this.value = value; |
|||
} |
|||
|
|||
@Override |
|||
public MsgType getMsgType() { |
|||
return MsgType.QUEUE_TO_RULE_ENGINE_MSG; |
|||
} |
|||
} |
|||
@ -0,0 +1,53 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors; |
|||
|
|||
import lombok.extern.slf4j.Slf4j; |
|||
|
|||
@Slf4j |
|||
public class SlowCreateActor extends TestRootActor { |
|||
|
|||
public SlowCreateActor(TbActorId actorId, ActorTestCtx testCtx) { |
|||
super(actorId, testCtx); |
|||
try { |
|||
Thread.sleep(500); |
|||
} catch (InterruptedException e) { |
|||
e.printStackTrace(); |
|||
} |
|||
testCtx.getInvocationCount().incrementAndGet(); |
|||
} |
|||
|
|||
public static class SlowCreateActorCreator implements TbActorCreator { |
|||
|
|||
private final TbActorId actorId; |
|||
private final ActorTestCtx testCtx; |
|||
|
|||
public SlowCreateActorCreator(TbActorId actorId, ActorTestCtx testCtx) { |
|||
this.actorId = actorId; |
|||
this.testCtx = testCtx; |
|||
} |
|||
|
|||
@Override |
|||
public TbActorId createActorId() { |
|||
return actorId; |
|||
} |
|||
|
|||
@Override |
|||
public TbActor createActor() { |
|||
return new SlowCreateActor(actorId, testCtx); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,57 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors; |
|||
|
|||
import lombok.extern.slf4j.Slf4j; |
|||
|
|||
@Slf4j |
|||
public class SlowInitActor extends TestRootActor { |
|||
|
|||
public SlowInitActor(TbActorId actorId, ActorTestCtx testCtx) { |
|||
super(actorId, testCtx); |
|||
} |
|||
|
|||
@Override |
|||
public void init() { |
|||
try { |
|||
Thread.sleep(500); |
|||
} catch (InterruptedException e) { |
|||
e.printStackTrace(); |
|||
} |
|||
super.init(); |
|||
} |
|||
|
|||
public static class SlowInitActorCreator implements TbActorCreator { |
|||
|
|||
private final TbActorId actorId; |
|||
private final ActorTestCtx testCtx; |
|||
|
|||
public SlowInitActorCreator(TbActorId actorId, ActorTestCtx testCtx) { |
|||
this.actorId = actorId; |
|||
this.testCtx = testCtx; |
|||
} |
|||
|
|||
@Override |
|||
public TbActorId createActorId() { |
|||
return actorId; |
|||
} |
|||
|
|||
@Override |
|||
public TbActor createActor() { |
|||
return new SlowInitActor(actorId, testCtx); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,84 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors; |
|||
|
|||
import lombok.Getter; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.thingsboard.server.common.msg.TbActorMsg; |
|||
|
|||
@Slf4j |
|||
public class TestRootActor implements TbActor { |
|||
|
|||
@Getter |
|||
private final TbActorId actorId; |
|||
@Getter |
|||
private final ActorTestCtx testCtx; |
|||
|
|||
private boolean initialized; |
|||
private long sum; |
|||
private int count; |
|||
|
|||
public TestRootActor(TbActorId actorId, ActorTestCtx testCtx) { |
|||
this.actorId = actorId; |
|||
this.testCtx = testCtx; |
|||
} |
|||
|
|||
@Override |
|||
public void init() { |
|||
initialized = true; |
|||
} |
|||
|
|||
@Override |
|||
public boolean process(TbActorCtx ctx, TbActorMsg msg) { |
|||
if (initialized) { |
|||
int value = ((IntTbActorMsg) msg).getValue(); |
|||
sum += value; |
|||
count += 1; |
|||
if (count == testCtx.getExpectedInvocationCount()) { |
|||
testCtx.getActual().set(sum); |
|||
testCtx.getInvocationCount().addAndGet(count); |
|||
testCtx.getLatch().countDown(); |
|||
} |
|||
} |
|||
return true; |
|||
} |
|||
|
|||
@Override |
|||
public void destroy() { |
|||
|
|||
} |
|||
|
|||
public static class TestRootActorCreator implements TbActorCreator { |
|||
|
|||
private final TbActorId actorId; |
|||
private final ActorTestCtx testCtx; |
|||
|
|||
public TestRootActorCreator(TbActorId actorId, ActorTestCtx testCtx) { |
|||
this.actorId = actorId; |
|||
this.testCtx = testCtx; |
|||
} |
|||
|
|||
@Override |
|||
public TbActorId createActorId() { |
|||
return actorId; |
|||
} |
|||
|
|||
@Override |
|||
public TbActor createActor() { |
|||
return new TestRootActor(actorId, testCtx); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,14 @@ |
|||
<?xml version="1.0" encoding="UTF-8" ?> |
|||
|
|||
<configuration> |
|||
<appender name="console" class="ch.qos.logback.core.ConsoleAppender"> |
|||
<encoder> |
|||
<pattern>%d{ISO8601} [%thread] %-5level %logger{36} - %msg%n</pattern> |
|||
</encoder> |
|||
</appender> |
|||
|
|||
<root level="INFO"> |
|||
<appender-ref ref="console"/> |
|||
</root> |
|||
|
|||
</configuration> |
|||
Loading…
Reference in new issue