Skip to content

Commit a69ccd4

Browse files
Alex Joalexjo2144
authored andcommitted
Add table function for generating Iceberg CDC records
Adds a table function which, given a range of snapshot ids, will produce a table of the rows inserted and deleted between those two snapshots. Currently only supports metadata deletes, not merge-on-read positional or equality deletes.
1 parent ddf82b3 commit a69ccd4

16 files changed

+1089
-23
lines changed

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergColumnHandle.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,15 @@ public class IcebergColumnHandle
4343
public static final int TRINO_MERGE_PARTITION_SPEC_ID = Integer.MIN_VALUE + 2;
4444
public static final int TRINO_MERGE_PARTITION_DATA = Integer.MIN_VALUE + 3;
4545

46+
public static final String DATA_CHANGE_TYPE_NAME = "_change_type";
47+
public static final int DATA_CHANGE_TYPE_ID = Integer.MIN_VALUE + 5;
48+
public static final String DATA_CHANGE_VERSION_NAME = "_change_version_id";
49+
public static final int DATA_CHANGE_VERSION_ID = Integer.MIN_VALUE + 6;
50+
public static final String DATA_CHANGE_TIMESTAMP_NAME = "_change_timestamp";
51+
public static final int DATA_CHANGE_TIMESTAMP_ID = Integer.MIN_VALUE + 7;
52+
public static final String DATA_CHANGE_ORDINAL_NAME = "_change_ordinal";
53+
public static final int DATA_CHANGE_ORDINAL_ID = Integer.MIN_VALUE + 8;
54+
4655
private final ColumnIdentity baseColumnIdentity;
4756
private final Type baseType;
4857
// The list of field ids to indicate the projected part of the top-level column represented by baseColumnIdentity

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConnector.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
import io.trino.spi.connector.ConnectorSplitManager;
3333
import io.trino.spi.connector.ConnectorTransactionHandle;
3434
import io.trino.spi.connector.TableProcedureMetadata;
35+
import io.trino.spi.function.FunctionProvider;
36+
import io.trino.spi.function.table.ConnectorTableFunction;
3537
import io.trino.spi.procedure.Procedure;
3638
import io.trino.spi.session.PropertyMetadata;
3739
import io.trino.spi.transaction.IsolationLevel;
@@ -66,6 +68,8 @@ public class IcebergConnector
6668
private final Optional<ConnectorAccessControl> accessControl;
6769
private final Set<Procedure> procedures;
6870
private final Set<TableProcedureMetadata> tableProcedures;
71+
private final Set<ConnectorTableFunction> tableFunctions;
72+
private final FunctionProvider functionProvider;
6973

7074
public IcebergConnector(
7175
Injector injector,
@@ -82,7 +86,9 @@ public IcebergConnector(
8286
List<PropertyMetadata<?>> analyzeProperties,
8387
Optional<ConnectorAccessControl> accessControl,
8488
Set<Procedure> procedures,
85-
Set<TableProcedureMetadata> tableProcedures)
89+
Set<TableProcedureMetadata> tableProcedures,
90+
Set<ConnectorTableFunction> tableFunctions,
91+
FunctionProvider functionProvider)
8692
{
8793
this.injector = requireNonNull(injector, "injector is null");
8894
this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null");
@@ -101,6 +107,8 @@ public IcebergConnector(
101107
this.accessControl = requireNonNull(accessControl, "accessControl is null");
102108
this.procedures = ImmutableSet.copyOf(requireNonNull(procedures, "procedures is null"));
103109
this.tableProcedures = ImmutableSet.copyOf(requireNonNull(tableProcedures, "tableProcedures is null"));
110+
this.tableFunctions = ImmutableSet.copyOf(requireNonNull(tableFunctions, "tableFunctions is null"));
111+
this.functionProvider = requireNonNull(functionProvider, "functionProvider is null");
104112
}
105113

106114
@Override
@@ -154,6 +162,18 @@ public Set<TableProcedureMetadata> getTableProcedures()
154162
return tableProcedures;
155163
}
156164

165+
@Override
166+
public Set<ConnectorTableFunction> getTableFunctions()
167+
{
168+
return tableFunctions;
169+
}
170+
171+
@Override
172+
public Optional<FunctionProvider> getFunctionProvider()
173+
{
174+
return Optional.of(functionProvider);
175+
}
176+
157177
@Override
158178
public List<PropertyMetadata<?>> getSessionProperties()
159179
{

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergModule.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@
2626
import io.trino.plugin.hive.orc.OrcWriterConfig;
2727
import io.trino.plugin.hive.parquet.ParquetReaderConfig;
2828
import io.trino.plugin.hive.parquet.ParquetWriterConfig;
29+
import io.trino.plugin.iceberg.functions.IcebergFunctionProvider;
30+
import io.trino.plugin.iceberg.functions.tablechanges.TableChangesFunctionProcessorProvider;
31+
import io.trino.plugin.iceberg.functions.tablechanges.TableChangesFunctionProvider;
2932
import io.trino.plugin.iceberg.procedure.DropExtendedStatsTableProcedure;
3033
import io.trino.plugin.iceberg.procedure.ExpireSnapshotsTableProcedure;
3134
import io.trino.plugin.iceberg.procedure.OptimizeTableProcedure;
@@ -37,6 +40,8 @@
3740
import io.trino.spi.connector.ConnectorPageSourceProvider;
3841
import io.trino.spi.connector.ConnectorSplitManager;
3942
import io.trino.spi.connector.TableProcedureMetadata;
43+
import io.trino.spi.function.FunctionProvider;
44+
import io.trino.spi.function.table.ConnectorTableFunction;
4045
import io.trino.spi.procedure.Procedure;
4146

4247
import static com.google.inject.multibindings.Multibinder.newSetBinder;
@@ -65,6 +70,7 @@ public void configure(Binder binder)
6570
.setDefault().toInstance(true);
6671
binder.bind(ConnectorSplitManager.class).to(IcebergSplitManager.class).in(Scopes.SINGLETON);
6772
newOptionalBinder(binder, ConnectorPageSourceProvider.class).setDefault().to(IcebergPageSourceProvider.class).in(Scopes.SINGLETON);
73+
binder.bind(IcebergPageSourceProvider.class).in(Scopes.SINGLETON);
6874
binder.bind(ConnectorPageSinkProvider.class).to(IcebergPageSinkProvider.class).in(Scopes.SINGLETON);
6975
binder.bind(ConnectorNodePartitioningProvider.class).to(IcebergNodePartitioningProvider.class).in(Scopes.SINGLETON);
7076

@@ -95,5 +101,9 @@ public void configure(Binder binder)
95101
tableProcedures.addBinding().toProvider(DropExtendedStatsTableProcedure.class).in(Scopes.SINGLETON);
96102
tableProcedures.addBinding().toProvider(ExpireSnapshotsTableProcedure.class).in(Scopes.SINGLETON);
97103
tableProcedures.addBinding().toProvider(RemoveOrphanFilesTableProcedure.class).in(Scopes.SINGLETON);
104+
105+
newSetBinder(binder, ConnectorTableFunction.class).addBinding().toProvider(TableChangesFunctionProvider.class).in(Scopes.SINGLETON);
106+
binder.bind(FunctionProvider.class).to(IcebergFunctionProvider.class).in(Scopes.SINGLETON);
107+
binder.bind(TableChangesFunctionProcessorProvider.class).in(Scopes.SINGLETON);
98108
}
99109
}

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java

Lines changed: 52 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -231,21 +231,52 @@ public ConnectorPageSource createPageSource(
231231
DynamicFilter dynamicFilter)
232232
{
233233
IcebergSplit split = (IcebergSplit) connectorSplit;
234-
IcebergTableHandle table = (IcebergTableHandle) connectorTable;
235-
236234
List<IcebergColumnHandle> icebergColumns = columns.stream()
237235
.map(IcebergColumnHandle.class::cast)
238236
.collect(toImmutableList());
239-
240-
Schema tableSchema = SchemaParser.fromJson(table.getTableSchemaJson());
241-
242-
Set<IcebergColumnHandle> deleteFilterRequiredColumns = requiredColumnsForDeletes(tableSchema, split.getDeletes());
243-
244-
PartitionSpec partitionSpec = PartitionSpecParser.fromJson(tableSchema, split.getPartitionSpecJson());
237+
IcebergTableHandle tableHandle = (IcebergTableHandle) connectorTable;
238+
Schema schema = SchemaParser.fromJson(tableHandle.getTableSchemaJson());
239+
PartitionSpec partitionSpec = PartitionSpecParser.fromJson(schema, split.getPartitionSpecJson());
245240
org.apache.iceberg.types.Type[] partitionColumnTypes = partitionSpec.fields().stream()
246-
.map(field -> field.transform().getResultType(tableSchema.findType(field.sourceId())))
241+
.map(field -> field.transform().getResultType(schema.findType(field.sourceId())))
247242
.toArray(org.apache.iceberg.types.Type[]::new);
248-
PartitionData partitionData = PartitionData.fromJson(split.getPartitionDataJson(), partitionColumnTypes);
243+
244+
return createPageSource(
245+
session,
246+
icebergColumns,
247+
schema,
248+
partitionSpec,
249+
PartitionData.fromJson(split.getPartitionDataJson(), partitionColumnTypes),
250+
split.getDeletes(),
251+
dynamicFilter,
252+
tableHandle.getUnenforcedPredicate(),
253+
split.getPath(),
254+
split.getStart(),
255+
split.getLength(),
256+
split.getFileSize(),
257+
split.getPartitionDataJson(),
258+
split.getFileFormat(),
259+
tableHandle.getNameMappingJson().map(NameMappingParser::fromJson));
260+
}
261+
262+
public ConnectorPageSource createPageSource(
263+
ConnectorSession session,
264+
List<IcebergColumnHandle> icebergColumns,
265+
Schema tableSchema,
266+
PartitionSpec partitionSpec,
267+
PartitionData partitionData,
268+
List<DeleteFile> deletes,
269+
DynamicFilter dynamicFilter,
270+
TupleDomain<IcebergColumnHandle> unenforcedPredicate,
271+
String path,
272+
long start,
273+
long length,
274+
long fileSize,
275+
String partitionDataJson,
276+
IcebergFileFormat fileFormat,
277+
Optional<NameMapping> nameMapping)
278+
{
279+
Set<IcebergColumnHandle> deleteFilterRequiredColumns = requiredColumnsForDeletes(tableSchema, deletes);
249280
Map<Integer, Optional<String>> partitionKeys = getPartitionKeys(partitionData, partitionSpec);
250281

251282
List<IcebergColumnHandle> requiredColumns = new ArrayList<>(icebergColumns);
@@ -282,7 +313,7 @@ else if (identity.getId() == TRINO_MERGE_PARTITION_DATA) {
282313
}
283314
});
284315

285-
TupleDomain<IcebergColumnHandle> effectivePredicate = table.getUnenforcedPredicate()
316+
TupleDomain<IcebergColumnHandle> effectivePredicate = unenforcedPredicate
286317
.intersect(dynamicFilter.getCurrentPredicate().transformKeys(IcebergColumnHandle.class::cast))
287318
.simplify(ICEBERG_DOMAIN_COMPACTION_THRESHOLD);
288319
if (effectivePredicate.isNone()) {
@@ -291,21 +322,21 @@ else if (identity.getId() == TRINO_MERGE_PARTITION_DATA) {
291322

292323
TrinoFileSystem fileSystem = fileSystemFactory.create(session);
293324
TrinoInputFile inputfile = isUseFileSizeFromMetadata(session)
294-
? fileSystem.newInputFile(Location.of(split.getPath()), split.getFileSize())
295-
: fileSystem.newInputFile(Location.of(split.getPath()));
325+
? fileSystem.newInputFile(Location.of(path), fileSize)
326+
: fileSystem.newInputFile(Location.of(path));
296327

297328
ReaderPageSourceWithRowPositions readerPageSourceWithRowPositions = createDataPageSource(
298329
session,
299330
inputfile,
300-
split.getStart(),
301-
split.getLength(),
331+
start,
332+
length,
302333
partitionSpec.specId(),
303-
split.getPartitionDataJson(),
304-
split.getFileFormat(),
305-
SchemaParser.fromJson(table.getTableSchemaJson()),
334+
partitionDataJson,
335+
fileFormat,
336+
tableSchema,
306337
requiredColumns,
307338
effectivePredicate,
308-
table.getNameMappingJson().map(NameMappingParser::fromJson),
339+
nameMapping,
309340
partitionKeys);
310341
ReaderPageSource dataPageSource = readerPageSourceWithRowPositions.getReaderPageSource();
311342

@@ -324,8 +355,8 @@ else if (identity.getId() == TRINO_MERGE_PARTITION_DATA) {
324355
List<DeleteFilter> deleteFilters = readDeletes(
325356
session,
326357
tableSchema,
327-
split.getPath(),
328-
split.getDeletes(),
358+
path,
359+
deletes,
329360
readerPageSourceWithRowPositions.getStartRowPosition(),
330361
readerPageSourceWithRowPositions.getEndRowPosition());
331362
return deleteFilters.stream()

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitManager.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
import io.airlift.units.Duration;
1919
import io.trino.filesystem.TrinoFileSystemFactory;
2020
import io.trino.plugin.base.classloader.ClassLoaderSafeConnectorSplitSource;
21+
import io.trino.plugin.iceberg.functions.tablechanges.TableChangesFunctionHandle;
22+
import io.trino.plugin.iceberg.functions.tablechanges.TableChangesSplitSource;
2123
import io.trino.spi.connector.ConnectorSession;
2224
import io.trino.spi.connector.ConnectorSplitManager;
2325
import io.trino.spi.connector.ConnectorSplitSource;
@@ -26,6 +28,7 @@
2628
import io.trino.spi.connector.Constraint;
2729
import io.trino.spi.connector.DynamicFilter;
2830
import io.trino.spi.connector.FixedSplitSource;
31+
import io.trino.spi.function.table.ConnectorTableFunctionHandle;
2932
import io.trino.spi.type.TypeManager;
3033
import org.apache.iceberg.Table;
3134
import org.apache.iceberg.TableScan;
@@ -99,4 +102,24 @@ public ConnectorSplitSource getSplits(
99102

100103
return new ClassLoaderSafeConnectorSplitSource(splitSource, IcebergSplitManager.class.getClassLoader());
101104
}
105+
106+
@Override
107+
public ConnectorSplitSource getSplits(
108+
ConnectorTransactionHandle transaction,
109+
ConnectorSession session,
110+
ConnectorTableFunctionHandle function)
111+
{
112+
if (function instanceof TableChangesFunctionHandle functionHandle) {
113+
Table icebergTable = transactionManager.get(transaction, session.getIdentity()).getIcebergTable(session, functionHandle.schemaTableName());
114+
115+
TableChangesSplitSource tableChangesSplitSource = new TableChangesSplitSource(
116+
icebergTable,
117+
icebergTable.newIncrementalChangelogScan()
118+
.fromSnapshotExclusive(functionHandle.startSnapshotId())
119+
.toSnapshot(functionHandle.endSnapshotId()));
120+
return new ClassLoaderSafeConnectorSplitSource(tableChangesSplitSource, IcebergSplitManager.class.getClassLoader());
121+
}
122+
123+
throw new IllegalStateException("Unknown table function: " + function);
124+
}
102125
}

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/InternalIcebergConnectorFactory.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@
5252
import io.trino.spi.connector.ConnectorPageSourceProvider;
5353
import io.trino.spi.connector.ConnectorSplitManager;
5454
import io.trino.spi.connector.TableProcedureMetadata;
55+
import io.trino.spi.function.FunctionProvider;
56+
import io.trino.spi.function.table.ConnectorTableFunction;
5557
import io.trino.spi.procedure.Procedure;
5658
import io.trino.spi.session.PropertyMetadata;
5759
import io.trino.spi.type.TypeManager;
@@ -126,6 +128,8 @@ public static Connector createConnector(
126128
IcebergAnalyzeProperties icebergAnalyzeProperties = injector.getInstance(IcebergAnalyzeProperties.class);
127129
Set<Procedure> procedures = injector.getInstance(Key.get(new TypeLiteral<Set<Procedure>>() {}));
128130
Set<TableProcedureMetadata> tableProcedures = injector.getInstance(Key.get(new TypeLiteral<Set<TableProcedureMetadata>>() {}));
131+
Set<ConnectorTableFunction> tableFunctions = injector.getInstance(Key.get(new TypeLiteral<Set<ConnectorTableFunction>>() {}));
132+
FunctionProvider functionProvider = injector.getInstance(FunctionProvider.class);
129133
Optional<ConnectorAccessControl> accessControl = injector.getInstance(Key.get(new TypeLiteral<Optional<ConnectorAccessControl>>() {}));
130134
// Materialized view should allow configuring all the supported iceberg table properties for the storage table
131135
List<PropertyMetadata<?>> materializedViewProperties = Stream.of(icebergTableProperties.getTableProperties(), materializedViewAdditionalProperties.getMaterializedViewProperties())
@@ -147,7 +151,9 @@ public static Connector createConnector(
147151
icebergAnalyzeProperties.getAnalyzeProperties(),
148152
accessControl,
149153
procedures,
150-
tableProcedures);
154+
tableProcedures,
155+
tableFunctions,
156+
functionProvider);
151157
}
152158
}
153159
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.plugin.iceberg.functions;
15+
16+
import com.google.inject.Inject;
17+
import io.trino.plugin.iceberg.functions.tablechanges.TableChangesFunctionHandle;
18+
import io.trino.plugin.iceberg.functions.tablechanges.TableChangesFunctionProcessorProvider;
19+
import io.trino.spi.function.FunctionProvider;
20+
import io.trino.spi.function.table.ConnectorTableFunctionHandle;
21+
import io.trino.spi.function.table.TableFunctionProcessorProvider;
22+
23+
import static java.util.Objects.requireNonNull;
24+
25+
public class IcebergFunctionProvider
26+
implements FunctionProvider
27+
{
28+
private final TableChangesFunctionProcessorProvider tableChangesFunctionProcessorProvider;
29+
30+
@Inject
31+
public IcebergFunctionProvider(TableChangesFunctionProcessorProvider tableChangesFunctionProcessorProvider)
32+
{
33+
this.tableChangesFunctionProcessorProvider = requireNonNull(tableChangesFunctionProcessorProvider, "tableChangesFunctionProcessorProvider is null");
34+
}
35+
36+
@Override
37+
public TableFunctionProcessorProvider getTableFunctionProcessorProvider(ConnectorTableFunctionHandle functionHandle)
38+
{
39+
if (functionHandle instanceof TableChangesFunctionHandle) {
40+
return tableChangesFunctionProcessorProvider;
41+
}
42+
43+
throw new UnsupportedOperationException("Unsupported function: " + functionHandle);
44+
}
45+
}

0 commit comments

Comments
 (0)