Skip to content

Commit eb85632

Browse files
committed
Add simple projection pushdown support to MongoDB
1 parent 3fa6f4e commit eb85632

File tree

4 files changed

+76
-5
lines changed

4 files changed

+76
-5
lines changed

plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoMetadata.java

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import com.google.common.collect.ImmutableList;
1717
import com.google.common.collect.ImmutableMap;
18+
import com.google.common.collect.ImmutableSet;
1819
import com.google.common.collect.Streams;
1920
import com.google.common.io.Closer;
2021
import com.mongodb.client.MongoCollection;
@@ -23,6 +24,7 @@
2324
import io.trino.plugin.mongodb.MongoIndex.MongodbIndexKey;
2425
import io.trino.plugin.mongodb.ptf.Query.QueryFunctionHandle;
2526
import io.trino.spi.TrinoException;
27+
import io.trino.spi.connector.Assignment;
2628
import io.trino.spi.connector.ColumnHandle;
2729
import io.trino.spi.connector.ColumnMetadata;
2830
import io.trino.spi.connector.ConnectorInsertTableHandle;
@@ -39,12 +41,14 @@
3941
import io.trino.spi.connector.LimitApplicationResult;
4042
import io.trino.spi.connector.LocalProperty;
4143
import io.trino.spi.connector.NotFoundException;
44+
import io.trino.spi.connector.ProjectionApplicationResult;
4245
import io.trino.spi.connector.RetryMode;
4346
import io.trino.spi.connector.SchemaTableName;
4447
import io.trino.spi.connector.SchemaTablePrefix;
4548
import io.trino.spi.connector.SortingProperty;
4649
import io.trino.spi.connector.TableFunctionApplicationResult;
4750
import io.trino.spi.connector.TableNotFoundException;
51+
import io.trino.spi.expression.ConnectorExpression;
4852
import io.trino.spi.predicate.Domain;
4953
import io.trino.spi.predicate.TupleDomain;
5054
import io.trino.spi.ptf.ConnectorTableFunctionHandle;
@@ -552,7 +556,7 @@ public Optional<LimitApplicationResult<ConnectorTableHandle>> applyLimit(Connect
552556
}
553557

554558
return Optional.of(new LimitApplicationResult<>(
555-
new MongoTableHandle(handle.getSchemaTableName(), handle.getRemoteTableName(), handle.getFilter(), handle.getConstraint(), OptionalInt.of(toIntExact(limit))),
559+
new MongoTableHandle(handle.getSchemaTableName(), handle.getRemoteTableName(), handle.getFilter(), handle.getConstraint(), handle.getProjectedColumns(), OptionalInt.of(toIntExact(limit))),
556560
true,
557561
false));
558562
}
@@ -599,11 +603,41 @@ public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(C
599603
handle.getRemoteTableName(),
600604
handle.getFilter(),
601605
newDomain,
606+
handle.getProjectedColumns(),
602607
handle.getLimit());
603608

604609
return Optional.of(new ConstraintApplicationResult<>(handle, remainingFilter, false));
605610
}
606611

