Skip to content

Commit

Permalink
Refactor dispatch task execute (alibaba#3995)
Browse files Browse the repository at this point in the history
* Refactor nacos task execute engine

* Refactor nacos task execute engine

* For checkstyle

* For checkstyle

* Use ThreadUtils to reduce duplicate codes

* Set custom logger for TaskExecuteWorker

* Set custom logger for TaskExecuteWorker
  • Loading branch information
KomachiSion authored and zilongTong committed Oct 19, 2020
1 parent 1cf605d commit 3d41c2c
Show file tree
Hide file tree
Showing 25 changed files with 388 additions and 270 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
*
* @author xiweng.yy
*/
public abstract class AbstractExecuteTask implements NacosTask {
public abstract class AbstractExecuteTask implements NacosTask, Runnable {

@Override
public boolean shouldProcess() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,5 @@ public interface NacosTaskProcessor {
* @param task task.
* @return process task result.
*/
boolean process(AbstractDelayTask task);
boolean process(NacosTask task);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,13 @@

package com.alibaba.nacos.common.task.engine;

import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.common.executor.ExecutorFactory;
import com.alibaba.nacos.common.executor.NameThreadFactory;
import com.alibaba.nacos.common.task.NacosTask;
import com.alibaba.nacos.common.task.NacosTaskProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.HashSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

/**
* Abstract nacos task execute engine.
Expand All @@ -40,57 +33,12 @@ public abstract class AbstractNacosTaskExecuteEngine<T extends NacosTask> implem

private final Logger log;

private final ScheduledExecutorService processingExecutor;

private final ConcurrentHashMap<Object, NacosTaskProcessor> taskProcessors = new ConcurrentHashMap<Object, NacosTaskProcessor>();

protected final ConcurrentHashMap<Object, T> tasks;

protected final ReentrantLock lock = new ReentrantLock();

private NacosTaskProcessor defaultTaskProcessor;

public AbstractNacosTaskExecuteEngine(String name) {
this(name, 32, null, 100L);
}

public AbstractNacosTaskExecuteEngine(String name, Logger logger) {
this(name, 32, logger, 100L);
}

public AbstractNacosTaskExecuteEngine(String name, Logger logger, long processInterval) {
this(name, 32, logger, processInterval);
}

public AbstractNacosTaskExecuteEngine(String name, int initCapacity, Logger logger) {
this(name, initCapacity, logger, 100L);
}

public AbstractNacosTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) {
public AbstractNacosTaskExecuteEngine(Logger logger) {
this.log = null != logger ? logger : LoggerFactory.getLogger(AbstractNacosTaskExecuteEngine.class.getName());
tasks = new ConcurrentHashMap<Object, T>(initCapacity);
processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name));
processingExecutor.scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS);
}

@Override
public int size() {
lock.lock();
try {
return tasks.size();
} finally {
lock.unlock();
}
}

@Override
public boolean isEmpty() {
lock.lock();
try {
return tasks.isEmpty();
} finally {
lock.unlock();
}
}

@Override
Expand Down Expand Up @@ -118,56 +66,7 @@ public void setDefaultTaskProcessor(NacosTaskProcessor defaultTaskProcessor) {
this.defaultTaskProcessor = defaultTaskProcessor;
}

@Override
public T removeTask(Object key) {
lock.lock();
try {
T task = tasks.get(key);
if (null != task && task.shouldProcess()) {
return tasks.remove(key);
} else {
return null;
}
} finally {
lock.unlock();
}
}

@Override
public Collection<Object> getAllTaskKeys() {
Collection<Object> keys = new HashSet<Object>();
lock.lock();
try {
keys.addAll(tasks.keySet());
} finally {
lock.unlock();
}
return keys;
}

@Override
public void shutdown() throws NacosException {
processingExecutor.shutdown();
}

protected Logger getEngineLog() {
return log;
}

/**
* process tasks in execute engine.
*/
protected abstract void processTasks();

private class ProcessRunnable implements Runnable {

@Override
public void run() {
try {
AbstractNacosTaskExecuteEngine.this.processTasks();
} catch (Throwable e) {
log.error(e.toString(), e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,19 @@

package com.alibaba.nacos.common.task.engine;

import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.common.executor.ExecutorFactory;
import com.alibaba.nacos.common.executor.NameThreadFactory;
import com.alibaba.nacos.common.task.AbstractDelayTask;
import com.alibaba.nacos.common.task.NacosTaskProcessor;
import org.slf4j.Logger;

import java.util.Collection;
import java.util.HashSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

/**
* Nacos delay task execute engine.
Expand All @@ -29,27 +37,105 @@
*/
public class NacosDelayTaskExecuteEngine extends AbstractNacosTaskExecuteEngine<AbstractDelayTask> {

private final ScheduledExecutorService processingExecutor;

protected final ConcurrentHashMap<Object, AbstractDelayTask> tasks;

protected final ReentrantLock lock = new ReentrantLock();

public NacosDelayTaskExecuteEngine(String name) {
super(name);
this(name, null);
}

public NacosDelayTaskExecuteEngine(String name, Logger logger) {
super(name, logger);
this(name, 32, logger, 100L);
}

public NacosDelayTaskExecuteEngine(String name, Logger logger, long processInterval) {
super(name, logger, processInterval);
this(name, 32, logger, processInterval);
}

public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger) {
super(name, initCapacity, logger);
this(name, initCapacity, logger, 100L);
}

public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) {
super(name, initCapacity, logger, processInterval);
super(logger);
tasks = new ConcurrentHashMap<Object, AbstractDelayTask>(initCapacity);
processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name));
processingExecutor
.scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS);
}

@Override
public int size() {
lock.lock();
try {
return tasks.size();
} finally {
lock.unlock();
}
}

@Override
public boolean isEmpty() {
lock.lock();
try {
return tasks.isEmpty();
} finally {
lock.unlock();
}
}

@Override
public AbstractDelayTask removeTask(Object key) {
lock.lock();
try {
AbstractDelayTask task = tasks.get(key);
if (null != task && task.shouldProcess()) {
return tasks.remove(key);
} else {
return null;
}
} finally {
lock.unlock();
}
}

@Override
public Collection<Object> getAllTaskKeys() {
Collection<Object> keys = new HashSet<Object>();
lock.lock();
try {
keys.addAll(tasks.keySet());
} finally {
lock.unlock();
}
return keys;
}

@Override
public void shutdown() throws NacosException {
processingExecutor.shutdown();
}

@Override
public void addTask(Object key, AbstractDelayTask newTask) {
lock.lock();
try {
AbstractDelayTask existTask = tasks.get(key);
if (null != existTask) {
newTask.merge(existTask);
}
tasks.put(key, newTask);
} finally {
lock.unlock();
}
}

/**
* process tasks in execute engine.
*/
protected void processTasks() {
Collection<Object> keys = getAllTaskKeys();
for (Object taskKey : keys) {
Expand All @@ -74,22 +160,20 @@ protected void processTasks() {
}
}

@Override
public void addTask(Object key, AbstractDelayTask newTask) {
lock.lock();
try {
AbstractDelayTask existTask = tasks.get(key);
if (null != existTask) {
newTask.merge(existTask);
}
tasks.put(key, newTask);
} finally {
lock.unlock();
}
}

private void retryFailedTask(Object key, AbstractDelayTask task) {
task.setLastProcessTime(System.currentTimeMillis());
addTask(key, task);
}

private class ProcessRunnable implements Runnable {

@Override
public void run() {
try {
processTasks();
} catch (Throwable e) {
getEngineLog().error(e.toString(), e);
}
}
}
}
Loading

0 comments on commit 3d41c2c

Please sign in to comment.