Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Threadcontext POC changes #21

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.http.HttpTransportSettings;
import org.opensearch.tasks.ResourceUsageStatsTCPropagator;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskThreadContextStatePropagator;

Expand Down Expand Up @@ -126,7 +127,8 @@ public ThreadContext(Settings settings) {
this.threadLocal = ThreadLocal.withInitial(() -> DEFAULT_CONTEXT);
this.maxWarningHeaderCount = SETTING_HTTP_MAX_WARNING_HEADER_COUNT.get(settings);
this.maxWarningHeaderSize = SETTING_HTTP_MAX_WARNING_HEADER_SIZE.get(settings).getBytes();
this.propagators = new CopyOnWriteArrayList<>(List.of(new TaskThreadContextStatePropagator()));
this.propagators = new CopyOnWriteArrayList<>(List.of(new TaskThreadContextStatePropagator(),
new ResourceUsageStatsTCPropagator()));
}

public void registerThreadContextStatePropagator(final ThreadContextStatePropagator propagator) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.lucene.codecs.DocValuesFormat;
import org.apache.lucene.codecs.PostingsFormat;
import org.apache.lucene.codecs.lucene90.Lucene90DocValuesFormat;
//import org.apache.lucene.codecs.
import org.apache.lucene.codecs.lucene95.Lucene95Codec;
import org.opensearch.common.lucene.Lucene;
import org.opensearch.index.mapper.CompletionFieldMapper;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1754,6 +1754,7 @@ public boolean maybeRefresh(String source) throws EngineException {
final boolean refresh(String source, SearcherScope scope, boolean block) throws EngineException {
// both refresh types will result in an internal refresh but only the external will also
// pass the new reader reference to the external reader manager.
System.out.println("======= REFRESH called =====");
final long localCheckpointBeforeRefresh = localCheckpointTracker.getProcessedCheckpoint();
boolean refreshed;
try {
Expand Down Expand Up @@ -1852,6 +1853,8 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException {
// or (4) the local checkpoint information in the last commit is stale, which slows down future recoveries.
boolean hasUncommittedChanges = indexWriter.hasUncommittedChanges();
boolean shouldPeriodicallyFlush = shouldPeriodicallyFlush();
System.out.println("HAS UNCOMMITED CHANGES : " + hasUncommittedChanges);
System.out.println("shouldPeriodicallyFlush : " + shouldPeriodicallyFlush);
if (hasUncommittedChanges
|| force
|| shouldPeriodicallyFlush
Expand Down Expand Up @@ -2525,6 +2528,7 @@ protected void commitIndexWriter(final IndexWriter writer, final String translog
if (currentForceMergeUUID != null) {
commitData.put(FORCE_MERGE_UUID_KEY, currentForceMergeUUID);
}
System.out.println("committing writer with commit data [{}]" + commitData);
logger.trace("committing writer with commit data [{}]", commitData);
return commitData.entrySet().iterator();
});
Expand Down Expand Up @@ -2796,6 +2800,7 @@ public final long currentOngoingRefreshCheckpoint() {
* Refresh this engine **internally** iff the requesting seq_no is greater than the last refreshed checkpoint.
*/
protected final void refreshIfNeeded(String source, long requestingSeqNo) {
System.out.println("======= REFRESH If Needed called =====");
if (lastRefreshedCheckpoint() < requestingSeqNo) {
synchronized (refreshIfNeededMutex) {
if (lastRefreshedCheckpoint() < requestingSeqNo) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;

/**
* A mapper for binary fields
*
Expand Down
31 changes: 18 additions & 13 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -896,6 +896,17 @@ protected Node(

final RestController restController = actionModule.getRestController();

final NodeResourceUsageTracker nodeResourceUsageTracker = new NodeResourceUsageTracker(
threadPool,
settings,
clusterService.getClusterSettings()
);
final ResourceUsageCollectorService resourceUsageCollectorService = new ResourceUsageCollectorService(
nodeResourceUsageTracker,
clusterService,
threadPool
);

final AdmissionControlService admissionControlService = new AdmissionControlService(
settings,
clusterService.getClusterSettings(),
Expand Down Expand Up @@ -944,6 +955,7 @@ protected Node(
pluginsService.filterPlugins(ActionPlugin.class).stream().flatMap(p -> p.getTaskHeaders().stream()),
Stream.of(Task.X_OPAQUE_ID)
).collect(Collectors.toSet());

final TransportService transportService = newTransportService(
settings,
transport,
Expand All @@ -952,7 +964,8 @@ protected Node(
localNodeFactory,
settingsModule.getClusterSettings(),
taskHeaders,
tracer
tracer,
resourceUsageCollectorService
);
TopNSearchTasksLogger taskConsumer = new TopNSearchTasksLogger(settings, settingsModule.getClusterSettings());
transportService.getTaskManager().registerTaskResourceConsumer(taskConsumer);
Expand Down Expand Up @@ -1103,16 +1116,6 @@ protected Node(
transportService.getTaskManager(),
taskCancellationMonitoringSettings
);
final NodeResourceUsageTracker nodeResourceUsageTracker = new NodeResourceUsageTracker(
threadPool,
settings,
clusterService.getClusterSettings()
);
final ResourceUsageCollectorService resourceUsageCollectorService = new ResourceUsageCollectorService(
nodeResourceUsageTracker,
clusterService,
threadPool
);
this.nodeService = new NodeService(
settings,
threadPool,
Expand Down Expand Up @@ -1322,9 +1325,11 @@ protected TransportService newTransportService(
Function<BoundTransportAddress, DiscoveryNode> localNodeFactory,
ClusterSettings clusterSettings,
Set<String> taskHeaders,
Tracer tracer
Tracer tracer,
ResourceUsageCollectorService resourceUsageCollectorService
) {
return new TransportService(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders, tracer);
return new TransportService(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings,
taskHeaders, tracer, resourceUsageCollectorService);
}

protected void processRecoverySettings(ClusterSettings clusterSettings, RecoverySettings recoverySettings) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public NodeResourceUsageStats(StreamInput in) throws IOException {

@Override
public void writeTo(StreamOutput out) throws IOException {

out.writeString(this.nodeId);
out.writeLong(this.timestamp);
out.writeDouble(this.cpuUtilizationPercent);
Expand All @@ -49,12 +50,13 @@ public void writeTo(StreamOutput out) throws IOException {

@Override
public String toString() {
StringBuilder sb = new StringBuilder("NodeResourceUsageStats[");
sb.append(nodeId).append("](");
sb.append("Timestamp: ").append(timestamp);
sb.append(", CPU utilization percent: ").append(String.format(Locale.ROOT, "%.1f", cpuUtilizationPercent));
sb.append(", Memory utilization percent: ").append(String.format(Locale.ROOT, "%.1f", memoryUtilizationPercent));
sb.append(")");
StringBuilder sb = new StringBuilder();
sb.append(nodeId).append(":");
sb.append(timestamp);
sb.append(",");
sb.append(memoryUtilizationPercent);
sb.append(",");
sb.append(cpuUtilizationPercent);
return sb.toString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,14 @@ public Optional<NodeResourceUsageStats> getNodeStatistics(final String nodeId) {
.map(resourceUsageStats -> new NodeResourceUsageStats(resourceUsageStats));
}

public Optional<NodeResourceUsageStats> getLocalNodeStatistics() {
if(clusterService.state() != null) {
return Optional.ofNullable(nodeIdToResourceUsageStats.get(clusterService.state().nodes().getLocalNodeId()))
.map(resourceUsageStats -> new NodeResourceUsageStats(resourceUsageStats));
}
return Optional.empty();
}

/**
* Returns collected resource usage statistics of all nodes
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ private static class Defaults {
/**
* This is the default window duration on which the average resource utilization values will be calculated
*/
private static final long WINDOW_DURATION_IN_SECONDS = 30;
private static final long WINDOW_DURATION_IN_SECONDS = 1;
}

public static final Setting<TimeValue> GLOBAL_CPU_USAGE_AC_POLLING_INTERVAL_SETTING = Setting.positiveTimeSetting(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBuc
return new LeafBucketCollectorBase(sub, null) {
@Override
public void collect(int doc, long bucket) throws IOException {
System.out.println("doc id : " + doc);
boolean matched = false;
for (int i = 0; i < bits.length; i++) {
if (bits[i].get(doc)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBuc
return new LeafBucketCollectorBase(sub, values) {
@Override
public void collect(int doc, long bucket) throws IOException {
System.out.println("Doc id " + doc);
sums = bigArrays.grow(sums, bucket + 1);
compensations = bigArrays.grow(compensations, bucket + 1);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.tasks;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.opensearch.common.util.concurrent.ThreadContextStatePropagator;

import static org.opensearch.tasks.TaskResourceTrackingService.TASK_ID;


public class ResourceUsageStatsTCPropagator implements ThreadContextStatePropagator {
public static final String NODE_RESOURCE_STATS = "PERF_STATS";
@Override
public Map<String, Object> transients(Map<String, Object> source) {
final Map<String, Object> transients = new HashMap<>();
for(Map.Entry<String, Object> entry : source.entrySet()) {
if(entry.getKey().startsWith(NODE_RESOURCE_STATS)) {
// key starts with prefix
transients.put(entry.getKey(), entry.getValue());
}
}
return transients;
}

@Override
public Map<String, String> headers(Map<String, Object> source) {
final Map<String, String> headers = new HashMap<>();
for(Map.Entry<String, Object> entry : source.entrySet()) {
if(entry.getKey().startsWith(NODE_RESOURCE_STATS)) {
// key starts with prefix
headers.put(entry.getKey(), entry.getValue().toString());
}
}
return headers;
}
}
5 changes: 4 additions & 1 deletion server/src/main/java/org/opensearch/transport/Header.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,10 @@ Tuple<Map<String, String>, Map<String, Set<String>>> getHeaders() {

void finishParsingHeader(StreamInput input) throws IOException {
this.headers = ThreadContext.readHeadersFromStream(input);

//if(this.headers)
//System.out.println("HEADER");
if (isRequest()) {
//System.out.println("Request");
final String[] featuresFound = input.readStringArray();
if (featuresFound.length == 0) {
features = Collections.emptySet();
Expand All @@ -133,6 +135,7 @@ void finishParsingHeader(StreamInput input) throws IOException {
}
this.actionName = input.readString();
} else {
//System.out.println("Response");
this.actionName = RESPONSE_NAME;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ private void messageReceived(TcpChannel channel, InboundMessage message, long st
if (header.isRequest()) {
handleRequest(channel, header, message);
} else {
// THIS IS RESPONSE
// Responses do not support short circuiting currently
// Responses do not support short circuiting currently
assert message.isShortCircuit() == false;
final TransportResponseHandler<?> handler;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.transport;

public class ResourceUsageStatsReference {
private String resourceUsageStats;

public ResourceUsageStatsReference(String stats) {
this.resourceUsageStats = stats;
}

public String getResourceUsageStats() {
return resourceUsageStats;
}

public void setResourceUsageStats(String stats) {
this.resourceUsageStats = new String(stats);
}

@Override
public String toString() {
return this.resourceUsageStats;
}

}
Loading
Loading