612+
@Override
613+
public Optional<ProjectionApplicationResult<ConnectorTableHandle>> applyProjection(
614+
ConnectorSession session,
615+
ConnectorTableHandle handle,
616+
List<ConnectorExpression> projections,
617+
Map<String, ColumnHandle> assignments)
618+
{
619+
MongoTableHandle mongoTableHandle = (MongoTableHandle) handle;
620+
621+
Set<ColumnHandle> newColumns = ImmutableSet.copyOf(assignments.values());
622+
623+
if (newColumns.equals(mongoTableHandle.getProjectedColumns())) {
624+
return Optional.empty();
625+
}
626+
627+
// TODO: support dereference pushdown
628+
ImmutableSet.Builder<MongoColumnHandle> projectedColumns = ImmutableSet.builder();
629+
ImmutableList.Builder<Assignment> assignmentList = ImmutableList.builder();
630+
assignments.forEach((name, column) -> {
631+
MongoColumnHandle columnHandle = (MongoColumnHandle) column;
632+
assignmentList.add(new Assignment(name, column, columnHandle.getType()));
633+
projectedColumns.add(columnHandle);
634+
});
635+
636+
mongoTableHandle = mongoTableHandle.withProjectedColumns(projectedColumns.build());
637+
638+
return Optional.of(new ProjectionApplicationResult<>(mongoTableHandle, projections, assignmentList.build(), false));
639+
}
640+
607641
@Override
608642
public Optional<TableFunctionApplicationResult<ConnectorTableHandle>> applyTableFunction(ConnectorSession session, ConnectorTableFunctionHandle handle)
609643
{

plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoSession.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -455,8 +455,16 @@ public long deleteDocuments(RemoteTableName remoteTableName, TupleDomain<ColumnH
455455
public MongoCursor<Document> execute(MongoTableHandle tableHandle, List<MongoColumnHandle> columns)
456456
{
457457
Document output = new Document();
458-
for (MongoColumnHandle column : columns) {
459-
output.append(column.getName(), 1);
458+
459+
if (!tableHandle.getProjectedColumns().isEmpty()) {
460+
for (MongoColumnHandle column : tableHandle.getProjectedColumns()) {
461+
output.append(column.getName(), 1);
462+
}
463+
}
464+
else {
465+
for (MongoColumnHandle column : columns) {
466+
output.append(column.getName(), 1);
467+
}
460468
}
461469
MongoCollection<Document> collection = getCollection(tableHandle.getRemoteTableName());
462470
Document filter = buildFilter(tableHandle);

plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoTableHandle.java

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.Objects;
2424
import java.util.Optional;
2525
import java.util.OptionalInt;
26+
import java.util.Set;
2627

2728
import static com.google.common.base.MoreObjects.toStringHelper;
2829
import static java.util.Objects.requireNonNull;
@@ -33,12 +34,13 @@ public class MongoTableHandle
3334
private final SchemaTableName schemaTableName;
3435
private final RemoteTableName remoteTableName;
3536
private final TupleDomain<ColumnHandle> constraint;
37+
private final Set<MongoColumnHandle> projectedColumns;
3638
private final Optional<String> filter;
3739
private final OptionalInt limit;
3840

3941
public MongoTableHandle(SchemaTableName schemaTableName, RemoteTableName remoteTableName, Optional<String> filter)
4042
{
41-
this(schemaTableName, remoteTableName, filter, TupleDomain.all(), OptionalInt.empty());
43+
this(schemaTableName, remoteTableName, filter, TupleDomain.all(), Set.of(), OptionalInt.empty());
4244
}
4345

4446
@JsonCreator
@@ -47,12 +49,14 @@ public MongoTableHandle(
4749
@JsonProperty("remoteTableName") RemoteTableName remoteTableName,
4850
@JsonProperty("filter") Optional<String> filter,
4951
@JsonProperty("constraint") TupleDomain<ColumnHandle> constraint,
52+
@JsonProperty("projectedColumns") Set<MongoColumnHandle> projectedColumns,
5053
@JsonProperty("limit") OptionalInt limit)
5154
{
5255
this.schemaTableName = requireNonNull(schemaTableName, "schemaTableName is null");
5356
this.remoteTableName = requireNonNull(remoteTableName, "remoteTableName is null");
5457
this.filter = requireNonNull(filter, "filter is null");
5558
this.constraint = requireNonNull(constraint, "constraint is null");
59+
this.projectedColumns = requireNonNull(projectedColumns, "projectedColumns is null");
5660
this.limit = requireNonNull(limit, "limit is null");
5761
}
5862

@@ -86,10 +90,16 @@ public OptionalInt getLimit()
8690
return limit;
8791
}
8892

93+
@JsonProperty
94+
public Set<MongoColumnHandle> getProjectedColumns()
95+
{
96+
return projectedColumns;
97+
}
98+
8999
@Override
90100
public int hashCode()
91101
{
92-
return Objects.hash(schemaTableName, filter, constraint, limit);
102+
return Objects.hash(schemaTableName, filter, constraint, projectedColumns, limit);
93103
}
94104

95105
@Override
@@ -106,6 +116,7 @@ public boolean equals(Object obj)
106116
Objects.equals(this.remoteTableName, other.remoteTableName) &&
107117
Objects.equals(this.filter, other.filter) &&
108118
Objects.equals(this.constraint, other.constraint) &&
119+
Objects.equals(this.projectedColumns, other.projectedColumns) &&
109120
Objects.equals(this.limit, other.limit);
110121
}
111122

@@ -117,7 +128,19 @@ public String toString()
117128
.add("remoteTableName", remoteTableName)
118129
.add("filter", filter)
119130
.add("limit", limit)
131+
.add("projectedColumns", projectedColumns)
120132
.add("constraint", constraint)
121133
.toString();
122134
}
135+
136+
public MongoTableHandle withProjectedColumns(Set<MongoColumnHandle> projectedColumns)
137+
{
138+
return new MongoTableHandle(
139+
schemaTableName,
140+
remoteTableName,
141+
filter,
142+
constraint,
143+
projectedColumns,
144+
limit);
145+
}
123146
}

plugin/trino-mongodb/src/test/java/io/trino/plugin/mongodb/TestMongoConnectorTest.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,12 @@ public void testSortItemsReflectedInExplain()
143143
"TopNPartial\\[count = 5, orderBy = \\[nationkey DESC");
144144
}
145145

146+
@Test
147+
public void testProjectionPushdown()
148+
{
149+
assertThat(query("SELECT orderdate, clerk FROM orders")).isFullyPushedDown();
150+
}
151+
146152
@Override
147153
protected Optional<DataMappingTestSetup> filterDataMappingSmokeTestData(DataMappingTestSetup dataMappingTestSetup)
148154
{

0 commit comments

Comments
 (0)