Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -156,27 +156,23 @@ public StringBuilder visitMailboxReceive(MailboxReceiveNode node, Context contex
MailboxSendNode sender = (MailboxSendNode) node.getSender();
int senderStageId = node.getSenderStageId();
DispatchablePlanFragment dispatchablePlanFragment = _dispatchableSubPlan.getQueryStageList().get(senderStageId);
Map<Integer, Map<String, List<String>>> segments = dispatchablePlanFragment.getWorkerIdToSegmentsMap();

Map<QueryServerInstance, List<Integer>> serverInstanceToWorkerIdMap =
dispatchablePlanFragment.getServerInstanceToWorkerIdMap();
Iterator<QueryServerInstance> iterator = serverInstanceToWorkerIdMap.keySet().iterator();
while (iterator.hasNext()) {
QueryServerInstance queryServerInstance = iterator.next();
for (int workerId : serverInstanceToWorkerIdMap.get(queryServerInstance)) {
if (segments.containsKey(workerId)) {
// always print out leaf stages
sender.visit(this, context.next(iterator.hasNext(), queryServerInstance, workerId));
List<Integer> workerIdList = serverInstanceToWorkerIdMap.get(queryServerInstance);
for (int idx = 0; idx < workerIdList.size(); idx++) {
int workerId = workerIdList.get(idx);
if (!iterator.hasNext() && idx == workerIdList.size() - 1) {
// always print out the last one
sender.visit(this, context.next(false, queryServerInstance, workerId));
} else {
if (!iterator.hasNext()) {
// always print out the last one
sender.visit(this, context.next(false, queryServerInstance, workerId));
} else {
// only print short version of the sender node
appendMailboxSend(sender, context.next(true, queryServerInstance, workerId))
.append(" (Subtree Omitted)")
.append('\n');
}
// only print short version of the sender node
appendMailboxSend(sender, context.next(true, queryServerInstance, workerId))
.append(" (Subtree Omitted)")
.append('\n');
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,6 @@ public void testQueryPlanExplainLogical(String query, String digest)
testQueryPlanExplain(query, digest);
}

@Test(dataProvider = "testQueryPhysicalPlanDataProvider")
public void testQueryPlanExplainPhysical(String query, String digest)
throws Exception {
testQueryPlanExplain(query, digest);
}

private void testQueryPlanExplain(String query, String digest) {
try {
long requestId = RANDOM_REQUEST_ID_GEN.nextLong();
Expand Down Expand Up @@ -466,49 +460,4 @@ private Object[][] provideQueriesWithExplainedLogicalPlan() {
};
//@formatter:on
}

@DataProvider(name = "testQueryPhysicalPlanDataProvider")
private Object[][] provideQueriesWithExplainedPhysicalPlan() {
//@formatter:off
return new Object[][] {
new Object[]{"EXPLAIN IMPLEMENTATION PLAN INCLUDING ALL ATTRIBUTES FOR SELECT col1, col3 FROM a",
"[0]@localhost:3 MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n"
+ "├── [1]@localhost:1 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n"
+ "│ └── [1]@localhost:1 PROJECT\n"
+ "│ └── [1]@localhost:1 TABLE SCAN (a) null\n"
+ "└── [1]@localhost:2 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n"
+ " └── [1]@localhost:2 PROJECT\n"
+ " └── [1]@localhost:2 TABLE SCAN (a) null\n"},
new Object[]{"EXPLAIN IMPLEMENTATION PLAN EXCLUDING ATTRIBUTES FOR SELECT col1, COUNT(*) FROM a GROUP BY col1",
"[0]@localhost:3 MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n"
+ "├── [1]@localhost:1 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]} (Subtree Omitted)\n"
+ "└── [1]@localhost:2 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n"
+ " └── [1]@localhost:2 AGGREGATE_FINAL\n"
+ " └── [1]@localhost:2 MAIL_RECEIVE(HASH_DISTRIBUTED)\n"
+ " ├── [2]@localhost:1 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{1,1}|[1],[1]@localhost@{2,2}|[0]}\n"
+ " │ └── [2]@localhost:1 AGGREGATE_LEAF\n"
+ " │ └── [2]@localhost:1 TABLE SCAN (a) null\n"
+ " └── [2]@localhost:2 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{1,1}|[1],[1]@localhost@{2,2}|[0]}\n"
+ " └── [2]@localhost:2 AGGREGATE_LEAF\n"
+ " └── [2]@localhost:2 TABLE SCAN (a) null\n"},
new Object[]{"EXPLAIN IMPLEMENTATION PLAN FOR SELECT a.col1, b.col3 FROM a JOIN b ON a.col1 = b.col1",
"[0]@localhost:3 MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n"
+ "├── [1]@localhost:1 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]} (Subtree Omitted)\n"
+ "└── [1]@localhost:2 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n"
+ " └── [1]@localhost:2 PROJECT\n"
+ " └── [1]@localhost:2 JOIN\n"
+ " ├── [1]@localhost:2 MAIL_RECEIVE(HASH_DISTRIBUTED)\n"
+ " │ ├── [2]@localhost:1 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{1,1}|[1],[1]@localhost@{2,2}|[0]}\n"
+ " │ │ └── [2]@localhost:1 PROJECT\n"
+ " │ │ └── [2]@localhost:1 TABLE SCAN (a) null\n"
+ " │ └── [2]@localhost:2 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{1,1}|[1],[1]@localhost@{2,2}|[0]}\n"
+ " │ └── [2]@localhost:2 PROJECT\n"
+ " │ └── [2]@localhost:2 TABLE SCAN (a) null\n"
+ " └── [1]@localhost:2 MAIL_RECEIVE(HASH_DISTRIBUTED)\n"
+ " └── [3]@localhost:1 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{1,1}|[1],[1]@localhost@{2,2}|[0]}\n"
+ " └── [3]@localhost:1 PROJECT\n"
+ " └── [3]@localhost:1 TABLE SCAN (b) null\n"}
};
//@formatter:on
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -57,6 +58,23 @@ public class QueryEnvironmentTestBase {
"d_REALTIME", ImmutableList.of("d2"), "d_OFFLINE", ImmutableList.of("d3"), "e_REALTIME",
ImmutableList.of("e2"), "e_OFFLINE", ImmutableList.of("e3"));
public static final Map<String, Schema> TABLE_SCHEMAS = new HashMap<>();
public static final Map<String, Pair<String, List<List<String>>>> PARTITIONED_SEGMENTS_MAP = new HashMap<>();
public static final int PARTITION_COUNT = 4;
public static final Map<String, String> PARTITIONED_TABLES =
ImmutableMap.of("a_REALTIME", "col2", "b_REALTIME", "col1");
static {
for (Map.Entry<String, String> e : PARTITIONED_TABLES.entrySet()) {
String tableName = e.getKey();
String partitionColumn = e.getValue();
List<List<String>> partitionIdToSegmentsMap = new ArrayList<>(PARTITION_COUNT);
partitionIdToSegmentsMap.add(SERVER1_SEGMENTS.getOrDefault(tableName, Collections.emptyList()));
partitionIdToSegmentsMap.add(SERVER2_SEGMENTS.getOrDefault(tableName, Collections.emptyList()));
for (int i = 2; i < PARTITION_COUNT; i++) {
partitionIdToSegmentsMap.add(new ArrayList<>());
}
PARTITIONED_SEGMENTS_MAP.put(tableName, Pair.of(partitionColumn, partitionIdToSegmentsMap));
}
}

static {
TABLE_SCHEMAS.put("a_REALTIME", getSchemaBuilder("a").build());
Expand Down Expand Up @@ -84,7 +102,8 @@ static Schema.SchemaBuilder getSchemaBuilder(String schemaName) {
@BeforeClass
public void setUp() {
// the port doesn't matter as we are not actually making a server call.
_queryEnvironment = getQueryEnvironment(3, 1, 2, TABLE_SCHEMAS, SERVER1_SEGMENTS, SERVER2_SEGMENTS, null);
_queryEnvironment =
getQueryEnvironment(3, 1, 2, TABLE_SCHEMAS, SERVER1_SEGMENTS, SERVER2_SEGMENTS, PARTITIONED_SEGMENTS_MAP);
}

@DataProvider(name = "testQueryDataProvider")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@

public class ResourceBasedQueryPlansTest extends QueryEnvironmentTestBase {
private static final ObjectMapper MAPPER = new ObjectMapper();
private static final String EXPLAIN_REGEX =
"EXPLAIN (IMPLEMENTATION )*PLAN (INCLUDING |EXCLUDING )*(ALL )*(ATTRIBUTES )*(AS DOT |AS JSON |AS TEXT )*FOR ";
private static final String QUERY_TEST_RESOURCE_FOLDER = "queries";
private static final String FILE_FILTER_PROPERTY = "pinot.fileFilter";

Expand All @@ -51,7 +53,8 @@ public void testQueryExplainPlansAndQueryPlanConversion(String testCaseName, Str
String explainedPlan = _queryEnvironment.explainQuery(query, requestId);
Assert.assertEquals(explainedPlan, output,
String.format("Test case %s for query %s doesn't match expected output: %s", testCaseName, query, output));
String queryWithoutExplainPlan = query.replace("EXPLAIN PLAN FOR ", "");
// use a regex to exclude the
String queryWithoutExplainPlan = query.replaceFirst(EXPLAIN_REGEX, "");
DispatchableSubPlan dispatchableSubPlan = _queryEnvironment.planQuery(queryWithoutExplainPlan);
Assert.assertNotNull(dispatchableSubPlan,
String.format("Test case %s for query %s should not have a null QueryPlan",
Expand All @@ -66,7 +69,7 @@ public void testQueryExplainPlansWithExceptions(String testCaseName, String quer
try {
long requestId = RANDOM_REQUEST_ID_GEN.nextLong();
_queryEnvironment.explainQuery(query, requestId);
String queryWithoutExplainPlan = query.replace("EXPLAIN PLAN FOR ", "");
String queryWithoutExplainPlan = query.replaceFirst(EXPLAIN_REGEX, "");
_queryEnvironment.planQuery(queryWithoutExplainPlan);
Assert.fail("Query compilation should have failed with exception message pattern: " + expectedException);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
{
"physical_plan_explain_formats": {
"queries": [
{
"description": "explain plan with attributes",
"sql": "EXPLAIN IMPLEMENTATION PLAN INCLUDING ALL ATTRIBUTES FOR SELECT col1, col3 FROM a",
"output": [
"[0]@localhost:3 MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
"├── [1]@localhost:1 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]} (Subtree Omitted)\n",
"└── [1]@localhost:2 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n",
" └── [1]@localhost:2 PROJECT\n",
" └── [1]@localhost:2 TABLE SCAN (a) null\n"
]
},
{
"description": "explain plan without attributes",
"sql": "EXPLAIN IMPLEMENTATION PLAN EXCLUDING ATTRIBUTES FOR SELECT col1, COUNT(*) FROM a GROUP BY col1",
"output": [
"[0]@localhost:3 MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
"├── [1]@localhost:1 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]} (Subtree Omitted)\n",
"└── [1]@localhost:2 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n",
" └── [1]@localhost:2 AGGREGATE_FINAL\n",
" └── [1]@localhost:2 MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
" ├── [2]@localhost:1 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{1,1}|[1],[1]@localhost@{2,2}|[0]} (Subtree Omitted)\n",
" └── [2]@localhost:2 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{1,1}|[1],[1]@localhost@{2,2}|[0]}\n",
" └── [2]@localhost:2 AGGREGATE_LEAF\n",
" └── [2]@localhost:2 TABLE SCAN (a) null\n"
]
},
{
"description": "explain plan with join",
"sql": "EXPLAIN IMPLEMENTATION PLAN FOR SELECT a.col1, b.col3 FROM a JOIN b ON a.col1 = b.col1",
"output": [
"[0]@localhost:3 MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
"├── [1]@localhost:1 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]} (Subtree Omitted)\n",
"└── [1]@localhost:2 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n",
" └── [1]@localhost:2 PROJECT\n",
" └── [1]@localhost:2 JOIN\n",
" ├── [1]@localhost:2 MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
" │ ├── [2]@localhost:1 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{1,1}|[1],[1]@localhost@{2,2}|[0]} (Subtree Omitted)\n",
" │ └── [2]@localhost:2 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{1,1}|[1],[1]@localhost@{2,2}|[0]}\n",
" │ └── [2]@localhost:2 PROJECT\n",
" │ └── [2]@localhost:2 TABLE SCAN (a) null\n",
" └── [1]@localhost:2 MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
" └── [3]@localhost:1 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{1,1}|[1],[1]@localhost@{2,2}|[0]}\n",
" └── [3]@localhost:1 PROJECT\n",
" └── [3]@localhost:1 TABLE SCAN (b) null\n"
]
},
{
"description": "explain plan with join with colocated tables",
"sql": "EXPLAIN IMPLEMENTATION PLAN FOR SELECT a.col2, a.col3, b.col3 FROM a /*+ tableOptions(partition_key='col2', partition_size='4') */ JOIN b /*+ tableOptions(partition_key='col1', partition_size='4') */ ON a.col2 = b.col1 WHERE b.col3 > 0",
"output": [
"[0]@localhost:3 MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
"├── [1]@localhost:2 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]} (Subtree Omitted)\n",
"├── [1]@localhost:2 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]} (Subtree Omitted)\n",
"├── [1]@localhost:1 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]} (Subtree Omitted)\n",
"└── [1]@localhost:1 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n",
" └── [1]@localhost:1 PROJECT\n",
" └── [1]@localhost:1 JOIN\n",
" ├── [1]@localhost:1 MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
" │ ├── [2]@localhost:2 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{2,2}|[2, 3],[1]@localhost@{1,1}|[0, 1]} (Subtree Omitted)\n",
" │ ├── [2]@localhost:2 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{2,2}|[2, 3],[1]@localhost@{1,1}|[0, 1]} (Subtree Omitted)\n",
" │ ├── [2]@localhost:1 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{2,2}|[2, 3],[1]@localhost@{1,1}|[0, 1]} (Subtree Omitted)\n",
" │ └── [2]@localhost:1 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{2,2}|[2, 3],[1]@localhost@{1,1}|[0, 1]}\n",
" │ └── [2]@localhost:1 PROJECT\n",
" │ └── [2]@localhost:1 TABLE SCAN (a) null\n",
" └── [1]@localhost:1 MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
" ├── [3]@localhost:2 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{2,2}|[2, 3],[1]@localhost@{1,1}|[0, 1]} (Subtree Omitted)\n",
" ├── [3]@localhost:2 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{2,2}|[2, 3],[1]@localhost@{1,1}|[0, 1]} (Subtree Omitted)\n",
" ├── [3]@localhost:1 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{2,2}|[2, 3],[1]@localhost@{1,1}|[0, 1]} (Subtree Omitted)\n",
" └── [3]@localhost:1 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{2,2}|[2, 3],[1]@localhost@{1,1}|[0, 1]}\n",
" └── [3]@localhost:1 PROJECT\n",
" └── [3]@localhost:1 FILTER\n",
" └── [3]@localhost:1 TABLE SCAN (b) null\n"
]
}
]
}
}