diff --git a/application/src/main/java/org/thingsboard/server/utils/EventDeduplicationExecutor.java b/application/src/main/java/org/thingsboard/server/utils/EventDeduplicationExecutor.java deleted file mode 100644 index 7ce958ef2d..0000000000 --- a/application/src/main/java/org/thingsboard/server/utils/EventDeduplicationExecutor.java +++ /dev/null @@ -1,85 +0,0 @@ -/** - * Copyright © 2016-2023 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.utils; - -import lombok.extern.slf4j.Slf4j; - -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.function.Consumer; - -/** - * This class deduplicate executions of the specified function. - * Useful in cluster mode, when you get event about partition change multiple times. - * Assuming that the function execution is expensive, we should execute it immediately when first time event occurs and - * later, once the processing of first event is done, process last pending task. - * - * @param

parameters of the function - */ -@Slf4j -public class EventDeduplicationExecutor

{ - private final String name; - private final ExecutorService executor; - private final Consumer

function; - private P pendingTask; - private boolean busy; - - public EventDeduplicationExecutor(String name, ExecutorService executor, Consumer

function) { - this.name = name; - this.executor = executor; - this.function = function; - } - - public void submit(P params) { - log.info("[{}] Going to submit: {}", name, params); - synchronized (EventDeduplicationExecutor.this) { - if (!busy) { - busy = true; - pendingTask = null; - try { - log.info("[{}] Submitting task: {}", name, params); - executor.submit(() -> { - try { - log.info("[{}] Executing task: {}", name, params); - function.accept(params); - } catch (Throwable e) { - log.warn("[{}] Failed to process task with parameters: {}", name, params, e); - throw e; - } finally { - unlockAndProcessIfAny(); - } - }); - } catch (Throwable e) { - log.warn("[{}] Failed to submit task with parameters: {}", name, params, e); - unlockAndProcessIfAny(); - throw e; - } - } else { - log.info("[{}] Task is already in progress. {} pending task: {}", name, pendingTask == null ? "adding" : "updating", params); - pendingTask = params; - } - } - } - - private void unlockAndProcessIfAny() { - synchronized (EventDeduplicationExecutor.this) { - busy = false; - if (pendingTask != null) { - submit(pendingTask); - } - } - } -} diff --git a/application/src/test/java/org/thingsboard/server/util/EventDeduplicationExecutorTest.java b/application/src/test/java/org/thingsboard/server/util/EventDeduplicationExecutorTest.java deleted file mode 100644 index ef75657ed4..0000000000 --- a/application/src/test/java/org/thingsboard/server/util/EventDeduplicationExecutorTest.java +++ /dev/null @@ -1,173 +0,0 @@ -/** - * Copyright © 2016-2023 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.util; - -import com.google.common.util.concurrent.MoreExecutors; -import lombok.extern.slf4j.Slf4j; -import org.junit.After; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Mockito; -import org.mockito.junit.MockitoJUnitRunner; -import org.thingsboard.common.util.ThingsBoardThreadFactory; -import org.thingsboard.server.utils.EventDeduplicationExecutor; - -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.function.Consumer; - -@Slf4j -@RunWith(MockitoJUnitRunner.class) -public class EventDeduplicationExecutorTest { - - ThingsBoardThreadFactory threadFactory = ThingsBoardThreadFactory.forName(getClass().getSimpleName()); - ExecutorService executor; - - @After - public void tearDown() throws Exception { - if (executor != null) { - executor.shutdownNow(); - } - } - - @Test - public void testSimpleFlowSameThread() throws InterruptedException { - simpleFlow(MoreExecutors.newDirectExecutorService()); - } - - @Test - public void testPeriodicFlowSameThread() throws InterruptedException { - periodicFlow(MoreExecutors.newDirectExecutorService()); - } - - @Test - public void testExceptionFlowSameThread() throws InterruptedException { - exceptionFlow(MoreExecutors.newDirectExecutorService()); - } - - @Test - public void testSimpleFlowSingleThread() throws InterruptedException { - executor = Executors.newSingleThreadExecutor(threadFactory); - simpleFlow(executor); - } - - @Test - public void testPeriodicFlowSingleThread() throws InterruptedException { - executor = Executors.newSingleThreadExecutor(threadFactory); - periodicFlow(executor); - } - - @Test - public void testExceptionFlowSingleThread() throws InterruptedException { - executor = Executors.newSingleThreadExecutor(threadFactory); - exceptionFlow(executor); - } - - @Test - public void testSimpleFlowMultiThread() throws InterruptedException { - executor = Executors.newFixedThreadPool(3, threadFactory); - simpleFlow(executor); - } - - @Test - public void testPeriodicFlowMultiThread() throws InterruptedException { - executor = Executors.newFixedThreadPool(3, threadFactory); - periodicFlow(executor); - } - - @Test - public void testExceptionFlowMultiThread() throws InterruptedException { - executor = Executors.newFixedThreadPool(3, threadFactory); - exceptionFlow(executor); - } - - private void simpleFlow(ExecutorService executorService) throws InterruptedException { - try { - Consumer function = Mockito.spy(StringConsumer.class); - EventDeduplicationExecutor executor = new EventDeduplicationExecutor<>(EventDeduplicationExecutorTest.class.getSimpleName(), executorService, function); - - String params1 = "params1"; - String params2 = "params2"; - String params3 = "params3"; - - executor.submit(params1); - executor.submit(params2); - executor.submit(params3); - Thread.sleep(500); - Mockito.verify(function).accept(params1); - Mockito.verify(function).accept(params3); - } finally { - executorService.shutdownNow(); - } - } - - private void periodicFlow(ExecutorService executorService) throws InterruptedException { - try { - Consumer function = Mockito.spy(StringConsumer.class); - EventDeduplicationExecutor executor = new EventDeduplicationExecutor<>(EventDeduplicationExecutorTest.class.getSimpleName(), executorService, function); - - String params1 = "params1"; - String params2 = "params2"; - String params3 = "params3"; - - executor.submit(params1); - Thread.sleep(500); - executor.submit(params2); - Thread.sleep(500); - executor.submit(params3); - Thread.sleep(500); - Mockito.verify(function).accept(params1); - Mockito.verify(function).accept(params2); - Mockito.verify(function).accept(params3); - } finally { - executorService.shutdownNow(); - } - } - - private void exceptionFlow(ExecutorService executorService) throws InterruptedException { - try { - Consumer function = Mockito.spy(StringConsumer.class); - EventDeduplicationExecutor executor = new EventDeduplicationExecutor<>(EventDeduplicationExecutorTest.class.getSimpleName(), executorService, function); - - String params1 = "params1"; - String params2 = "params2"; - String params3 = "params3"; - - Mockito.doThrow(new RuntimeException()).when(function).accept("params1"); - executor.submit(params1); - executor.submit(params2); - Thread.sleep(500); - executor.submit(params3); - Thread.sleep(500); - Mockito.verify(function).accept(params2); - Mockito.verify(function).accept(params3); - } finally { - executorService.shutdownNow(); - } - } - - public static class StringConsumer implements Consumer { - @Override - public void accept(String s) { - try { - Thread.sleep(100); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - } - -}