Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/trigger_files/IO_Iceberg_Integration_Tests.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run.",
"modification": 5
"modification": 4
}
1 change: 1 addition & 0 deletions .github/workflows/IO_Iceberg_Unit_Tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ jobs:
arguments: |
-PdisableSpotlessCheck=true \
-PdisableCheckStyle=true \
--info
- name: Archive JUnit Test Results
uses: actions/upload-artifact@v4
if: ${{ !success() }}
Expand Down
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
* [Python] Prism runner now auto-enabled for some Python pipelines using the direct runner ([#34921](https://github.com/apache/beam/pull/34921)).
* [YAML] WriteToTFRecord and ReadFromTFRecord Beam YAML support
* Python: Added JupyterLab 4.x extension compatibility for enhanced notebook integration ([#34495](https://github.com/apache/beam/pull/34495)).
* [IcebergIO] Create tables with a specified partition spec ([#34966](https://github.com/apache/beam/pull/34966))
* [IcebergIO] Dynamically create namespaces if needed ([#35228](https://github.com/apache/beam/pull/35228))

## Breaking Changes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
package org.apache.beam.sdk.io.iceberg;

import com.google.auto.value.AutoValue;
import java.util.List;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.dataflow.qual.Pure;

@AutoValue
Expand All @@ -31,18 +33,27 @@ public abstract class IcebergTableCreateConfig {

/** Partition spec destination, in the event that it must be dynamically created. */
@Pure
public abstract PartitionSpec getPartitionSpec();
public @Nullable PartitionSpec getPartitionSpec() {
@Nullable List<String> fields = getPartitionFields();
if (fields == null || fields.isEmpty()) {
return null;
}
return PartitionUtils.toPartitionSpec(fields, getSchema());
}

@Pure
public abstract @Nullable List<String> getPartitionFields();

@Pure
public Builder builder() {
public static Builder builder() {
return new AutoValue_IcebergTableCreateConfig.Builder();
}

@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setSchema(Schema schema);

public abstract Builder setPartitionSpec(PartitionSpec partitionSpec);
public abstract Builder setPartitionFields(@Nullable List<String> partitionFields);

@Pure
public abstract IcebergTableCreateConfig build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,20 @@ public static Builder builder() {
+ "Is mutually exclusive with 'keep' and 'drop'.")
public abstract @Nullable String getOnly();

@SchemaFieldDescription(
"Fields used to create a partition spec that is applied when tables are created. For a field 'foo', "
+ "the available partition transforms are:\n\n"
+ "- `foo`\n"
+ "- `truncate(foo, N)`\n"
+ "- `bucket(foo, N)`\n"
+ "- `hour(foo)`\n"
+ "- `day(foo)`\n"
+ "- `month(foo)`\n"
+ "- `year(foo)`\n"
+ "- `void(foo)`\n\n"
+ "For more information on partition transforms, please visit https://iceberg.apache.org/spec/#partition-transforms.")
public abstract @Nullable List<String> getPartitionFields();

@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setTable(String table);
Expand All @@ -128,6 +142,8 @@ public abstract static class Builder {

public abstract Builder setOnly(String only);

public abstract Builder setPartitionFields(List<String> partitionFields);

public abstract Configuration build();
}

Expand Down Expand Up @@ -192,6 +208,7 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
configuration.getTable(),
FileFormat.PARQUET.toString(),
rows.getSchema(),
configuration.getPartitionFields(),
configuration.getDrop(),
configuration.getKeep(),
configuration.getOnly()));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.iceberg;

import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;

import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;

class PartitionUtils {
private static final Pattern HOUR = Pattern.compile("^hour\\(([a-zA-Z0-9_-]+)\\)$");
private static final Pattern DAY = Pattern.compile("^day\\(([a-zA-Z0-9_-]+)\\)$");
private static final Pattern MONTH = Pattern.compile("^month\\(([a-zA-Z0-9_-]+)\\)$");
private static final Pattern YEAR = Pattern.compile("^year\\(([a-zA-Z0-9_-]+)\\)$");
private static final Pattern TRUNCATE =
Pattern.compile("^truncate\\(([a-zA-Z0-9_-]+),\\s*(\\d+)\\)$");
private static final Pattern BUCKET =
Pattern.compile("^bucket\\(([a-zA-Z0-9_-]+),\\s*(\\d+)\\)$");
private static final Pattern VOID = Pattern.compile("^void\\(([^)]+)\\)$");
private static final Pattern IDENTITY = Pattern.compile("^([a-zA-Z0-9_-]+)$");

private static final Map<
Pattern, BiFunction<PartitionSpec.Builder, Matcher, PartitionSpec.Builder>>
TRANSFORMATIONS =
ImmutableMap.of(
HOUR, (builder, matcher) -> builder.hour(checkStateNotNull(matcher.group(1))),
DAY, (builder, matcher) -> builder.day(checkStateNotNull(matcher.group(1))),
MONTH, (builder, matcher) -> builder.month(checkStateNotNull(matcher.group(1))),
YEAR, (builder, matcher) -> builder.year(checkStateNotNull(matcher.group(1))),
TRUNCATE,
(builder, matcher) ->
builder.truncate(
checkStateNotNull(matcher.group(1)),
Integer.parseInt(checkStateNotNull(matcher.group(2)))),
BUCKET,
(builder, matcher) ->
builder.bucket(
checkStateNotNull(matcher.group(1)),
Integer.parseInt(checkStateNotNull(matcher.group(2)))),
VOID, (builder, matcher) -> builder.alwaysNull(checkStateNotNull(matcher.group(1))),
IDENTITY,
(builder, matcher) -> builder.identity(checkStateNotNull(matcher.group(1))));

static PartitionSpec toPartitionSpec(
List<String> fields, org.apache.beam.sdk.schemas.Schema beamSchema) {
Schema schema = IcebergUtils.beamSchemaToIcebergSchema(beamSchema);
PartitionSpec.Builder builder = PartitionSpec.builderFor(schema);

for (String field : fields) {
boolean matched = false;
for (Map.Entry<Pattern, BiFunction<PartitionSpec.Builder, Matcher, PartitionSpec.Builder>>
entry : TRANSFORMATIONS.entrySet()) {
Matcher matcher = entry.getKey().matcher(field);
if (matcher.find()) {
builder = entry.getValue().apply(builder, matcher);
matched = true;
break;
}
}
if (!matched) {
throw new IllegalArgumentException(
"Could not find a partition transform for '" + field + "'.");
}
}

return builder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,18 @@ class PortableIcebergDestinations implements DynamicDestinations {
private final RowStringInterpolator interpolator;
private final String fileFormat;

private final @Nullable List<String> partitionFields;

public PortableIcebergDestinations(
String destinationTemplate,
String fileFormat,
Schema inputSchema,
@Nullable List<String> partitionFields,
@Nullable List<String> fieldsToDrop,
@Nullable List<String> fieldsToKeep,
@Nullable String onlyField) {
interpolator = new RowStringInterpolator(destinationTemplate, inputSchema);
this.interpolator = new RowStringInterpolator(destinationTemplate, inputSchema);
this.partitionFields = partitionFields;
RowFilter rf = new RowFilter(inputSchema);

if (fieldsToDrop != null) {
Expand All @@ -51,7 +55,7 @@ public PortableIcebergDestinations(
if (onlyField != null) {
rf = rf.only(onlyField);
}
rowFilter = rf;
this.rowFilter = rf;
this.fileFormat = fileFormat;
}

Expand All @@ -74,7 +78,11 @@ public String getTableStringIdentifier(ValueInSingleWindow<Row> element) {
public IcebergDestination instantiateDestination(String dest) {
return IcebergDestination.builder()
.setTableIdentifier(TableIdentifier.parse(dest))
.setTableCreateConfig(null)
.setTableCreateConfig(
IcebergTableCreateConfig.builder()
.setSchema(getDataSchema())
.setPartitionFields(partitionFields)
.build())
.setFileFormat(FileFormat.fromString(fileFormat))
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,14 +279,23 @@ static String getPartitionDataPath(
* implementation. Although it is expected, some implementations may not support creating a table
* using the Iceberg API.
*/
private Table getOrCreateTable(TableIdentifier identifier, Schema dataSchema) {
Namespace namespace = identifier.namespace();
private Table getOrCreateTable(IcebergDestination destination, Schema dataSchema) {
TableIdentifier identifier = destination.getTableIdentifier();
@Nullable Table table = TABLE_CACHE.getIfPresent(identifier);
if (table != null) {
// If fetching from cache, refresh the table to avoid working with stale metadata
// (e.g. partition spec)
table.refresh();
return table;
}

Namespace namespace = identifier.namespace();
@Nullable IcebergTableCreateConfig createConfig = destination.getTableCreateConfig();
PartitionSpec partitionSpec = PartitionSpec.unpartitioned();
if (createConfig != null && createConfig.getPartitionSpec() != null) {
partitionSpec = createConfig.getPartitionSpec();
}

synchronized (TABLE_CACHE) {
// Create namespace if it does not exist yet
if (catalog instanceof SupportsNamespaces) {
Expand All @@ -309,9 +318,12 @@ private Table getOrCreateTable(TableIdentifier identifier, Schema dataSchema) {
} catch (NoSuchTableException e) { // Otherwise, create the table
org.apache.iceberg.Schema tableSchema = IcebergUtils.beamSchemaToIcebergSchema(dataSchema);
try {
// TODO(ahmedabu98): support creating a table with a specified partition spec
table = catalog.createTable(identifier, tableSchema);
LOG.info("Created Iceberg table '{}' with schema: {}", identifier, tableSchema);
table = catalog.createTable(identifier, tableSchema, partitionSpec);
LOG.info(
"Created Iceberg table '{}' with schema: {}\n, partition spec: {}",
identifier,
tableSchema,
partitionSpec);
} catch (AlreadyExistsException ignored) {
// race condition: another worker already created this table
table = catalog.loadTable(identifier);
Expand All @@ -335,9 +347,9 @@ public boolean write(WindowedValue<IcebergDestination> icebergDestination, Row r
destinations.computeIfAbsent(
icebergDestination,
destination -> {
TableIdentifier identifier = destination.getValue().getTableIdentifier();
Table table = getOrCreateTable(identifier, row.getSchema());
return new DestinationState(destination.getValue(), table);
IcebergDestination dest = destination.getValue();
Table table = getOrCreateTable(dest, row.getSchema());
return new DestinationState(dest, table);
});

Record icebergRecord = IcebergUtils.beamRowToIcebergRecord(destinationState.schema, row);
Expand Down
Loading
Loading