Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ public BrokerResponse handleRequest(JsonNode request, @Nullable SqlNodeAndOption
}
}

brokerResponse.setRequestId(String.valueOf(requestId));
return brokerResponse;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
* Reimplementation of BrokerResponse from pinot-common, so that pinot-api does not depend on pinot-common.
*/
public class BrokerResponse {
private String _requestId;
private JsonNode _aggregationResults;
private JsonNode _selectionResults;
private JsonNode _resultTable;
Expand All @@ -35,6 +36,7 @@ private BrokerResponse() {
}

private BrokerResponse(JsonNode brokerResponse) {
_requestId = brokerResponse.get("requestId") != null ? brokerResponse.get("requestId").asText() : "unknown";
_aggregationResults = brokerResponse.get("aggregationResults");
_exceptions = brokerResponse.get("exceptions");
_selectionResults = brokerResponse.get("selectionResults");
Expand Down Expand Up @@ -81,4 +83,8 @@ static BrokerResponse fromJson(JsonNode json) {
static BrokerResponse empty() {
return new BrokerResponse();
}

public String getRequestId() {
return _requestId;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.client;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectReader;
import org.apache.pinot.spi.utils.JsonUtils;
import org.testng.Assert;
import org.testng.annotations.Test;


public class BrokerResponseTest {
private static final ObjectReader OBJECT_READER = JsonUtils.DEFAULT_READER;

@Test
public void parseResultWithRequestId()
throws JsonProcessingException {
String responseJson = "{\"requestId\":\"1\",\"traceInfo\":{},\"numDocsScanned\":36542,"
+ "\"aggregationResults\":[{\"function\":\"count_star\",\"value\":\"36542\"}],\"timeUsedMs\":30,"
+ "\"segmentStatistics\":[],\"exceptions\":[],\"totalDocs\":115545}";
BrokerResponse brokerResponse = BrokerResponse.fromJson(OBJECT_READER.readTree(responseJson));
Assert.assertEquals("1", brokerResponse.getRequestId());
Assert.assertTrue(!brokerResponse.hasExceptions());
}

@Test
public void parseResultWithoutRequestId()
throws JsonProcessingException {
String responseJson = "{\"traceInfo\":{},\"numDocsScanned\":36542,"
+ "\"aggregationResults\":[{\"function\":\"count_star\",\"value\":\"36542\"}],\"timeUsedMs\":30,"
+ "\"segmentStatistics\":[],\"exceptions\":[],\"totalDocs\":115545}";
BrokerResponse brokerResponse = BrokerResponse.fromJson(OBJECT_READER.readTree(responseJson));
Assert.assertEquals("unknown", brokerResponse.getRequestId());
Assert.assertTrue(!brokerResponse.hasExceptions());
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"requestId": "1",
"traceInfo": {},
"numDocsScanned": 36542,
"aggregationResults": [
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"requestId": "1",
"traceInfo": {},
"numDocsScanned": 22598,
"aggregationResults": [
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"requestId": "1",
"traceInfo": {},
"numDocsScanned": 0,
"aggregationResults": [],
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"requestId": "1",
"selectionResults": {
"columns": [
"ActualElapsedTime",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,4 +325,14 @@ String toJsonString()
* Set the total number of segments with a MatchAllFilterOperator when Explain Plan is called
*/
void setExplainPlanNumMatchAllFilterSegments(long explainPlanNumMatchAllFilterSegments);

/**
* get request ID for the query
*/
String getRequestId();

/**
* set request ID generated by broker
*/
void setRequestId(String requestId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
* Supports serialization via JSON.
*/
@JsonPropertyOrder({
"resultTable", "exceptions", "numServersQueried", "numServersResponded", "numSegmentsQueried",
"resultTable", "requestId", "exceptions", "numServersQueried", "numServersResponded", "numSegmentsQueried",
"numSegmentsProcessed", "numSegmentsMatched", "numConsumingSegmentsQueried", "numConsumingSegmentsProcessed",
"numConsumingSegmentsMatched", "numDocsScanned", "numEntriesScannedInFilter", "numEntriesScannedPostFilter",
"numGroupsLimitReached", "totalDocs", "timeUsedMs", "offlineThreadCpuTimeNs", "realtimeThreadCpuTimeNs",
Expand All @@ -57,6 +57,7 @@ public class BrokerResponseNative implements BrokerResponse {
new BrokerResponseNative(QueryException.TABLE_DOES_NOT_EXIST_ERROR);
public static final BrokerResponseNative BROKER_ONLY_EXPLAIN_PLAN_OUTPUT = getBrokerResponseExplainPlanOutput();

private String _requestId;
private int _numServersQueried = 0;
private int _numServersResponded = 0;
private long _numDocsScanned = 0L;
Expand Down Expand Up @@ -557,4 +558,16 @@ public void addToExceptions(QueryProcessingException processingException) {
public int getExceptionsSize() {
return _processingExceptions.size();
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit) Extra empty line

@JsonProperty("requestId")
@Override
public String getRequestId() {
return _requestId;
}

@JsonProperty("requestId")
@Override
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also annotate (JsonProperty) the setter? (Although seems the annotation is redundant)

public void setRequestId(String requestId) {
_requestId = requestId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
"realtimeTotalCpuTimeNs", "segmentStatistics", "traceInfo"
})
public class BrokerResponseNativeV2 extends BrokerResponseNative {
private String _requestId;

private final Map<Integer, BrokerResponseStats> _stageIdStats = new HashMap<>();

Expand Down Expand Up @@ -93,13 +92,4 @@ public void addStageStat(Integer stageId, BrokerResponseStats brokerResponseStat
public Map<Integer, BrokerResponseStats> getStageIdStats() {
return _stageIdStats;
}

@JsonProperty("requestId")
public String getRequestId() {
return _requestId;
}

public void setRequestId(String requestId) {
_requestId = requestId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
// same metadataKey
// TODO: Replace member fields with a simple map of <MetadataKey, Object>
// TODO: Add a subStat field, stage level subStats will contain each operator stats
@JsonPropertyOrder({"exceptions", "numBlocks", "numRows", "stageExecutionTimeMs", "stageExecutionUnit",
@JsonPropertyOrder({"requestId", "exceptions", "numBlocks", "numRows", "stageExecutionTimeMs", "stageExecutionUnit",
Copy link
Contributor

@walterddr walterddr Jun 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

requestID generated in v1 engine doesn't include a hashed encoding of the broker address. so essentially this could cause confusion (e.g. requestID starts with 0 from one broker, but you might get another 0 when hitting a different broker a bit later)

see: #10789 for more info

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that's a problem, I'll submit another PR to reuse MultiStageRequestIdGenerator in v1 engine.

"stageExecWallTimeMs", "stageExecEndTimeMs", "numServersQueried", "numServersResponded", "numSegmentsQueried",
"numSegmentsProcessed", "numSegmentsMatched", "numConsumingSegmentsQueried", "numConsumingSegmentsProcessed",
"numConsumingSegmentsMatched", "numDocsScanned", "numEntriesScannedInFilter", "numEntriesScannedPostFilter",
Expand Down