Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify tracking of task counts stats in TaskQueue #16423

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

public final class Counters
{
Expand All @@ -36,17 +35,6 @@ public static <K> int getAndIncrementInt(ConcurrentHashMap<K, AtomicInteger> cou
return counter.getAndIncrement();
}

public static <K> long incrementAndGetLong(ConcurrentHashMap<K, AtomicLong> counters, K key)
{
// get() before computeIfAbsent() is an optimization to avoid locking in computeIfAbsent() if not needed.
// See https://github.com/apache/druid/pull/6898#discussion_r251384586.
AtomicLong counter = counters.get(key);
if (counter == null) {
counter = counters.computeIfAbsent(key, k -> new AtomicLong());
}
return counter.incrementAndGet();
}

private Counters()
{
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package org.apache.druid.indexing.overlord;

import org.apache.druid.server.coordinator.stats.CoordinatorStat;
import org.apache.druid.server.stats.DruidStat;

/**
* Task-level stats emitted as metrics.
Expand All @@ -28,9 +28,23 @@ public class Stats
{
public static class TaskQueue
{
public static final CoordinatorStat STATUS_UPDATES_IN_QUEUE
= CoordinatorStat.toDebugAndEmit("queuedStatusUpdates", "task/status/queue/count");
public static final CoordinatorStat HANDLED_STATUS_UPDATES
= CoordinatorStat.toDebugAndEmit("handledStatusUpdates", "task/status/updated/count");
public static final DruidStat STATUS_UPDATES_IN_QUEUE
= DruidStat.toDebugAndEmit("queuedStatusUpdates", "task/status/queue/count");
public static final DruidStat HANDLED_STATUS_UPDATES
= DruidStat.toDebugAndEmit("handledStatusUpdates", "task/status/updated/count");
}

public static class TaskCount
{
public static final DruidStat SUCCESSFUL
= DruidStat.toDebugAndEmit("successfulTasks", "task/success/count");
public static final DruidStat FAILED
= DruidStat.toDebugAndEmit("failedTasks", "task/failed/count");
public static final DruidStat RUNNING
= DruidStat.toDebugAndEmit("runningTasks", "task/running/count");
public static final DruidStat PENDING
= DruidStat.toDebugAndEmit("pendingTasks", "task/pending/count");
public static final DruidStat WAITING
= DruidStat.toDebugAndEmit("waitingTasks", "task/waiting/count");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordinator.CoordinatorOverlordServiceConfig;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.metrics.TaskCountStatsProvider;
import org.apache.druid.server.metrics.TaskSlotCountStatsProvider;
import org.apache.druid.server.stats.DruidRunStats;

import javax.annotation.Nullable;
import java.util.Map;
Expand Down Expand Up @@ -315,68 +315,13 @@ public Optional<SupervisorManager> getSupervisorManager()
}

@Override
public Map<String, Long> getSuccessfulTaskCount()
{
Optional<TaskQueue> taskQueue = getTaskQueue();
if (taskQueue.isPresent()) {
return taskQueue.get().getSuccessfulTaskCount();
} else {
return null;
}
}

@Override
public Map<String, Long> getFailedTaskCount()
{
Optional<TaskQueue> taskQueue = getTaskQueue();
if (taskQueue.isPresent()) {
return taskQueue.get().getFailedTaskCount();
} else {
return null;
}
}

@Override
public Map<String, Long> getRunningTaskCount()
{
Optional<TaskQueue> taskQueue = getTaskQueue();
if (taskQueue.isPresent()) {
return taskQueue.get().getRunningTaskCount();
} else {
return null;
}
}

@Override
public Map<String, Long> getPendingTaskCount()
{
Optional<TaskQueue> taskQueue = getTaskQueue();
if (taskQueue.isPresent()) {
return taskQueue.get().getPendingTaskCount();
} else {
return null;
}
}

@Override
public Map<String, Long> getWaitingTaskCount()
{
Optional<TaskQueue> taskQueue = getTaskQueue();
if (taskQueue.isPresent()) {
return taskQueue.get().getWaitingTaskCount();
} else {
return null;
}
}

@Override
public CoordinatorRunStats getStats()
public DruidRunStats getTaskCountStats()
{
Optional<TaskQueue> taskQueue = getTaskQueue();
if (taskQueue.isPresent()) {
return taskQueue.get().getQueueStats();
} else {
return CoordinatorRunStats.empty();
return DruidRunStats.empty();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.Counters;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
Expand All @@ -61,8 +60,10 @@
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.metadata.PasswordProvider;
import org.apache.druid.metadata.PasswordProviderRedactionMixIn;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.utils.CollectionUtils;
import org.apache.druid.server.stats.Dimension;
import org.apache.druid.server.stats.DruidRunStats;
import org.apache.druid.server.stats.DruidStat;
import org.apache.druid.server.stats.RowKey;

import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -71,17 +72,18 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -148,15 +150,9 @@ public class TaskQueue

private static final EmittingLogger log = new EmittingLogger(TaskQueue.class);

private final ConcurrentHashMap<String, AtomicLong> totalSuccessfulTaskCount = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, AtomicLong> totalFailedTaskCount = new ConcurrentHashMap<>();
@GuardedBy("totalSuccessfulTaskCount")
private Map<String, Long> prevTotalSuccessfulTaskCount = new HashMap<>();
@GuardedBy("totalFailedTaskCount")
private Map<String, Long> prevTotalFailedTaskCount = new HashMap<>();

private final AtomicInteger statusUpdatesInQueue = new AtomicInteger();
private final AtomicInteger handledStatusUpdates = new AtomicInteger();
private final AtomicReference<DruidRunStats> collectedStats
= new AtomicReference<>(new DruidRunStats());

public TaskQueue(
TaskLockConfig lockConfig,
Expand Down Expand Up @@ -785,9 +781,9 @@ private void handleStatus(final TaskStatus status)
);

if (status.isSuccess()) {
Counters.incrementAndGetLong(totalSuccessfulTaskCount, task.getDataSource());
incrementTaskStat(Stats.TaskCount.SUCCESSFUL, task, collectedStats.get());
} else {
Counters.incrementAndGetLong(totalFailedTaskCount, task.getDataSource());
incrementTaskStat(Stats.TaskCount.FAILED, task, collectedStats.get());
}
}
}
Expand All @@ -799,7 +795,7 @@ private void handleStatus(final TaskStatus status)
}
finally {
statusUpdatesInQueue.decrementAndGet();
handledStatusUpdates.incrementAndGet();
collectedStats.get().add(Stats.TaskQueue.HANDLED_STATUS_UPDATES, 1);
}
}
},
Expand Down Expand Up @@ -870,83 +866,14 @@ private static Map<String, Task> toTaskIDMap(List<Task> taskList)
return rv;
}

private Map<String, Long> getDeltaValues(Map<String, Long> total, Map<String, Long> prev)
{
return total.entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue() - prev.getOrDefault(e.getKey(), 0L)));
}

public Map<String, Long> getSuccessfulTaskCount()
{
Map<String, Long> total = CollectionUtils.mapValues(totalSuccessfulTaskCount, AtomicLong::get);
synchronized (totalSuccessfulTaskCount) {
Map<String, Long> delta = getDeltaValues(total, prevTotalSuccessfulTaskCount);
prevTotalSuccessfulTaskCount = total;
return delta;
}
}

public Map<String, Long> getFailedTaskCount()
{
Map<String, Long> total = CollectionUtils.mapValues(totalFailedTaskCount, AtomicLong::get);
synchronized (totalFailedTaskCount) {
Map<String, Long> delta = getDeltaValues(total, prevTotalFailedTaskCount);
prevTotalFailedTaskCount = total;
return delta;
}
}

Map<String, String> getCurrentTaskDatasources()
{
giant.lock();
try {
return tasks.values().stream().collect(Collectors.toMap(Task::getId, Task::getDataSource));
}
finally {
giant.unlock();
}
}

public Map<String, Long> getRunningTaskCount()
{
Map<String, String> taskDatasources = getCurrentTaskDatasources();
return taskRunner.getRunningTasks()
.stream()
.collect(Collectors.toMap(
e -> taskDatasources.getOrDefault(e.getTaskId(), ""),
e -> 1L,
Long::sum
));
}

public Map<String, Long> getPendingTaskCount()
private void incrementTaskStat(DruidStat stat, Task task, DruidRunStats stats)
{
Map<String, String> taskDatasources = getCurrentTaskDatasources();
return taskRunner.getPendingTasks()
.stream()
.collect(Collectors.toMap(
e -> taskDatasources.getOrDefault(e.getTaskId(), ""),
e -> 1L,
Long::sum
));
}

public Map<String, Long> getWaitingTaskCount()
{
Set<String> runnerKnownTaskIds = taskRunner.getKnownTasks()
.stream()
.map(TaskRunnerWorkItem::getTaskId)
.collect(Collectors.toSet());

giant.lock();
try {
return tasks.values().stream().filter(task -> !runnerKnownTaskIds.contains(task.getId()))
.collect(Collectors.toMap(Task::getDataSource, task -> 1L, Long::sum));
}
finally {
giant.unlock();
}
stats.add(
stat,
RowKey.with(Dimension.DATASOURCE, task.getDataSource())
.and(Dimension.TASK_TYPE, task.getType()),
1
);
}

/**
Expand All @@ -963,17 +890,42 @@ public Optional<TaskStatus> getTaskStatus(final String taskId)
}
}

public CoordinatorRunStats getQueueStats()
/**
* Task stats collected by the TaskQueue since the last invocation of this
* method.
*/
public DruidRunStats getQueueStats()
{
final DruidRunStats stats = collectedStats.getAndSet(new DruidRunStats());

final int queuedUpdates = statusUpdatesInQueue.get();
final int handledUpdates = handledStatusUpdates.getAndSet(0);
final long handledUpdates = stats.get(Stats.TaskQueue.HANDLED_STATUS_UPDATES);
if (queuedUpdates > 0) {
log.info("There are [%d] task status updates in queue, handled [%d]", queuedUpdates, handledUpdates);
}

final CoordinatorRunStats stats = new CoordinatorRunStats();
stats.add(Stats.TaskQueue.STATUS_UPDATES_IN_QUEUE, queuedUpdates);
stats.add(Stats.TaskQueue.HANDLED_STATUS_UPDATES, handledUpdates);

// Add counts of running tasks
final List<Task> allTasks = getTasks();
final Map<String, Task> idToTask = allTasks.stream().collect(Collectors.toMap(Task::getId, Function.identity()));
taskRunner.getRunningTasks().stream()
.map(workItem -> idToTask.get(workItem.getTaskId()))
.filter(Objects::nonNull)
.forEach(task -> incrementTaskStat(Stats.TaskCount.RUNNING, task, stats));

// Add counts of pending tasks
taskRunner.getPendingTasks().stream()
.map(workItem -> idToTask.get(workItem.getTaskId()))
.filter(Objects::nonNull)
.forEach(task -> incrementTaskStat(Stats.TaskCount.PENDING, task, stats));

// Add counts of waiting tasks
final Set<String> runnerKnownTaskIds
= taskRunner.getKnownTasks().stream().map(TaskRunnerWorkItem::getTaskId).collect(Collectors.toSet());
allTasks.stream()
.filter(task -> !runnerKnownTaskIds.contains(task.getId()))
.forEach(task -> incrementTaskStat(Stats.TaskCount.WAITING, task, stats));

return stats;
}

Expand Down
Loading
Loading