Skip to content

Commit

Permalink
[alibaba#3368]Cancel empty Long polling thread to improve performance…
Browse files Browse the repository at this point in the history
…. (#ISSUE3432)

* update clientWorker

* Upgrade 1.3.1 (alibaba#3294)

update clientWorker

* add ClientWorkerTest

* add remove taskFlagMap

* update clientworker

* update taskIdSet ConcurrentHashSet
  • Loading branch information
binbin0325 authored Aug 5, 2020
1 parent 0ae6669 commit 95c8bf2
Show file tree
Hide file tree
Showing 2 changed files with 177 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.alibaba.nacos.client.utils.TenantUtil;
import com.alibaba.nacos.common.http.HttpRestResult;
import com.alibaba.nacos.common.lifecycle.Closeable;
import com.alibaba.nacos.common.utils.ConcurrentHashSet;
import com.alibaba.nacos.common.utils.ConvertUtils;
import com.alibaba.nacos.common.utils.MD5Utils;
import com.alibaba.nacos.common.utils.StringUtils;
Expand Down Expand Up @@ -165,6 +166,7 @@ private void removeCache(String dataId, String group) {
copy.remove(groupKey);
cacheMap.set(copy);
}
reMakeCacheDataTaskId();
LOGGER.info("[{}] [unsubscribe] {}", this.agent.getName(), groupKey);

MetricsMonitor.getListenConfigCountMonitor().set(cacheMap.get().size());
Expand All @@ -177,11 +179,38 @@ void removeCache(String dataId, String group, String tenant) {
copy.remove(groupKey);
cacheMap.set(copy);
}
reMakeCacheDataTaskId();
LOGGER.info("[{}] [unsubscribe] {}", agent.getName(), groupKey);

MetricsMonitor.getListenConfigCountMonitor().set(cacheMap.get().size());
}

/**
* Remake cacheData taskId.
*/
private void reMakeCacheDataTaskId() {
int listenerSize = cacheMap.get().size();
int remakeTaskId = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
if (remakeTaskId < (int) currentLongingTaskCount) {
for (int i = 0; i < remakeTaskId; i++) {
int count = 0;
for (String key : cacheMap.get().keySet()) {
if (count == ParamUtil.getPerTaskConfigSize()) {
break;
}
CacheData cacheData = cacheMap.get().get(key);
cacheData.setTaskId(i);
synchronized (cacheMap) {
Map<String, CacheData> copy = new HashMap<String, CacheData>(this.cacheMap.get());
copy.put(key, cacheData);
cacheMap.set(copy);
}
count++;
}
}
}
}

/**
* Add cache data if absent.
*
Expand Down Expand Up @@ -248,6 +277,8 @@ public CacheData addCacheDataIfAbsent(String dataId, String group, String tenant
cache.setInitializing(true);
} else {
cache = new CacheData(configFilterChainManager, agent.getName(), dataId, group, tenant);
int taskId = cacheMap.get().size() / (int) ParamUtil.getPerTaskConfigSize();
cache.setTaskId(taskId);
// fix issue # 1317
if (enableRemoteSyncConfig) {
String[] ct = getServerConfig(dataId, group, tenant, 3000L);
Expand Down Expand Up @@ -394,11 +425,16 @@ public void checkConfigInfo() {
int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
if (longingTaskCount > currentLongingTaskCount) {
for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
taskIdSet.add(i);
// The task list is no order.So it maybe has issues when changing.
executorService.execute(new LongPollingRunnable(i));
}
currentLongingTaskCount = longingTaskCount;
} else if (longingTaskCount < currentLongingTaskCount) {
for (int i = longingTaskCount; i < (int) currentLongingTaskCount; i++) {
taskIdSet.remove(i);
}
}
currentLongingTaskCount = longingTaskCount;
}

/**
Expand Down Expand Up @@ -656,7 +692,9 @@ public void run() {
}
inInitializingCacheList.clear();

executorService.execute(this);
if (taskIdSet.contains(taskId)) {
executorService.execute(this);
}

} catch (Throwable e) {

Expand Down Expand Up @@ -685,6 +723,11 @@ private void setHealthServer(boolean isHealthServer) {
private final AtomicReference<Map<String, CacheData>> cacheMap = new AtomicReference<Map<String, CacheData>>(
new HashMap<String, CacheData>());

/**
* Store the running taskId.
*/
private final ConcurrentHashSet<Integer> taskIdSet = new ConcurrentHashSet<Integer>();

private final HttpAgent agent;

private final ConfigFilterChainManager configFilterChainManager;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.nacos.client.config.listener.impl;

import com.alibaba.nacos.api.config.listener.Listener;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.client.config.filter.impl.ConfigFilterChainManager;
import com.alibaba.nacos.client.config.http.MetricsHttpAgent;
import com.alibaba.nacos.client.config.impl.ClientWorker;
import com.alibaba.nacos.client.utils.ParamUtil;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;

import static org.mockito.Mockito.mock;

public class ClientWorkerTest {

@Mock
ScheduledExecutorService scheduledExecutorService;

private ClientWorker clientWorker;

private List<Listener> listeners;

private final String dataId = "data";

private final String group = "group";

private final String currentLongingTaskCount = "currentLongingTaskCount";

@Before
public void init() {
MockitoAnnotations.initMocks(this);
clientWorker = new ClientWorker(mock(MetricsHttpAgent.class), mock(ConfigFilterChainManager.class),
mock(Properties.class));
try {
Field executorServiceField = clientWorker.getClass().getDeclaredField("executorService");
executorServiceField.setAccessible(true);
Field modifiersField = Field.class.getDeclaredField("modifiers");
modifiersField.setAccessible(true);
modifiersField.setInt(executorServiceField, executorServiceField.getModifiers() & ~Modifier.FINAL);
executorServiceField.set(clientWorker, scheduledExecutorService);
Listener listener = new Listener() {
@Override
public Executor getExecutor() {
return null;
}

@Override
public void receiveConfigInfo(String configInfo) {

}
};
listeners = Arrays.asList(listener);
} catch (NoSuchFieldException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
}
}

@Test
public void testAddLongPollNumberThreads() {
try {
for (int i = 0; i < ParamUtil.getPerTaskConfigSize(); i++) {
clientWorker.addTenantListeners(dataId + i, group, listeners);
}
Field currentLongingTaskCountField = clientWorker.getClass().getDeclaredField(currentLongingTaskCount);
currentLongingTaskCountField.setAccessible(true);
Assert.assertEquals(currentLongingTaskCount, (int) currentLongingTaskCountField.getDouble(clientWorker), 1);
for (int i = (int) ParamUtil.getPerTaskConfigSize(); i < ParamUtil.getPerTaskConfigSize() * 2; i++) {
clientWorker.addTenantListeners(dataId + i, group, listeners);
}
Assert.assertEquals(currentLongingTaskCount, (int) currentLongingTaskCountField.getDouble(clientWorker), 2);
} catch (NacosException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (NoSuchFieldException e) {
e.printStackTrace();
}
}

@Test
public void testReduceLongPollNumberThreads() {
try {
for (int i = 0; i < ParamUtil.getPerTaskConfigSize() * 3; i++) {
clientWorker.addTenantListeners(dataId + i, group, listeners);
}
Field currentLongingTaskCountField = clientWorker.getClass().getDeclaredField(currentLongingTaskCount);
currentLongingTaskCountField.setAccessible(true);
Assert.assertEquals(currentLongingTaskCount, (int) currentLongingTaskCountField.getDouble(clientWorker), 3);

for (int i = (int) ParamUtil.getPerTaskConfigSize(); i < ParamUtil.getPerTaskConfigSize() * 2; i++) {
clientWorker.removeTenantListener(dataId + i, group, listeners.get(0));
}
Assert.assertEquals(currentLongingTaskCount, (int) currentLongingTaskCountField.getDouble(clientWorker), 2);
} catch (NacosException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (NoSuchFieldException e) {
e.printStackTrace();
}
}

}

0 comments on commit 95c8bf2

Please sign in to comment.