Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include Hive and GCS runtime dependencies in main Iceberg jar #32218

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/trigger_files/IO_Iceberg_Integration_Tests.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 4
"modification": 5
}
2 changes: 1 addition & 1 deletion .github/workflows/IO_Iceberg_Integration_Tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,4 @@ jobs:
- name: Run IcebergIO Integration Test
uses: ./.github/actions/gradle-command-self-hosted-action
with:
gradle-command: :sdks:java:io:iceberg:catalogTests
gradle-command: :sdks:java:io:iceberg:integrationTest
27 changes: 17 additions & 10 deletions sdks/java/io/iceberg/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def hadoopVersions = [
hadoopVersions.each {kv -> configurations.create("hadoopVersion$kv.key")}

def iceberg_version = "1.4.2"
def hive_version = "3.1.3"
def parquet_version = "1.12.0"
def orc_version = "1.9.2"

Expand All @@ -54,11 +55,22 @@ dependencies {
implementation "org.apache.iceberg:iceberg-parquet:$iceberg_version"
implementation "org.apache.iceberg:iceberg-orc:$iceberg_version"
implementation library.java.hadoop_common
// read/write to GCS
runtimeOnly library.java.bigdataoss_gcs_connector
runtimeOnly library.java.hadoop_client
// hive
runtimeOnly ("org.apache.iceberg:iceberg-hive-metastore:$iceberg_version")
runtimeOnly project(path: ":sdks:java:io:iceberg:hive-exec", configuration: "shadow")

// *** hive tests
testImplementation "org.apache.iceberg:iceberg-common:$iceberg_version"
testImplementation ("org.apache.iceberg:iceberg-hive-metastore:$iceberg_version")
testImplementation project(path: ":sdks:java:io:iceberg:hive-exec", configuration: "shadow")
testRuntimeOnly ("org.apache.hive.hcatalog:hive-hcatalog-core:$hive_version") {
exclude group: "org.apache.hive", module: "hive-exec"
exclude group: "org.apache.parquet", module: "parquet-hadoop-bundle"
}

testImplementation library.java.hadoop_client
testImplementation library.java.bigdataoss_gcsio
testImplementation library.java.bigdataoss_gcs_connector
testImplementation library.java.bigdataoss_util_hadoop
testImplementation "org.apache.iceberg:iceberg-gcp:$iceberg_version"
testImplementation "org.apache.iceberg:iceberg-data:$iceberg_version"
testImplementation project(path: ":sdks:java:core", configuration: "shadowTest")
Expand Down Expand Up @@ -99,7 +111,7 @@ hadoopVersions.each { kv ->
task integrationTest(type: Test) {
group = "Verification"
def gcpProject = project.findProperty('gcpProject') ?: 'apache-beam-testing'
def gcpTempLocation = project.findProperty('gcpTempLocation') ?: 'gs://temp-storage-for-end-to-end-tests'
def gcpTempLocation = project.findProperty('gcpTempLocation') ?: 'gs://temp-storage-for-end-to-end-tests/iceberg-it'
systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
"--project=${gcpProject}",
"--tempLocation=${gcpTempLocation}",
Expand All @@ -115,11 +127,6 @@ task integrationTest(type: Test) {
testClassesDirs = sourceSets.test.output.classesDirs
}

tasks.register('catalogTests') {
dependsOn integrationTest
dependsOn ":sdks:java:io:iceberg:hive:integrationTest"
}

task loadTest(type: Test) {
def gcpProject = project.findProperty('gcpProject') ?: 'apache-beam-testing'
def gcpTempLocation = project.findProperty('gcpTempLocation') ?: 'gs://temp-storage-for-end-to-end-tests/temp-lt'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ artifacts {
shadowJar {
zip64 true

// need to shade "com.google.guava" to avoid Guava conflict
relocate 'com.google.protobuf', getJavaRelocatedPath('com.google.protobuf')
relocate 'shaded.parquet', getJavaRelocatedPath('shaded.parquet')
relocate 'org.apache.parquet', getJavaRelocatedPath('org.apache.parquet')
// need to relocate some libraries to avoid conflict
relocate 'com.google.protobuf', getJavaRelocatedPath('hive-exec.com.google.protobuf')
relocate 'shaded.parquet', getJavaRelocatedPath('hive-exec.shaded.parquet')
relocate 'org.apache.parquet', getJavaRelocatedPath('hive-exec.org.apache.parquet')

version "3.1.3"
mergeServiceFiles()
Expand All @@ -54,5 +54,5 @@ shadowJar {
'com/sun/**'
)
}
description = "Apache Beam :: SDKs :: Java :: IO :: Iceberg :: Hive :: Exec"
description = "Apache Beam :: SDKs :: Java :: IO :: Iceberg :: Hive-Exec"
ext.summary = "A copy of the hive-exec dependency with some popular libraries relocated."
80 changes: 0 additions & 80 deletions sdks/java/io/iceberg/hive/build.gradle

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.iceberg.hive;
package org.apache.beam.sdk.io.iceberg;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
Expand All @@ -28,8 +28,7 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import org.apache.beam.sdk.io.iceberg.IcebergUtils;
import org.apache.beam.sdk.io.iceberg.hive.testutils.HiveMetastoreExtension;
import org.apache.beam.sdk.io.iceberg.hiveutils.TestHiveMetastore;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.PAssert;
Expand All @@ -39,9 +38,6 @@
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
Expand All @@ -64,20 +60,18 @@
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.parquet.Parquet;
import org.apache.thrift.TException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;

/**
* Read and write test for {@link Managed} {@link org.apache.beam.sdk.io.iceberg.IcebergIO} using
* {@link HiveCatalog}.
* Read and write test for {@link Managed} {@link IcebergIO} using {@link HiveCatalog}.
*
* <p>Spins up a local Hive metastore to manage the Iceberg table. Warehouse path is set to a GCS
* bucket.
*/
public class IcebergHiveCatalogIT {
public class HiveCatalogIT {
private static final Schema DOUBLY_NESTED_ROW_SCHEMA =
Schema.builder()
.addStringField("doubly_nested_str")
Expand Down Expand Up @@ -141,7 +135,7 @@ public Record apply(Row input) {
}
};

private static HiveMetastoreExtension hiveMetastoreExtension;
private static TestHiveMetastore testHiveMetastore;

@Rule public TestPipeline writePipeline = TestPipeline.create();

Expand All @@ -153,9 +147,9 @@ public Record apply(Row input) {
private static final String TEST_DB = "test_db_" + System.nanoTime();

@BeforeClass
public static void setUp() throws TException {
public static void setUp() throws Exception {
String warehousePath = TestPipeline.testingPipelineOptions().getTempLocation();
hiveMetastoreExtension = new HiveMetastoreExtension(warehousePath);
testHiveMetastore = new TestHiveMetastore(warehousePath, null);
catalog =
(HiveCatalog)
CatalogUtil.loadCatalog(
Expand All @@ -164,25 +158,23 @@ public static void setUp() throws TException {
ImmutableMap.of(
CatalogProperties.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS,
String.valueOf(TimeUnit.SECONDS.toMillis(10))),
hiveMetastoreExtension.hiveConf());
testHiveMetastore.hiveConf());

String dbPath = hiveMetastoreExtension.metastore().getDatabasePath(TEST_DB);
Database db = new Database(TEST_DB, "description", dbPath, Maps.newHashMap());
hiveMetastoreExtension.metastoreClient().createDatabase(db);
testHiveMetastore.createDatabase(TEST_DB);
}

@AfterClass
public static void cleanup() throws Exception {
hiveMetastoreExtension.cleanup();
if (testHiveMetastore != null) {
testHiveMetastore.close();
}
}

private Map<String, Object> getManagedIcebergConfig(TableIdentifier table) {
String metastoreUri = hiveMetastoreExtension.hiveConf().getVar(HiveConf.ConfVars.METASTOREURIS);
String metastoreUri = testHiveMetastore.metastoreUri();

Map<String, String> confProperties =
ImmutableMap.<String, String>builder()
.put(HiveConf.ConfVars.METASTOREURIS.varname, metastoreUri)
.build();
ImmutableMap.<String, String>builder().put("hive.metastore.uris", metastoreUri).build();

return ImmutableMap.<String, Object>builder()
.put("table", table.toString())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.iceberg.hive.testutils;
package org.apache.beam.sdk.io.iceberg.hiveutils;

import java.io.IOException;
import java.io.LineNumberReader;
Expand Down
Loading
Loading