Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] Introduce ThreadMonitor to check if thread is blocking for long time. #6

Open
wants to merge 26 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
5ea0ab2
add thread monitor and thread pool monitor
lifepuzzlefun Dec 11, 2022
b7f0706
add thread monitor first edition
lifepuzzlefun Dec 11, 2022
c5436b5
add thread monitor and thread pool monitor
lifepuzzlefun Dec 16, 2022
8a09896
add thread monitor and thread pool monitor
lifepuzzlefun Dec 16, 2022
1918fd6
add thread monitor and thread pool monitor
lifepuzzlefun Dec 16, 2022
2bf598e
add thread monitor and thread pool monitor
lifepuzzlefun Dec 16, 2022
52335b6
add thread monitor and thread pool monitor
lifepuzzlefun Dec 16, 2022
bc32c83
add thread monitor and thread pool monitor
lifepuzzlefun Dec 16, 2022
e783410
add thread monitor and thread pool monitor
lifepuzzlefun Dec 16, 2022
00abcb5
add thread monitor and thread pool monitor
lifepuzzlefun Dec 16, 2022
ec86ae4
add thread monitor and thread pool monitor
lifepuzzlefun Dec 16, 2022
ceff16c
add thread monitor and thread pool monitor
lifepuzzlefun Dec 16, 2022
792f446
add thread monitor and thread pool monitor
lifepuzzlefun Dec 16, 2022
c99860a
add doc.
lifepuzzlefun Dec 16, 2022
f0c70a0
fix metric print
lifepuzzlefun Dec 16, 2022
0d7c434
fix metric print
lifepuzzlefun Dec 16, 2022
1fd48cf
fix metric print
lifepuzzlefun Dec 16, 2022
991a16f
fix metric print
lifepuzzlefun Dec 16, 2022
fa0f4b1
change `Executors.newSingleScheduleThreadPool` to `ScheduledExecutorP…
lifepuzzlefun Dec 16, 2022
75ee8d6
fix checkstyle
lifepuzzlefun Dec 16, 2022
14837d1
Merge branch 'master' into thread_monitor
lifepuzzlefun Dec 16, 2022
17d6930
fix checkstyle
lifepuzzlefun Dec 16, 2022
047bc53
fix checkstyle
lifepuzzlefun Dec 18, 2022
bd0ea20
fix checkstyle
lifepuzzlefun Dec 18, 2022
60ce767
fix checkstyle
lifepuzzlefun Dec 19, 2022
a37ceb9
fix checkstyle
lifepuzzlefun Dec 31, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix checkstyle
  • Loading branch information
lifepuzzlefun committed Dec 18, 2022
commit 047bc53b56a702574695386d080a51e89a3c6fa4
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import io.prometheus.client.CollectorRegistry;
import java.io.IOException;
import java.io.OutputStream;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashMap;
Expand Down Expand Up @@ -76,7 +78,7 @@ public static void generateSystemMetrics(SimpleTextOutputStream stream, String c
if (!sample.labelNames.contains("cluster")) {
stream.write("cluster=\"").write(cluster).write('"');
// If label is empty, should not append ','.
if (!CollectionUtils.isEmpty(sample.labelNames)){
if (!CollectionUtils.isEmpty(sample.labelNames)) {
stream.write(",");
}
}
Expand All @@ -101,11 +103,19 @@ public static void generateSystemMetrics(SimpleTextOutputStream stream, String c
}
}

public static final String THREAD_BLOCK_COUNTS = "thread_blocked_count";
public static final String THREAD_BLOCK_TIME_MS = "thread_blocked_time_ms";
public static final String THREAD_WAIT_COUNTS = "thread_waited_counts";
public static final String THREAD_WAIT_TIME_MS = "thread_waited_time_ms";

public static List<Metrics> generateThreadPoolMonitorMetrics(String cluster) {
List<Metrics> metrics = new ArrayList<>();

Map<Long, Long> threadStat = new HashMap<>(ThreadMonitor.THREAD_LAST_ACTIVE_TIMESTAMP);
Map<Long, String> threadIdMapping = new HashMap<>(ThreadMonitor.THREAD_ID_TO_NAME);
Map<Long, Long> threadStat = new HashMap<>(ThreadMonitor.THREAD_LAST_ACTIVE_TIMESTAMP.size());
threadStat.putAll(ThreadMonitor.THREAD_LAST_ACTIVE_TIMESTAMP);

Map<Long, String> threadIdMapping = new HashMap<>(ThreadMonitor.THREAD_ID_TO_NAME.size());
threadIdMapping.putAll(ThreadMonitor.THREAD_ID_TO_NAME);

threadStat.forEach((tid, lastActiveTimestamp) -> {
Map<String, String> dimensionMap = new HashMap<>();
Expand Down Expand Up @@ -133,6 +143,28 @@ public static List<Metrics> generateThreadPoolMonitorMetrics(String cluster) {

metrics.add(m);

ThreadMXBean threadMXBean = ThreadPoolMonitor.THREAD_MX_BEAN;
if (threadMXBean.isThreadContentionMonitoringSupported()) {
long[] allThreadIds = threadMXBean.getAllThreadIds();
ThreadInfo[] threadInfo = threadMXBean.getThreadInfo(allThreadIds, 0);
for (ThreadInfo info : threadInfo) {
if (info != null) {
Map<String, String> dimensions = new HashMap<>();
dimensions.put("cluster", cluster);
dimensions.put("threadName", info.getThreadName());
dimensions.put("tid", String.valueOf(info.getThreadId()));

Metrics threadMetric = Metrics.create(dimensions);
threadMetric.put(THREAD_BLOCK_COUNTS, info.getBlockedCount());
threadMetric.put(THREAD_BLOCK_TIME_MS, info.getBlockedTime());
threadMetric.put(THREAD_WAIT_COUNTS, info.getWaitedCount());
threadMetric.put(THREAD_WAIT_TIME_MS, info.getWaitedTime());

metrics.add(threadMetric);
}
}
}

return metrics;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
*/
package org.apache.pulsar.broker.web;

import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.FastThreadLocalThread;

import java.lang.reflect.Field;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.common.util.ExecutorProvider;
import org.apache.pulsar.common.util.ThreadMonitor;
import org.apache.pulsar.common.util.ThreadPoolMonitor;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.thread.ExecutorThreadPool;

Expand All @@ -48,22 +50,25 @@ protected void beforeExecute(Thread t, Runnable r) {
protected void afterExecute(Runnable r, Throwable t) {
ThreadMonitor.refreshThreadState(Thread.currentThread());
}
}, Math.min(8, maxThreads));
}, -1);

this.threadFactory = new DefaultThreadFactory(namePrefix) {
@Override
protected Thread newThread(Runnable r, String name) {
return new FastThreadLocalThread(threadGroup, r, name) {
@Override
public void run() {
super.run();
this.threadFactory = new ExecutorProvider.ExtendedThreadFactory(namePrefix);
tryMonitorThreadExecutor(maxThreads);
}

// execute when thread exit.
ThreadMonitor.remove(Thread.currentThread());
}
};
}
};
private void tryMonitorThreadExecutor(int threadNumber) {
ThreadPoolExecutor executor;
try {
Field executorField = this.getClass().getSuperclass().getDeclaredField("_executor");
executorField.setAccessible(true);
executor = (ThreadPoolExecutor) executorField.get(this);
} catch (NoSuchFieldException | IllegalAccessException e) {
executor = null;
}

if (executor != null) {
ThreadPoolMonitor.registerMultiThreadExecutor(executor, threadNumber);
}
}

@Override
Expand Down
Loading