|
|
|
@ -46,8 +46,11 @@ import java.util.concurrent.atomic.AtomicInteger; |
|
|
|
@Service |
|
|
|
public class RemoteJsInvokeService extends AbstractJsInvokeService { |
|
|
|
|
|
|
|
@Value("${queue.js.max_requests_timeout}") |
|
|
|
private long maxRequestsTimeout; |
|
|
|
@Value("${queue.js.max_eval_requests_timeout}") |
|
|
|
private long maxEvalRequestsTimeout; |
|
|
|
|
|
|
|
@Value("${queue.js.max_invoke_requests_timeout}") |
|
|
|
private long maxInvokeRequestsTimeout; |
|
|
|
|
|
|
|
@Getter |
|
|
|
@Value("${js.remote.max_errors}") |
|
|
|
@ -87,7 +90,7 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { |
|
|
|
|
|
|
|
@PostConstruct |
|
|
|
public void init() { |
|
|
|
super.init(maxRequestsTimeout); |
|
|
|
super.init(maxInvokeRequestsTimeout); |
|
|
|
requestTemplate.init(); |
|
|
|
} |
|
|
|
|
|
|
|
@ -113,8 +116,8 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { |
|
|
|
|
|
|
|
log.trace("Post compile request for scriptId [{}]", scriptId); |
|
|
|
ListenableFuture<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> future = requestTemplate.send(new TbProtoJsQueueMsg<>(UUID.randomUUID(), jsRequestWrapper)); |
|
|
|
if (maxRequestsTimeout > 0) { |
|
|
|
future = Futures.withTimeout(future, maxRequestsTimeout, TimeUnit.MILLISECONDS, timeoutExecutorService); |
|
|
|
if (maxEvalRequestsTimeout > 0) { |
|
|
|
future = Futures.withTimeout(future, maxEvalRequestsTimeout, TimeUnit.MILLISECONDS, timeoutExecutorService); |
|
|
|
} |
|
|
|
queuePushedMsgs.incrementAndGet(); |
|
|
|
Futures.addCallback(future, new FutureCallback<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>>() { |
|
|
|
@ -155,7 +158,7 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { |
|
|
|
.setScriptIdMSB(scriptId.getMostSignificantBits()) |
|
|
|
.setScriptIdLSB(scriptId.getLeastSignificantBits()) |
|
|
|
.setFunctionName(functionName) |
|
|
|
.setTimeout((int) maxRequestsTimeout) |
|
|
|
.setTimeout((int) maxInvokeRequestsTimeout) |
|
|
|
.setScriptBody(scriptIdToBodysMap.get(scriptId)); |
|
|
|
|
|
|
|
for (Object arg : args) { |
|
|
|
@ -167,8 +170,8 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { |
|
|
|
.build(); |
|
|
|
|
|
|
|
ListenableFuture<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> future = requestTemplate.send(new TbProtoJsQueueMsg<>(UUID.randomUUID(), jsRequestWrapper)); |
|
|
|
if (maxRequestsTimeout > 0) { |
|
|
|
future = Futures.withTimeout(future, maxRequestsTimeout, TimeUnit.MILLISECONDS, timeoutExecutorService); |
|
|
|
if (maxInvokeRequestsTimeout > 0) { |
|
|
|
future = Futures.withTimeout(future, maxInvokeRequestsTimeout, TimeUnit.MILLISECONDS, timeoutExecutorService); |
|
|
|
} |
|
|
|
queuePushedMsgs.incrementAndGet(); |
|
|
|
Futures.addCallback(future, new FutureCallback<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>>() { |
|
|
|
@ -210,8 +213,8 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { |
|
|
|
.build(); |
|
|
|
|
|
|
|
ListenableFuture<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> future = requestTemplate.send(new TbProtoJsQueueMsg<>(UUID.randomUUID(), jsRequestWrapper)); |
|
|
|
if (maxRequestsTimeout > 0) { |
|
|
|
future = Futures.withTimeout(future, maxRequestsTimeout, TimeUnit.MILLISECONDS, timeoutExecutorService); |
|
|
|
if (maxInvokeRequestsTimeout > 0) { |
|
|
|
future = Futures.withTimeout(future, maxInvokeRequestsTimeout, TimeUnit.MILLISECONDS, timeoutExecutorService); |
|
|
|
} |
|
|
|
JsInvokeProtos.RemoteJsResponse response = future.get().getValue(); |
|
|
|
|
|
|
|
|