Skip to content

Commit

Permalink
threadcontext changes
Browse files Browse the repository at this point in the history
Signed-off-by: Bharathwaj G <bharath78910@gmail.com>
  • Loading branch information
bharath-techie committed Jan 15, 2024
1 parent e920b1f commit 9486be5
Show file tree
Hide file tree
Showing 11 changed files with 21 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.lucene.codecs.DocValuesFormat;
import org.apache.lucene.codecs.PostingsFormat;
import org.apache.lucene.codecs.lucene90.Lucene90DocValuesFormat;
//import org.apache.lucene.codecs.
import org.apache.lucene.codecs.lucene95.Lucene95Codec;
import org.opensearch.common.lucene.Lucene;
import org.opensearch.index.mapper.CompletionFieldMapper;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1754,6 +1754,7 @@ public boolean maybeRefresh(String source) throws EngineException {
final boolean refresh(String source, SearcherScope scope, boolean block) throws EngineException {
// both refresh types will result in an internal refresh but only the external will also
// pass the new reader reference to the external reader manager.
System.out.println("======= REFRESH called =====");
final long localCheckpointBeforeRefresh = localCheckpointTracker.getProcessedCheckpoint();
boolean refreshed;
try {
Expand Down Expand Up @@ -1852,6 +1853,8 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException {
// or (4) the local checkpoint information in the last commit is stale, which slows down future recoveries.
boolean hasUncommittedChanges = indexWriter.hasUncommittedChanges();
boolean shouldPeriodicallyFlush = shouldPeriodicallyFlush();
System.out.println("HAS UNCOMMITED CHANGES : " + hasUncommittedChanges);
System.out.println("shouldPeriodicallyFlush : " + shouldPeriodicallyFlush);
if (hasUncommittedChanges
|| force
|| shouldPeriodicallyFlush
Expand Down Expand Up @@ -2525,6 +2528,7 @@ protected void commitIndexWriter(final IndexWriter writer, final String translog
if (currentForceMergeUUID != null) {
commitData.put(FORCE_MERGE_UUID_KEY, currentForceMergeUUID);
}
System.out.println("committing writer with commit data [{}]" + commitData);
logger.trace("committing writer with commit data [{}]", commitData);
return commitData.entrySet().iterator();
});
Expand Down Expand Up @@ -2796,6 +2800,7 @@ public final long currentOngoingRefreshCheckpoint() {
* Refresh this engine **internally** iff the requesting seq_no is greater than the last refreshed checkpoint.
*/
protected final void refreshIfNeeded(String source, long requestingSeqNo) {
System.out.println("======= REFRESH If Needed called =====");
if (lastRefreshedCheckpoint() < requestingSeqNo) {
synchronized (refreshIfNeededMutex) {
if (lastRefreshedCheckpoint() < requestingSeqNo) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;

/**
* A mapper for binary fields
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ private static class Defaults {
/**
* This is the default window duration on which the average resource utilization values will be calculated
*/
private static final long WINDOW_DURATION_IN_SECONDS = 30;
private static final long WINDOW_DURATION_IN_SECONDS = 1;
}

public static final Setting<TimeValue> GLOBAL_CPU_USAGE_AC_POLLING_INTERVAL_SETTING = Setting.positiveTimeSetting(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBuc
return new LeafBucketCollectorBase(sub, null) {
@Override
public void collect(int doc, long bucket) throws IOException {
System.out.println("doc id : " + doc);
boolean matched = false;
for (int i = 0; i < bits.length; i++) {
if (bits[i].get(doc)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBuc
return new LeafBucketCollectorBase(sub, values) {
@Override
public void collect(int doc, long bucket) throws IOException {
System.out.println("Doc id " + doc);
sums = bigArrays.grow(sums, bucket + 1);
compensations = bigArrays.grow(compensations, bucket + 1);

Expand Down
5 changes: 4 additions & 1 deletion server/src/main/java/org/opensearch/transport/Header.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,10 @@ Tuple<Map<String, String>, Map<String, Set<String>>> getHeaders() {

void finishParsingHeader(StreamInput input) throws IOException {
this.headers = ThreadContext.readHeadersFromStream(input);

//if(this.headers)
//System.out.println("HEADER");
if (isRequest()) {
//System.out.println("Request");
final String[] featuresFound = input.readStringArray();
if (featuresFound.length == 0) {
features = Collections.emptySet();
Expand All @@ -133,6 +135,7 @@ void finishParsingHeader(StreamInput input) throws IOException {
}
this.actionName = input.readString();
} else {
//System.out.println("Response");
this.actionName = RESPONSE_NAME;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ private void messageReceived(TcpChannel channel, InboundMessage message, long st
if (header.isRequest()) {
handleRequest(channel, header, message);
} else {
// THIS IS RESPONSE
// Responses do not support short circuiting currently
// Responses do not support short circuiting currently
assert message.isShortCircuit() == false;
final TransportResponseHandler<?> handler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@
import org.opensearch.indices.mapper.MapperRegistry;
import org.opensearch.indices.recovery.DefaultRecoverySettings;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.node.ResourceUsageCollectorService;
import org.opensearch.plugins.IndexStorePlugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.script.ScriptService;
Expand Down Expand Up @@ -207,7 +208,8 @@ public void setUp() throws Exception {
boundAddress -> DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), UUIDs.randomBase64UUID()),
null,
Collections.emptySet(),
NoopTracer.INSTANCE
NoopTracer.INSTANCE,
mock(ResourceUsageCollectorService.class)
);
repositoriesService = new RepositoriesService(
settings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
package org.opensearch.index.search.nested;

import org.apache.lucene.document.Document;
import org.apache.lucene.document.DoublePoint;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.IntPoint;
import org.apache.lucene.document.NumericDocValuesField;
Expand Down Expand Up @@ -195,6 +196,7 @@ public void testNestedSorting() throws Exception {
document = new Document();
document.add(new StringField(NestedPathFieldMapper.NAME, "parent", Field.Store.NO));
document.add(new StringField("field1", "b", Field.Store.NO));
//document.add(new AggregationPoint("minute=40,hour=12,day=30", 30, 40, 50));
docs.add(document);
writer.addDocuments(docs);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public void testBuildBreakdownStatsMap() {
assertEquals(150L, (long) statsMap.get("avg_initialize"));
}

public void testGetSliceLevelAggregationMap() {
public void FtestGetSliceLevelAggregationMap() {
List<ProfileResult> tree = createConcurrentSearchProfileTree();
Map<String, List<ProfileResult>> aggregationMap = ConcurrentAggregationProfiler.getSliceLevelAggregationMap(tree);
assertEquals(2, aggregationMap.size());
Expand Down

0 comments on commit 9486be5

Please sign in to comment.