Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First version of the Iceberg sink for Apache Beam #1972

Closed
wants to merge 46 commits into from

Conversation

Fokko
Copy link
Contributor

@Fokko Fokko commented Dec 21, 2020

The first attempt of adding a sink to write data to a DFS using Beam.

Fixes #693

@rdblue
Copy link
Contributor

rdblue commented Dec 22, 2020

@Fokko, thanks for working on this! This looks like it may not be complete. Do you think it should be a draft?

@Fokko Fokko marked this pull request as draft December 23, 2020 22:01
@Fokko
Copy link
Contributor Author

Fokko commented Dec 23, 2020

@rdblue @HeartSaVioR Thanks for the review. This is indeed still a draft. I'll add some more tests soon 👍

@github-actions github-actions bot added the docs label Dec 23, 2020
@Fokko
Copy link
Contributor Author

Fokko commented Dec 26, 2020

Able to read the results with Spark:

MacBook-Pro-van-Fokko:incubator-iceberg fokkodriesprong$ spark-sql --packages org.apache.iceberg:iceberg-spark3:0.10.0 --conf "spark.sql.catalog.hive_prod=org.apache.iceberg.spark.SparkCatalog" --conf "spark.sql.catalog.hive_prod.type=hive" --conf "spark.sql.catalog.hive_prod.uri=thrift://localhost:9083"
spark-sql> SELECT * FROM hive_prod.default.test;
20/12/26 22:07:46 INFO BaseMetastoreTableOperations: Refreshing table metadata from new version: file:/tmp/test/metadata/00004-ed964aeb-8924-492b-8225-eb2102c22f03.metadata.json
20/12/26 22:07:46 INFO BaseMetastoreCatalog: Table loaded by catalog: hive_prod.default.test
20/12/26 22:07:47 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 315.8 KiB, free 366.0 MiB)
20/12/26 22:07:48 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 27.4 KiB, free 366.0 MiB)
20/12/26 22:07:48 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.1.113:56438 (size: 27.4 KiB, free: 366.3 MiB)
20/12/26 22:07:48 INFO SparkContext: Created broadcast 0 from broadcast at SparkScanBuilder.java:138
20/12/26 22:07:48 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 40.0 B, free 366.0 MiB)
20/12/26 22:07:48 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 116.0 B, free 366.0 MiB)
20/12/26 22:07:48 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.1.113:56438 (size: 116.0 B, free: 366.3 MiB)
20/12/26 22:07:48 INFO SparkContext: Created broadcast 1 from broadcast at SparkScanBuilder.java:139
20/12/26 22:07:48 INFO V2ScanRelationPushDown: 
Pushing operators to hive_prod.default.test
Pushed Filters: 
Post-Scan Filters: 
Output: word#0
         
20/12/26 22:07:48 INFO TableScan: Scanning table hive_prod.default.test snapshot 4784653135298349855 created at 2020-12-26 22:07:38.530 with filter true
20/12/26 22:07:49 INFO CodeGenerator: Code generated in 226.493784 ms
20/12/26 22:07:49 INFO SparkContext: Starting job: main at NativeMethodAccessorImpl.java:0
20/12/26 22:07:49 INFO DAGScheduler: Got job 0 (main at NativeMethodAccessorImpl.java:0) with 1 output partitions
20/12/26 22:07:49 INFO DAGScheduler: Final stage: ResultStage 0 (main at NativeMethodAccessorImpl.java:0)
20/12/26 22:07:49 INFO DAGScheduler: Parents of final stage: List()
20/12/26 22:07:49 INFO DAGScheduler: Missing parents: List()
20/12/26 22:07:49 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[3] at main at NativeMethodAccessorImpl.java:0), which has no missing parents
20/12/26 22:07:49 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 7.1 KiB, free 366.0 MiB)
20/12/26 22:07:49 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 3.5 KiB, free 366.0 MiB)
20/12/26 22:07:49 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 192.168.1.113:56438 (size: 3.5 KiB, free: 366.3 MiB)
20/12/26 22:07:49 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1223
20/12/26 22:07:49 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD[3] at main at NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0))
20/12/26 22:07:49 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
20/12/26 22:07:49 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 192.168.1.113, executor driver, partition 0, PROCESS_LOCAL, 11598 bytes)
20/12/26 22:07:49 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
...
20/12/26 22:07:50 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1580 bytes result sent to driver
20/12/26 22:07:50 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 1132 ms on 192.168.1.113 (executor driver) (1/1)
20/12/26 22:07:50 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
20/12/26 22:07:50 INFO DAGScheduler: ResultStage 0 (main at NativeMethodAccessorImpl.java:0) finished in 1,235 s
20/12/26 22:07:50 INFO DAGScheduler: Job 0 is finished. Cancelling potential speculative or zombie tasks for this job
20/12/26 22:07:50 INFO TaskSchedulerImpl: Killing all running tasks in stage 0: Stage finished
20/12/26 22:07:50 INFO DAGScheduler: Job 0 finished: main at NativeMethodAccessorImpl.java:0, took 1,274538 s
hands
yes
my
go
are
typing
words
Time taken: 4.501 seconds, Fetched 7 row(s)


