Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: IcebergIO cannot write data into an hourly partitioned table #32865

Open
3 of 17 tasks
DanielMorales9 opened this issue Oct 18, 2024 · 0 comments
Open
3 of 17 tasks
Assignees
Labels
bug dataflow IcebergIO IcebergIO: can only be used through ManagedIO io java P2

Comments

@DanielMorales9
Copy link

What happened?

When I attempt to write data into an hourly partitioned table, no matter whether the schema type Datetime is standard or logical (SQLType), it fails.

Iceberg expects a long but Beam passes a Datetime class.

package com.experiment.beam;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.Row;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.types.Types;
import org.joda.time.Instant;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.io.File;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.*;

import static org.junit.Assert.assertTrue;

@RunWith(Parameterized.class)
public class HourlyPartitionedIcebergTableTest {

    // Constants for testing
    private static final String WAREHOUSE_DIR =
            new File("src/test/resources/iceberg-test").getAbsolutePath();
    private static final String WAREHOUSE_PATH = String.format("file:///%s", WAREHOUSE_DIR);
    private static final Map<String, Object> CATALOG_CONFIG = new HashMap<String, Object>() {{
        put("warehouse", WAREHOUSE_PATH);
        put("type", "hadoop");
    }};
    private static final Map<String, Object> ICEBERG_CONFIG = new HashMap<String, Object>() {{
        put("catalog_name", "iceberg");
        put("catalog_properties", CATALOG_CONFIG);
    }};
    private static final Configuration CONF = new Configuration();
    private static final HadoopCatalog HADOOP_CATALOG = new HadoopCatalog(CONF, WAREHOUSE_PATH);

    private final String schemaName;
    private final String tableName;
    private final Schema beamSchema;
    private final List<Row> testData;
    private TableIdentifier tableIdentifier;


    @Parameterized.Parameters
    public static Collection<Object[]> data() {
        Schema schemaWithSQLDatetime = Schema.builder()
                .addStringField("id")
                .addLogicalTypeField("event_time", SqlTypes.DATETIME)
                .build();
        Schema schemaWithDatetime = Schema.builder()
                .addStringField("id")
                .addDateTimeField("event_time")
                .build();
        return Arrays.asList(new Object[][]{
                {
                        "myschema",
                        "test1",
                        schemaWithDatetime,
                        Arrays.asList(
                                Row.withSchema(schemaWithDatetime)
                                        .addValues("record-1", Instant.now())
                                        .build(),
                                Row.withSchema(schemaWithDatetime)
                                        .addValues("record-2", Instant.now().minus(3600 * 1000))
                                        .build()
                        )
                },
                {
                        "myschema",
                        "test2",
                        schemaWithSQLDatetime,
                        Arrays.asList(
                                Row.withSchema(schemaWithSQLDatetime)
                                        .addValues("record-3", LocalDateTime.now())
                                        .build(),
                                Row.withSchema(schemaWithSQLDatetime)
                                        .addValues("record-4", LocalDateTime.now().minus(1, ChronoUnit.HOURS))
                                        .build()
                        )
                },
        });
    }

    public HourlyPartitionedIcebergTableTest(String schemaName, String tableName, Schema beamSchema,
                                             List<Row> hardcodedData) {
        this.schemaName = schemaName;
        this.tableName = tableName;
        this.beamSchema = beamSchema;
        this.testData = hardcodedData;
    }

    @Before
    public void setUp() {
        // Create table identifier for the test
        tableIdentifier = TableIdentifier.of(schemaName, tableName);

        // Define Iceberg schema
        org.apache.iceberg.Schema icebergSchema = new org.apache.iceberg.Schema(
                Types.NestedField.required(1, "id", Types.StringType.get()),
                Types.NestedField.required(2, "event_time", Types.TimestampType.withZone())
        );

        PartitionSpec spec = PartitionSpec.builderFor(icebergSchema)
                .hour("event_time")
                .build();

        // Create the Iceberg table
        HADOOP_CATALOG.createTable(tableIdentifier, icebergSchema, spec);
    }

    @Test
    public void testPipeline() {
        // Define Beam pipeline options
        PipelineOptions options = PipelineOptionsFactory.create();
        Pipeline pipeline = Pipeline.create(options);

        ICEBERG_CONFIG.put("table", String.format("%s.%s", schemaName, tableName));

        // Build the pipeline using the hardcoded data
        pipeline
                .apply("CreateHardcodedData", Create.of(testData).withRowSchema(beamSchema))
                .apply("WriteToIceberg", Managed.write(Managed.ICEBERG).withConfig(ICEBERG_CONFIG));

        // Run the pipeline
        pipeline.run().waitUntilFinish();

        // Verify that the table exists after data is written
        assertTrue("Table should exist after writing data.", HADOOP_CATALOG.tableExists(tableIdentifier));
    }

    @After
    public void tearDown() {
        // Drop the table after the test
        if (HADOOP_CATALOG.tableExists(tableIdentifier)) {
            HADOOP_CATALOG.dropTable(tableIdentifier);
        }
    }
}

Test Results:

org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalStateException: Not an instance of java.lang.Long: 2024-10-18T16:14:32.934Z

	at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:377)
	at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:345)
	at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)
	at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
	at org.apache.beam.sdk.Pipeline.run(Pipeline.java:325)
	at org.apache.beam.sdk.Pipeline.run(Pipeline.java:310)
	at com.experiment.beam.HourlyPartitionedIcebergTableTest.testPipeline(HourlyPartitionedIcebergTableTest.java:136)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
	at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
	at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
	at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
	at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
	at org.junit.runners.Suite.runChild(Suite.java:128)
	at org.junit.runners.Suite.runChild(Suite.java:27)
	at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
	at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
	at org.junit.runners.Suite.runChild(Suite.java:128)
	at org.junit.runners.Suite.runChild(Suite.java:27)
	at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
	at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
	at com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
	at com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
	at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
	at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
	at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
Caused by: java.lang.IllegalStateException: Not an instance of java.lang.Long: 2024-10-18T16:14:32.934Z
	at org.apache.iceberg.data.GenericRecord.get(GenericRecord.java:123)
	at org.apache.iceberg.Accessors$PositionAccessor.get(Accessors.java:71)
	at org.apache.iceberg.Accessors$PositionAccessor.get(Accessors.java:58)
	at org.apache.iceberg.PartitionKey.partition(PartitionKey.java:106)
	at org.apache.beam.sdk.io.iceberg.RecordWriterManager$DestinationState.write(RecordWriterManager.java:136)
	at org.apache.beam.sdk.io.iceberg.RecordWriterManager.write(RecordWriterManager.java:270)
	at org.apache.beam.sdk.io.iceberg.WriteUngroupedRowsToFiles$WriteUngroupedRowsToFilesDoFn.processElement(WriteUngroupedRowsToFiles.java:243)

org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalStateException: Not an instance of java.lang.Long: 2024-10-18T17:14:32.961579Z

	at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:377)
	at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:345)
	at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)
	at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
	at org.apache.beam.sdk.Pipeline.run(Pipeline.java:325)
	at org.apache.beam.sdk.Pipeline.run(Pipeline.java:310)
	at com.experiment.beam.HourlyPartitionedIcebergTableTest.testPipeline(HourlyPartitionedIcebergTableTest.java:136)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
	at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
	at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
	at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
	at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
	at org.junit.runners.Suite.runChild(Suite.java:128)
	at org.junit.runners.Suite.runChild(Suite.java:27)
	at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
	at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
	at org.junit.runners.Suite.runChild(Suite.java:128)
	at org.junit.runners.Suite.runChild(Suite.java:27)
	at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
	at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
	at com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
	at com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
	at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
	at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
	at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
Caused by: java.lang.IllegalStateException: Not an instance of java.lang.Long: 2024-10-18T17:14:32.961579Z
	at org.apache.iceberg.data.GenericRecord.get(GenericRecord.java:123)
	at org.apache.iceberg.Accessors$PositionAccessor.get(Accessors.java:71)
	at org.apache.iceberg.Accessors$PositionAccessor.get(Accessors.java:58)
	at org.apache.iceberg.PartitionKey.partition(PartitionKey.java:106)
	at org.apache.beam.sdk.io.iceberg.RecordWriterManager$DestinationState.write(RecordWriterManager.java:136)
	at org.apache.beam.sdk.io.iceberg.RecordWriterManager.write(RecordWriterManager.java:270)
	at org.apache.beam.sdk.io.iceberg.WriteUngroupedRowsToFiles$WriteUngroupedRowsToFilesDoFn.processElement(WriteUngroupedRowsToFiles.java:243)

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug dataflow IcebergIO IcebergIO: can only be used through ManagedIO io java P2
Projects
None yet
Development

No branches or pull requests

3 participants