-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
First version of the Iceberg sink for Apache Beam #1972
Conversation
hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
Outdated
Show resolved
Hide resolved
@Fokko, thanks for working on this! This looks like it may not be complete. Do you think it should be a draft? |
@rdblue @HeartSaVioR Thanks for the review. This is indeed still a draft. I'll add some more tests soon 👍 |
Able to read the results with Spark:
|
|
||
private static final PipelineOptions options = TestPipeline.testingPipelineOptions(); | ||
|
||
private static final String stringSchema = "{\n" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure why this would result in readable data files. Iceberg requires either a schema in the Avro file with field IDs or a name mapping in the Iceberg table to get the field IDs from Avro field names. How did you read the table produced by this test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I've started by implementing Avro files, because this is our use case. But with the Beam API, Parquet and ORC are also supported.
For the actual interface, we only use the Iceberg schema. The IcebergIO
only accepts org.apache.iceberg.Schema
. For the test we convert the Avro schema to iceberg using AvroSchemaUtil.toIceberg(avroSchema);
.
translates into:
I need to clean the test up a bit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, so the initial conversion to Iceberg assigns some IDs, then you pass that Iceberg schema into the source. The IDs are reassigned to ensure consistency when a table is created, and you're probably converting that table schema back to Iceberg to configure the Avro writer? Or maybe you're just getting lucky and the IDs are assigned the same way in the tests.
final FileIO.Write<Void, GenericRecord> avroFileIO = FileIO.<GenericRecord>write() | ||
.via(AvroIO.sink(avroSchema)) | ||
.to("gs://.../../") | ||
.withSuffix(".avro"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would work if the Avro schema passed to the sink were created from the Iceberg table schema because that sets the field IDs correctly. But I think that's not very obvious to users, so I think it would be better to eventually have an Iceberg sink that uses the Iceberg schema directly like Spark and Flink do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the implication of doing this the other way around? In the current setup we have the Avro schema, and distill the Iceberg schema from it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem with converting Avro to Iceberg is that there is no guarantee that the field IDs are assigned consistently (see the Field IDs section for Avro). If the ID annotations aren't present, they are reassigned but that depends on the structure of the schema.
To safely write Avro for an Iceberg table, you should always convert from the table schema to an Avro schema so that the IDs are correct and match the table.
|
||
// Append the new files | ||
final AppendFiles app = table.newAppend(); | ||
// We need to get the statistics, not easy to get them through Beam |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a PR to aggregate the stats while writing an Avro file. You would get them for free using an Iceberg-based file writer instead of the generic Avro one. You'd also get correct record counts and file sizes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for pointing out. I'll track PR https://github.com/apache/iceberg/pull/1963/files. I think we have to make a decision here. I do like the fact that we fully re-use the writers from Beam, so we don't have to maintain that part.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Iceberg provides common write classes in the core module that we extend for Flink and Spark object models, so if you do go with Iceberg writers, it would probably not be too much code. For example, take a look at the SparkPartitionedWriter
.
Using Iceberg writers would help with stats as well as writing delete files if you later want to support CDC or other row-level use cases.
But, one challenge is integrating those writers with an object model. Spark and Flink have row classes that are directly supported in the integration. It looks like you're using Avro generics for Avro, and using an object model specific to a data format is going to be a little difficult because Iceberg needs additional metadata, like field IDs. It sounds difficult to make sure that each format writer is producing the right information for Iceberg tables.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avro is a first-class citizen in Beam, this is more or less the equivalent of the InternalRow of Spark. We could replace the AvroIO with an IcebergIO, but this feels like reinventing the wheel to me. I've noticed that there is quite some effort in the writers for Beam. For example, tuning the number and size of the output buffers kind of stuff. I would like to reuse this logic.
Apart from the writers themselves. We still need to current logic of doing the commits. So, I think we can keep this open for now.
Looks like a good start, @Fokko! I'd definitely recommend relying a bit more on Iceberg helpers to do things like write data files, but this does demonstrate the basic elements of writing to an Iceberg table. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rdblue Thanks for the comments, appreciate it.
Both the batch and streaming test are running. Only one issue with filenames. By default Beam encodes the window interval into the filename, including the time. This conflicts with the Hadoop file reader, as this doesn't support semicolons :
.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 (TID 3, 192.168.1.113, executor driver): java.lang.IllegalArgumentException: java.net.URISyntaxException: Relative path in absolute URI: .1970-01-01T00-00-00.000+00:00-1970-01-01T00-01-00.000+00:00-00000-of-00001.avro.crc
at org.apache.hadoop.fs.Path.initialize(Path.java:205)
at org.apache.hadoop.fs.Path.<init>(Path.java:171)
at org.apache.hadoop.fs.Path.<init>(Path.java:93)
at org.apache.hadoop.fs.ChecksumFileSystem.getChecksumFile(ChecksumFileSystem.java:90)
at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.<init>(ChecksumFileSystem.java:145)
at org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:346)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769)
at org.apache.iceberg.hadoop.HadoopInputFile.newStream(HadoopInputFile.java:157)
at org.apache.iceberg.avro.AvroIterable.newFileReader(AvroIterable.java:101)
at org.apache.iceberg.avro.AvroIterable.iterator(AvroIterable.java:77)
at org.apache.iceberg.spark.source.RowDataReader.open(RowDataReader.java:95)
at org.apache.iceberg.spark.source.BaseDataReader.next(BaseDataReader.java:85)
at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:79)
at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:112)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:340)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.URISyntaxException: Relative path in absolute URI: .1970-01-01T00-00-00.000+00:00-1970-01-01T00-01-00.000+00:00-00000-of-00001.avro.crc
at java.net.URI.checkPath(URI.java:1823)
at java.net.URI.<init>(URI.java:745)
at org.apache.hadoop.fs.Path.initialize(Path.java:202)
... 32 more
Avro tools:
MacBook-Pro-van-Fokko:incubator-iceberg fokkodriesprong$ avro-tools tojson /tmp/fokko/1970-01-01T00-00-00.000+00:00-1970-01-01T00-01-00.000+00:00-00000-of-00001.avro
20/12/30 12:55:18 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Exception in thread "main" java.lang.IllegalArgumentException: java.net.URISyntaxException: Relative path in absolute URI: .1970-01-01T00-00-00.000+00:00-1970-01-01T00-01-00.000+00:00-00000-of-00001.avro.crc
at org.apache.hadoop.fs.Path.initialize(Path.java:263)
at org.apache.hadoop.fs.Path.<init>(Path.java:221)
at org.apache.hadoop.fs.Path.<init>(Path.java:129)
at org.apache.hadoop.fs.ChecksumFileSystem.getChecksumFile(ChecksumFileSystem.java:94)
at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.<init>(ChecksumFileSystem.java:149)
at org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:347)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:899)
at org.apache.avro.tool.Util.openFromFS(Util.java:88)
at org.apache.avro.tool.Util.fileOrStdin(Util.java:64)
at org.apache.avro.tool.DataFileReadTool.run(DataFileReadTool.java:87)
at org.apache.avro.tool.Main.run(Main.java:67)
at org.apache.avro.tool.Main.main(Main.java:56)
Caused by: java.net.URISyntaxException: Relative path in absolute URI: .1970-01-01T00-00-00.000+00:00-1970-01-01T00-01-00.000+00:00-00000-of-00001.avro.crc
at java.net.URI.checkPath(URI.java:1823)
at java.net.URI.<init>(URI.java:745)
at org.apache.hadoop.fs.Path.initialize(Path.java:260)
... 11 more
I'll add some asserts to the tests to make sure that everything goes well.
|
||
private static final PipelineOptions options = TestPipeline.testingPipelineOptions(); | ||
|
||
private static final String stringSchema = "{\n" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I've started by implementing Avro files, because this is our use case. But with the Beam API, Parquet and ORC are also supported.
For the actual interface, we only use the Iceberg schema. The IcebergIO
only accepts org.apache.iceberg.Schema
. For the test we convert the Avro schema to iceberg using AvroSchemaUtil.toIceberg(avroSchema);
.
translates into:
I need to clean the test up a bit.
|
||
// Append the new files | ||
final AppendFiles app = table.newAppend(); | ||
// We need to get the statistics, not easy to get them through Beam |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for pointing out. I'll track PR https://github.com/apache/iceberg/pull/1963/files. I think we have to make a decision here. I do like the fact that we fully re-use the writers from Beam, so we don't have to maintain that part.
final FileIO.Write<Void, GenericRecord> avroFileIO = FileIO.<GenericRecord>write() | ||
.via(AvroIO.sink(avroSchema)) | ||
.to("gs://.../../") | ||
.withSuffix(".avro"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the implication of doing this the other way around? In the current setup we have the Avro schema, and distill the Iceberg schema from it.
@Fokko on the interest of making Beam more object store friendly don't hesitate to bring ideas/discussion into the Beam community. I think we need to improve in this area. Does this have an associated JIRA in the Iceberg side? I want to link it to BEAM-10160 Also do you plan to add the read part too? |
@iemejia I would like to sort this out, before opening a new discussion :) But I think it is an import thing to address, as it slows down the throughput of the pipeline, and increases the costs (because of the increased number of operations). Iceberg doesn't rely on renames, as it is build with object stores in mind. I'm happy to add the read part later on, but if you feel like it, feel free to pick it up. I got quite some backlog. |
Please go ahead with the Read @Fokko I was just curious. I do have just too much in my backlog too to even risk to add more stuff. Let's sync for the Read part because I have a WIP branch for Delta and I suppose both can end up being similar (ok saying this with my poor knowledge of both Delta and Iceberg). We can also discuss once you have finished the write (and/or read) on how to integrate it with Beam's SQL, it is quite straight-forward. |
Thanks for the great work, @Fokko! We have a person who is also interested in a Beam sink for Iceberg. It would be great to collaborate. @RussellSpitzer and I can help with reviews. I feel like this is not far from being ready. A couple of questions from me (I have just a basic understanding of Beam):
The first two points are minor but the last one can be a big deal at scale. For example, if we don't distribute and order data on write, we will either produce a lot of small files or keep a lot of files open at the same time. |
} | ||
try { | ||
lastSeenWindow = window; | ||
writer.write(element); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if we get records for multiple partitions? I think PartitionedWriter
will fail with a runtime exception.
@madhubabubits, let us know if you have any other comments. I know you are giving this PR a try. |
// We use a combiner, to combine all the files to a single commit in | ||
// the Iceberg log | ||
final IcebergDataFileCommitter combiner = new IcebergDataFileCommitter(table, hiveMetastoreUrl, properties); | ||
final Combine.Globally<DataFile, Snapshot> combined = Combine.globally(combiner).withoutDefaults(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Fokko - firstly, great work with the implementation.
Do you see any use for returning the snapshot here ? Instead, can we implement something on the lines of FileIO sink ? that way user will have full control on windowing, dynamic tablename ( based on the avro schema of each GenericRecord in PCollection ) and support both bounded & unbounded collection.
Also, do you see value in providing windowing for both creating files & windowing for committing ( like create files based on 5min window, but final commit is based on 1hr window ). This might be a use-case for unbounded collection
@@ -28,7 +29,7 @@ | |||
/** | |||
* Identifies a table in iceberg catalog. | |||
*/ | |||
public class TableIdentifier { | |||
public class TableIdentifier implements Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For some of these classes where they are implementing Serializable, should we consider adding a serialVersionUID
?
I have many users (specifically for Flink) that encounter subtle issues when some Serializable class does not specify serialVersionUID
and the compiler changes the assigned seriallVersionUID. Very often, these users need to allow some or all of their jobs existing state to be discarded when this happens.
I'm not sure if TableIdentifier
gets placed into state anywhere ourselves, but for classes that are part of the public api is there an argument against including them?
Additionally, does anybody have any arguments for or against including serial version uid on the interfaces which are also extending serializable? For example, DataFile
stands out to me in particular as things that could potentially cause issue. I could imagine that something along the lines of Class<DataFile> clazz = DataFile.class
could potentially get serialized and cause issues when the interface changes.
@@ -26,6 +26,7 @@ | |||
import org.apache.iceberg.PartitionSpec; | |||
import org.apache.iceberg.encryption.EncryptedOutputFile; | |||
import org.apache.iceberg.encryption.EncryptionManager; | |||
import org.apache.iceberg.relocated.com.google.common.base.Preconditions; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: This seems to be unused.
@Fokko I see this has been open for a while. Wondering when do you plan to merge it? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leaving a small comment as I see there's been recent interest in this PR.
Given the age of this PR, it can't be merged without at least rebasing off of the master branch. And likely updating the Gradle build stuff to match the new setup as needed. So it probably just generally can't be merged that quickly - at least I personally wouldn't be able to return to a relatively older unfinished task and not expect to have some added time to refamiliarize myself with it and also find time to work on it given that I'd likely have other work priorities. So I wouldn't expect the same of @Fokko (amazing as he is!).
Outside of the recent interest shown by @yushuyao, I know there's at least a few heavy Iceberg users that would greatly benefit from an Iceberg Beam sink. I'm not a huge Beam user myself but I do understand it enough and have used it enough to be able to help where needed (or at least try to). And Beam has decent support in the APIs to be able to have Hadoop and things included (I'm not ready to really comment much on this PR yet).
My experience is mostly in the Beam Flink runner but my understanding is that in the open source that's more or less the standard runner. Someone please correct me if I'm way off base.
If we're going to prioritize having a Beam sink, we should probably put it on the roadmap and gave it a relative priority like we are doing now with other larger tasks (to find dedicated reviewers and authors and ensure the work has the support it needs to be successful).
To answer the question of when will this be merged, given the relative age of this PR, I would estimate probably not in the next few days 🙂. But if there's interest (which I believe there is), if @Fokko wants to lead this charge that would be great. Otherwise, I would be happy to help find people to work on it as it's prioritized on the roadmap (including possibly working on it myself, but likely there are people better suited than me). I do happen to know there are some folks interested in an OSS Beam Iceberg connector and I'm sure they would be happy to help review at the least.
Likely this should be discussed at the next community sync up or on the dev list. I'll put it on the agenda for the next sync-up.
TLDR for Fokko: Please let us know if you're interested in personally seeing this work through to the finish line, or if you'd like to find a co-author @Fokko. Of course, timing depends on your own commitments and the relative priority on the roadmap that the community decides this should have (if any but I think there's definitely interest and it recently just saw some interest).
TLDR for @yushuyao: Given the age of this PR, it likely won't be merged very soon. Given this is a relatively major addition (a new framework to support), this very likely needs to be added to the roadmap and assigned a relative priority. We do this now to ensure that larger projects have people who can review and so we know what's being worked on. However, you're not the only person with interest in an Iceberg Beam connector and it does seem that Fokko has done a large portion of the initial work. So stay tuned. Likely we (the community) will discuss this at the monthly community sync-up. If you don't already get the invite, you can get instructions on how to join the Google group to do so on the Apache Iceberg website under Community. It would be great if you could make it to the next one and voice your interest in this. I'll add it to the agenda but it would be great to hear from the people that are the most interested in it. We're very friendly and it's usually maybe 20-30 people that show up so it's low pressure 🙂. Hope to see you there!
package org.apache.iceberg.beam; | ||
|
||
import java.util.Map; | ||
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not: Now that we have our own shaded guava, can we use that instead or do you need the shaded Beam guava for some reason?
Commenting as their has been some recent interest in this PR and I happen to know there are a group of people that would greatly appreciate an open source Apache Beam connector.
Bit out of date :) |
To the extent we on the Beam team can help, please let me (us) know. We'd love to see this completed and merged. |
The first attempt of adding a sink to write data to a DFS using Beam.
Fixes #693