Skip to content

Commit 43e90f4

Browse files
krvikashfindepi
authored andcommitted
Implement Dereference pushdown for the Delta Lake connector
1 parent 35eda7d commit 43e90f4

File tree

39 files changed

+1391
-247
lines changed

39 files changed

+1391
-247
lines changed

docs/src/main/sphinx/connector/delta-lake.rst

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,9 @@ configure processing of Parquet files.
190190
* - ``parquet_writer_batch_size``
191191
- Maximum number of rows processed by the Parquet writer in a batch.
192192
- ``10000``
193+
* - ``projection_pushdown_enabled``
194+
- Read only projected fields from row columns while performing ``SELECT`` queries
195+
- ``true``
193196

194197
.. _delta-lake-type-mapping:
195198

@@ -930,3 +933,6 @@ connector.
930933
for structural data types. The equivalent catalog session property is
931934
``parquet_optimized_nested_reader_enabled``.
932935
- ``true``
936+
* - ``delta.projection-pushdown-enabled``
937+
- Read only projected fields from row columns while performing ``SELECT`` queries
938+
- ``true``

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/AbstractDeltaLakePageSink.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -155,16 +155,17 @@ public AbstractDeltaLakePageSink(
155155
DeltaLakeColumnHandle column = inputColumns.get(inputIndex);
156156
switch (column.getColumnType()) {
157157
case PARTITION_KEY:
158-
int partitionPosition = canonicalToOriginalPartitionPositions.get(column.getName());
158+
int partitionPosition = canonicalToOriginalPartitionPositions.get(column.getColumnName());
159159
partitionColumnInputIndex[partitionPosition] = inputIndex;
160-
originalPartitionColumnNames[partitionPosition] = canonicalToOriginalPartitionColumns.get(column.getName());
161-
partitionColumnTypes[partitionPosition] = column.getType();
160+
originalPartitionColumnNames[partitionPosition] = canonicalToOriginalPartitionColumns.get(column.getColumnName());
161+
partitionColumnTypes[partitionPosition] = column.getBaseType();
162162
break;
163163
case REGULAR:
164+
verify(column.isBaseColumn(), "Unexpected dereference: %s", column);
164165
dataColumnHandles.add(column);
165166
dataColumnsInputIndex.add(inputIndex);
166-
dataColumnNames.add(column.getPhysicalName());
167-
dataColumnTypes.add(column.getPhysicalType());
167+
dataColumnNames.add(column.getBasePhysicalColumnName());
168+
dataColumnTypes.add(column.getBasePhysicalType());
168169
break;
169170
case SYNTHESIZED:
170171
processSynthesizedColumn(column);

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeBucketFunction.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.List;
2323

2424
import static com.google.common.base.Throwables.throwIfUnchecked;
25+
import static com.google.common.base.Verify.verify;
2526
import static com.google.common.collect.ImmutableList.toImmutableList;
2627
import static io.trino.spi.function.InvocationConvention.InvocationArgumentConvention.BLOCK_POSITION;
2728
import static io.trino.spi.function.InvocationConvention.InvocationReturnConvention.FAIL_ON_NULL;
@@ -37,7 +38,8 @@ public class DeltaLakeBucketFunction
3738
public DeltaLakeBucketFunction(TypeOperators typeOperators, List<DeltaLakeColumnHandle> partitioningColumns, int bucketCount)
3839
{
3940
this.hashCodeInvokers = partitioningColumns.stream()
40-
.map(DeltaLakeColumnHandle::getType)
41+
.peek(column -> verify(column.isBaseColumn(), "Unexpected dereference: %s", column))
42+
.map(DeltaLakeColumnHandle::getBaseType)
4143
.map(type -> typeOperators.getHashCodeOperator(type, simpleConvention(FAIL_ON_NULL, BLOCK_POSITION)))
4244
.collect(toImmutableList());
4345
this.bucketCount = bucketCount;

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeColumnHandle.java

Lines changed: 84 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package io.trino.plugin.deltalake;
1515

1616
import com.fasterxml.jackson.annotation.JsonCreator;
17+
import com.fasterxml.jackson.annotation.JsonIgnore;
1718
import com.fasterxml.jackson.annotation.JsonProperty;
1819
import io.trino.plugin.hive.HiveColumnHandle;
1920
import io.trino.spi.connector.ColumnHandle;
@@ -23,10 +24,13 @@
2324
import java.util.Optional;
2425
import java.util.OptionalInt;
2526

27+
import static com.google.common.base.Preconditions.checkArgument;
28+
import static com.google.common.base.Preconditions.checkState;
2629
import static io.airlift.slice.SizeOf.estimatedSizeOf;
2730
import static io.airlift.slice.SizeOf.instanceSize;
2831
import static io.airlift.slice.SizeOf.sizeOf;
2932
import static io.trino.plugin.deltalake.DeltaHiveTypeTranslator.toHiveType;
33+
import static io.trino.plugin.deltalake.DeltaLakeColumnType.REGULAR;
3034
import static io.trino.plugin.deltalake.DeltaLakeColumnType.SYNTHESIZED;
3135
import static io.trino.spi.type.BigintType.BIGINT;
3236
import static io.trino.spi.type.RowType.field;
@@ -56,64 +60,68 @@ public class DeltaLakeColumnHandle
5660
public static final String FILE_MODIFIED_TIME_COLUMN_NAME = "$file_modified_time";
5761
public static final Type FILE_MODIFIED_TIME_TYPE = TIMESTAMP_TZ_MILLIS;
5862

59-
private final String name;
60-
private final Type type;
61-
private final OptionalInt fieldId;
63+
private final String baseColumnName;
64+
private final Type baseType;
65+
private final OptionalInt baseFieldId;
6266
// Hold field names in Parquet files
6367
// The value is same as 'name' when the column mapping mode is none
6468
// The value is same as 'delta.columnMapping.physicalName' when the column mapping mode is id or name. e.g. col-6707cc9e-f3aa-4e6b-b8ef-1b03d3475680
65-
private final String physicalName;
69+
private final String basePhysicalColumnName;
6670
// Hold type in Parquet files
6771
// The value is same as 'type' when the column mapping mode is none
6872
// The value is same as 'delta.columnMapping.physicalName' when the column mapping mode is id or name. e.g. row(col-5924c8b3-04cf-4146-abb5-2c229e7ff708 integer)
69-
private final Type physicalType;
73+
private final Type basePhysicalType;
7074
private final DeltaLakeColumnType columnType;
75+
private final Optional<DeltaLakeColumnProjectionInfo> projectionInfo;
7176

7277
@JsonCreator
7378
public DeltaLakeColumnHandle(
74-
@JsonProperty("name") String name,
75-
@JsonProperty("type") Type type,
76-
@JsonProperty("fieldId") OptionalInt fieldId,
77-
@JsonProperty("physicalName") String physicalName,
78-
@JsonProperty("physicalType") Type physicalType,
79-
@JsonProperty("columnType") DeltaLakeColumnType columnType)
80-
{
81-
this.name = requireNonNull(name, "name is null");
82-
this.type = requireNonNull(type, "type is null");
83-
this.fieldId = requireNonNull(fieldId, "fieldId is null");
84-
this.physicalName = requireNonNull(physicalName, "physicalName is null");
85-
this.physicalType = requireNonNull(physicalType, "physicalType is null");
79+
@JsonProperty("baseColumnName") String baseColumnName,
80+
@JsonProperty("baseType") Type baseType,
81+
@JsonProperty("baseFieldId") OptionalInt baseFieldId,
82+
@JsonProperty("basePhysicalColumnName") String basePhysicalColumnName,
83+
@JsonProperty("basePhysicalType") Type basePhysicalType,
84+
@JsonProperty("columnType") DeltaLakeColumnType columnType,
85+
@JsonProperty("projectionInfo") Optional<DeltaLakeColumnProjectionInfo> projectionInfo)
86+
{
87+
this.baseColumnName = requireNonNull(baseColumnName, "baseColumnName is null");
88+
this.baseType = requireNonNull(baseType, "baseType is null");
89+
this.baseFieldId = requireNonNull(baseFieldId, "baseFieldId is null");
90+
this.basePhysicalColumnName = requireNonNull(basePhysicalColumnName, "basePhysicalColumnName is null");
91+
this.basePhysicalType = requireNonNull(basePhysicalType, "basePhysicalType is null");
8692
this.columnType = requireNonNull(columnType, "columnType is null");
93+
checkArgument(projectionInfo.isEmpty() || columnType == REGULAR, "Projection info present for column type: %s", columnType);
94+
this.projectionInfo = projectionInfo;
8795
}
8896

8997
@JsonProperty
90-
public String getName()
98+
public String getBaseColumnName()
9199
{
92-
return name;
100+
return baseColumnName;
93101
}
94102

95103
@JsonProperty
96-
public Type getType()
104+
public Type getBaseType()
97105
{
98-
return type;
106+
return baseType;
99107
}
100108

101109
@JsonProperty
102-
public OptionalInt getFieldId()
110+
public OptionalInt getBaseFieldId()
103111
{
104-
return fieldId;
112+
return baseFieldId;
105113
}
106114

107115
@JsonProperty
108-
public String getPhysicalName()
116+
public String getBasePhysicalColumnName()
109117
{
110-
return physicalName;
118+
return basePhysicalColumnName;
111119
}
112120

113121
@JsonProperty
114-
public Type getPhysicalType()
122+
public Type getBasePhysicalType()
115123
{
116-
return physicalType;
124+
return basePhysicalType;
117125
}
118126

119127
@JsonProperty
@@ -122,6 +130,12 @@ public DeltaLakeColumnType getColumnType()
122130
return columnType;
123131
}
124132

133+
@JsonProperty
134+
public Optional<DeltaLakeColumnProjectionInfo> getProjectionInfo()
135+
{
136+
return projectionInfo;
137+
}
138+
125139
@Override
126140
public boolean equals(Object obj)
127141
{
@@ -132,64 +146,88 @@ public boolean equals(Object obj)
132146
return false;
133147
}
134148
DeltaLakeColumnHandle other = (DeltaLakeColumnHandle) obj;
135-
return Objects.equals(this.name, other.name) &&
136-
Objects.equals(this.type, other.type) &&
137-
Objects.equals(this.fieldId, other.fieldId) &&
138-
Objects.equals(this.physicalName, other.physicalName) &&
139-
Objects.equals(this.physicalType, other.physicalType) &&
140-
this.columnType == other.columnType;
149+
return Objects.equals(this.baseColumnName, other.baseColumnName) &&
150+
Objects.equals(this.baseType, other.baseType) &&
151+
Objects.equals(this.baseFieldId, other.baseFieldId) &&
152+
Objects.equals(this.basePhysicalColumnName, other.basePhysicalColumnName) &&
153+
Objects.equals(this.basePhysicalType, other.basePhysicalType) &&
154+
this.columnType == other.columnType &&
155+
Objects.equals(this.projectionInfo, other.projectionInfo);
156+
}
157+
158+
@JsonIgnore
159+
public String getColumnName()
160+
{
161+
checkState(isBaseColumn(), "Unexpected dereference: %s", this);
162+
return baseColumnName;
163+
}
164+
165+
@JsonIgnore
166+
public String getQualifiedPhysicalName()
167+
{
168+
return projectionInfo.map(projectionInfo -> basePhysicalColumnName + "#" + projectionInfo.getPartialName())
169+
.orElse(basePhysicalColumnName);
141170
}
142171

143172
public long getRetainedSizeInBytes()
144173
{
145174
// type is not accounted for as the instances are cached (by TypeRegistry) and shared
146175
return INSTANCE_SIZE
147-
+ estimatedSizeOf(name)
148-
+ sizeOf(fieldId)
149-
+ estimatedSizeOf(physicalName);
176+
+ estimatedSizeOf(baseColumnName)
177+
+ sizeOf(baseFieldId)
178+
+ estimatedSizeOf(basePhysicalColumnName)
179+
+ projectionInfo.map(DeltaLakeColumnProjectionInfo::getRetainedSizeInBytes).orElse(0L);
180+
}
181+
182+
@JsonIgnore
183+
public boolean isBaseColumn()
184+
{
185+
return projectionInfo.isEmpty();
150186
}
151187

152188
@Override
153189
public int hashCode()
154190
{
155-
return Objects.hash(name, type, fieldId, physicalName, physicalType, columnType);
191+
return Objects.hash(baseColumnName, baseType, baseFieldId, basePhysicalColumnName, basePhysicalType, columnType, projectionInfo);
156192
}
157193

158194
@Override
159195
public String toString()
160196
{
161-
return name + ":" + type.getDisplayName() + ":" + columnType;
197+
return getQualifiedPhysicalName() +
198+
":" + projectionInfo.map(DeltaLakeColumnProjectionInfo::getType).orElse(baseType).getDisplayName() +
199+
":" + columnType;
162200
}
163201

164202
public HiveColumnHandle toHiveColumnHandle()
165203
{
166204
return new HiveColumnHandle(
167-
physicalName, // this name is used for accessing Parquet files, so it should be physical name
205+
basePhysicalColumnName, // this name is used for accessing Parquet files, so it should be physical name
168206
0, // hiveColumnIndex; we provide fake value because we always find columns by name
169-
toHiveType(physicalType),
170-
physicalType,
171-
Optional.empty(),
207+
toHiveType(basePhysicalType),
208+
basePhysicalType,
209+
projectionInfo.map(DeltaLakeColumnProjectionInfo::toHiveColumnProjectionInfo),
172210
columnType.toHiveColumnType(),
173211
Optional.empty());
174212
}
175213

176214
public static DeltaLakeColumnHandle pathColumnHandle()
177215
{
178-
return new DeltaLakeColumnHandle(PATH_COLUMN_NAME, PATH_TYPE, OptionalInt.empty(), PATH_COLUMN_NAME, PATH_TYPE, SYNTHESIZED);
216+
return new DeltaLakeColumnHandle(PATH_COLUMN_NAME, PATH_TYPE, OptionalInt.empty(), PATH_COLUMN_NAME, PATH_TYPE, SYNTHESIZED, Optional.empty());
179217
}
180218

181219
public static DeltaLakeColumnHandle fileSizeColumnHandle()
182220
{
183-
return new DeltaLakeColumnHandle(FILE_SIZE_COLUMN_NAME, FILE_SIZE_TYPE, OptionalInt.empty(), FILE_SIZE_COLUMN_NAME, FILE_SIZE_TYPE, SYNTHESIZED);
221+
return new DeltaLakeColumnHandle(FILE_SIZE_COLUMN_NAME, FILE_SIZE_TYPE, OptionalInt.empty(), FILE_SIZE_COLUMN_NAME, FILE_SIZE_TYPE, SYNTHESIZED, Optional.empty());
184222
}
185223

186224
public static DeltaLakeColumnHandle fileModifiedTimeColumnHandle()
187225
{
188-
return new DeltaLakeColumnHandle(FILE_MODIFIED_TIME_COLUMN_NAME, FILE_MODIFIED_TIME_TYPE, OptionalInt.empty(), FILE_MODIFIED_TIME_COLUMN_NAME, FILE_MODIFIED_TIME_TYPE, SYNTHESIZED);
226+
return new DeltaLakeColumnHandle(FILE_MODIFIED_TIME_COLUMN_NAME, FILE_MODIFIED_TIME_TYPE, OptionalInt.empty(), FILE_MODIFIED_TIME_COLUMN_NAME, FILE_MODIFIED_TIME_TYPE, SYNTHESIZED, Optional.empty());
189227
}
190228

191229
public static DeltaLakeColumnHandle mergeRowIdColumnHandle()
192230
{
193-
return new DeltaLakeColumnHandle(ROW_ID_COLUMN_NAME, MERGE_ROW_ID_TYPE, OptionalInt.empty(), ROW_ID_COLUMN_NAME, MERGE_ROW_ID_TYPE, SYNTHESIZED);
231+
return new DeltaLakeColumnHandle(ROW_ID_COLUMN_NAME, MERGE_ROW_ID_TYPE, OptionalInt.empty(), ROW_ID_COLUMN_NAME, MERGE_ROW_ID_TYPE, SYNTHESIZED, Optional.empty());
194232
}
195233
}

0 commit comments

Comments
 (0)