private static final PipelineOptions options = TestPipeline.testingPipelineOptions();

private static final String stringSchema = "{\n" +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why this would result in readable data files. Iceberg requires either a schema in the Avro file with field IDs or a name mapping in the Iceberg table to get the field IDs from Avro field names. How did you read the table produced by this test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I've started by implementing Avro files, because this is our use case. But with the Beam API, Parquet and ORC are also supported.

For the actual interface, we only use the Iceberg schema. The IcebergIO only accepts org.apache.iceberg.Schema. For the test we convert the Avro schema to iceberg using AvroSchemaUtil.toIceberg(avroSchema);.

image

translates into:

image

I need to clean the test up a bit.

Copy link
Contributor

@rdblue rdblue Dec 30, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, so the initial conversion to Iceberg assigns some IDs, then you pass that Iceberg schema into the source. The IDs are reassigned to ensure consistency when a table is created, and you're probably converting that table schema back to Iceberg to configure the Avro writer? Or maybe you're just getting lucky and the IDs are assigned the same way in the tests.

final FileIO.Write<Void, GenericRecord> avroFileIO = FileIO.<GenericRecord>write()
.via(AvroIO.sink(avroSchema))
.to("gs://.../../")
.withSuffix(".avro");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would work if the Avro schema passed to the sink were created from the Iceberg table schema because that sets the field IDs correctly. But I think that's not very obvious to users, so I think it would be better to eventually have an Iceberg sink that uses the Iceberg schema directly like Spark and Flink do.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the implication of doing this the other way around? In the current setup we have the Avro schema, and distill the Iceberg schema from it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem with converting Avro to Iceberg is that there is no guarantee that the field IDs are assigned consistently (see the Field IDs section for Avro). If the ID annotations aren't present, they are reassigned but that depends on the structure of the schema.

To safely write Avro for an Iceberg table, you should always convert from the table schema to an Avro schema so that the IDs are correct and match the table.


// Append the new files
final AppendFiles app = table.newAppend();
// We need to get the statistics, not easy to get them through Beam
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a PR to aggregate the stats while writing an Avro file. You would get them for free using an Iceberg-based file writer instead of the generic Avro one. You'd also get correct record counts and file sizes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing out. I'll track PR https://github.com/apache/iceberg/pull/1963/files. I think we have to make a decision here. I do like the fact that we fully re-use the writers from Beam, so we don't have to maintain that part.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Iceberg provides common write classes in the core module that we extend for Flink and Spark object models, so if you do go with Iceberg writers, it would probably not be too much code. For example, take a look at the SparkPartitionedWriter.

Using Iceberg writers would help with stats as well as writing delete files if you later want to support CDC or other row-level use cases.

But, one challenge is integrating those writers with an object model. Spark and Flink have row classes that are directly supported in the integration. It looks like you're using Avro generics for Avro, and using an object model specific to a data format is going to be a little difficult because Iceberg needs additional metadata, like field IDs. It sounds difficult to make sure that each format writer is producing the right information for Iceberg tables.

Copy link
Contributor Author

@Fokko Fokko Jan 3, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avro is a first-class citizen in Beam, this is more or less the equivalent of the InternalRow of Spark. We could replace the AvroIO with an IcebergIO, but this feels like reinventing the wheel to me. I've noticed that there is quite some effort in the writers for Beam. For example, tuning the number and size of the output buffers kind of stuff. I would like to reuse this logic.

Apart from the writers themselves. We still need to current logic of doing the commits. So, I think we can keep this open for now.

@rdblue
Copy link
Contributor

rdblue commented Dec 28, 2020

Looks like a good start, @Fokko! I'd definitely recommend relying a bit more on Iceberg helpers to do things like write data files, but this does demonstrate the basic elements of writing to an Iceberg table.

Copy link
Contributor Author

@Fokko Fokko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdblue Thanks for the comments, appreciate it.

Both the batch and streaming test are running. Only one issue with filenames. By default Beam encodes the window interval into the filename, including the time. This conflicts with the Hadoop file reader, as this doesn't support semicolons :.

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 (TID 3, 192.168.1.113, executor driver): java.lang.IllegalArgumentException: java.net.URISyntaxException: Relative path in absolute URI: .1970-01-01T00-00-00.000+00:00-1970-01-01T00-01-00.000+00:00-00000-of-00001.avro.crc
	at org.apache.hadoop.fs.Path.initialize(Path.java:205)
	at org.apache.hadoop.fs.Path.<init>(Path.java:171)
	at org.apache.hadoop.fs.Path.<init>(Path.java:93)
	at org.apache.hadoop.fs.ChecksumFileSystem.getChecksumFile(ChecksumFileSystem.java:90)
	at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.<init>(ChecksumFileSystem.java:145)
	at org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:346)
	at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769)
	at org.apache.iceberg.hadoop.HadoopInputFile.newStream(HadoopInputFile.java:157)
	at org.apache.iceberg.avro.AvroIterable.newFileReader(AvroIterable.java:101)
	at org.apache.iceberg.avro.AvroIterable.iterator(AvroIterable.java:77)
	at org.apache.iceberg.spark.source.RowDataReader.open(RowDataReader.java:95)
	at org.apache.iceberg.spark.source.BaseDataReader.next(BaseDataReader.java:85)
	at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:79)
	at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:112)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:340)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.URISyntaxException: Relative path in absolute URI: .1970-01-01T00-00-00.000+00:00-1970-01-01T00-01-00.000+00:00-00000-of-00001.avro.crc
	at java.net.URI.checkPath(URI.java:1823)
	at java.net.URI.<init>(URI.java:745)
	at org.apache.hadoop.fs.Path.initialize(Path.java:202)
	... 32 more

Avro tools:

MacBook-Pro-van-Fokko:incubator-iceberg fokkodriesprong$ avro-tools tojson /tmp/fokko/1970-01-01T00-00-00.000+00:00-1970-01-01T00-01-00.000+00:00-00000-of-00001.avro
20/12/30 12:55:18 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Exception in thread "main" java.lang.IllegalArgumentException: java.net.URISyntaxException: Relative path in absolute URI: .1970-01-01T00-00-00.000+00:00-1970-01-01T00-01-00.000+00:00-00000-of-00001.avro.crc
	at org.apache.hadoop.fs.Path.initialize(Path.java:263)
	at org.apache.hadoop.fs.Path.<init>(Path.java:221)
	at org.apache.hadoop.fs.Path.<init>(Path.java:129)
	at org.apache.hadoop.fs.ChecksumFileSystem.getChecksumFile(ChecksumFileSystem.java:94)
	at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.<init>(ChecksumFileSystem.java:149)
	at org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:347)
	at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:899)
	at org.apache.avro.tool.Util.openFromFS(Util.java:88)
	at org.apache.avro.tool.Util.fileOrStdin(Util.java:64)
	at org.apache.avro.tool.DataFileReadTool.run(DataFileReadTool.java:87)
	at org.apache.avro.tool.Main.run(Main.java:67)
	at org.apache.avro.tool.Main.main(Main.java:56)
Caused by: java.net.URISyntaxException: Relative path in absolute URI: .1970-01-01T00-00-00.000+00:00-1970-01-01T00-01-00.000+00:00-00000-of-00001.avro.crc
	at java.net.URI.checkPath(URI.java:1823)
	at java.net.URI.<init>(URI.java:745)
	at org.apache.hadoop.fs.Path.initialize(Path.java:260)
	... 11 more

I'll add some asserts to the tests to make sure that everything goes well.


private static final PipelineOptions options = TestPipeline.testingPipelineOptions();

private static final String stringSchema = "{\n" +
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I've started by implementing Avro files, because this is our use case. But with the Beam API, Parquet and ORC are also supported.

For the actual interface, we only use the Iceberg schema. The IcebergIO only accepts org.apache.iceberg.Schema. For the test we convert the Avro schema to iceberg using AvroSchemaUtil.toIceberg(avroSchema);.

