Browse Source

Merge pull request #8127 from volodymyr-babak/remove-event-deduplicator

[3.5] Remove EventDeduplicationExecutor
pull/8138/head
Andrew Shvayka 3 years ago
committed by GitHub
parent
commit
589abd708c
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 85
      application/src/main/java/org/thingsboard/server/utils/EventDeduplicationExecutor.java
  2. 173
      application/src/test/java/org/thingsboard/server/util/EventDeduplicationExecutorTest.java

85
application/src/main/java/org/thingsboard/server/utils/EventDeduplicationExecutor.java

@ -1,85 +0,0 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.utils;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
/**
* This class deduplicate executions of the specified function.
* Useful in cluster mode, when you get event about partition change multiple times.
* Assuming that the function execution is expensive, we should execute it immediately when first time event occurs and
* later, once the processing of first event is done, process last pending task.
*
* @param <P> parameters of the function
*/
@Slf4j
public class EventDeduplicationExecutor<P> {
private final String name;
private final ExecutorService executor;
private final Consumer<P> function;
private P pendingTask;
private boolean busy;
public EventDeduplicationExecutor(String name, ExecutorService executor, Consumer<P> function) {
this.name = name;
this.executor = executor;
this.function = function;
}
public void submit(P params) {
log.info("[{}] Going to submit: {}", name, params);
synchronized (EventDeduplicationExecutor.this) {
if (!busy) {
busy = true;
pendingTask = null;
try {
log.info("[{}] Submitting task: {}", name, params);
executor.submit(() -> {
try {
log.info("[{}] Executing task: {}", name, params);
function.accept(params);
} catch (Throwable e) {
log.warn("[{}] Failed to process task with parameters: {}", name, params, e);
throw e;
} finally {
unlockAndProcessIfAny();
}
});
} catch (Throwable e) {
log.warn("[{}] Failed to submit task with parameters: {}", name, params, e);
unlockAndProcessIfAny();
throw e;
}
} else {
log.info("[{}] Task is already in progress. {} pending task: {}", name, pendingTask == null ? "adding" : "updating", params);
pendingTask = params;
}
}
}
private void unlockAndProcessIfAny() {
synchronized (EventDeduplicationExecutor.this) {
busy = false;
if (pendingTask != null) {
submit(pendingTask);
}
}
}
}

173
application/src/test/java/org/thingsboard/server/util/EventDeduplicationExecutorTest.java

@ -1,173 +0,0 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.util;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.junit.After;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.utils.EventDeduplicationExecutor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
@Slf4j
@RunWith(MockitoJUnitRunner.class)
public class EventDeduplicationExecutorTest {
ThingsBoardThreadFactory threadFactory = ThingsBoardThreadFactory.forName(getClass().getSimpleName());
ExecutorService executor;
@After
public void tearDown() throws Exception {
if (executor != null) {
executor.shutdownNow();
}
}
@Test
public void testSimpleFlowSameThread() throws InterruptedException {
simpleFlow(MoreExecutors.newDirectExecutorService());
}
@Test
public void testPeriodicFlowSameThread() throws InterruptedException {
periodicFlow(MoreExecutors.newDirectExecutorService());
}
@Test
public void testExceptionFlowSameThread() throws InterruptedException {
exceptionFlow(MoreExecutors.newDirectExecutorService());
}
@Test
public void testSimpleFlowSingleThread() throws InterruptedException {
executor = Executors.newSingleThreadExecutor(threadFactory);
simpleFlow(executor);
}
@Test
public void testPeriodicFlowSingleThread() throws InterruptedException {
executor = Executors.newSingleThreadExecutor(threadFactory);
periodicFlow(executor);
}
@Test
public void testExceptionFlowSingleThread() throws InterruptedException {
executor = Executors.newSingleThreadExecutor(threadFactory);
exceptionFlow(executor);
}
@Test
public void testSimpleFlowMultiThread() throws InterruptedException {
executor = Executors.newFixedThreadPool(3, threadFactory);
simpleFlow(executor);
}
@Test
public void testPeriodicFlowMultiThread() throws InterruptedException {
executor = Executors.newFixedThreadPool(3, threadFactory);
periodicFlow(executor);
}
@Test
public void testExceptionFlowMultiThread() throws InterruptedException {
executor = Executors.newFixedThreadPool(3, threadFactory);
exceptionFlow(executor);
}
private void simpleFlow(ExecutorService executorService) throws InterruptedException {
try {
Consumer<String> function = Mockito.spy(StringConsumer.class);
EventDeduplicationExecutor<String> executor = new EventDeduplicationExecutor<>(EventDeduplicationExecutorTest.class.getSimpleName(), executorService, function);
String params1 = "params1";
String params2 = "params2";
String params3 = "params3";
executor.submit(params1);
executor.submit(params2);
executor.submit(params3);
Thread.sleep(500);
Mockito.verify(function).accept(params1);
Mockito.verify(function).accept(params3);
} finally {
executorService.shutdownNow();
}
}
private void periodicFlow(ExecutorService executorService) throws InterruptedException {
try {
Consumer<String> function = Mockito.spy(StringConsumer.class);
EventDeduplicationExecutor<String> executor = new EventDeduplicationExecutor<>(EventDeduplicationExecutorTest.class.getSimpleName(), executorService, function);
String params1 = "params1";
String params2 = "params2";
String params3 = "params3";
executor.submit(params1);
Thread.sleep(500);
executor.submit(params2);
Thread.sleep(500);
executor.submit(params3);
Thread.sleep(500);
Mockito.verify(function).accept(params1);
Mockito.verify(function).accept(params2);
Mockito.verify(function).accept(params3);
} finally {
executorService.shutdownNow();
}
}
private void exceptionFlow(ExecutorService executorService) throws InterruptedException {
try {
Consumer<String> function = Mockito.spy(StringConsumer.class);
EventDeduplicationExecutor<String> executor = new EventDeduplicationExecutor<>(EventDeduplicationExecutorTest.class.getSimpleName(), executorService, function);
String params1 = "params1";
String params2 = "params2";
String params3 = "params3";
Mockito.doThrow(new RuntimeException()).when(function).accept("params1");
executor.submit(params1);
executor.submit(params2);
Thread.sleep(500);
executor.submit(params3);
Thread.sleep(500);
Mockito.verify(function).accept(params2);
Mockito.verify(function).accept(params3);
} finally {
executorService.shutdownNow();
}
}
public static class StringConsumer implements Consumer<String> {
@Override
public void accept(String s) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
Loading…
Cancel
Save