Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#3368]Cancel empty Long polling thread to improve performance. #3432

Merged
merged 7 commits into from
Aug 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.alibaba.nacos.client.utils.ParamUtil;
import com.alibaba.nacos.client.utils.TenantUtil;
import com.alibaba.nacos.common.lifecycle.Closeable;
import com.alibaba.nacos.common.utils.ConcurrentHashSet;
import com.alibaba.nacos.common.utils.ConvertUtils;
import com.alibaba.nacos.common.utils.MD5Utils;
import com.alibaba.nacos.common.utils.StringUtils;
Expand Down Expand Up @@ -166,6 +167,7 @@ private void removeCache(String dataId, String group) {
copy.remove(groupKey);
cacheMap.set(copy);
}
reMakeCacheDataTaskId();
LOGGER.info("[{}] [unsubscribe] {}", this.agent.getName(), groupKey);

MetricsMonitor.getListenConfigCountMonitor().set(cacheMap.get().size());
Expand All @@ -178,11 +180,38 @@ void removeCache(String dataId, String group, String tenant) {
copy.remove(groupKey);
cacheMap.set(copy);
}
reMakeCacheDataTaskId();
LOGGER.info("[{}] [unsubscribe] {}", agent.getName(), groupKey);

MetricsMonitor.getListenConfigCountMonitor().set(cacheMap.get().size());
}

/**
* Remake cacheData taskId.
*/
private void reMakeCacheDataTaskId() {
int listenerSize = cacheMap.get().size();
int remakeTaskId = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
if (remakeTaskId < (int) currentLongingTaskCount) {
for (int i = 0; i < remakeTaskId; i++) {
int count = 0;
for (String key : cacheMap.get().keySet()) {
if (count == ParamUtil.getPerTaskConfigSize()) {
break;
}
CacheData cacheData = cacheMap.get().get(key);
cacheData.setTaskId(i);
synchronized (cacheMap) {
Map<String, CacheData> copy = new HashMap<String, CacheData>(this.cacheMap.get());
copy.put(key, cacheData);
cacheMap.set(copy);
}
count++;
}
}
}
}

/**
* Add cache data if absent.
*
Expand Down Expand Up @@ -249,6 +278,8 @@ public CacheData addCacheDataIfAbsent(String dataId, String group, String tenant
cache.setInitializing(true);
} else {
cache = new CacheData(configFilterChainManager, agent.getName(), dataId, group, tenant);
int taskId = cacheMap.get().size() / (int) ParamUtil.getPerTaskConfigSize();
cache.setTaskId(taskId);
// fix issue # 1317
if (enableRemoteSyncConfig) {
String[] ct = getServerConfig(dataId, group, tenant, 3000L);
Expand Down Expand Up @@ -392,11 +423,16 @@ public void checkConfigInfo() {
int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
if (longingTaskCount > currentLongingTaskCount) {
for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
taskIdSet.add(i);
// The task list is no order.So it maybe has issues when changing.
executorService.execute(new LongPollingRunnable(i));
}
currentLongingTaskCount = longingTaskCount;
} else if (longingTaskCount < currentLongingTaskCount) {
for (int i = longingTaskCount; i < (int) currentLongingTaskCount; i++) {
taskIdSet.remove(i);
}
}
currentLongingTaskCount = longingTaskCount;
}

/**
Expand Down Expand Up @@ -658,7 +694,9 @@ public void run() {
}
inInitializingCacheList.clear();

executorService.execute(this);
if (taskIdSet.contains(taskId)) {
executorService.execute(this);
}

} catch (Throwable e) {

Expand Down Expand Up @@ -687,6 +725,11 @@ private void setHealthServer(boolean isHealthServer) {
private final AtomicReference<Map<String, CacheData>> cacheMap = new AtomicReference<Map<String, CacheData>>(
new HashMap<String, CacheData>());

/**
* Store the running taskId.
*/
private final ConcurrentHashSet<Integer> taskIdSet = new ConcurrentHashSet<Integer>();

private final HttpAgent agent;

private final ConfigFilterChainManager configFilterChainManager;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.nacos.client.config.listener.impl;

import com.alibaba.nacos.api.config.listener.Listener;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.client.config.filter.impl.ConfigFilterChainManager;
import com.alibaba.nacos.client.config.http.MetricsHttpAgent;
import com.alibaba.nacos.client.config.impl.ClientWorker;
import com.alibaba.nacos.client.utils.ParamUtil;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;

import static org.mockito.Mockito.mock;

public class ClientWorkerTest {

@Mock
ScheduledExecutorService scheduledExecutorService;

private ClientWorker clientWorker;

private List<Listener> listeners;

private final String dataId = "data";

private final String group = "group";

private final String currentLongingTaskCount = "currentLongingTaskCount";

@Before
public void init() {
MockitoAnnotations.initMocks(this);
clientWorker = new ClientWorker(mock(MetricsHttpAgent.class), mock(ConfigFilterChainManager.class),
mock(Properties.class));
try {
Field executorServiceField = clientWorker.getClass().getDeclaredField("executorService");
executorServiceField.setAccessible(true);
Field modifiersField = Field.class.getDeclaredField("modifiers");
modifiersField.setAccessible(true);
modifiersField.setInt(executorServiceField, executorServiceField.getModifiers() & ~Modifier.FINAL);
executorServiceField.set(clientWorker, scheduledExecutorService);
Listener listener = new Listener() {
@Override
public Executor getExecutor() {
return null;
}

@Override
public void receiveConfigInfo(String configInfo) {

}
};
listeners = Arrays.asList(listener);
} catch (NoSuchFieldException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
}
}

@Test
public void testAddLongPollNumberThreads() {
try {
for (int i = 0; i < ParamUtil.getPerTaskConfigSize(); i++) {
clientWorker.addTenantListeners(dataId + i, group, listeners);
}
Field currentLongingTaskCountField = clientWorker.getClass().getDeclaredField(currentLongingTaskCount);
currentLongingTaskCountField.setAccessible(true);
Assert.assertEquals(currentLongingTaskCount, (int) currentLongingTaskCountField.getDouble(clientWorker), 1);
for (int i = (int) ParamUtil.getPerTaskConfigSize(); i < ParamUtil.getPerTaskConfigSize() * 2; i++) {
clientWorker.addTenantListeners(dataId + i, group, listeners);
}
Assert.assertEquals(currentLongingTaskCount, (int) currentLongingTaskCountField.getDouble(clientWorker), 2);
} catch (NacosException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (NoSuchFieldException e) {
e.printStackTrace();
}
}

@Test
public void testReduceLongPollNumberThreads() {
try {
for (int i = 0; i < ParamUtil.getPerTaskConfigSize() * 3; i++) {
clientWorker.addTenantListeners(dataId + i, group, listeners);
}
Field currentLongingTaskCountField = clientWorker.getClass().getDeclaredField(currentLongingTaskCount);
currentLongingTaskCountField.setAccessible(true);
Assert.assertEquals(currentLongingTaskCount, (int) currentLongingTaskCountField.getDouble(clientWorker), 3);

for (int i = (int) ParamUtil.getPerTaskConfigSize(); i < ParamUtil.getPerTaskConfigSize() * 2; i++) {
clientWorker.removeTenantListener(dataId + i, group, listeners.get(0));
}
Assert.assertEquals(currentLongingTaskCount, (int) currentLongingTaskCountField.getDouble(clientWorker), 2);
binbin0325 marked this conversation as resolved.
Show resolved Hide resolved
} catch (NacosException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (NoSuchFieldException e) {
e.printStackTrace();
}
}

}