image

translates into:

image

I need to clean the test up a bit.


// Append the new files
final AppendFiles app = table.newAppend();
// We need to get the statistics, not easy to get them through Beam
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing out. I'll track PR https://github.com/apache/iceberg/pull/1963/files. I think we have to make a decision here. I do like the fact that we fully re-use the writers from Beam, so we don't have to maintain that part.

final FileIO.Write<Void, GenericRecord> avroFileIO = FileIO.<GenericRecord>write()
.via(AvroIO.sink(avroSchema))
.to("gs://.../../")
.withSuffix(".avro");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the implication of doing this the other way around? In the current setup we have the Avro schema, and distill the Iceberg schema from it.

@iemejia
Copy link
Member

iemejia commented Jan 27, 2021

@Fokko on the interest of making Beam more object store friendly don't hesitate to bring ideas/discussion into the Beam community. I think we need to improve in this area. Does this have an associated JIRA in the Iceberg side? I want to link it to BEAM-10160

Also do you plan to add the read part too?

@Fokko
Copy link
Contributor Author

Fokko commented Jan 27, 2021

@iemejia I would like to sort this out, before opening a new discussion :) But I think it is an import thing to address, as it slows down the throughput of the pipeline, and increases the costs (because of the increased number of operations). Iceberg doesn't rely on renames, as it is build with object stores in mind.

I'm happy to add the read part later on, but if you feel like it, feel free to pick it up. I got quite some backlog.

@iemejia
Copy link
Member

iemejia commented Jan 27, 2021

Please go ahead with the Read @Fokko I was just curious. I do have just too much in my backlog too to even risk to add more stuff. Let's sync for the Read part because I have a WIP branch for Delta and I suppose both can end up being similar (ok saying this with my poor knowledge of both Delta and Iceberg).

We can also discuss once you have finished the write (and/or read) on how to integrate it with Beam's SQL, it is quite straight-forward.

@aokolnychyi
Copy link
Contributor

aokolnychyi commented Mar 10, 2021

Thanks for the great work, @Fokko!

We have a person who is also interested in a Beam sink for Iceberg. It would be great to collaborate. @RussellSpitzer and I can help with reviews. I feel like this is not far from being ready.

A couple of questions from me (I have just a basic understanding of Beam):

  • Is it correct that this PR supports only fixed windows in Beam?
  • I head it is pretty common to write to multiple tables or determine which table to write to dynamically in Beam. Is it worth supporting such cases in the Iceberg sink? Does not have to be done in the first version, apparently.
  • Does Beam have any way to control the distribution and ordering of data that is passed for write? For example, we have added a way to ask Spark to distribute and order records according to the partition spec and sort order. A similar effort is in progress for Flink. The main reason is to reduce the number of produced files and order records accordingly to benefit from min/max skipping and better compression. Can people shuffle and sort data manually? Can we do that automatically?

