Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/trigger_files/IO_Iceberg_Integration_Tests.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run.",
"modification": 4
"modification": 1
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 4
"modification": 5
}
2 changes: 1 addition & 1 deletion .github/trigger_files/beam_PostCommit_SQL.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 1
"modification": 2
}
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,12 @@
## I/Os

* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* [IcebergIO] Now available with Beam SQL! ([#34799](https://github.com/apache/beam/pull/34799))
* [IcebergIO] Support reading with column pruning ([#34856](https://github.com/apache/beam/pull/34856))
* [IcebergIO] Support reading with pushdown filtering ([#34827](https://github.com/apache/beam/pull/34827))

## New Features / Improvements
* [Beam SQL] Introducing Beam Catalogs ([#35223](https://github.com/apache/beam/pull/35223))
* Adding Google Storage Requests Pays feature (Golang)([#30747](https://github.com/apache/beam/issues/30747)).
* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* [Python] Prism runner now auto-enabled for some Python pipelines using the direct runner ([#34921](https://github.com/apache/beam/pull/34921)).
Expand Down
4 changes: 4 additions & 0 deletions sdks/java/extensions/sql/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ dependencies {
fmppTask "org.freemarker:freemarker:2.3.31"
fmppTemplates library.java.vendored_calcite_1_28_0
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation project(":sdks:java:managed")
implementation project(":sdks:java:io:iceberg")
runtimeOnly project(":sdks:java:io:iceberg:bqms")
runtimeOnly project(":sdks:java:io:iceberg:hive")
implementation project(":sdks:java:extensions:avro")
implementation project(":sdks:java:extensions:join-library")
permitUnusedDeclared project(":sdks:java:extensions:join-library") // BEAM-11761
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,16 +99,16 @@ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
if (i > 0) {
writer.keyword(",");
}
properties.get(i).unparse(writer, leftPrec, rightPrec);
}

for (int i = 0; i < properties.size(); i += 2) {
if (i > 0) {
writer.keyword(",");
}
properties.get(i).unparse(writer, leftPrec, rightPrec); // key
SqlNode property = properties.get(i);
checkState(
property instanceof SqlNodeList,
String.format(
"Unexpected properties entry '%s' of class '%s'", property, property.getClass()));
SqlNodeList kv = ((SqlNodeList) property);

kv.get(0).unparse(writer, leftPrec, rightPrec); // key
writer.keyword("=");
properties.get(i + 1).unparse(writer, leftPrec, rightPrec); // value
kv.get(1).unparse(writer, leftPrec, rightPrec); // value
}
writer.keyword(")");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
package org.apache.beam.sdk.extensions.sql.meta.catalog;

import java.util.Map;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.extensions.sql.meta.store.MetaStore;

/**
* Represents a named and configurable container for managing tables. Is defined with a type and
* configuration properties. Uses an underlying {@link MetaStore} to manage tables and table
* providers.
*/
@Internal
public interface Catalog {
/** A type that defines this catalog. */
String type();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.beam.sdk.extensions.sql.meta.catalog;

import java.util.Map;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.extensions.sql.impl.BeamCalciteSchema;
import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
import org.checkerframework.checker.nullness.qual.Nullable;
Expand All @@ -32,6 +33,7 @@
* <p>When {@link #registerTableProvider(String, TableProvider)} is called, the provider should
* become available for all catalogs.
*/
@Internal
public interface CatalogManager {
/** Creates and stores a catalog of a particular type. */
void createCatalog(String name, String type, Map<String, String> properties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@
*/
package org.apache.beam.sdk.extensions.sql.meta.catalog;

import org.apache.beam.sdk.annotations.Internal;

/**
* Over-arching registrar to capture available {@link Catalog}s. Implementations should be marked
* with {@link com.google.auto.service.AutoService} to be available to {@link
* java.util.ServiceLoader}s.
*/
@Internal
public interface CatalogRegistrar {
Iterable<Class<? extends Catalog>> getCatalogs();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,15 @@
*/
package org.apache.beam.sdk.extensions.sql.meta.catalog;

import com.google.auto.service.AutoService;
import java.util.Map;
import org.apache.beam.sdk.extensions.sql.meta.store.InMemoryMetaStore;
import org.apache.beam.sdk.extensions.sql.meta.store.MetaStore;
import org.apache.beam.sdk.util.Preconditions;

@AutoService(Catalog.class)
public class InMemoryCatalog implements Catalog {
private final String name;
private final Map<String, String> properties;
private final InMemoryMetaStore metaStore = new InMemoryMetaStore();
protected final InMemoryMetaStore metaStore = new InMemoryMetaStore();

public InMemoryCatalog(String name, Map<String, String> properties) {
this.name = name;
Expand All @@ -41,7 +39,8 @@ public String type() {

@Override
public String name() {
return Preconditions.checkStateNotNull(name, "InMemoryCatalog has not been initialized");
return Preconditions.checkStateNotNull(
name, getClass().getSimpleName() + " has not been initialized");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,16 @@
package org.apache.beam.sdk.extensions.sql.meta.catalog;

import com.google.auto.service.AutoService;
import org.apache.beam.sdk.extensions.sql.meta.provider.iceberg.IcebergCatalog;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;

@AutoService(CatalogRegistrar.class)
public class InMemoryCatalogRegistrar implements CatalogRegistrar {
@Override
public Iterable<Class<? extends Catalog>> getCatalogs() {
return ImmutableList.of(InMemoryCatalog.class);
return ImmutableList.<Class<? extends Catalog>>builder()
.add(InMemoryCatalog.class)
.add(IcebergCatalog.class)
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.sql.meta.provider.iceberg;

import java.util.Map;
import org.apache.beam.sdk.extensions.sql.meta.catalog.InMemoryCatalog;

public class IcebergCatalog extends InMemoryCatalog {
public IcebergCatalog(String name, Map<String, String> properties) {
super(name, properties);
metaStore.registerProvider(new IcebergTableProvider(name, properties));
}

@Override
public String type() {
return "iceberg";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.sql.meta.provider.iceberg;

import static org.apache.beam.sdk.io.iceberg.FilterUtils.SUPPORTED_OPS;
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
import static org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlKind.AND;
import static org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlKind.OR;

import java.util.List;
import java.util.stream.Collectors;
import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTableFilter;
import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexCall;
import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexInputRef;
import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexLiteral;
import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexNode;
import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.type.SqlTypeName;
import org.apache.beam.vendor.calcite.v1_28_0.org.apache.commons.lang3.tuple.Pair;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.checkerframework.checker.nullness.qual.Nullable;

public class IcebergFilter implements BeamSqlTableFilter {
private @Nullable List<RexNode> supported;
private @Nullable List<RexNode> unsupported;
private final List<RexNode> predicateCNF;

public IcebergFilter(List<RexNode> predicateCNF) {
this.predicateCNF = predicateCNF;
}

private void maybeInitialize() {
if (supported != null && unsupported != null) {
return;
}
ImmutableList.Builder<RexNode> supportedBuilder = ImmutableList.builder();
ImmutableList.Builder<RexNode> unsupportedBuilder = ImmutableList.builder();
for (RexNode node : predicateCNF) {
if (!node.getType().getSqlTypeName().equals(SqlTypeName.BOOLEAN)) {
throw new IllegalArgumentException(
"Predicate node '"
+ node.getClass().getSimpleName()
+ "' should be a boolean expression, but was: "
+ node.getType().getSqlTypeName());
}

if (isSupported(node).getLeft()) {
supportedBuilder.add(node);
} else {
unsupportedBuilder.add(node);
}
}
supported = supportedBuilder.build();
unsupported = unsupportedBuilder.build();
}

@Override
public List<RexNode> getNotSupported() {
maybeInitialize();
return checkStateNotNull(unsupported);
}

@Override
public int numSupported() {
maybeInitialize();
return BeamSqlTableFilter.expressionsInFilter(checkStateNotNull(supported));
}

public List<RexNode> getSupported() {
maybeInitialize();
return checkStateNotNull(supported);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(IcebergFilter.class)
.add(
"supported",
checkStateNotNull(supported).stream()
.map(RexNode::toString)
.collect(Collectors.joining()))
.add(
"unsupported",
checkStateNotNull(unsupported).stream()
.map(RexNode::toString)
.collect(Collectors.joining()))
.toString();
}

/**
* Check whether a {@code RexNode} is supported. As of right now Iceberg supports: 1. Complex
* predicates (both conjunction and disjunction). 2. Comparison between a column and a literal.
*
* @param node A node to check for predicate push-down support.
* @return A pair containing a boolean whether an expression is supported and the number of input
* references used by the expression.
*/
private Pair<Boolean, Integer> isSupported(RexNode node) {
int numberOfInputRefs = 0;
boolean isSupported = true;

if (node instanceof RexCall) {
RexCall compositeNode = (RexCall) node;
if (!SUPPORTED_OPS.contains(node.getKind())) {
isSupported = false;
} else {
for (RexNode operand : compositeNode.getOperands()) {
// All operands must be supported for a parent node to be supported.
Pair<Boolean, Integer> childSupported = isSupported(operand);
if (!node.getKind().belongsTo(ImmutableSet.of(AND, OR))) {
numberOfInputRefs += childSupported.getRight();
}
// Predicate functions with multiple columns are unsupported.
isSupported = numberOfInputRefs < 2 && childSupported.getLeft();
}
}
} else if (node instanceof RexInputRef) {
numberOfInputRefs = 1;
} else if (node instanceof RexLiteral) {
// RexLiterals are expected, but no action is needed.
} else {
throw new UnsupportedOperationException(
"Encountered an unexpected node type: " + node.getClass().getSimpleName());
}

return Pair.of(isSupported, numberOfInputRefs);
}
}
Loading
Loading