Skip to content

Commit

Permalink
Flink: Migrate HadoopCatalog related tests (apache#10358)
Browse files Browse the repository at this point in the history
  • Loading branch information
tomtongue authored Jul 1, 2024
1 parent 0e7aa84 commit a975a95
Show file tree
Hide file tree
Showing 11 changed files with 356 additions and 305 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink;

import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.junit.jupiter.api.extension.ExtensionContext;

public class HadoopTableExtension extends HadoopCatalogExtension {
private final Schema schema;
private final PartitionSpec partitionSpec;

private Table table;

public HadoopTableExtension(String database, String tableName, Schema schema) {
this(database, tableName, schema, null);
}

public HadoopTableExtension(
String database, String tableName, Schema schema, PartitionSpec partitionSpec) {
super(database, tableName);
this.schema = schema;
this.partitionSpec = partitionSpec;
}

@Override
public void beforeEach(ExtensionContext context) throws Exception {
super.beforeEach(context);
if (partitionSpec == null) {
this.table = catalog.createTable(TableIdentifier.of(database, tableName), schema);
} else {
this.table =
catalog.createTable(TableIdentifier.of(database, tableName), schema, partitionSpec);
}
tableLoader.open();
}

public Table table() {
return table;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.iceberg.flink.sink;

import static org.apache.iceberg.flink.TestFixtures.DATABASE;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import java.io.IOException;
Expand All @@ -29,77 +31,74 @@
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.types.Row;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Parameter;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Parameters;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.FlinkWriteOptions;
import org.apache.iceberg.flink.HadoopCatalogResource;
import org.apache.iceberg.flink.HadoopCatalogExtension;
import org.apache.iceberg.flink.MiniClusterResource;
import org.apache.iceberg.flink.MiniFlinkClusterExtension;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
public class TestFlinkIcebergSink extends TestFlinkIcebergSinkBase {
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;

@ClassRule
public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
MiniClusterResource.createWithClassloaderCheckDisabled();
@ExtendWith(ParameterizedTestExtension.class)
public class TestFlinkIcebergSink extends TestFlinkIcebergSinkBase {

@ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
@RegisterExtension
public static MiniClusterExtension miniClusterResource =
MiniFlinkClusterExtension.createWithClassloaderCheckDisabled();

@Rule
public final HadoopCatalogResource catalogResource =
new HadoopCatalogResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE);
@RegisterExtension
private static final HadoopCatalogExtension catalogResource =
new HadoopCatalogExtension(DATABASE, TestFixtures.TABLE);

private TableLoader tableLoader;

private final FileFormat format;
private final int parallelism;
private final boolean partitioned;
@Parameter(index = 0)
private FileFormat format;

@Parameter(index = 1)
private int parallelism;

@Parameterized.Parameters(name = "format={0}, parallelism = {1}, partitioned = {2}")
@Parameter(index = 2)
private boolean partitioned;

@Parameters(name = "format={0}, parallelism = {1}, partitioned = {2}")
public static Object[][] parameters() {
return new Object[][] {
{"avro", 1, true},
{"avro", 1, false},
{"avro", 2, true},
{"avro", 2, false},
{"orc", 1, true},
{"orc", 1, false},
{"orc", 2, true},
{"orc", 2, false},
{"parquet", 1, true},
{"parquet", 1, false},
{"parquet", 2, true},
{"parquet", 2, false}
{FileFormat.AVRO, 1, true},
{FileFormat.AVRO, 1, false},
{FileFormat.AVRO, 2, true},
{FileFormat.AVRO, 2, false},
{FileFormat.ORC, 1, true},
{FileFormat.ORC, 1, false},
{FileFormat.ORC, 2, true},
{FileFormat.ORC, 2, false},
{FileFormat.PARQUET, 1, true},
{FileFormat.PARQUET, 1, false},
{FileFormat.PARQUET, 2, true},
{FileFormat.PARQUET, 2, false}
};
}

public TestFlinkIcebergSink(String format, int parallelism, boolean partitioned) {
this.format = FileFormat.fromString(format);
this.parallelism = parallelism;
this.partitioned = partitioned;
}

@Before
@BeforeEach
public void before() throws IOException {
table =
catalogResource
Expand All @@ -122,7 +121,7 @@ public void before() throws IOException {
tableLoader = catalogResource.tableLoader();
}

@Test
@TestTemplate
public void testWriteRowData() throws Exception {
List<Row> rows = Lists.newArrayList(Row.of(1, "hello"), Row.of(2, "world"), Row.of(3, "foo"));
DataStream<RowData> dataStream =
Expand Down Expand Up @@ -165,17 +164,17 @@ private int partitionFiles(String partition) throws IOException {
return SimpleDataUtil.partitionDataFiles(table, ImmutableMap.of("data", partition)).size();
}

@Test
@TestTemplate
public void testWriteRow() throws Exception {
testWriteRow(null, DistributionMode.NONE);
}

@Test
@TestTemplate
public void testWriteRowWithTableSchema() throws Exception {
testWriteRow(SimpleDataUtil.FLINK_SCHEMA, DistributionMode.NONE);
}

@Test
@TestTemplate
public void testJobNoneDistributeMode() throws Exception {
table
.updateProperties()
Expand All @@ -187,12 +186,12 @@ public void testJobNoneDistributeMode() throws Exception {
if (parallelism > 1) {
if (partitioned) {
int files = partitionFiles("aaa") + partitionFiles("bbb") + partitionFiles("ccc");
Assert.assertTrue("Should have more than 3 files in iceberg table.", files > 3);
assertThat(files).isGreaterThan(3);
}
}
}

@Test
@TestTemplate
public void testJobHashDistributionMode() {
table
.updateProperties()
Expand All @@ -204,7 +203,7 @@ public void testJobHashDistributionMode() {
.hasMessage("Flink does not support 'range' write distribution mode now.");
}

@Test
@TestTemplate
public void testJobNullDistributionMode() throws Exception {
table
.updateProperties()
Expand All @@ -214,42 +213,33 @@ public void testJobNullDistributionMode() throws Exception {
testWriteRow(null, null);

if (partitioned) {
Assert.assertEquals(
"There should be only 1 data file in partition 'aaa'", 1, partitionFiles("aaa"));
Assert.assertEquals(
"There should be only 1 data file in partition 'bbb'", 1, partitionFiles("bbb"));
Assert.assertEquals(
"There should be only 1 data file in partition 'ccc'", 1, partitionFiles("ccc"));
assertThat(partitionFiles("aaa")).isEqualTo(1);
assertThat(partitionFiles("bbb")).isEqualTo(1);
assertThat(partitionFiles("ccc")).isEqualTo(1);
}
}

@Test
@TestTemplate
public void testPartitionWriteMode() throws Exception {
testWriteRow(null, DistributionMode.HASH);
if (partitioned) {
Assert.assertEquals(
"There should be only 1 data file in partition 'aaa'", 1, partitionFiles("aaa"));
Assert.assertEquals(
"There should be only 1 data file in partition 'bbb'", 1, partitionFiles("bbb"));
Assert.assertEquals(
"There should be only 1 data file in partition 'ccc'", 1, partitionFiles("ccc"));
assertThat(partitionFiles("aaa")).isEqualTo(1);
assertThat(partitionFiles("bbb")).isEqualTo(1);
assertThat(partitionFiles("ccc")).isEqualTo(1);
}
}

@Test
@TestTemplate
public void testShuffleByPartitionWithSchema() throws Exception {
testWriteRow(SimpleDataUtil.FLINK_SCHEMA, DistributionMode.HASH);
if (partitioned) {
Assert.assertEquals(
"There should be only 1 data file in partition 'aaa'", 1, partitionFiles("aaa"));
Assert.assertEquals(
"There should be only 1 data file in partition 'bbb'", 1, partitionFiles("bbb"));
Assert.assertEquals(
"There should be only 1 data file in partition 'ccc'", 1, partitionFiles("ccc"));
assertThat(partitionFiles("aaa")).isEqualTo(1);
assertThat(partitionFiles("bbb")).isEqualTo(1);
assertThat(partitionFiles("ccc")).isEqualTo(1);
}
}

@Test
@TestTemplate
public void testTwoSinksInDisjointedDAG() throws Exception {
Map<String, String> props = ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name());

Expand Down Expand Up @@ -323,16 +313,14 @@ public void testTwoSinksInDisjointedDAG() throws Exception {
SimpleDataUtil.assertTableRows(rightTable, convertToRowData(rightRows));

leftTable.refresh();
Assert.assertNull(leftTable.currentSnapshot().summary().get("flink.test"));
Assert.assertNull(leftTable.currentSnapshot().summary().get("direction"));
assertThat(leftTable.currentSnapshot().summary()).doesNotContainKeys("flink.test", "direction");
rightTable.refresh();
Assert.assertEquals(
TestFlinkIcebergSink.class.getName(),
rightTable.currentSnapshot().summary().get("flink.test"));
Assert.assertEquals("rightTable", rightTable.currentSnapshot().summary().get("direction"));
assertThat(rightTable.currentSnapshot().summary())
.containsEntry("flink.test", TestFlinkIcebergSink.class.getName())
.containsEntry("direction", "rightTable");
}

@Test
@TestTemplate
public void testOverrideWriteConfigWithUnknownDistributionMode() {
Map<String, String> newProps = Maps.newHashMap();
newProps.put(FlinkWriteOptions.DISTRIBUTION_MODE.key(), "UNRECOGNIZED");
Expand All @@ -352,7 +340,7 @@ public void testOverrideWriteConfigWithUnknownDistributionMode() {
.hasMessage("Invalid distribution mode: UNRECOGNIZED");
}

@Test
@TestTemplate
public void testOverrideWriteConfigWithUnknownFileFormat() {
Map<String, String> newProps = Maps.newHashMap();
newProps.put(FlinkWriteOptions.WRITE_FORMAT.key(), "UNRECOGNIZED");
Expand All @@ -372,7 +360,7 @@ public void testOverrideWriteConfigWithUnknownFileFormat() {
.hasMessage("Invalid file format: UNRECOGNIZED");
}

@Test
@TestTemplate
public void testWriteRowWithTableRefreshInterval() throws Exception {
List<Row> rows = Lists.newArrayList(Row.of(1, "hello"), Row.of(2, "world"), Row.of(3, "foo"));
DataStream<RowData> dataStream =
Expand Down
Loading

0 comments on commit a975a95

Please sign in to comment.