@ -31,7 +31,10 @@ import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.queue.PartitionChangeMsg ;
import org.thingsboard.server.common.msg.queue.RuleNodeException ;
import org.thingsboard.server.common.msg.queue.RuleNodeInfo ;
import org.thingsboard.server.common.msg.queue.ServiceType ;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo ;
import org.thingsboard.server.common.stats.TbApiUsageReportClient ;
import org.thingsboard.server.gen.transport.TransportProtos ;
/ * *
* @author Andrew Shvayka
@ -57,29 +60,37 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod
@Override
public void start ( TbActorCtx context ) throws Exception {
//TODO: do not start the node if singleton
tbNode = initComponent ( ruleNode ) ;
if ( tbNode ! = null ) {
state = ComponentLifecycleState . ACTIVE ;
if ( isMyNode ( ) ) {
tbNode = initComponent ( ruleNode ) ;
if ( tbNode ! = null ) {
state = ComponentLifecycleState . ACTIVE ;
}
}
}
@Override
public void onUpdate ( TbActorCtx context ) throws Exception {
RuleNode newRuleNode = systemContext . getRuleChainService ( ) . findRuleNodeById ( tenantId , entityId ) ;
this . info = new RuleNodeInfo ( entityId , ruleChainName , newRuleNode ! = null ? newRuleNode . getName ( ) : "Unknown" ) ;
boolean restartRequired = state ! = ComponentLifecycleState . ACTIVE | |
! ( ruleNode . getType ( ) . equals ( newRuleNode . getType ( ) ) & & ruleNode . getConfiguration ( ) . equals ( newRuleNode . getConfiguration ( ) ) ) ;
this . ruleNode = newRuleNode ;
this . defaultCtx . updateSelf ( newRuleNode ) ;
if ( restartRequired ) {
if ( isMyNode ( ) ) {
RuleNode newRuleNode = systemContext . getRuleChainService ( ) . findRuleNodeById ( tenantId , entityId ) ;
this . info = new RuleNodeInfo ( entityId , ruleChainName , newRuleNode ! = null ? newRuleNode . getName ( ) : "Unknown" ) ;
boolean restartRequired = state ! = ComponentLifecycleState . ACTIVE | |
! ( ruleNode . getType ( ) . equals ( newRuleNode . getType ( ) ) & & ruleNode . getConfiguration ( ) . equals ( newRuleNode . getConfiguration ( ) ) ) ;
this . ruleNode = newRuleNode ;
this . defaultCtx . updateSelf ( newRuleNode ) ;
if ( restartRequired ) {
if ( tbNode ! = null ) {
tbNode . destroy ( ) ;
}
try {
start ( context ) ;
} catch ( Exception e ) {
throw new TbRuleNodeUpdateException ( "Failed to update rule node" , e ) ;
}
}
} else {
if ( tbNode ! = null ) {
tbNode . destroy ( ) ;
}
try {
start ( context ) ;
} catch ( Exception e ) {
throw new TbRuleNodeUpdateException ( "Failed to update rule node" , e ) ;
tbNode = null ;
}
}
}
@ -93,15 +104,20 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod
}
@Override
public void onPartitionChangeMsg ( PartitionChangeMsg msg ) {
//TODO: start the node
public void onPartitionChangeMsg ( PartitionChangeMsg msg ) throws Exception {
if ( tbNode ! = null ) {
tbNode . onPartitionChangeMsg ( defaultCtx , msg ) ;
if ( ! isMyNode ( ) ) {
tbNode . destroy ( ) ;
tbNode = null ;
} else {
tbNode . onPartitionChangeMsg ( defaultCtx , msg ) ;
}
} else if ( isMyNode ( ) ) {
start ( null ) ;
}
}
public void onRuleToSelfMsg ( RuleNodeToSelfMsg msg ) throws Exception {
// TODO: check that the rule node is singleton and use putToQueue
checkComponentStateActive ( msg . getMsg ( ) ) ;
TbMsg tbMsg = msg . getMsg ( ) ;
int ruleNodeCount = tbMsg . getAndIncrementRuleNodeCounter ( ) ;
@ -122,23 +138,27 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod
}
void onRuleChainToRuleNodeMsg ( RuleChainToRuleNodeMsg msg ) throws Exception {
msg . getMsg ( ) . getCallback ( ) . onProcessingStart ( info ) ;
checkComponentStateActive ( msg . getMsg ( ) ) ;
TbMsg tbMsg = msg . getMsg ( ) ;
int ruleNodeCount = tbMsg . getAndIncrementRuleNodeCounter ( ) ;
int maxRuleNodeExecutionsPerMessage = getTenantProfileConfiguration ( ) . getMaxRuleNodeExecsPerMessage ( ) ;
if ( maxRuleNodeExecutionsPerMessage = = 0 | | ruleNodeCount < maxRuleNodeExecutionsPerMessage ) {
apiUsageClient . report ( tenantId , tbMsg . getCustomerId ( ) , ApiUsageRecordKey . RE_EXEC_COUNT ) ;
if ( ruleNode . isDebugMode ( ) ) {
systemContext . persistDebugInput ( tenantId , entityId , msg . getMsg ( ) , msg . getFromRelationType ( ) ) ;
}
try {
tbNode . onMsg ( msg . getCtx ( ) , msg . getMsg ( ) ) ;
} catch ( Exception e ) {
msg . getCtx ( ) . tellFailure ( msg . getMsg ( ) , e ) ;
}
if ( ! isMyNode ( ) ) {
putToQueue ( msg . getMsg ( ) ) ;
} else {
tbMsg . getCallback ( ) . onFailure ( new RuleNodeException ( "Message is processed by more then " + maxRuleNodeExecutionsPerMessage + " rule nodes!" , ruleChainName , ruleNode ) ) ;
msg . getMsg ( ) . getCallback ( ) . onProcessingStart ( info ) ;
checkComponentStateActive ( msg . getMsg ( ) ) ;
TbMsg tbMsg = msg . getMsg ( ) ;
int ruleNodeCount = tbMsg . getAndIncrementRuleNodeCounter ( ) ;
int maxRuleNodeExecutionsPerMessage = getTenantProfileConfiguration ( ) . getMaxRuleNodeExecsPerMessage ( ) ;
if ( maxRuleNodeExecutionsPerMessage = = 0 | | ruleNodeCount < maxRuleNodeExecutionsPerMessage ) {
apiUsageClient . report ( tenantId , tbMsg . getCustomerId ( ) , ApiUsageRecordKey . RE_EXEC_COUNT ) ;
if ( ruleNode . isDebugMode ( ) ) {
systemContext . persistDebugInput ( tenantId , entityId , msg . getMsg ( ) , msg . getFromRelationType ( ) ) ;
}
try {
tbNode . onMsg ( msg . getCtx ( ) , msg . getMsg ( ) ) ;
} catch ( Exception e ) {
msg . getCtx ( ) . tellFailure ( msg . getMsg ( ) , e ) ;
}
} else {
tbMsg . getCallback ( ) . onFailure ( new RuleNodeException ( "Message is processed by more then " + maxRuleNodeExecutionsPerMessage + " rule nodes!" , ruleChainName , ruleNode ) ) ;
}
}
}
@ -161,4 +181,23 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod
protected RuleNodeException getInactiveException ( ) {
return new RuleNodeException ( "Rule Node is not active! Failed to initialize." , ruleChainName , ruleNode ) ;
}
private boolean isMyNode ( ) {
return systemContext . getDiscoveryService ( ) . isMonolith ( )
| | ! ruleNode . isSingletonMode ( )
// || !ruleNode.getName().equals("singleton")
| | defaultCtx . isLocalEntity ( ruleNode . getId ( ) ) ;
}
private void putToQueue ( TbMsg source ) {
TbMsg tbMsg = TbMsg . newMsg ( source , source . getQueueName ( ) , source . getRuleChainId ( ) , entityId ) ;
TopicPartitionInfo tpi = systemContext . resolve ( ServiceType . TB_RULE_ENGINE , tbMsg . getQueueName ( ) , tenantId , entityId ) ;
TransportProtos . ToRuleEngineMsg toQueueMsg = TransportProtos . ToRuleEngineMsg . newBuilder ( )
. setTenantIdMSB ( tenantId . getId ( ) . getMostSignificantBits ( ) )
. setTenantIdLSB ( tenantId . getId ( ) . getLeastSignificantBits ( ) )
. setTbMsg ( TbMsg . toByteString ( tbMsg ) )
. build ( ) ;
systemContext . getClusterService ( ) . pushMsgToRuleEngine ( tpi , tbMsg . getId ( ) , toQueueMsg , null ) ;
defaultCtx . ack ( source ) ;
}
}