Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -670,16 +670,14 @@ private void logBrokerResponse(long requestId, String query, RequestContext requ
if (_queryLogRateLimiter.tryAcquire() || forceLog(brokerResponse, totalTimeMs)) {
// Table name might have been changed (with suffix _OFFLINE/_REALTIME appended)
LOGGER.info("requestId={},table={},timeMs={},docs={}/{},entries={}/{},"
+ "segments(queried/processed/matched/consumingQueried/consumingMatched/consumingProcessed/unavailable):"
+ "{}/{}/{}/{}/{}/{}/{},consumingFreshnessTimeMs={},"
+ "segments(queried/processed/matched/consuming/unavailable):{}/{}/{}/{}/{},consumingFreshnessTimeMs={},"
+ "servers={}/{},groupLimitReached={},brokerReduceTimeMs={},exceptions={},serverStats={},"
+ "offlineThreadCpuTimeNs(total/thread/sysActivity/resSer):{}/{}/{}/{},"
+ "realtimeThreadCpuTimeNs(total/thread/sysActivity/resSer):{}/{}/{}/{},query={}", requestId, tableName,
totalTimeMs, brokerResponse.getNumDocsScanned(), brokerResponse.getTotalDocs(),
brokerResponse.getNumEntriesScannedInFilter(), brokerResponse.getNumEntriesScannedPostFilter(),
brokerResponse.getNumSegmentsQueried(), brokerResponse.getNumSegmentsProcessed(),
brokerResponse.getNumSegmentsMatched(), brokerResponse.getNumConsumingSegmentsQueried(),
brokerResponse.getNumConsumingSegmentsProcessed(), brokerResponse.getNumConsumingSegmentsMatched(),
numUnavailableSegments, brokerResponse.getMinConsumingFreshnessTimeMs(),
brokerResponse.getNumServersResponded(), brokerResponse.getNumServersQueried(),
brokerResponse.isNumGroupsLimitReached(), requestContext.getReduceTimeMillis(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,16 +107,6 @@ String toJsonString()
*/
long getNumConsumingSegmentsQueried();

/**
* Get number of consuming segments processed by server after server side pruning
*/
long getNumConsumingSegmentsProcessed();

/**
* Get number of consuming segments that had at least one matching document
*/
long getNumConsumingSegmentsMatched();

/**
* Get the minimum freshness timestamp across consuming segments that were queried
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@
*/
@JsonPropertyOrder({
"resultTable", "exceptions", "numServersQueried", "numServersResponded", "numSegmentsQueried",
"numSegmentsProcessed", "numSegmentsMatched", "numConsumingSegmentsQueried", "numConsumingSegmentsProcessed",
"numConsumingSegmentsMatched", "numDocsScanned", "numEntriesScannedInFilter", "numEntriesScannedPostFilter",
"numGroupsLimitReached", "totalDocs", "timeUsedMs", "offlineThreadCpuTimeNs", "realtimeThreadCpuTimeNs",
"offlineSystemActivitiesCpuTimeNs", "realtimeSystemActivitiesCpuTimeNs", "offlineResponseSerializationCpuTimeNs",
"numSegmentsProcessed", "numSegmentsMatched", "numConsumingSegmentsQueried", "numDocsScanned",
"numEntriesScannedInFilter", "numEntriesScannedPostFilter", "numGroupsLimitReached", "totalDocs", "timeUsedMs",
"offlineThreadCpuTimeNs", "realtimeThreadCpuTimeNs", "offlineSystemActivitiesCpuTimeNs",
"realtimeSystemActivitiesCpuTimeNs", "offlineResponseSerializationCpuTimeNs",
"realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs", "realtimeTotalCpuTimeNs", "segmentStatistics",
"traceInfo"
})
Expand All @@ -66,8 +66,6 @@ public class BrokerResponseNative implements BrokerResponse {
private long _numSegmentsProcessed = 0L;
private long _numSegmentsMatched = 0L;
private long _numConsumingSegmentsQueried = 0L;
private long _numConsumingSegmentsProcessed = 0L;
private long _numConsumingSegmentsMatched = 0L;
// the timestamp indicating the freshness of the data queried in consuming segments.
// This can be ingestion timestamp if provided by the stream, or the last index time
private long _minConsumingFreshnessTimeMs = 0L;
Expand Down Expand Up @@ -432,27 +430,6 @@ public void setNumConsumingSegmentsQueried(long numConsumingSegmentsQueried) {
_numConsumingSegmentsQueried = numConsumingSegmentsQueried;
}

@JsonProperty("numConsumingSegmentsProcessed")
@Override
public long getNumConsumingSegmentsProcessed() {
return _numConsumingSegmentsProcessed;
}
@JsonProperty("numConsumingSegmentsProcessed")
public void setNumConsumingSegmentsProcessed(long numConsumingSegmentsProcessed) {
_numConsumingSegmentsProcessed = numConsumingSegmentsProcessed;
}

@JsonProperty("numConsumingSegmentsMatched")
@Override
public long getNumConsumingSegmentsMatched() {
return _numConsumingSegmentsMatched;
}

@JsonProperty("numConsumingSegmentsMatched")
public void setNumConsumingSegmentsMatched(long numConsumingSegmentsMatched) {
_numConsumingSegmentsMatched = numConsumingSegmentsMatched;
}

@JsonProperty("minConsumingFreshnessTimeMs")
@Override
public long getMinConsumingFreshnessTimeMs() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,6 @@ enum MetadataKey {
NUM_SEGMENTS_PROCESSED("numSegmentsProcessed", MetadataValueType.INT),
NUM_SEGMENTS_MATCHED("numSegmentsMatched", MetadataValueType.INT),
NUM_CONSUMING_SEGMENTS_QUERIED("numConsumingSegmentsQueried", MetadataValueType.INT),
NUM_CONSUMING_SEGMENTS_PROCESSED("numConsumingSegmentsProcessed", MetadataValueType.INT),
NUM_CONSUMING_SEGMENTS_MATCHED("numConsumingSegmentsMatched", MetadataValueType.INT),
MIN_CONSUMING_FRESHNESS_TIME_MS("minConsumingFreshnessTimeMs", MetadataValueType.LONG),
TOTAL_DOCS("totalDocs", MetadataValueType.LONG),
NUM_GROUPS_LIMIT_REACHED("numGroupsLimitReached", MetadataValueType.STRING),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,6 @@ public class IntermediateResultsBlock implements Block {
private long _numTotalDocs;
private int _numSegmentsProcessed;
private int _numSegmentsMatched;
private int _numConsumingSegmentsProcessed;
private int _numConsumingSegmentsMatched;
private boolean _numGroupsLimitReached;
private int _numResizes;
private long _resizeTimeMs;
Expand Down Expand Up @@ -271,24 +269,6 @@ public void setNumSegmentsMatched(int numSegmentsMatched) {
_numSegmentsMatched = numSegmentsMatched;
}

@VisibleForTesting
public int getNumConsumingSegmentsProcessed() {
return _numConsumingSegmentsProcessed;
}

public void setNumConsumingSegmentsProcessed(int numConsumingSegmentsProcessed) {
_numConsumingSegmentsProcessed = numConsumingSegmentsProcessed;
}

@VisibleForTesting
public int getNumConsumingSegmentsMatched() {
return _numConsumingSegmentsMatched;
}

public void setNumConsumingSegmentsMatched(int numConsumingSegmentsMatched) {
_numConsumingSegmentsMatched = numConsumingSegmentsMatched;
}

@VisibleForTesting
public long getNumTotalDocs() {
return _numTotalDocs;
Expand Down Expand Up @@ -488,10 +468,6 @@ private DataTable attachMetadataToDataTable(DataTable dataTable) {
.put(MetadataKey.NUM_ENTRIES_SCANNED_POST_FILTER.getName(), String.valueOf(_numEntriesScannedPostFilter));
dataTable.getMetadata().put(MetadataKey.NUM_SEGMENTS_PROCESSED.getName(), String.valueOf(_numSegmentsProcessed));
dataTable.getMetadata().put(MetadataKey.NUM_SEGMENTS_MATCHED.getName(), String.valueOf(_numSegmentsMatched));
dataTable.getMetadata().put(MetadataKey.NUM_CONSUMING_SEGMENTS_PROCESSED.getName(),
String.valueOf(_numConsumingSegmentsProcessed));
dataTable.getMetadata().put(MetadataKey.NUM_CONSUMING_SEGMENTS_MATCHED.getName(),
String.valueOf(_numConsumingSegmentsMatched));
dataTable.getMetadata().put(MetadataKey.NUM_RESIZES.getName(), String.valueOf(_numResizes));
dataTable.getMetadata().put(MetadataKey.RESIZE_TIME_MS.getName(), String.valueOf(_resizeTimeMs));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.operator.ExecutionStatistics;
import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
import org.apache.pinot.segment.spi.MutableSegment;


@SuppressWarnings("rawtypes")
Expand Down Expand Up @@ -57,8 +56,6 @@ public static void setExecutionStatistics(IntermediateResultsBlock resultsBlock,
long threadCpuTimeNs, int numServerThreads) {
int numSegmentsProcessed = operators.size();
int numSegmentsMatched = 0;
int numConsumingSegmentsProcessed = 0;
int numConsumingSegmentsMatched = 0;
long numDocsScanned = 0;
long numEntriesScannedInFilter = 0;
long numEntriesScannedPostFilter = 0;
Expand All @@ -68,24 +65,13 @@ public static void setExecutionStatistics(IntermediateResultsBlock resultsBlock,
if (executionStatistics.getNumDocsScanned() > 0) {
numSegmentsMatched++;
}
try {
if (operator.getIndexSegment() instanceof MutableSegment) {
numConsumingSegmentsProcessed += 1;
if (executionStatistics.getNumDocsScanned() > 0) {
numConsumingSegmentsMatched++;
}
}
} catch (UnsupportedOperationException ignored) { }

numDocsScanned += executionStatistics.getNumDocsScanned();
numEntriesScannedInFilter += executionStatistics.getNumEntriesScannedInFilter();
numEntriesScannedPostFilter += executionStatistics.getNumEntriesScannedPostFilter();
numTotalDocs += executionStatistics.getNumTotalDocs();
}
resultsBlock.setNumSegmentsProcessed(numSegmentsProcessed);
resultsBlock.setNumSegmentsMatched(numSegmentsMatched);
resultsBlock.setNumConsumingSegmentsProcessed(numConsumingSegmentsProcessed);
resultsBlock.setNumConsumingSegmentsMatched(numConsumingSegmentsMatched);
resultsBlock.setNumDocsScanned(numDocsScanned);
resultsBlock.setNumEntriesScannedInFilter(numEntriesScannedInFilter);
resultsBlock.setNumEntriesScannedPostFilter(numEntriesScannedPostFilter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,6 @@ private DataTable processQuery(List<IndexSegment> indexSegments, QueryContext qu
metadata.put(MetadataKey.NUM_ENTRIES_SCANNED_POST_FILTER.getName(), "0");
metadata.put(MetadataKey.NUM_SEGMENTS_PROCESSED.getName(), "0");
metadata.put(MetadataKey.NUM_SEGMENTS_MATCHED.getName(), "0");
metadata.put(MetadataKey.NUM_CONSUMING_SEGMENTS_PROCESSED.getName(), "0");
metadata.put(MetadataKey.NUM_CONSUMING_SEGMENTS_MATCHED.getName(), "0");
metadata.put(MetadataKey.NUM_SEGMENTS_PRUNED_BY_SERVER.getName(), String.valueOf(totalSegments));
addPrunerStats(metadata, prunerStats);
return dataTable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,6 @@ protected static class ExecutionStatsAggregator {
private long _numSegmentsProcessed = 0L;
private long _numSegmentsMatched = 0L;
private long _numConsumingSegmentsQueried = 0L;
private long _numConsumingSegmentsProcessed = 0L;
private long _numConsumingSegmentsMatched = 0L;
private long _minConsumingFreshnessTimeMs = Long.MAX_VALUE;
private long _numTotalDocs = 0L;
private long _offlineThreadCpuTimeNs = 0L;
Expand Down Expand Up @@ -202,16 +200,6 @@ protected synchronized void aggregate(ServerRoutingInstance routingInstance, Dat
_numConsumingSegmentsQueried += Long.parseLong(numConsumingSegmentsQueriedString);
}

String numConsumingSegmentsProcessed = metadata.get(MetadataKey.NUM_CONSUMING_SEGMENTS_PROCESSED.getName());
if (numConsumingSegmentsProcessed != null) {
_numConsumingSegmentsProcessed += Long.parseLong(numConsumingSegmentsProcessed);
}

String numConsumingSegmentsMatched = metadata.get(MetadataKey.NUM_CONSUMING_SEGMENTS_MATCHED.getName());
if (numConsumingSegmentsMatched != null) {
_numConsumingSegmentsMatched += Long.parseLong(numConsumingSegmentsMatched);
}

String minConsumingFreshnessTimeMsString = metadata.get(MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName());
if (minConsumingFreshnessTimeMsString != null) {
_minConsumingFreshnessTimeMs =
Expand Down Expand Up @@ -312,8 +300,6 @@ protected void setStats(String rawTableName, BrokerResponseNative brokerResponse
brokerResponseNative.setNumConsumingSegmentsQueried(_numConsumingSegmentsQueried);
brokerResponseNative.setMinConsumingFreshnessTimeMs(_minConsumingFreshnessTimeMs);
}
brokerResponseNative.setNumConsumingSegmentsProcessed(_numConsumingSegmentsProcessed);
brokerResponseNative.setNumConsumingSegmentsMatched(_numConsumingSegmentsMatched);

// Update broker metrics.
if (brokerMetrics != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,10 +186,6 @@ protected byte[] processQueryAndSerialize(ServerQueryRequest queryRequest, Execu
MetadataKey.NUM_SEGMENTS_PRUNED_BY_VALUE.getName(), INVALID_SEGMENTS_COUNT));
long numSegmentsConsuming = Long.parseLong(
dataTableMetadata.getOrDefault(MetadataKey.NUM_CONSUMING_SEGMENTS_QUERIED.getName(), INVALID_SEGMENTS_COUNT));
long numConsumingSegmentsProcessed = Long.parseLong(
dataTableMetadata.getOrDefault(MetadataKey.NUM_CONSUMING_SEGMENTS_PROCESSED.getName(), INVALID_SEGMENTS_COUNT));
long numConsumingSegmentsMatched = Long.parseLong(
dataTableMetadata.getOrDefault(MetadataKey.NUM_CONSUMING_SEGMENTS_MATCHED.getName(), INVALID_SEGMENTS_COUNT));
long minConsumingFreshnessMs = Long.parseLong(
dataTableMetadata.getOrDefault(MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName(), INVALID_FRESHNESS_MS));
int numResizes =
Expand Down Expand Up @@ -246,13 +242,11 @@ protected byte[] processQueryAndSerialize(ServerQueryRequest queryRequest, Execu
// Please add new entries at the end
if (_queryLogRateLimiter.tryAcquire() || forceLog(schedulerWaitMs, numDocsScanned, numSegmentsPrunedInvalid)) {
LOGGER.info("Processed requestId={},table={},"
+ "segments(queried/processed/matched/consumingQueried/consumingProcessed/consumingMatched/"
+ "invalid/limit/value)={}/{}/{}/{}/{}/{}/{}/{}/{},"
+ "segments(queried/processed/matched/consuming/invalid/limit/value)={}/{}/{}/{}/{}/{}/{},"
+ "schedulerWaitMs={},reqDeserMs={},totalExecMs={},resSerMs={},totalTimeMs={},minConsumingFreshnessMs={},"
+ "broker={},numDocsScanned={},scanInFilter={},scanPostFilter={},sched={},"
+ "threadCpuTimeNs(total/thread/sysActivity/resSer)={}/{}/{}/{}", requestId, tableNameWithType,
numSegmentsQueried, numSegmentsProcessed, numSegmentsMatched, numSegmentsConsuming,
numConsumingSegmentsProcessed, numConsumingSegmentsMatched,
numSegmentsPrunedInvalid, numSegmentsPrunedByLimit, numSegmentsPrunedByValue, schedulerWaitMs,
timerContext.getPhaseDurationMs(ServerQueryPhase.REQUEST_DESERIALIZATION),
timerContext.getPhaseDurationMs(ServerQueryPhase.QUERY_PROCESSING),
Expand Down
Loading