Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka Connect: Initial project setup and event data structures #8701

Merged
merged 9 commits into from
Jan 10, 2024

Conversation

bryanck
Copy link
Contributor

@bryanck bryanck commented Oct 2, 2023

We (Tabular) would like to submit our Iceberg Kafka Connect sink connector to the Iceberg project. Kafka Connect is a popular, efficient, and easy to use framework for reading from and writing to Kafka. This sink gives Iceberg users another option for landing data from Kafka into Iceberg tables. Having the backing of the Iceberg community will help it evolve and improve over time.

The sink codebase is on the larger side, so the thought was to break the submission into different PRs to make it easier to review. This initial PR includes the starting build setup and the project for the Avro event data structures.

The original repo can be found at https://github.com/tabular-io/iceberg-kafka-connect. Some design docs can be found in the docs directory, and that includes an explanation of what the events are used for, and why Avro was chosen for serialization.

The events were put in a separate project so the library can be used independently to read messages from the control topic outside of the connector, for debugging or notification purposes.

@ajantha-bhat
Copy link
Member

Very happy to see this contribution 👍
I have recently tested this project with Nessie catalog and liked it.

I just skimmed the files. Will definitely go through design docs and review again.

@@ -200,3 +202,6 @@ if (JavaVersion.current() == JavaVersion.VERSION_1_8) {
}
}

include ":iceberg-kafka-connect:kafka-connect-events"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark/Flink/Hive require that we support multiple versions. Is the KC api stable enough that we don't have to worry about supporting different major/minor versions? I see the 3.x line goes back to 2021, but there are six minor releases. Just wondering if we should structure the project with versioning in mind from the start.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the most part, the API has been very stable so I was thinking of not doing this to start, it might be overkill.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need so many modules? If we're just creating a single Jar in the end I wonder if it is helpful to break it up

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main reason I have events as separate module is so that it can be used independently from the sink to read and deserialize events from the control topic. This can be used, for example, to trigger workflows from the control topic rather than having to poll the table metadata. With that, the plan was to have 2 modules (events, core) plus a runtime module.

private Long vtts;
private final Schema avroSchema;

private static final Schema AVRO_SCHEMA =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We typically prefer to define Iceberg schemas with field IDs and convert them to Avro schemas. I think those schemas are easier to read and it ensures that we assign field IDs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for using the Iceberg schema definition with field ids and converting them to Avro.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do agree it is more readable. However, this will require changes to the Avro encoder. For example, both GenericDataFile and GenericDelete file use the same Iceberg schema. The current schema converter only allows a one to one mapping of struct type to class name, so you can't include both data files and delete files in the same event.

Also, it becomes cumbersome to redefine the same struct to class mapping for every schema, e.g. the TopicPartitionOffset mapping must be defined for both payloads that use it as well as for the event container, when converting the schema to Avro.

Another downside is the extra overhead of doing the conversion. Schemas are sometimes constructed for each event, as the schema changes depending on the table and partitioning. Though that is more minor and could be solved w/ caching.

I did updated the field IDs so they aren't all -1.

import org.apache.avro.specific.SpecificData.SchemaConstructable;
import org.apache.iceberg.avro.AvroSchemaUtil;

public interface Element extends IndexedRecord, SchemaConstructable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than using SchemaConstructable, Iceberg just checks for a Schema constructor first.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I removed SchemaConstructable.

@dungdm93
Copy link
Contributor

dungdm93 commented Oct 6, 2023

I'm developing alluvial project which used to stream change logs from Kafka in debezium format to Iceberg table.
Can't wait until this PR get merged and I'd like to contribute to it too.

public class EventTestUtil {
public static DataFile createDataFile() {
Ctor<DataFile> ctor =
DynConstructors.builder(DataFile.class)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't this use DataFiles to build a data file instead of using reflection?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is done

return commitId;
}

public Long vtts() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add a comment explaining this field? I can understand it is some time-stamp. But not clearly with the abbreviation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can also add events.md doc with this PR now.
https://github.com/tabular-io/iceberg-kafka-connect/blob/main/docs/events.md

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can add this info as javadoc

VTTS (valid-through timestamp) property indicating through what timestamp records have been fully processed, i.e. all records processed from then on will have a timestamp greater than the VTTS. This is calculated by taking the maximum timestamp of records processed from each topic partition, and taking the minimum of these. If any partitions were not processed as part of the commit then the VTTS is not set

https://github.com/tabular-io/iceberg-kafka-connect/blob/main/docs/design.md#snapshot-properties

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a javadoc for this

import org.apache.iceberg.types.Types.StructType;
import org.junit.jupiter.api.Test;

public class EventSerializationTest {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Majority of the testcase in Iceberg starts with Test prefix. So, maybe we can rename it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is pretty mixed actually and I much prefer the specificity of putting the class name first.

ByteBuffer.wrap(new byte[] {0}));
}

private EventTestUtil() {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Can we move the constructor up?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


/**
* A control event payload for events sent by a coordinator that indicates it has completed a commit
* cycle. Events with this payload are not consumed by the sink, they * are informational and can be
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* cycle. Events with this payload are not consumed by the sink, they * are informational and can be
* cycle. Events with this payload are not consumed by the sink, they are informational and can be

Copy link
Contributor Author

@bryanck bryanck Oct 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching this, I fixed this

public class CommitCompletePayload implements Payload {

private UUID commitId;
private Long vtts;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the name is somewhat cryptic, would it make sense to rename this to validThroughTimestamp?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I updated this to validThroughTs

@danielcweeks danielcweeks self-requested a review October 27, 2023 23:00
@danielcweeks
Copy link
Contributor

I'm a +1 on moving forward with this. I think there might still be an open question about Iceberg/Avro Schema definitions, but I'm fine with either resolution.

@bryanck bryanck force-pushed the kafka-connect-pt1 branch 2 times, most recently from 1a3fde1 to 9fed273 Compare November 12, 2023 17:06
Copy link
Member

@ajantha-bhat ajantha-bhat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

Can we take this forward?

super((id, struct) -> names.get(struct));
}

Map<Type, Schema> getConversionMap() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Map<Type, Schema> getConversionMap() {
Map<Type, Schema> conversionMap() {

@nastra nastra added this to the Iceberg 1.5.0 milestone Jan 5, 2024
@Fokko
Copy link
Contributor

Fokko commented Jan 10, 2024

Since there are no further comments, I'll go ahead and merge this. I would like to express my gratitude to @bryanck for working on this since this will help so many people in the Kafka community to get their data in Iceberg in a fast and reliable way! 🙏 Thanks @ajantha-bhat, @danielcweeks, @rdblue, @jbonofre, @ajantha-bhat and @nastra for the review 🚀

@Fokko Fokko merged commit d1a3c10 into apache:main Jan 10, 2024
41 checks passed
@jbonofre
Copy link
Member

@Fokko awesome, thanks !

@bryanck
Copy link
Contributor Author

bryanck commented Jan 10, 2024

Awesome! Thanks all for the feedback and guidance. I'll follow up with PRs for the actual sink portion.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants