Skip to content

HBASE-22881 Fix non-daemon threads in hbase server implementation #536

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 28, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -807,7 +807,8 @@ public int run(String[] arg0) throws Exception {

// Have them all share the same connection so they all share the same instance of
// ManyServersManyRegionsConnection so I can keep an eye on how many requests by server.
final ExecutorService pool = Executors.newCachedThreadPool(Threads.getNamedThreadFactory("p"));
final ExecutorService pool = Executors.newCachedThreadPool(
Threads.newDaemonThreadFactory("p"));
// Executors.newFixedThreadPool(servers * 10, Threads.getNamedThreadFactory("p"));
// Share a connection so I can keep counts in the 'server' on concurrency.
final Connection sharedConnection = ConnectionFactory.createConnection(getConf()/*, pool*/);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ public static ThreadPoolExecutor getBoundedCachedThreadPool(
* @param prefix The prefix of every created Thread's name
* @return a {@link java.util.concurrent.ThreadFactory} that names threads
*/
public static ThreadFactory getNamedThreadFactory(final String prefix) {
private static ThreadFactory getNamedThreadFactory(final String prefix) {
SecurityManager s = System.getSecurityManager();
final ThreadGroup threadGroup = (s != null) ? s.getThreadGroup() : Thread.currentThread()
.getThreadGroup();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,9 @@ private static ThreadFactory getThreadFactory() {
@Override
public Thread newThread(Runnable r) {
final String name = "HFileArchiver-" + threadNumber.getAndIncrement();
return new Thread(r, name);
Thread t = new Thread(r, name);
t.setDaemon(true);
return t;
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,21 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.master.locking.LockManager;
import org.apache.hadoop.hbase.mob.MobUtils;
import org.apache.hadoop.hbase.procedure2.LockType;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;

/**
* The mob compaction thread used in {@link MasterRpcServices}
Expand All @@ -55,14 +55,11 @@ public MasterMobCompactionThread(HMaster master) {
this.conf = master.getConfiguration();
final String n = Thread.currentThread().getName();
// this pool is used to run the mob compaction
this.masterMobPool = new ThreadPoolExecutor(1, 2, 60, TimeUnit.SECONDS,
new SynchronousQueue<>(), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
String name = n + "-MasterMobCompaction-" + EnvironmentEdgeManager.currentTime();
return new Thread(r, name);
}
});
this.masterMobPool = new ThreadPoolExecutor(1, 2, 60,
TimeUnit.SECONDS, new SynchronousQueue<>(),
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat(n + "-MasterMobCompaction-" + EnvironmentEdgeManager.currentTime())
.build());
((ThreadPoolExecutor) this.masterMobPool).allowCoreThreadTimeOut(true);
// this pool is used in the mob compaction to compact the mob files by partitions
// in parallel
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,7 @@ private Pair<Integer, Integer> splitStoreFiles(final MasterProcedureEnv env,
LOG.info("pid=" + getProcId() + " splitting " + nbFiles + " storefiles, region=" +
getParentRegion().getShortNameToLog() + ", threads=" + maxThreads);
final ExecutorService threadPool = Executors.newFixedThreadPool(maxThreads,
Threads.getNamedThreadFactory("StoreFileSplitter-%1$d"));
Threads.newDaemonThreadFactory("StoreFileSplitter-%1$d"));
final List<Future<Pair<Path, Path>>> futures = new ArrayList<Future<Pair<Path, Path>>>(nbFiles);

// Split each store file.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,9 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.DaemonThreadFactory;
Expand All @@ -47,7 +45,9 @@
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -213,10 +213,8 @@ static class FlushTableSubprocedurePool {
RegionServerFlushTableProcedureManager.FLUSH_TIMEOUT_MILLIS_DEFAULT);
int threads = conf.getInt(CONCURENT_FLUSH_TASKS_KEY, DEFAULT_CONCURRENT_FLUSH_TASKS);
this.name = name;
executor = new ThreadPoolExecutor(threads, threads, keepAlive, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(), new DaemonThreadFactory("rs("
+ name + ")-flush-proc-pool"));
executor.allowCoreThreadTimeOut(true);
executor = Threads.getBoundedCachedThreadPool(threads, keepAlive, TimeUnit.MILLISECONDS,
new DaemonThreadFactory("rs(" + name + ")-flush-proc-pool-"));
taskPool = new ExecutorCompletionService<>(executor);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -56,8 +55,10 @@
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;

/**
* Compact region on request and then run split if appropriate
Expand Down Expand Up @@ -118,14 +119,9 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
private void createSplitExcecutors() {
final String n = Thread.currentThread().getName();
int splitThreads = conf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT);
this.splits =
(ThreadPoolExecutor) Executors.newFixedThreadPool(splitThreads, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
String name = n + "-splits-" + System.currentTimeMillis();
return new Thread(r, name);
}
});
this.splits = (ThreadPoolExecutor) Executors.newFixedThreadPool(splitThreads,
new ThreadFactoryBuilder().setNameFormat(n + "-splits-" + System.currentTimeMillis())
.setDaemon(true).build());
}

private void createCompactionExecutors() {
Expand All @@ -144,24 +140,16 @@ private void createCompactionExecutors() {
StealJobQueue<Runnable> stealJobQueue = new StealJobQueue<Runnable>(COMPARATOR);
this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads, 60,
TimeUnit.SECONDS, stealJobQueue,
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
String name = n + "-longCompactions-" + System.currentTimeMillis();
return new Thread(r, name);
}
});
new ThreadFactoryBuilder()
.setNameFormat(n + "-longCompactions-" + System.currentTimeMillis())
.setDaemon(true).build());
this.longCompactions.setRejectedExecutionHandler(new Rejection());
this.longCompactions.prestartAllCoreThreads();
this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads, 60,
TimeUnit.SECONDS, stealJobQueue.getStealFromQueue(),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
String name = n + "-shortCompactions-" + System.currentTimeMillis();
return new Thread(r, name);
}
});
new ThreadFactoryBuilder()
.setNameFormat(n + "-shortCompactions-" + System.currentTimeMillis())
.setDaemon(true).build());
this.shortCompactions.setRejectedExecutionHandler(new Rejection());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

Expand All @@ -36,6 +35,7 @@
import org.apache.hadoop.hbase.DroppedSnapshotException;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
Expand Down Expand Up @@ -283,10 +283,8 @@ static class SnapshotSubprocedurePool {
RegionServerSnapshotManager.SNAPSHOT_TIMEOUT_MILLIS_DEFAULT);
int threads = conf.getInt(CONCURENT_SNAPSHOT_TASKS_KEY, DEFAULT_CONCURRENT_SNAPSHOT_TASKS);
this.name = name;
executor = new ThreadPoolExecutor(threads, threads, keepAlive, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(), new DaemonThreadFactory("rs("
+ name + ")-snapshot-pool"));
executor.allowCoreThreadTimeOut(true);
executor = Threads.getBoundedCachedThreadPool(threads, keepAlive, TimeUnit.MILLISECONDS,
new DaemonThreadFactory("rs(" + name + ")-snapshot-pool-"));
taskPool = new ExecutorCompletionService<>(executor);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,8 @@ public FSHLog(final FileSystem fs, final Path rootDir, final String logDir,
// Using BlockingWaitStrategy. Stuff that is going on here takes so long it makes no sense
// spinning as other strategies do.
this.disruptor = new Disruptor<>(RingBufferTruck::new,
getPreallocatedEventCount(), Threads.getNamedThreadFactory(hostingThreadName + ".append"),
getPreallocatedEventCount(),
Threads.newDaemonThreadFactory(hostingThreadName + ".append"),
ProducerType.MULTI, new BlockingWaitStrategy());
// Advance the ring buffer sequence so that it starts from 1 instead of 0,
// because SyncFuture.NOT_DONE = 0.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
Expand All @@ -57,13 +56,15 @@
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;

import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.BlockingInterface;

Expand Down Expand Up @@ -141,9 +142,8 @@ public void init(Context context) throws IOException {
// per sink thread pool
this.maxThreads = this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY,
HConstants.REPLICATION_SOURCE_MAXTHREADS_DEFAULT);
this.exec = new ThreadPoolExecutor(maxThreads, maxThreads, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>());
this.exec.allowCoreThreadTimeOut(true);
this.exec = Threads.getBoundedCachedThreadPool(maxThreads, 60, TimeUnit.SECONDS,
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("SinkThread-%d").build());
this.abortable = ctx.getAbortable();
// Set the size limit for replication RPCs to 95% of the max request size.
// We could do with less slop if we have an accurate estimate of encoded size. Being
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
*/
package org.apache.hadoop.hbase.replication.regionserver;

import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
Expand All @@ -27,7 +25,6 @@
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

Expand All @@ -39,9 +36,6 @@
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Table;
Expand All @@ -53,6 +47,12 @@
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;

/**
* It is used for replicating HFile entries. It will first copy parallely all the hfiles to a local
Expand Down Expand Up @@ -105,12 +105,9 @@ public HFileReplicator(Configuration sourceClusterConf,
this.maxCopyThreads =
this.conf.getInt(REPLICATION_BULKLOAD_COPY_MAXTHREADS_KEY,
REPLICATION_BULKLOAD_COPY_MAXTHREADS_DEFAULT);
ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
builder.setNameFormat("HFileReplicationCallable-%1$d");
this.exec =
new ThreadPoolExecutor(maxCopyThreads, maxCopyThreads, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(), builder.build());
this.exec.allowCoreThreadTimeOut(true);
this.exec = Threads.getBoundedCachedThreadPool(maxCopyThreads, 60, TimeUnit.SECONDS,
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("HFileReplicationCallable-%1$d").build());
this.copiesPerThread =
conf.getInt(REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_KEY,
REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_DEFAULT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,8 @@ public ReplicationSourceManager(ReplicationQueueStorage queueStorage,
int nbWorkers = conf.getInt("replication.executor.workers", 1);
// use a short 100ms sleep since this could be done inline with a RS startup
// even if we fail, other region servers can take care of it
this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers, 100, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>());
this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers, 100,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
tfb.setNameFormat("ReplicationExecutor-%d");
tfb.setDaemon(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ private ThreadPoolExecutor createExecutor(final String name) {
public static ThreadPoolExecutor createExecutor(final Configuration conf, final String name) {
int maxThreads = conf.getInt("hbase.snapshot.thread.pool.max", 8);
return Threads.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
Threads.getNamedThreadFactory(name));
Threads.newDaemonThreadFactory(name));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ private ExecutorService createThreadPool(Configuration conf) {
int availableProcessors = Runtime.getRuntime().availableProcessors();
int numThreads = conf.getInt("hfilevalidator.numthreads", availableProcessors);
return Executors.newFixedThreadPool(numThreads,
Threads.getNamedThreadFactory("hfile-validator"));
Threads.newDaemonThreadFactory("hfile-validator"));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1623,7 +1623,8 @@ public boolean accept(Path path) {
// run in multiple threads
ThreadPoolExecutor tpe = new ThreadPoolExecutor(threadPoolSize,
threadPoolSize, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(statusList.length));
new ArrayBlockingQueue<>(statusList.length),
Threads.newDaemonThreadFactory("FSRegionQuery"));
try {
// ignore all file status items that are not of interest
for (FileStatus regionStatus : statusList) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -235,14 +234,7 @@ static ThreadPoolExecutor getRegionOpenAndInitThreadPool(final Configuration con
"hbase.hregion.open.and.init.threads.max", 16));
ThreadPoolExecutor regionOpenAndInitThreadPool = Threads
.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
new ThreadFactory() {
private int count = 1;

@Override
public Thread newThread(Runnable r) {
return new Thread(r, threadNamePrefix + "-" + count++);
}
});
Threads.newDaemonThreadFactory(threadNamePrefix));
return regionOpenAndInitThreadPool;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,9 @@ public class SimpleSubprocedurePool implements Closeable, Abortable {

public SimpleSubprocedurePool(String name, Configuration conf) {
this.name = name;
executor = new ThreadPoolExecutor(1, 1, 500, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
new DaemonThreadFactory("rs(" + name + ")-procedure-pool"));
executor = new ThreadPoolExecutor(1, 1, 500,
TimeUnit.SECONDS, new LinkedBlockingQueue<>(),
new DaemonThreadFactory("rs(" + name + ")-procedure-pool-"));
taskPool = new ExecutorCompletionService<>(executor);
}

Expand Down
Loading