@ -43,7 +43,7 @@ import java.util.UUID;
@Slf4j
public final class TbMsg implements Serializable {
private final QueueId queueId ;
private final String queueName ;
private final UUID id ;
private final long ts ;
private final String type ;
@ -67,12 +67,12 @@ public final class TbMsg implements Serializable {
return ctx . getAndIncrementRuleNodeCounter ( ) ;
}
public static TbMsg newMsg ( QueueId queueId , String type , EntityId originator , TbMsgMetaData metaData , String data , RuleChainId ruleChainId , RuleNodeId ruleNodeId ) {
return newMsg ( queueId , type , originator , null , metaData , data , ruleChainId , ruleNodeId ) ;
public static TbMsg newMsg ( String queueName , String type , EntityId originator , TbMsgMetaData metaData , String data , RuleChainId ruleChainId , RuleNodeId ruleNodeId ) {
return newMsg ( queueName , type , originator , null , metaData , data , ruleChainId , ruleNodeId ) ;
}
public static TbMsg newMsg ( QueueId queueId , String type , EntityId originator , CustomerId customerId , TbMsgMetaData metaData , String data , RuleChainId ruleChainId , RuleNodeId ruleNodeId ) {
return new TbMsg ( queueId , UUID . randomUUID ( ) , System . currentTimeMillis ( ) , type , originator , customerId ,
public static TbMsg newMsg ( String queueName , String type , EntityId originator , CustomerId customerId , TbMsgMetaData metaData , String data , RuleChainId ruleChainId , RuleNodeId ruleNodeId ) {
return new TbMsg ( queueName , UUID . randomUUID ( ) , System . currentTimeMillis ( ) , type , originator , customerId ,
metaData . copy ( ) , TbMsgDataType . JSON , data , ruleChainId , ruleNodeId , null , TbMsgCallback . EMPTY ) ;
}
@ -87,12 +87,12 @@ public final class TbMsg implements Serializable {
// REALLY NEW MSG
public static TbMsg newMsg ( QueueId queueId , String type , EntityId originator , TbMsgMetaData metaData , String data ) {
return newMsg ( queueId , type , originator , null , metaData , data ) ;
public static TbMsg newMsg ( String queueName , String type , EntityId originator , TbMsgMetaData metaData , String data ) {
return newMsg ( queueName , type , originator , null , metaData , data ) ;
}
public static TbMsg newMsg ( QueueId queueId , String type , EntityId originator , CustomerId customerId , TbMsgMetaData metaData , String data ) {
return new TbMsg ( queueId , UUID . randomUUID ( ) , System . currentTimeMillis ( ) , type , originator , customerId ,
public static TbMsg newMsg ( String queueName , String type , EntityId originator , CustomerId customerId , TbMsgMetaData metaData , String data ) {
return new TbMsg ( queueName , UUID . randomUUID ( ) , System . currentTimeMillis ( ) , type , originator , customerId ,
metaData . copy ( ) , TbMsgDataType . JSON , data , null , null , null , TbMsgCallback . EMPTY ) ;
}
@ -118,40 +118,40 @@ public final class TbMsg implements Serializable {
}
public static TbMsg transformMsg ( TbMsg tbMsg , String type , EntityId originator , TbMsgMetaData metaData , String data ) {
return new TbMsg ( tbMsg . queueId , tbMsg . id , tbMsg . ts , type , originator , tbMsg . customerId , metaData . copy ( ) , tbMsg . dataType ,
return new TbMsg ( tbMsg . queueName , tbMsg . id , tbMsg . ts , type , originator , tbMsg . customerId , metaData . copy ( ) , tbMsg . dataType ,
data , tbMsg . ruleChainId , tbMsg . ruleNodeId , tbMsg . ctx . copy ( ) , tbMsg . callback ) ;
}
public static TbMsg transformMsg ( TbMsg tbMsg , CustomerId customerId ) {
return new TbMsg ( tbMsg . queueId , tbMsg . id , tbMsg . ts , tbMsg . type , tbMsg . originator , customerId , tbMsg . metaData , tbMsg . dataType ,
return new TbMsg ( tbMsg . queueName , tbMsg . id , tbMsg . ts , tbMsg . type , tbMsg . originator , customerId , tbMsg . metaData , tbMsg . dataType ,
tbMsg . data , tbMsg . ruleChainId , tbMsg . ruleNodeId , tbMsg . ctx . copy ( ) , tbMsg . getCallback ( ) ) ;
}
public static TbMsg transformMsg ( TbMsg tbMsg , RuleChainId ruleChainId ) {
return new TbMsg ( tbMsg . queueId , tbMsg . id , tbMsg . ts , tbMsg . type , tbMsg . originator , tbMsg . customerId , tbMsg . metaData , tbMsg . dataType ,
return new TbMsg ( tbMsg . queueName , tbMsg . id , tbMsg . ts , tbMsg . type , tbMsg . originator , tbMsg . customerId , tbMsg . metaData , tbMsg . dataType ,
tbMsg . data , ruleChainId , null , tbMsg . ctx . copy ( ) , tbMsg . getCallback ( ) ) ;
}
public static TbMsg transformMsg ( TbMsg tbMsg , QueueId queueId ) {
return new TbMsg ( queueId , tbMsg . id , tbMsg . ts , tbMsg . type , tbMsg . originator , tbMsg . customerId , tbMsg . metaData , tbMsg . dataType ,
public static TbMsg transformMsg ( TbMsg tbMsg , String queueName ) {
return new TbMsg ( queueName , tbMsg . id , tbMsg . ts , tbMsg . type , tbMsg . originator , tbMsg . customerId , tbMsg . metaData , tbMsg . dataType ,
tbMsg . data , tbMsg . getRuleChainId ( ) , null , tbMsg . ctx . copy ( ) , tbMsg . getCallback ( ) ) ;
}
public static TbMsg transformMsg ( TbMsg tbMsg , RuleChainId ruleChainId , QueueId queueId ) {
return new TbMsg ( queueId , tbMsg . id , tbMsg . ts , tbMsg . type , tbMsg . originator , tbMsg . customerId , tbMsg . metaData , tbMsg . dataType ,
public static TbMsg transformMsg ( TbMsg tbMsg , RuleChainId ruleChainId , String queueName ) {
return new TbMsg ( queueName , tbMsg . id , tbMsg . ts , tbMsg . type , tbMsg . originator , tbMsg . customerId , tbMsg . metaData , tbMsg . dataType ,
tbMsg . data , ruleChainId , null , tbMsg . ctx . copy ( ) , tbMsg . getCallback ( ) ) ;
}
//used for enqueueForTellNext
public static TbMsg newMsg ( TbMsg tbMsg , QueueId queueId , RuleChainId ruleChainId , RuleNodeId ruleNodeId ) {
return new TbMsg ( queueId , UUID . randomUUID ( ) , tbMsg . getTs ( ) , tbMsg . getType ( ) , tbMsg . getOriginator ( ) , tbMsg . customerId , tbMsg . getMetaData ( ) . copy ( ) ,
public static TbMsg newMsg ( TbMsg tbMsg , String queueName , RuleChainId ruleChainId , RuleNodeId ruleNodeId ) {
return new TbMsg ( queueName , UUID . randomUUID ( ) , tbMsg . getTs ( ) , tbMsg . getType ( ) , tbMsg . getOriginator ( ) , tbMsg . customerId , tbMsg . getMetaData ( ) . copy ( ) ,
tbMsg . getDataType ( ) , tbMsg . getData ( ) , ruleChainId , ruleNodeId , tbMsg . ctx . copy ( ) , TbMsgCallback . EMPTY ) ;
}
private TbMsg ( QueueId queueId , UUID id , long ts , String type , EntityId originator , CustomerId customerId , TbMsgMetaData metaData , TbMsgDataType dataType , String data ,
private TbMsg ( String queueName , UUID id , long ts , String type , EntityId originator , CustomerId customerId , TbMsgMetaData metaData , TbMsgDataType dataType , String data ,
RuleChainId ruleChainId , RuleNodeId ruleNodeId , TbMsgProcessingCtx ctx , TbMsgCallback callback ) {
this . id = id ;
this . queueId = queueId ;
this . queueName = queueName ;
if ( ts > 0 ) {
this . ts = ts ;
} else {
@ -220,7 +220,7 @@ public final class TbMsg implements Serializable {
return builder . build ( ) . toByteArray ( ) ;
}
public static TbMsg fromBytes ( QueueId queueId , byte [ ] data , TbMsgCallback callback ) {
public static TbMsg fromBytes ( String queueName , byte [ ] data , TbMsgCallback callback ) {
try {
MsgProtos . TbMsgProto proto = MsgProtos . TbMsgProto . parseFrom ( data ) ;
TbMsgMetaData metaData = new TbMsgMetaData ( proto . getMetaData ( ) . getDataMap ( ) ) ;
@ -247,7 +247,7 @@ public final class TbMsg implements Serializable {
}
TbMsgDataType dataType = TbMsgDataType . values ( ) [ proto . getDataType ( ) ] ;
return new TbMsg ( queueId , UUID . fromString ( proto . getId ( ) ) , proto . getTs ( ) , proto . getType ( ) , entityId , customerId ,
return new TbMsg ( queueName , UUID . fromString ( proto . getId ( ) ) , proto . getTs ( ) , proto . getType ( ) , entityId , customerId ,
metaData , dataType , proto . getData ( ) , ruleChainId , ruleNodeId , ctx , callback ) ;
} catch ( InvalidProtocolBufferException e ) {
throw new IllegalStateException ( "Could not parse protobuf for TbMsg" , e ) ;
@ -259,12 +259,12 @@ public final class TbMsg implements Serializable {
}
public TbMsg copyWithRuleChainId ( RuleChainId ruleChainId , UUID msgId ) {
return new TbMsg ( this . queueId , msgId , this . ts , this . type , this . originator , this . customerId ,
return new TbMsg ( this . queueName , msgId , this . ts , this . type , this . originator , this . customerId ,
this . metaData , this . dataType , this . data , ruleChainId , null , this . ctx , callback ) ;
}
public TbMsg copyWithRuleNodeId ( RuleChainId ruleChainId , RuleNodeId ruleNodeId , UUID msgId ) {
return new TbMsg ( this . queueId , msgId , this . ts , this . type , this . originator , this . customerId ,
return new TbMsg ( this . queueName , msgId , this . ts , this . type , this . originator , this . customerId ,
this . metaData , this . dataType , this . data , ruleChainId , ruleNodeId , this . ctx , callback ) ;
}