|
|
|
@ -36,6 +36,7 @@ import org.springframework.stereotype.Component; |
|
|
|
import org.thingsboard.rule.engine.api.MailService; |
|
|
|
import org.thingsboard.rule.engine.api.RuleChainTransactionService; |
|
|
|
import org.thingsboard.server.actors.service.ActorService; |
|
|
|
import org.thingsboard.server.actors.tenant.DebugTbRateLimits; |
|
|
|
import org.thingsboard.server.common.data.DataConstants; |
|
|
|
import org.thingsboard.server.common.data.Event; |
|
|
|
import org.thingsboard.server.common.data.id.EntityId; |
|
|
|
@ -43,6 +44,7 @@ import org.thingsboard.server.common.data.id.TenantId; |
|
|
|
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; |
|
|
|
import org.thingsboard.server.common.msg.TbMsg; |
|
|
|
import org.thingsboard.server.common.msg.cluster.ServerAddress; |
|
|
|
import org.thingsboard.server.common.msg.tools.TbRateLimits; |
|
|
|
import org.thingsboard.server.common.transport.auth.DeviceAuthService; |
|
|
|
import org.thingsboard.server.dao.alarm.AlarmService; |
|
|
|
import org.thingsboard.server.dao.asset.AssetService; |
|
|
|
@ -84,6 +86,8 @@ import java.io.IOException; |
|
|
|
import java.io.PrintWriter; |
|
|
|
import java.io.StringWriter; |
|
|
|
import java.util.Optional; |
|
|
|
import java.util.concurrent.ConcurrentHashMap; |
|
|
|
import java.util.concurrent.ConcurrentMap; |
|
|
|
|
|
|
|
@Slf4j |
|
|
|
@Component |
|
|
|
@ -92,6 +96,12 @@ public class ActorSystemContext { |
|
|
|
|
|
|
|
protected final ObjectMapper mapper = new ObjectMapper(); |
|
|
|
|
|
|
|
private final ConcurrentMap<TenantId, DebugTbRateLimits> debugPerTenantLimits = new ConcurrentHashMap<>(); |
|
|
|
|
|
|
|
public ConcurrentMap<TenantId, DebugTbRateLimits> getDebugPerTenantLimits() { |
|
|
|
return debugPerTenantLimits; |
|
|
|
} |
|
|
|
|
|
|
|
@Getter |
|
|
|
@Setter |
|
|
|
private ActorService actorService; |
|
|
|
@ -291,6 +301,14 @@ public class ActorSystemContext { |
|
|
|
@Getter |
|
|
|
private long sessionReportTimeout; |
|
|
|
|
|
|
|
@Value("${actors.rule.chain.debug_mode_rate_limits_per_tenant.enabled}") |
|
|
|
@Getter |
|
|
|
private boolean debugPerTenantEnabled; |
|
|
|
|
|
|
|
@Value("${actors.rule.chain.debug_mode_rate_limits_per_tenant.configuration}") |
|
|
|
@Getter |
|
|
|
private String debugPerTenantLimitsConfiguration; |
|
|
|
|
|
|
|
@Getter |
|
|
|
@Setter |
|
|
|
private ActorSystem actorSystem; |
|
|
|
@ -318,8 +336,6 @@ public class ActorSystemContext { |
|
|
|
@Getter |
|
|
|
private CassandraBufferedRateExecutor cassandraBufferedRateExecutor; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public ActorSystemContext() { |
|
|
|
config = ConfigFactory.parseResources(AKKA_CONF_FILE_NAME).withFallback(ConfigFactory.load()); |
|
|
|
} |
|
|
|
@ -392,46 +408,97 @@ public class ActorSystemContext { |
|
|
|
} |
|
|
|
|
|
|
|
private void persistDebugAsync(TenantId tenantId, EntityId entityId, String type, TbMsg tbMsg, String relationType, Throwable error) { |
|
|
|
try { |
|
|
|
Event event = new Event(); |
|
|
|
event.setTenantId(tenantId); |
|
|
|
event.setEntityId(entityId); |
|
|
|
event.setType(DataConstants.DEBUG_RULE_NODE); |
|
|
|
|
|
|
|
String metadata = mapper.writeValueAsString(tbMsg.getMetaData().getData()); |
|
|
|
|
|
|
|
ObjectNode node = mapper.createObjectNode() |
|
|
|
.put("type", type) |
|
|
|
.put("server", getServerAddress()) |
|
|
|
.put("entityId", tbMsg.getOriginator().getId().toString()) |
|
|
|
.put("entityName", tbMsg.getOriginator().getEntityType().name()) |
|
|
|
.put("msgId", tbMsg.getId().toString()) |
|
|
|
.put("msgType", tbMsg.getType()) |
|
|
|
.put("dataType", tbMsg.getDataType().name()) |
|
|
|
.put("relationType", relationType) |
|
|
|
.put("data", tbMsg.getData()) |
|
|
|
.put("metadata", metadata); |
|
|
|
|
|
|
|
if (error != null) { |
|
|
|
node = node.put("error", toString(error)); |
|
|
|
if (checkLimits(tenantId, tbMsg, error)) { |
|
|
|
try { |
|
|
|
Event event = new Event(); |
|
|
|
event.setTenantId(tenantId); |
|
|
|
event.setEntityId(entityId); |
|
|
|
event.setType(DataConstants.DEBUG_RULE_NODE); |
|
|
|
|
|
|
|
String metadata = mapper.writeValueAsString(tbMsg.getMetaData().getData()); |
|
|
|
|
|
|
|
ObjectNode node = mapper.createObjectNode() |
|
|
|
.put("type", type) |
|
|
|
.put("server", getServerAddress()) |
|
|
|
.put("entityId", tbMsg.getOriginator().getId().toString()) |
|
|
|
.put("entityName", tbMsg.getOriginator().getEntityType().name()) |
|
|
|
.put("msgId", tbMsg.getId().toString()) |
|
|
|
.put("msgType", tbMsg.getType()) |
|
|
|
.put("dataType", tbMsg.getDataType().name()) |
|
|
|
.put("relationType", relationType) |
|
|
|
.put("data", tbMsg.getData()) |
|
|
|
.put("metadata", metadata); |
|
|
|
|
|
|
|
if (error != null) { |
|
|
|
node = node.put("error", toString(error)); |
|
|
|
} |
|
|
|
|
|
|
|
event.setBody(node); |
|
|
|
ListenableFuture<Event> future = eventService.saveAsync(event); |
|
|
|
Futures.addCallback(future, new FutureCallback<Event>() { |
|
|
|
@Override |
|
|
|
public void onSuccess(@Nullable Event event) { |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public void onFailure(Throwable th) { |
|
|
|
log.error("Could not save debug Event for Node", th); |
|
|
|
} |
|
|
|
}); |
|
|
|
} catch (IOException ex) { |
|
|
|
log.warn("Failed to persist rule node debug message", ex); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
event.setBody(node); |
|
|
|
ListenableFuture<Event> future = eventService.saveAsync(event); |
|
|
|
Futures.addCallback(future, new FutureCallback<Event>() { |
|
|
|
@Override |
|
|
|
public void onSuccess(@Nullable Event event) { |
|
|
|
private boolean checkLimits(TenantId tenantId, TbMsg tbMsg, Throwable error) { |
|
|
|
if (debugPerTenantEnabled) { |
|
|
|
DebugTbRateLimits debugTbRateLimits = debugPerTenantLimits.computeIfAbsent(tenantId, id -> |
|
|
|
new DebugTbRateLimits(new TbRateLimits(debugPerTenantLimitsConfiguration), false)); |
|
|
|
|
|
|
|
if (!debugTbRateLimits.getTbRateLimits().tryConsume()) { |
|
|
|
if (!debugTbRateLimits.isRuleChainEventSaved()) { |
|
|
|
persistRuleChainDebugModeEvent(tenantId, tbMsg.getRuleChainId(), error); |
|
|
|
debugTbRateLimits.setRuleChainEventSaved(true); |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public void onFailure(Throwable th) { |
|
|
|
log.error("Could not save debug Event for Node", th); |
|
|
|
if (log.isTraceEnabled()) { |
|
|
|
log.trace("[{}] Tenant level debug mode rate limit detected: {}", tenantId, tbMsg); |
|
|
|
} |
|
|
|
}); |
|
|
|
} catch (IOException ex) { |
|
|
|
log.warn("Failed to persist rule node debug message", ex); |
|
|
|
return false; |
|
|
|
} |
|
|
|
} |
|
|
|
return true; |
|
|
|
} |
|
|
|
|
|
|
|
private void persistRuleChainDebugModeEvent(TenantId tenantId, EntityId entityId, Throwable error) { |
|
|
|
Event event = new Event(); |
|
|
|
event.setTenantId(tenantId); |
|
|
|
event.setEntityId(entityId); |
|
|
|
event.setType(DataConstants.DEBUG_RULE_CHAIN); |
|
|
|
|
|
|
|
ObjectNode node = mapper.createObjectNode() |
|
|
|
//todo: what fields are needed here?
|
|
|
|
.put("server", getServerAddress()) |
|
|
|
.put("message", "Reached debug mode rate limit!"); |
|
|
|
|
|
|
|
if (error != null) { |
|
|
|
node = node.put("error", toString(error)); |
|
|
|
} |
|
|
|
|
|
|
|
event.setBody(node); |
|
|
|
ListenableFuture<Event> future = eventService.saveAsync(event); |
|
|
|
Futures.addCallback(future, new FutureCallback<Event>() { |
|
|
|
@Override |
|
|
|
public void onSuccess(@Nullable Event event) { |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public void onFailure(Throwable th) { |
|
|
|
log.error("Could not save debug Event for Rule Chain", th); |
|
|
|
} |
|
|
|
}); |
|
|
|
} |
|
|
|
|
|
|
|
public static Exception toException(Throwable error) { |
|
|
|
|