Daria Shevchenko
3 days ago
committed by
GitHub
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with
20 additions and
8 deletions
-
application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java
-
application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java
-
common/proto/src/main/java/org/thingsboard/server/common/util/ProtoUtils.java
-
common/proto/src/main/proto/queue.proto
-
common/proto/src/test/java/org/thingsboard/server/common/util/ProtoUtilsTest.java
|
|
|
@ -466,9 +466,9 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore |
|
|
|
} |
|
|
|
|
|
|
|
private void forwardToCoreRpcService(FromDeviceRPCResponseProto proto, TbCallback callback) { |
|
|
|
RpcError error = proto.getError() > 0 ? RpcError.values()[proto.getError()] : null; |
|
|
|
RpcError error = proto.getError() >= 0 ? RpcError.values()[proto.getError()] : null; |
|
|
|
FromDeviceRpcResponse response = new FromDeviceRpcResponse(new UUID(proto.getRequestIdMSB(), proto.getRequestIdLSB()) |
|
|
|
, proto.getResponse(), error); |
|
|
|
, proto.hasResponse() ? proto.getResponse() : null, error); |
|
|
|
tbCoreDeviceRpcService.processRpcResponseFromRuleEngine(response); |
|
|
|
callback.onSuccess(); |
|
|
|
} |
|
|
|
|
|
|
|
@ -180,9 +180,9 @@ public class DefaultTbRuleEngineConsumerService extends AbstractPartitionBasedCo |
|
|
|
callback.onSuccess(); |
|
|
|
} else if (nfMsg.hasFromDeviceRpcResponse()) { |
|
|
|
TransportProtos.FromDeviceRPCResponseProto proto = nfMsg.getFromDeviceRpcResponse(); |
|
|
|
RpcError error = proto.getError() > 0 ? RpcError.values()[proto.getError()] : null; |
|
|
|
RpcError error = proto.getError() >= 0 ? RpcError.values()[proto.getError()] : null; |
|
|
|
FromDeviceRpcResponse response = new FromDeviceRpcResponse(new UUID(proto.getRequestIdMSB(), proto.getRequestIdLSB()) |
|
|
|
, proto.getResponse(), error); |
|
|
|
, proto.hasResponse() ? proto.getResponse() : null, error); |
|
|
|
tbDeviceRpcService.processRpcResponseFromDevice(response); |
|
|
|
callback.onSuccess(); |
|
|
|
} else if (nfMsg.getQueueUpdateMsgsCount() > 0) { |
|
|
|
|
|
|
|
@ -583,10 +583,11 @@ public class ProtoUtils { |
|
|
|
} |
|
|
|
|
|
|
|
private static ToDeviceActorNotificationMsg fromProto(TransportProtos.FromDeviceRpcResponseActorMsgProto proto) { |
|
|
|
TransportProtos.FromDeviceRPCResponseProto rpcResponse = proto.getRpcResponse(); |
|
|
|
FromDeviceRpcResponse fromDeviceRpcResponse = new FromDeviceRpcResponse( |
|
|
|
new UUID(proto.getRpcResponse().getRequestIdMSB(), proto.getRpcResponse().getRequestIdLSB()), |
|
|
|
proto.getRpcResponse().getResponse(), |
|
|
|
proto.getRpcResponse().getError() >= 0 ? RpcError.values()[proto.getRpcResponse().getError()] : null); |
|
|
|
new UUID(rpcResponse.getRequestIdMSB(), rpcResponse.getRequestIdLSB()), |
|
|
|
rpcResponse.hasResponse() ? rpcResponse.getResponse() : null, |
|
|
|
rpcResponse.getError() >= 0 ? RpcError.values()[rpcResponse.getError()] : null); |
|
|
|
return new FromDeviceRpcResponseActorMsg( |
|
|
|
proto.getRequestId(), |
|
|
|
TenantId.fromUUID(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB())), |
|
|
|
|
|
|
|
@ -1238,7 +1238,7 @@ message LocalSubscriptionServiceMsgProto { |
|
|
|
message FromDeviceRPCResponseProto { |
|
|
|
int64 requestIdMSB = 1; |
|
|
|
int64 requestIdLSB = 2; |
|
|
|
string response = 3; |
|
|
|
optional string response = 3; |
|
|
|
int32 error = 4; |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@ -226,6 +226,17 @@ class ProtoUtilsTest { |
|
|
|
assertThat(ProtoUtils.fromProto(serializedMsg)).as("deserialized").isEqualTo(msg); |
|
|
|
} |
|
|
|
|
|
|
|
@Test |
|
|
|
void protoFromDeviceRpcResponseOnewaySerialization() { |
|
|
|
// Oneway RPC success: response and error are both null. Relies on the proto
|
|
|
|
// 'optional string response' presence bit so the receiver round-trips null
|
|
|
|
// rather than seeing the proto3 default "".
|
|
|
|
FromDeviceRpcResponseActorMsg msg = new FromDeviceRpcResponseActorMsg(23, tenantId, deviceId, new FromDeviceRpcResponse(id, null, null)); |
|
|
|
TransportProtos.ToDeviceActorNotificationMsgProto serializedMsg = ProtoUtils.toProto(msg); |
|
|
|
Assertions.assertNotNull(serializedMsg); |
|
|
|
assertThat(ProtoUtils.fromProto(serializedMsg)).as("deserialized").isEqualTo(msg); |
|
|
|
} |
|
|
|
|
|
|
|
@Test |
|
|
|
void protoRemoveRpcActorSerialization() { |
|
|
|
RemoveRpcActorMsg msg = new RemoveRpcActorMsg(tenantId, deviceId, id); |
|
|
|
|