From 87b86022fe963eb1869cced70434c77e6266849d Mon Sep 17 00:00:00 2001 From: yanlinly Date: Tue, 8 Sep 2020 09:21:45 +0800 Subject: [PATCH] Revert "[#3368]Cancel empty Long polling thread to improve performance. (#ISSUE3432)" This reverts commit 95c8bf242b1f8895b77c519fb7dfdbc144e986c0. --- .../client/config/impl/ClientWorker.java | 47 +------ .../listener/impl/ClientWorkerTest.java | 132 ------------------ 2 files changed, 2 insertions(+), 177 deletions(-) delete mode 100644 client/src/test/java/com/alibaba/nacos/client/config/listener/impl/ClientWorkerTest.java diff --git a/client/src/main/java/com/alibaba/nacos/client/config/impl/ClientWorker.java b/client/src/main/java/com/alibaba/nacos/client/config/impl/ClientWorker.java index fab00f032c5..93578dda94c 100644 --- a/client/src/main/java/com/alibaba/nacos/client/config/impl/ClientWorker.java +++ b/client/src/main/java/com/alibaba/nacos/client/config/impl/ClientWorker.java @@ -32,7 +32,6 @@ import com.alibaba.nacos.client.utils.TenantUtil; import com.alibaba.nacos.common.http.HttpRestResult; import com.alibaba.nacos.common.lifecycle.Closeable; -import com.alibaba.nacos.common.utils.ConcurrentHashSet; import com.alibaba.nacos.common.utils.ConvertUtils; import com.alibaba.nacos.common.utils.MD5Utils; import com.alibaba.nacos.common.utils.StringUtils; @@ -166,7 +165,6 @@ private void removeCache(String dataId, String group) { copy.remove(groupKey); cacheMap.set(copy); } - reMakeCacheDataTaskId(); LOGGER.info("[{}] [unsubscribe] {}", this.agent.getName(), groupKey); MetricsMonitor.getListenConfigCountMonitor().set(cacheMap.get().size()); @@ -179,38 +177,11 @@ void removeCache(String dataId, String group, String tenant) { copy.remove(groupKey); cacheMap.set(copy); } - reMakeCacheDataTaskId(); LOGGER.info("[{}] [unsubscribe] {}", agent.getName(), groupKey); MetricsMonitor.getListenConfigCountMonitor().set(cacheMap.get().size()); } - /** - * Remake cacheData taskId. - */ - private void reMakeCacheDataTaskId() { - int listenerSize = cacheMap.get().size(); - int remakeTaskId = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize()); - if (remakeTaskId < (int) currentLongingTaskCount) { - for (int i = 0; i < remakeTaskId; i++) { - int count = 0; - for (String key : cacheMap.get().keySet()) { - if (count == ParamUtil.getPerTaskConfigSize()) { - break; - } - CacheData cacheData = cacheMap.get().get(key); - cacheData.setTaskId(i); - synchronized (cacheMap) { - Map copy = new HashMap(this.cacheMap.get()); - copy.put(key, cacheData); - cacheMap.set(copy); - } - count++; - } - } - } - } - /** * Add cache data if absent. * @@ -277,8 +248,6 @@ public CacheData addCacheDataIfAbsent(String dataId, String group, String tenant cache.setInitializing(true); } else { cache = new CacheData(configFilterChainManager, agent.getName(), dataId, group, tenant); - int taskId = cacheMap.get().size() / (int) ParamUtil.getPerTaskConfigSize(); - cache.setTaskId(taskId); // fix issue # 1317 if (enableRemoteSyncConfig) { String[] ct = getServerConfig(dataId, group, tenant, 3000L); @@ -425,16 +394,11 @@ public void checkConfigInfo() { int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize()); if (longingTaskCount > currentLongingTaskCount) { for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) { - taskIdSet.add(i); // The task list is no order.So it maybe has issues when changing. executorService.execute(new LongPollingRunnable(i)); } - } else if (longingTaskCount < currentLongingTaskCount) { - for (int i = longingTaskCount; i < (int) currentLongingTaskCount; i++) { - taskIdSet.remove(i); - } + currentLongingTaskCount = longingTaskCount; } - currentLongingTaskCount = longingTaskCount; } /** @@ -692,9 +656,7 @@ public void run() { } inInitializingCacheList.clear(); - if (taskIdSet.contains(taskId)) { - executorService.execute(this); - } + executorService.execute(this); } catch (Throwable e) { @@ -723,11 +685,6 @@ private void setHealthServer(boolean isHealthServer) { private final AtomicReference> cacheMap = new AtomicReference>( new HashMap()); - /** - * Store the running taskId. - */ - private final ConcurrentHashSet taskIdSet = new ConcurrentHashSet(); - private final HttpAgent agent; private final ConfigFilterChainManager configFilterChainManager; diff --git a/client/src/test/java/com/alibaba/nacos/client/config/listener/impl/ClientWorkerTest.java b/client/src/test/java/com/alibaba/nacos/client/config/listener/impl/ClientWorkerTest.java deleted file mode 100644 index 7922124189e..00000000000 --- a/client/src/test/java/com/alibaba/nacos/client/config/listener/impl/ClientWorkerTest.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Copyright 1999-2018 Alibaba Group Holding Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.nacos.client.config.listener.impl; - -import com.alibaba.nacos.api.config.listener.Listener; -import com.alibaba.nacos.api.exception.NacosException; -import com.alibaba.nacos.client.config.filter.impl.ConfigFilterChainManager; -import com.alibaba.nacos.client.config.http.MetricsHttpAgent; -import com.alibaba.nacos.client.config.impl.ClientWorker; -import com.alibaba.nacos.client.utils.ParamUtil; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; - -import java.lang.reflect.Field; -import java.lang.reflect.Modifier; -import java.util.Arrays; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.Executor; -import java.util.concurrent.ScheduledExecutorService; - -import static org.mockito.Mockito.mock; - -public class ClientWorkerTest { - - @Mock - ScheduledExecutorService scheduledExecutorService; - - private ClientWorker clientWorker; - - private List listeners; - - private final String dataId = "data"; - - private final String group = "group"; - - private final String currentLongingTaskCount = "currentLongingTaskCount"; - - @Before - public void init() { - MockitoAnnotations.initMocks(this); - clientWorker = new ClientWorker(mock(MetricsHttpAgent.class), mock(ConfigFilterChainManager.class), - mock(Properties.class)); - try { - Field executorServiceField = clientWorker.getClass().getDeclaredField("executorService"); - executorServiceField.setAccessible(true); - Field modifiersField = Field.class.getDeclaredField("modifiers"); - modifiersField.setAccessible(true); - modifiersField.setInt(executorServiceField, executorServiceField.getModifiers() & ~Modifier.FINAL); - executorServiceField.set(clientWorker, scheduledExecutorService); - Listener listener = new Listener() { - @Override - public Executor getExecutor() { - return null; - } - - @Override - public void receiveConfigInfo(String configInfo) { - - } - }; - listeners = Arrays.asList(listener); - } catch (NoSuchFieldException e) { - e.printStackTrace(); - } catch (IllegalAccessException e) { - e.printStackTrace(); - } - } - - @Test - public void testAddLongPollNumberThreads() { - try { - for (int i = 0; i < ParamUtil.getPerTaskConfigSize(); i++) { - clientWorker.addTenantListeners(dataId + i, group, listeners); - } - Field currentLongingTaskCountField = clientWorker.getClass().getDeclaredField(currentLongingTaskCount); - currentLongingTaskCountField.setAccessible(true); - Assert.assertEquals(currentLongingTaskCount, (int) currentLongingTaskCountField.getDouble(clientWorker), 1); - for (int i = (int) ParamUtil.getPerTaskConfigSize(); i < ParamUtil.getPerTaskConfigSize() * 2; i++) { - clientWorker.addTenantListeners(dataId + i, group, listeners); - } - Assert.assertEquals(currentLongingTaskCount, (int) currentLongingTaskCountField.getDouble(clientWorker), 2); - } catch (NacosException e) { - e.printStackTrace(); - } catch (IllegalAccessException e) { - e.printStackTrace(); - } catch (NoSuchFieldException e) { - e.printStackTrace(); - } - } - - @Test - public void testReduceLongPollNumberThreads() { - try { - for (int i = 0; i < ParamUtil.getPerTaskConfigSize() * 3; i++) { - clientWorker.addTenantListeners(dataId + i, group, listeners); - } - Field currentLongingTaskCountField = clientWorker.getClass().getDeclaredField(currentLongingTaskCount); - currentLongingTaskCountField.setAccessible(true); - Assert.assertEquals(currentLongingTaskCount, (int) currentLongingTaskCountField.getDouble(clientWorker), 3); - - for (int i = (int) ParamUtil.getPerTaskConfigSize(); i < ParamUtil.getPerTaskConfigSize() * 2; i++) { - clientWorker.removeTenantListener(dataId + i, group, listeners.get(0)); - } - Assert.assertEquals(currentLongingTaskCount, (int) currentLongingTaskCountField.getDouble(clientWorker), 2); - } catch (NacosException e) { - e.printStackTrace(); - } catch (IllegalAccessException e) { - e.printStackTrace(); - } catch (NoSuchFieldException e) { - e.printStackTrace(); - } - } - -}