Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka Connect: Commit coordination #10351

Merged
merged 4 commits into from
Jul 11, 2024
Merged

Kafka Connect: Commit coordination #10351

merged 4 commits into from
Jul 11, 2024

Conversation

bryanck
Copy link
Contributor

@bryanck bryanck commented May 18, 2024

This PR is the next stage in submitting the Kafka Connect Iceberg sink connector, and is a follow up to #8701, #9466, and #9641. It includes the commit coordinator and related tests.

Still not included for the sink are the integration tests, distribution build, or docs, which will be added in follow up PRs. For reference, the current sink implementation can be found at https://github.com/tabular-io/iceberg-kafka-connect, and you can read some existing docs at https://github.com/tabular-io/iceberg-kafka-connect/tree/main/docs.

public void put(Collection<SinkRecord> sinkRecords) {
if (committer != null) {
committer.save(sinkRecords);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should throw an exception when committer is null else the producer will assume it has been put?

Copy link
Contributor Author

@bryanck bryanck Jun 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should never happen, so I changed this to a precondition check. I'll revisit this if needed when I add the integration tests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can tell you that it does happen and there's nothing we can do about it other than throw an error and have the task retry. Precondition check makes sense 👍

.filter(distinctByKey(dataFile -> dataFile.path().toString()))
.collect(Collectors.toList());

List<DeleteFile> deleteFiles =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are only supporting append, do we need this code?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar comments for below RowDelta block.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to keep the coordinator capable of handling delete files, so when we do add in delta support, it should be fairly straightforward, and won't require changes to the control message data model.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ACK

}

@Test
public void testCommitDelta() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe delete file tests can be added when we have delete writers feature added.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment above, I'd prefer to add this into the coordinator now, rather than later.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

@ajantha-bhat ajantha-bhat added this to the Iceberg 1.6.0 milestone May 21, 2024
Comment on lines 87 to 94
@Override
public void flush(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
committer.save(null);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we overriding the flush method when we don't have any flush-specific code path in Committer?

The put method will be called on a regular basis (potentially with an empty collection of sink records) so this feels redundant.

Also, I'm fairly certain that this flush method will never actually be called since we are overriding the preCommit method. The default preCommit implementation is the only place where flush is called by the Kafka Connect runtime. Unless you call flush yourself in the preCommit method you've defined (or anywhere else), flush will never actually be called.

Overall, I would recommend you just omit this flush method definition from this class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is here in case future committer implementations want to perform an action on flush. The current committer doesn't do anything when the record collection is null.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is here in case future committer implementations want to perform an action on flush.

What I'm trying to say is that flush will never be called since you're overriding the default preCommit implementation: https://github.com/apache/kafka/blob/c97d4ce026fdb75f80760bcce00b239db951a481/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java#L139

So no committer implementation will ever be able to "perform an action on flush" unless you call flush yourself somewhere.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I felt it is safer to implement this, given it is a public method, rather than rely on default behavior.

Comment on lines +57 to +82
@Override
public void close(Collection<TopicPartition> partitions) {
close();
}

private void close() {
if (committer != null) {
committer.stop();
committer = null;
}

if (catalog != null) {
if (catalog instanceof AutoCloseable) {
try {
((AutoCloseable) catalog).close();
} catch (Exception e) {
LOG.warn("An error occurred closing catalog instance, ignoring...", e);
}
}
catalog = null;
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can you move these close methods so they're after preCommit but before stop? Just so these methods are arranged in life-cycle order.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I follow your suggestion. When the KC close() or stop() lifecycle methods are called, we close the committer and the catalog.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think he means just rearrange the code, keep close method just before stop.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have this order to prevent a running committer from having reference to a closed catalog.


@Override
public void open(Collection<TopicPartition> partitions) {
catalog = CatalogUtils.loadCatalog(config);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

open method is called with only the newly assigned partitions. Is there a strong reason to pass just the newly assigned partitions to the Committer.start method when the Committer can just retrieve all partitions assigned to this task via context.assignment anyway?

I'm also worried we might have a bug here. The Committer implementation uses this partitions argument to check if partition 0 of the first topic is assigned to this task and if so, it spawns a Coordinator process. I'm worried that if there was a rebalance where the partition 0 of the first topic doesn't move between tasks, then it would not be included in the partitions argument for any Task and thus we could potentially end up with a Connector that doesn't have any Coordinator process running on any Task. Thoughts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not my understanding, open() will be called with the new assignment, i.e. all assigned topic partitions. See the javadoc: "The list of partitions that are now assigned to the task (may include partitions previously assigned to the task)"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did read the javadoc and interpreted it differently but I can totally see how you reached your interpretation as well.

The reason I'm inclined towards my interpretation is because I know SinkTask.open is ultimately called by the ConsumerRebalanceListener.onPartitionsAssigned method (here in the Kafka source code) and if you look at the javadocs for that method, it says The list of partitions that are now assigned to the consumer (previously owned partitions will NOT be included, i.e. this list will only include newly added partitions).

I could still be wrong so feel free to test things out or ask in the kafka community.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, it does seem like you're right. I changed this to get the consumer assignment instead.


private void routeRecordDynamically(SinkRecord record) {
String routeField = config.tablesRouteField();
Preconditions.checkNotNull(routeField, "Route field cannot be null with dynamic routing");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should really be checked at config parsing time or at SinkWriter construction time?
Instead of on every record.
To be clear; I'm not worried about this from a performance perspective (I'm confident the JVM will optimize this away) but it just seems awkward.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already check this in the config, so I'll just remove this.

new TopicPartition(record.topic(), record.kafkaPartition()),
new Offset(record.kafkaOffset() + 1, timestamp));

if (config.dynamicTablesEnabled()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, we really don't need to check this on every record?

Feels like we're missing an abstraction here, something like the concept of a Router which has a StaticRouter implementation and a DynamicRouter implementation and only one of those is constructed for the lifetime of a SinkWriter based on the config.dynamicTablesEnabled() setting.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. But IMO we should avoid too much internal refactoring which makes extracting code from the Tabular implementation harder.

We can always refactor once the complete code is in.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a good suggestion, and I looked into adding an abstraction here. Ultimately I felt it added complexity, so I agree we should leave it as-is for now.

Preconditions.checkNotNull(routeField, "Route field cannot be null with dynamic routing");

String routeValue = extractRouteValue(record.value(), routeField);
if (routeValue != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO we should throw an error instead of dropping the record.
Users can filter out messages easily using an SMT to get the same behaviour, if necessary.
In the future, I imagine we can allow users to supply custom exception-handler implementations which could also allow users to drop records on error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel skipping is better than getting into a state where the sink can no longer progress. When we add DLQ support then we could route to that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, we can add a TODO about DLQ here or an issue for tracking it.

Comment on lines +120 to +124
String routeValue = extractRouteValue(record.value(), routeField);
if (routeValue != null) {
String tableName = routeValue.toLowerCase();
writerForTable(tableName, record, true).write(record);
}
Copy link
Contributor

@fqaiser94 fqaiser94 May 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we support writing to multiple tables if this is a comma separate list of table names? Like we do in "static" mode.

Unless there are other concerns, I think this is an important building block for more advanced functionality e.g. we can remove route-value-regex-table-routing-mode as it could be implemented entirely as an SMT + dynamic mode.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that could be a useful feature, but I don't think it should hold up this PR. If you wanted to open an issue that would be great.

Comment on lines +101 to +112
String routeValue = extractRouteValue(record.value(), routeField);
if (routeValue != null) {
config
.tables()
.forEach(
tableName -> {
Pattern regex = config.tableConfig(tableName).routeRegex();
if (regex != null && regex.matcher(routeValue).matches()) {
writerForTable(tableName, record, false).write(record);
}
});
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: is this really "static" routing? I guess it's a static list of tables, but the table/s each message is written to is determined dynamically based on the route value ...

More importantly, is this an important enough use-case to support within the connector? I would strongly prefer if we didn't support this within the connector itself (users can easily implement this by writing an SMT + dynamic mode).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Static in the sense that the list of tables is fixed and doesn't change, rather than deriving it from the record. This feature is in use by some.

@bryanck bryanck force-pushed the kc-coord branch 3 times, most recently from 9242cd1 to 731fdb8 Compare June 11, 2024 14:55
Copy link
Member

@ajantha-bhat ajantha-bhat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

The required basic functionality of commit coordinator looks good to me.

@bryanck
Copy link
Contributor Author

bryanck commented Jun 16, 2024

@ajantha-bhat @fqaiser94 Thanks for the reviews! I think I have addressed comments in case you want to take another look. @Fokko @nastra If either of you have time to take a look, that would be very appreciated.

Copy link
Member

@jbonofre jbonofre left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks !

@ajantha-bhat
Copy link
Member

@rdblue, @danielcweeks, @Fokko, @nastra: Please take a look. It is needed for 1.6.0 milestone.

Copy link
Contributor

@fqaiser94 fqaiser94 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the delay, this has been on my radar for a couple of weeks but i don't have a lot of time for reviews currently.

Comment on lines 222 to 233
AppendFiles appendOp = table.newAppend();
if (branch != null) {
appendOp.toBranch(branch);
}
appendOp.set(snapshotOffsetsProp, offsetsJson);
appendOp.set(COMMIT_ID_SNAPSHOT_PROP, commitState.currentCommitId().toString());
if (vtts != null) {
appendOp.set(VTTS_SNAPSHOT_PROP, vtts.toString());
}
dataFiles.forEach(appendOp::appendFile);
appendOp.commit();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you forgot to port over the the code for handling multiple partition-specs
tabular-io/iceberg-kafka-connect@cdd54f3

Otherwise, connectors can become hard-stuck when users evolve the partition whilst the connector is running, which I don't think is an entirely unreasonable expectation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, we could try getting this PR into iceberg-core: #9860
That would remove the need to do anything special here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel this should be addressed in Iceberg core

Comment on lines +194 to +199
.filter(
envelope -> {
Long minOffset = committedOffsets.get(envelope.partition());
return minOffset == null || envelope.offset() >= minOffset;
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a note for posterity's sake (not requesting any changes):
While this is a good check, this does not guarantee truly exactly-once.
There is an unlikely edge-case where if we have two coordinator zombies running, even this check will be insufficient to prevent duplicates.
Unless we add support for conditional commits to iceberg, there's not much that can be done about it unfortunately (well, nothing simple anyway).

consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString());

I would avoid randomly generating client ID config since they're used for rate limiting in Kafka.
I would just remove this line entirely (if users wish to set a ClientID they can do so via the connector kafka configs).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed this to putIfAbsent so it can be overridden.

Producer<String, byte[]> createProducer(String transactionalId) {
Map<String, Object> producerProps = Maps.newHashMap(kafkaProps);
producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString());

Same here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to putIfAbsent here also.

// pass transient consumer group ID to which we never commit offsets
super(
"worker",
IcebergSinkConfig.DEFAULT_CONTROL_GROUP_PREFIX + UUID.randomUUID(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we put the connect consumer group id in here somewhere?
Otherwise, it's impossible for users to map a random UUIDs back to a specific connector if necessary.

events.add(readyEvent);

send(events, results.sourceOffsets());
context.requestCommit();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
context.requestCommit();

Why are we doing this at all?
The previous line already commits the offsets for the connector consumer group.
Plus, Kafka Connect runtime won't commit anything anyway since IcebergSinkTask.preCommit returns an empty map.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed this, it is no longer necessary.

this.snapshotOffsetsProp =
String.format(
"kafka.connect.offsets.%s.%s", config.controlTopic(), config.connectGroupId());
this.exec = ThreadPools.newWorkerPool("iceberg-committer", config.commitThreads());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we never shut this thread pool down. Is that an issue?

Copy link
Contributor Author

@bryanck bryanck Jul 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I added the explicit shutdown call to ensure tasks are terminated before starting a new coordinator.

}

if (System.currentTimeMillis() - startTime > config.commitTimeoutMs()) {
LOG.info("Commit timeout reached");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please interpolate the commit id in here as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

recordList.forEach(producer::send);
if (!sourceOffsets.isEmpty()) {
producer.sendOffsetsToTransaction(
offsetsToCommit, KafkaUtils.consumerGroupMetadata(context, connectGroupId));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
offsetsToCommit, KafkaUtils.consumerGroupMetadata(context, connectGroupId));
offsetsToCommit, this.consumerGroupMetadata);

We only need to extract consumer-group-metadata once, we should just do that in the constructor and make it a class field.

The consumer-group-metadata contains the consumer-generation which is used by Kafka brokers for zombie fencing. I'm probably over-thinking this massively but I'm worried that the internal KafkaConsumer might rebalance, get the new generation, and then a zombie task could end up successfully committing a transaction resulting in duplicates.

Regardless, I would move this to a class field as that is definitely safe.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to update the metadata to get the latest generation ID.

DynFields.builder().hiddenImpl(CONTEXT_CLASS_NAME, "consumer").build(context).get())
.groupMetadata();
}
return new ConsumerGroupMetadata(connectGroupId);
Copy link
Contributor

@fqaiser94 fqaiser94 Jul 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure we want to fall back to the non-zombie-fencing consumer-group-metadata?
IMO we should just hard fail, this shouldn't happen and if it does, we're risking duplicates.
If not, we should at least throw a warning log if we hit this case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the fallback, so it hard fails now.

@bryanck bryanck force-pushed the kc-coord branch 3 times, most recently from a025e04 to f3ba4c0 Compare July 8, 2024 15:34
@Fokko Fokko requested a review from fqaiser94 July 10, 2024 10:41
Copy link
Contributor

@danielcweeks danielcweeks left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some minor nits and questions, but overall LGTM.

I think there are still a couple open questions from others (particularly the one on IcebergSinkTask.open(), which we probably want to address.

I did see that we renamed a few classes and reduced the visibility in others. I'm ok with that for now since this isn't really deployable, but we should probably avoid that going forward.

@bryanck
Copy link
Contributor Author

bryanck commented Jul 11, 2024

I think I've addressed the latest feedback @fqaiser94 @danielcweeks LMK if there is anything else.

@danielcweeks
Copy link
Contributor

Thanks @bryanck and @fqaiser94. It's really great to get this one in.

@danielcweeks danielcweeks merged commit d8e7642 into apache:main Jul 11, 2024
49 checks passed
jasonf20 pushed a commit to jasonf20/iceberg that referenced this pull request Aug 4, 2024
* Kafka Connect: Commit coordination

* PR feedback

* Get partition assignment from consumer

* test fix
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants