Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
5a551ad
Prototype with TableScanNode serialized using a protobuf message.
vrajat May 22, 2024
4e3d45e
Serialize Mailbox Send and Receive.
vrajat May 22, 2024
8901ce6
Serialize SetOpNode.
vrajat May 22, 2024
f251412
Serialize Exchange and Sort
vrajat May 23, 2024
2e4a20d
Add serialization support for all nodes
vrajat May 24, 2024
4d0a77a
Compiles
vrajat May 24, 2024
4ed9212
Add license header
vrajat May 25, 2024
a88cdd1
Allocate a hashmap within nodehints.
vrajat May 25, 2024
3c11acb
Fix trailing whitespace
vrajat May 25, 2024
1b365b1
Handle nulls correctly in direction keys and Literal Expressions
vrajat May 25, 2024
d46fadc
Create new context for every plan node.
vrajat May 26, 2024
560f30b
Improve structure of visitors.
vrajat May 26, 2024
2c65aed
Fix long line.
vrajat May 26, 2024
f4666f3
Only support specific types of literals.
vrajat May 27, 2024
6f41d41
Do not assume data type and underlying Java type matches.
vrajat May 27, 2024
af337b7
Support object literal.
vrajat May 27, 2024
054692a
Add javadocs to a couple of files.
vrajat May 27, 2024
c965843
Set distinct flag
vrajat May 27, 2024
843f20f
Address review comments.
vrajat May 29, 2024
71995e0
Undo style change.
vrajat May 29, 2024
c26d38a
Remove StageNodeSerDeUtils.java
vrajat May 29, 2024
3969f63
Do not prematurely serialize Stagenode.
vrajat May 29, 2024
2f42fce
Revert "Do not prematurely serialize Stagenode."
vrajat May 29, 2024
2611412
Make visitor a nested class.
vrajat May 29, 2024
f20e627
Remove use of visit prefix.
vrajat May 29, 2024
93b6c88
Fix name
vrajat May 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 80 additions & 0 deletions pinot-common/src/main/proto/expressions.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//

syntax = "proto3";

package org.apache.pinot.common.proto;

enum ColumnDataType {
INT = 0;
LONG = 1;
FLOAT = 2;
DOUBLE = 3;
BIG_DECIMAL = 4;
BOOLEAN = 5;
TIMESTAMP = 6;
STRING = 7;
JSON = 8;
BYTES = 9;
OBJECT = 10;
INT_ARRAY = 11;
LONG_ARRAY = 12;
FLOAT_ARRAY = 13;
DOUBLE_ARRAY = 14;
BOOLEAN_ARRAY = 15;
TIMESTAMP_ARRAY = 16;
STRING_ARRAY = 17;
BYTES_ARRAY = 18;
UNKNOWN = 19;
}

message InputRef {
int32 index = 1;
}

message Literal {
ColumnDataType dataType = 1;
bool isValueNull = 2;
oneof literalField {
bool boolField = 101;
int32 intField = 102;
int64 longField = 103;
float floatField = 104;
double doubleField = 105;
string stringField = 106;
bytes bytesField = 107;
bytes serializedField = 108;
}
}

message FunctionCall {
int32 sqlKind = 1;
ColumnDataType dataType = 2;
string functionName = 3;
repeated RexExpression functionOperands = 4;
bool isDistinct = 5;
}

message RexExpression {
oneof expression {
InputRef inputRef = 1;
Literal literal = 2;
FunctionCall functionCall = 3;
}
}
247 changes: 201 additions & 46 deletions pinot-common/src/main/proto/plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -18,64 +18,219 @@
//

syntax = "proto3";
import "expressions.proto";

package org.apache.pinot.common.proto;

message StageNode {
int32 stageId = 1;
string nodeName = 2;
repeated StageNode inputs = 3;
repeated string columnNames = 4;
repeated string columnDataTypes = 5;
ObjectField objectField = 6;
}

