Skip to content

Commit

Permalink
Update thread resource info and add tests
Browse files Browse the repository at this point in the history
Signed-off-by: sruti1312 <srutiparthiban@gmail.com>
  • Loading branch information
sruti1312 committed Mar 23, 2022
1 parent 312aaf5 commit b64ce26
Show file tree
Hide file tree
Showing 9 changed files with 126 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public TaskInfo taskInfoGivenSubtaskInfo(String localNodeId, List<TaskInfo> slic
sliceStatuses.set(status.getSliceId(), new BulkByScrollTask.StatusOrException(status));
}
Status status = leaderState.getStatus(sliceStatuses);
return taskInfo(localNodeId, getDescription(), status, null);
return taskInfo(localNodeId, getDescription(), status);
}

private BulkByScrollTask.Status emptyStatus() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

/** Defines the different types of resource stats. */
public enum ResourceStatsType {
// Resource stats of the worker thread that is reported directly from runnable.
// resource stats of the worker thread reported directly from runnable.
WORKER_STATS("worker_stats", false);

private final String statsType;
Expand Down
25 changes: 16 additions & 9 deletions server/src/main/java/org/opensearch/tasks/ResourceUsageInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Collections;
import java.util.EnumMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

/**
* Thread resource usage information for particular resource stats type.
* <p>
* It captures the resource usage information about a particular execution of thread
* It captures the resource usage information like memory, CPU about a particular execution of thread
* for a specific stats type.
*/
public class ResourceUsageInfo {
Expand All @@ -32,11 +34,16 @@ public ResourceUsageInfo(ResourceUsageMetric... resourceUsageMetrics) {

public void recordResourceUsageMetrics(ResourceUsageMetric... resourceUsageMetrics) {
for (ResourceUsageMetric resourceUsageMetric : resourceUsageMetrics) {
ResourceStatsInfo resourceStatsInfo = statsInfo.get(resourceUsageMetric.getStats());
if (resourceStatsInfo == null) {
statsInfo.put(resourceUsageMetric.getStats(), new ResourceStatsInfo(resourceUsageMetric.getValue()));
} else {
final ResourceStatsInfo resourceStatsInfo = statsInfo.get(resourceUsageMetric.getStats());
if (resourceStatsInfo != null) {
updateResourceUsageInfo(resourceStatsInfo, resourceUsageMetric);
} else {
throw new IllegalStateException(
"cannot update ["
+ resourceUsageMetric.getStats().toString()
+ "] entry as its not present current_stats_info:"
+ statsInfo
);
}
}
}
Expand All @@ -49,7 +56,7 @@ private void updateResourceUsageInfo(ResourceStatsInfo resourceStatsInfo, Resour
newEndValue = resourceUsageMetric.getValue();
if (currentEndValue > newEndValue) {
logger.debug(
"Dropping resource usage update as the new value is lower than current value ["
"dropping resource usage update as the new value is lower than current value ["
+ "resource_stats=[{}], "
+ "current_end_value={}, "
+ "new_end_value={}]",
Expand All @@ -61,15 +68,15 @@ private void updateResourceUsageInfo(ResourceStatsInfo resourceStatsInfo, Resour
}
} while (!resourceStatsInfo.endValue.compareAndSet(currentEndValue, newEndValue));
logger.debug(
"Updated resource usage info [resource_stats=[{}], " + "old_end_value={}, new_end_value={}]",
"updated resource usage info [resource_stats=[{}], " + "old_end_value={}, new_end_value={}]",
resourceUsageMetric.getStats(),
currentEndValue,
newEndValue
);
}

public EnumMap<ResourceStats, ResourceStatsInfo> getStatsInfo() {
return statsInfo;
public Map<ResourceStats, ResourceStatsInfo> getStatsInfo() {
return Collections.unmodifiableMap(statsInfo);
}

@Override
Expand Down
77 changes: 43 additions & 34 deletions server/src/main/java/org/opensearch/tasks/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -122,7 +123,7 @@ public Task(
* generate data?
*/
public final TaskInfo taskInfo(String localNodeId, boolean detailed) {
return taskInfo(localNodeId, detailed, !detailed);
return taskInfo(localNodeId, detailed, detailed == false);
}

/**
Expand All @@ -144,7 +145,7 @@ private TaskInfo taskInfo(String localNodeId, boolean detailed, boolean excludeS
status = getStatus();
}
if (excludeStats == false) {
resourceStats = new TaskResourceStats(new HashMap<String, TaskResourceUsage>() {
resourceStats = new TaskResourceStats(new HashMap<>() {
{
put(TOTAL, getTotalResourceStats());
}
Expand All @@ -153,6 +154,13 @@ private TaskInfo taskInfo(String localNodeId, boolean detailed, boolean excludeS
return taskInfo(localNodeId, description, status, resourceStats);
}

/**
* Build a {@link TaskInfo} for this task without resource stats.
*/
protected final TaskInfo taskInfo(String localNodeId, String description, Status status) {
return taskInfo(localNodeId, description, status, null);
}

/**
* Build a proper {@link TaskInfo} for this task.
*/
Expand Down Expand Up @@ -236,11 +244,13 @@ public Status getStatus() {
* Returns thread level resource consumption of the task
*/
public Map<Long, List<ThreadResourceInfo>> getResourceStats() {
return resourceStats;
return Collections.unmodifiableMap(resourceStats);
}

/**
* Returns total resource usage of the task
* Returns current total resource usage of the task.
* Currently, this method is only called on demand, during get and listing of tasks.
* In the future, these values can be cached as an optimization.
*/
public TaskResourceUsage getTotalResourceStats() {
return new TaskResourceUsage(getTotalResourceUtilization(ResourceStats.CPU), getTotalResourceUtilization(ResourceStats.MEMORY));
Expand All @@ -249,82 +259,81 @@ public TaskResourceUsage getTotalResourceStats() {
/**
* Returns total resource consumption for a specific task stat.
*/
public long getTotalResourceUtilization(ResourceStats taskStats) {
public long getTotalResourceUtilization(ResourceStats stats) {
long totalResourceConsumption = 0L;
for (List<ThreadResourceInfo> threadResourceInfosList : resourceStats.values()) {
for (ThreadResourceInfo threadResourceInfo : threadResourceInfosList) {
for (Map.Entry<ResourceStatsType, ResourceUsageInfo> entry : threadResourceInfo.getResourceUsageInfos().entrySet()) {
if (entry.getKey().isOnlyForAnalysis() == false) {
totalResourceConsumption += entry.getValue().getStatsInfo().get(taskStats).getTotalValue();
}
final ResourceUsageInfo.ResourceStatsInfo statsInfo = threadResourceInfo.getResourceUsageInfo().getStatsInfo().get(stats);
if (threadResourceInfo.getStatsType().isOnlyForAnalysis() == false && statsInfo != null) {
totalResourceConsumption += statsInfo.getTotalValue();
}
}
}
return totalResourceConsumption;
}

/**
* Adds thread's resource consumption information
* Adds thread's starting resource consumption information
* @param threadId ID of the thread
* @param statsType stats type
* @param resourceUsageMetrics resource consumption metrics of the thread
* @throws IllegalStateException matching active thread entry was found which is not expected.
*/
public void startThreadResourceTracking(long threadId, ResourceStatsType statsType, ResourceUsageMetric... resourceUsageMetrics) {
if (statsType != ResourceStatsType.WORKER_STATS) {
throw new IllegalArgumentException("Adding thread resource information should always have WORKER_STATS as stats type");
}
final List<ThreadResourceInfo> threadResourceInfoList = resourceStats.computeIfAbsent(threadId, k -> new ArrayList<>());
// active thread entry should not be present in the list.
// active thread entry should not be present in the list
for (ThreadResourceInfo threadResourceInfo : threadResourceInfoList) {
if (threadResourceInfo.isActive()) {
throw new IllegalStateException("Unexpected active thread entry is present");
if (threadResourceInfo.getStatsType() == statsType && threadResourceInfo.isActive()) {
throw new IllegalStateException(
"unexpected active thread resource entry present [" + threadId + "]:[" + threadResourceInfo + "]"
);
}
}
threadResourceInfoList.add(new ThreadResourceInfo(ResourceStatsType.WORKER_STATS, resourceUsageMetrics));
threadResourceInfoList.add(new ThreadResourceInfo(statsType, resourceUsageMetrics));
}

/**
* This method is used to update the resource consumption stats so that the data isn't too stale for long-running task.
* If an active thread entry is not present in the list, the update is dropped.
* If active thread entry is present in the list, the entry is updated. If one is not found, it throws an exception.
* @param threadId ID of the thread
* @param statsType stats type
* @param resourceUsageMetrics resource consumption metrics of the thread
* @throws IllegalStateException if no matching active thread entry was found.
*/
public void updateThreadResourceStats(long threadId, ResourceStatsType statsType, ResourceUsageMetric... resourceUsageMetrics) {
final List<ThreadResourceInfo> threadResourceInfoList = resourceStats.get(threadId);
if (threadResourceInfoList == null) {
throw new IllegalStateException("Cannot update if thread resource info is not present");
} else {
// If active entry is not present, the update is dropped. If present, the active entry is updated.
if (threadResourceInfoList != null) {
for (ThreadResourceInfo threadResourceInfo : threadResourceInfoList) {
if (threadResourceInfo.isActive()) {
threadResourceInfo.updateResourceInfo(statsType, resourceUsageMetrics);
// the active entry present in the list is updated
if (threadResourceInfo.getStatsType() == statsType && threadResourceInfo.isActive()) {
threadResourceInfo.recordResourceUsageMetrics(resourceUsageMetrics);
return;
}
}
}
throw new IllegalStateException("cannot update if active thread resource entry is not present");
}

/**
* Record the thread's final resource consumption values.
* If active thread entry is present in the list, the entry is updated. If one is not found, it throws an exception.
* @param threadId ID of the thread
* @param statsType stats type
* @param resourceUsageMetrics resource consumption metrics of the thread
* @throws IllegalStateException if no matching active thread entry was found.
*/
public void stopThreadResourceTracking(long threadId, ResourceStatsType statsType, ResourceUsageMetric... resourceUsageMetrics) {
final List<ThreadResourceInfo> threadResourceInfoList = resourceStats.get(threadId);
if (statsType != ResourceStatsType.WORKER_STATS || threadResourceInfoList == null) {
throw new IllegalArgumentException(
"Recording the end should have WORKER_STATS as stats type" + "and an active entry should be present in the list"
);
}
// marking active entries as done before updating the final resource usage values.
for (ThreadResourceInfo threadResourceInfo : threadResourceInfoList) {
if (threadResourceInfo.isActive()) {
threadResourceInfo.setActive(false);
threadResourceInfo.updateResourceInfo(ResourceStatsType.WORKER_STATS, resourceUsageMetrics);
if (threadResourceInfoList != null) {
for (ThreadResourceInfo threadResourceInfo : threadResourceInfoList) {
if (threadResourceInfo.getStatsType() == statsType && threadResourceInfo.isActive()) {
threadResourceInfo.setActive(false);
threadResourceInfo.recordResourceUsageMetrics(resourceUsageMetrics);
return;
}
}
}
throw new IllegalStateException("cannot update final values if active thread resource entry is not present");
}

/**
Expand Down
11 changes: 7 additions & 4 deletions server/src/main/java/org/opensearch/tasks/TaskResourceUsage.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
* information of running tasks.
*/
public class TaskResourceUsage implements Writeable, ToXContentFragment {
private static final ParseField CPU_TIME_IN_NANOS = new ParseField("cpu_time_in_nanos");
private static final ParseField MEMORY_IN_BYTES = new ParseField("memory_in_bytes");

private final long cpuTimeInNanos;
private final long memoryInBytes;

Expand Down Expand Up @@ -61,8 +64,8 @@ public long getMemoryInBytes() {

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(ResourceStats.CPU.toString(), cpuTimeInNanos);
builder.field(ResourceStats.MEMORY.toString(), memoryInBytes);
builder.field(CPU_TIME_IN_NANOS.getPreferredName(), cpuTimeInNanos);
builder.field(MEMORY_IN_BYTES.getPreferredName(), memoryInBytes);
return builder;
}

Expand All @@ -72,8 +75,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
);

static {
PARSER.declareLong(constructorArg(), new ParseField(ResourceStats.CPU.toString()));
PARSER.declareLong(constructorArg(), new ParseField(ResourceStats.MEMORY.toString()));
PARSER.declareLong(constructorArg(), CPU_TIME_IN_NANOS);
PARSER.declareLong(constructorArg(), MEMORY_IN_BYTES);
}

public static TaskResourceUsage fromXContent(XContentParser parser) {
Expand Down
38 changes: 20 additions & 18 deletions server/src/main/java/org/opensearch/tasks/ThreadResourceInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,45 +8,47 @@

package org.opensearch.tasks;

import java.util.EnumMap;

/**
* Resource consumption information about a particular execution of thread.
* <p>
* It captures the resource usage information about a particular execution of thread.
* across different stats type like worker_stats or response_stats etc.,
* It captures the resource usage information about a particular execution of thread
* for a specific stats type like worker_stats or response_stats etc.,
*/
public class ThreadResourceInfo {
private final EnumMap<ResourceStatsType, ResourceUsageInfo> resourceUsageInfos = new EnumMap<>(ResourceStatsType.class);
private volatile boolean isActive = true;
private final ResourceStatsType statsType;
private final ResourceUsageInfo resourceUsageInfo;

public ThreadResourceInfo(ResourceStatsType statsType, ResourceUsageMetric... resourceUsageMetrics) {
this.resourceUsageInfos.put(statsType, new ResourceUsageInfo(resourceUsageMetrics));
this.statsType = statsType;
this.resourceUsageInfo = new ResourceUsageInfo(resourceUsageMetrics);
}

public void updateResourceInfo(ResourceStatsType statsType, ResourceUsageMetric... resourceUsageMetrics) {
ResourceUsageInfo resourceUsageInfo = resourceUsageInfos.get(statsType);
if (resourceUsageInfo == null) {
resourceUsageInfos.put(statsType, new ResourceUsageInfo());
} else {
resourceUsageInfo.recordResourceUsageMetrics(resourceUsageMetrics);
}
/**
* Updates thread's resource consumption information.
*/
public void recordResourceUsageMetrics(ResourceUsageMetric... resourceUsageMetrics) {
resourceUsageInfo.recordResourceUsageMetrics(resourceUsageMetrics);
}

public EnumMap<ResourceStatsType, ResourceUsageInfo> getResourceUsageInfos() {
return resourceUsageInfos;
public void setActive(boolean isActive) {
this.isActive = isActive;
}

public boolean isActive() {
return isActive;
}

public boolean setActive(boolean active) {
return isActive = active;
public ResourceStatsType getStatsType() {
return statsType;
}

public ResourceUsageInfo getResourceUsageInfo() {
return resourceUsageInfo;
}

@Override
public String toString() {
return resourceUsageInfos + ", is_active=" + isActive;
return resourceUsageInfo + ", stats_type=" + statsType + ", is_active=" + isActive;
}
}
Loading

0 comments on commit b64ce26

Please sign in to comment.