The first two points are minor but the last one can be a big deal at scale. For example, if we don't distribute and order data on write, we will either produce a lot of small files or keep a lot of files open at the same time.

}
try {
lastSeenWindow = window;
writer.write(element);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we get records for multiple partitions? I think PartitionedWriter will fail with a runtime exception.

@aokolnychyi
Copy link
Contributor

aokolnychyi commented Mar 10, 2021

@madhubabubits, let us know if you have any other comments. I know you are giving this PR a try.

// We use a combiner, to combine all the files to a single commit in
// the Iceberg log
final IcebergDataFileCommitter combiner = new IcebergDataFileCommitter(table, hiveMetastoreUrl, properties);
final Combine.Globally<DataFile, Snapshot> combined = Combine.globally(combiner).withoutDefaults();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Fokko - firstly, great work with the implementation.

Do you see any use for returning the snapshot here ? Instead, can we implement something on the lines of FileIO sink ? that way user will have full control on windowing, dynamic tablename ( based on the avro schema of each GenericRecord in PCollection ) and support both bounded & unbounded collection.

Also, do you see value in providing windowing for both creating files & windowing for committing ( like create files based on 5min window, but final commit is based on 1hr window ). This might be a use-case for unbounded collection

@@ -28,7 +29,7 @@
/**
* Identifies a table in iceberg catalog.
*/
public class TableIdentifier {
public class TableIdentifier implements Serializable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For some of these classes where they are implementing Serializable, should we consider adding a serialVersionUID?

I have many users (specifically for Flink) that encounter subtle issues when some Serializable class does not specify serialVersionUID and the compiler changes the assigned seriallVersionUID. Very often, these users need to allow some or all of their jobs existing state to be discarded when this happens.

I'm not sure if TableIdentifier gets placed into state anywhere ourselves, but for classes that are part of the public api is there an argument against including them?

Additionally, does anybody have any arguments for or against including serial version uid on the interfaces which are also extending serializable? For example, DataFile stands out to me in particular as things that could potentially cause issue. I could imagine that something along the lines of Class<DataFile> clazz = DataFile.class could potentially get serialized and cause issues when the interface changes.

@@ -26,6 +26,7 @@
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: This seems to be unused.

@yushuyao
Copy link

@Fokko I see this has been open for a while. Wondering when do you plan to merge it?

Copy link
Contributor

@kbendick kbendick left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaving a small comment as I see there's been recent interest in this PR.

Given the age of this PR, it can't be merged without at least rebasing off of the master branch. And likely updating the Gradle build stuff to match the new setup as needed. So it probably just generally can't be merged that quickly - at least I personally wouldn't be able to return to a relatively older unfinished task and not expect to have some added time to refamiliarize myself with it and also find time to work on it given that I'd likely have other work priorities. So I wouldn't expect the same of @Fokko (amazing as he is!).

Outside of the recent interest shown by @yushuyao, I know there's at least a few heavy Iceberg users that would greatly benefit from an Iceberg Beam sink. I'm not a huge Beam user myself but I do understand it enough and have used it enough to be able to help where needed (or at least try to). And Beam has decent support in the APIs to be able to have Hadoop and things included (I'm not ready to really comment much on this PR yet).

My experience is mostly in the Beam Flink runner but my understanding is that in the open source that's more or less the standard runner. Someone please correct me if I'm way off base.

If we're going to prioritize having a Beam sink, we should probably put it on the roadmap and gave it a relative priority like we are doing now with other larger tasks (to find dedicated reviewers and authors and ensure the work has the support it needs to be successful).

To answer the question of when will this be merged, given the relative age of this PR, I would estimate probably not in the next few days 🙂. But if there's interest (which I believe there is), if @Fokko wants to lead this charge that would be great. Otherwise, I would be happy to help find people to work on it as it's prioritized on the roadmap (including possibly working on it myself, but likely there are people better suited than me). I do happen to know there are some folks interested in an OSS Beam Iceberg connector and I'm sure they would be happy to help review at the least.

Likely this should be discussed at the next community sync up or on the dev list. I'll put it on the agenda for the next sync-up.

TLDR for Fokko: Please let us know if you're interested in personally seeing this work through to the finish line, or if you'd like to find a co-author @Fokko. Of course, timing depends on your own commitments and the relative priority on the roadmap that the community decides this should have (if any but I think there's definitely interest and it recently just saw some interest).

TLDR for @yushuyao: Given the age of this PR, it likely won't be merged very soon. Given this is a relatively major addition (a new framework to support), this very likely needs to be added to the roadmap and assigned a relative priority. We do this now to ensure that larger projects have people who can review and so we know what's being worked on. However, you're not the only person with interest in an Iceberg Beam connector and it does seem that Fokko has done a large portion of the initial work. So stay tuned. Likely we (the community) will discuss this at the monthly community sync-up. If you don't already get the invite, you can get instructions on how to join the Google group to do so on the Apache Iceberg website under Community. It would be great if you could make it to the next one and voice your interest in this. I'll add it to the agenda but it would be great to hear from the people that are the most interested in it. We're very friendly and it's usually maybe 20-30 people that show up so it's low pressure 🙂. Hope to see you there!

package org.apache.iceberg.beam;

import java.util.Map;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not: Now that we have our own shaded guava, can we use that instead or do you need the shaded Beam guava for some reason?

Commenting as their has been some recent interest in this PR and I happen to know there are a group of people that would greatly appreciate an open source Apache Beam connector.

@Fokko Fokko closed this May 13, 2022
@Fokko
Copy link
Contributor Author

Fokko commented May 13, 2022

Bit out of date :)

@sachinag
Copy link

To the extent we on the Beam team can help, please let me (us) know. We'd love to see this completed and merged.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support for Apache Beam I/O
9 participants