// MemberVariableField defines the serialized format of the member variables of a class object.
// MemberVariableField can be one of
// 1. literal
// 2. list
// 3. map
// 4. complex class object
message MemberVariableField {
oneof member_variable_field {
LiteralField literalField = 1;
ListField listField = 2;
MapField mapField = 3;
ObjectField objectField = 4;
repeated ColumnDataType columnDataTypes = 5;
oneof nodeType {
TableScanNode tableScanNode = 102;
MailboxReceiveNode receiveNode = 103;
MailboxSendNode sendNode = 104;
SetOpNode setNode = 105;
ExchangeNode exchangeNode = 106;
SortNode sortNode = 107;
AggregateNode aggregateNode = 108;
JoinNode joinNode = 109;
LiteralValueNode literalValueNode = 110;
ProjectNode projectNode = 111;
ValueNode valueNode = 112;
WindowNode windowNode = 113;
FilterNode filterNode = 114;
}
}

// ObjectField defines the serialized format of a complex class object.
// it contains:
// 1. its fully-qualified clazz name;
// 2. its MemberVariableField map.
message ObjectField {
string objectClassName = 1;
map<string, MemberVariableField> memberVariables = 2;
}

// LiteralField defines the serialized format of a literal field.
message LiteralField {
oneof literal_field {
bool boolField = 1;
int32 intField = 2;
int64 longField = 3;
float floatField = 4;
double doubleField = 5;
string stringField = 6;
bytes bytesField = 7;
bytes bigDecimalField = 8;
}
message StrStrMap {
map<string, string> options = 1;
}

message NodeHint {
map<string, StrStrMap> hintOptions = 1;
}

message TableScanNode {
NodeHint nodeHint = 1;
string tableName = 2;
repeated string tableScanColumns = 3;
}

enum PinotRelExchangeType {
STREAMING = 0;
SUB_PLAN = 1;
PIPELINE_BREAKER = 2;
}

enum RelDistributionType {
SINGLETON = 0;
HASH_DISTRIBUTED = 1;
RANGE_DISTRIBUTED = 2;
RANDOM_DISTRIBUTED = 3;
ROUND_ROBIN_DISTRIBUTED = 4;
BROADCAST_DISTRIBUTED = 5;
ANY = 6;
}

enum Direction {
ASCENDING = 0;
STRICTLY_ASCENDING = 1;
DESCENDING = 2;
STRICTLY_DESCENDING = 3;
CLUSTERED = 4;
}

enum NullDirection {
FIRST = 0;
LAST = 1;
UNSPECIFIED = 2;
}

message DirectionList {
repeated Direction item = 1;
}

message NullDirectionList {
repeated NullDirection item = 1;
}

message RexExpressionList {
repeated RexExpression item = 1;
}

message DistributionKeyList {
repeated int32 item = 1;
}

message MailboxReceiveNode {
int32 senderStageId = 1;
PinotRelExchangeType exchangeType = 2;
RelDistributionType distributionType = 3;
DistributionKeyList distributionKeys = 4;
repeated int32 collationKeys = 5;
DirectionList collationDirections = 6;
NullDirectionList collationNullDirections = 7;
bool sortOnSender = 8;
bool sortOnReceiver = 9;
StageNode sender = 10;
}

message MailboxSendNode {
int32 receiverStageId = 1;
RelDistributionType distributionType = 2;
PinotRelExchangeType exchangeType = 3;
DistributionKeyList distributionKeys = 4;
repeated int32 collationKeys = 5;
DirectionList collationDirections = 6;
bool sortOnSender = 7;
bool prePartitioned = 8;
}

enum SetOpType {
UNION = 0;
INTERSECT = 1;
MINUS = 2;
}

message SetOpNode {
SetOpType setOpType = 1;
bool all = 2;
}

message RelFieldCollation {
int32 fieldIndex = 1;
Direction direction = 2;
NullDirection nullDirection = 3;
}

message ExchangeNode {
PinotRelExchangeType exchangeType = 1;
RelDistributionType distributionType = 2;
repeated int32 keys = 3;
bool isSortOnSender = 4;
bool isSortOnReceiver = 5;
bool isPrePartitioned = 6;
repeated RelFieldCollation collations = 7;
repeated string tableNames = 8;
}

message SortNode {
RexExpressionList collationKeys = 1;
DirectionList collationDirections = 2;
NullDirectionList collationNullDirections = 3;
int32 fetch = 4;
int32 offset = 5;
}

enum AggType {
DIRECT = 0;
LEAF = 1;
INTERMEDIATE = 2;
FINAL = 3;
}

message AggregateNode {
NodeHint nodeHint = 1;
RexExpressionList aggCalls = 2;
repeated int32 filterArgIndices = 3;
RexExpressionList groupSet = 4;
AggType aggType = 5;
}

// ListField defines the serialized format of a list field.
// The content of the list is a MemberVariableField.
message ListField {
repeated MemberVariableField content = 1;
message FilterNode {
RexExpression condition = 1;
}

// ListField defines the serialized format of a map field.
// The key of the map is a string and the value of the map is a MemberVariableField.
message MapField {
map<string, MemberVariableField> content = 1;
enum JoinRelType {
INNER = 0;
LEFT = 1;
RIGHT = 2;
FULL = 3;
SEMI = 4;
ANTI = 5;
}

message JoinKeys {
repeated int32 leftKeys = 1;
repeated int32 rightKeys = 2;
}

message JoinNode {
JoinRelType joinRelType = 1;
JoinKeys joinKeys = 2;
RexExpressionList joinClause = 3;
NodeHint joinHints = 4;
repeated string leftColumnNames = 5;
repeated string rightColumnNames = 6;
}

message LiteralValueNode {
bytes dataTable = 1;
}

message ProjectNode {
RexExpressionList projects = 1;
}

message ValueNode {
repeated RexExpressionList rows = 1;
}

enum WindowFrameType {
ROWS = 0;
RANGE = 1;
}

message WindowNode {
RexExpressionList groupSet = 1;
RexExpressionList orderSet = 2;
repeated Direction orderSetDirection = 3;
repeated NullDirection orderSetNullDirection = 4;
RexExpressionList aggCalls = 5;
int32 lowerBound = 6;
int32 upperBound = 7;
RexExpressionList constants = 8;
WindowFrameType windowFrameType = 9;
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,9 @@ private static PlanNode convertLogicalExchange(Exchange node, int currentStageId
}
}
RelDistribution inputDistributionTrait = node.getInputs().get(0).getTraitSet().getDistribution();
boolean isPrePartitioned = inputDistributionTrait != null
&& inputDistributionTrait.getType() == RelDistribution.Type.HASH_DISTRIBUTED
&& inputDistributionTrait == node.getDistribution();
boolean isPrePartitioned =
inputDistributionTrait != null && inputDistributionTrait.getType() == RelDistribution.Type.HASH_DISTRIBUTED
&& inputDistributionTrait == node.getDistribution();
List<RelFieldCollation> fieldCollations = (collation == null) ? null : collation.getFieldCollations();

// Compute all the tables involved under this exchange node
Expand Down Expand Up @@ -169,7 +169,8 @@ private static PlanNode convertLogicalAggregate(LogicalAggregate node, int curre
}

private static PlanNode convertLogicalProject(LogicalProject node, int currentStageId) {
return new ProjectNode(currentStageId, toDataSchema(node.getRowType()), node.getProjects());
return new ProjectNode(currentStageId, toDataSchema(node.getRowType()),
node.getProjects().stream().map(RexExpressionUtils::fromRexNode).collect(Collectors.toList()));
}

private static PlanNode convertLogicalFilter(LogicalFilter node, int currentStageId) {
Expand Down
Loading