-
-
Notifications
You must be signed in to change notification settings - Fork 527
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add custom offset committing interface for consumer/runner #122
Comments
Hi @bwdeng, I think you can solve your problem with the current feature set. If you switch to {
eachBatchAutoResolve: false,
autoCommitThreshold: 1,
eachBatch: async ({ batch, resolveOffset, heartbeat, commitOffsetsIfNecessary }) => {
for (let message of batch.messages) {
if (aggregate(message)) {
lastOffset = message.offset
await heartbeat()
} else {
await resolveOffset(lastOffset)
await commitOffsetsIfNecessary()
}
}
}
} This is pseudo code and I'm assuming a lot, but let's go through it. You get the whole batch and can detect if you have the full or the partial set. You can force the commit with |
@tulios Thanks for the reply. I like the solution you proposed, but I think it doesn't work well if the events of the same kind are fetched across multiple batches. For example,
The last offset resolved in Batch 1 is W3. When we fetch the next batch, the new batch actually starts from X0, not X3, so we received X0, X1, and X2 twice here. Those duplicate events can be filtered out with a little bit of performance cost, but the problem still exists if there's a full batch of "intermediate" events, for instance,
In such case, we didn't resolve any offset in batch 2, so it will keep fetching the same batch over and over again, not making any actual progress. I think the fundamental problem is that there's a mismatch between resolved offsets and "aggregation" offsets: the resolved offset is kind of like a reading progress, but the "aggregation" offset is more like a processing progress. For some tasks, we need to look ahead to process. Having an interface for custom offset committing would enable us to decouple the updates of different progress. |
I see, but are you planning to change the offset outside of the consumer loop? You can call |
Exactly, I was hoping that there's a way to commit the offset of a message outside of its original batch callback. At first, I tried updating the offset through the admin interface. But got Currently, I have a flaky patch which exposes the underlying ConsumerGroup used by the Runner, and use it to update offsets: public async commit(topic: string, partition: number, offset: string): Promise<boolean> {
const { groupId, generationId, memberId, coordinator } = this.consumerGroup; // the underlying consumer group
if (coordinator && coordinator.isConnected()) {
await coordinator.offsetCommit({
groupId,
memberId,
groupGenerationId: generationId,
topics: [
{
topic,
partitions: [{ partition, offset }],
},
],
});
return true;
}
return false;
} But I don't think this is the proper way to do it. Also, I had to patch the So do you suggest that we should use something like: async function commit(offset) {
const currentOffset = getResolvedOffset() // backup current offset
await resolveOffset(offset) // resolve offset for committing only
await commitOffsetsIfNecessary() // commit offset
await resolveOffset(currentOffset) // restore the original offset
} Thanks! |
You have an interesting use case. The admin client can change offsets outside of the consumer group, but the group must be empty, which doesn't help you much. Can you try the approach we've discussed first? Just remember that I think you can go a long way with |
@tulios I implemented a custom offset commit function based on the approach we discussed: const resolvedOffsets: { [topic: string]: string[] } = {};
const commitFns: { [topic: string]: Array<(offset: string) => Promise<void>> } = {};
async function commit(topic: string, partition: number, offset: string): Promise<boolean> {
const commitFn = commitFns[topic][partition];
if (commitFn) {
await commitFn(offset);
return true;
}
return false;
}
function updateCommitFn(
batch: Batch,
resolveOffset: (offset: string) => void,
commitOffsetsIfNecessary: () => Promise <void>,
) {
const { topic, partition } = batch;
// For an empty batch, firstOffset() will return null, and lastOffset()
// will return HIGH_WATERMARK - 1
const currentOffset = batch.firstOffset() || batch.lastOffset();
resolvedOffsets[topic][partition] = currentOffset;
commitFns[topic][partition] = async (offset: string) => {
// set resolve offset as the target and commit it
resolveOffset(
Long.fromValue(offset)
.sub(1)
.toString(),
);
await commitOffsetsIfNecessary();
// restore the resolved offset
resolveOffset(resolvedOffsets[topic][partition]);
};
}
async function main() {
const kafka = new Kafka({ brokers: ["kafka:9092"], clientId: "test-client" });
const consumer = kafka.consumer({ groupId: "test-group", partitionAssigners: [PartitionAssigners.roundRobin] });
await consumer.connect();
await consumer.subscribe({ topic: "test-topic", fromBeginning: true });
await consumer.run({
autoCommitThreshold: 1,
eachBatchAutoResolve: false,
eachBatch: async ({ batch, resolveOffset, heartbeat, commitOffsetsIfNecessary }) => {
const { topic, partition, messages } = batch;
// update commit function
updateCommitFn(batch, resolveOffset, commitOffsetsIfNecessary);
// consume messages
for (const { offset, key, value, timestamp } of messages) {
// process could call commit for with a custom offset
await process({ topic, partition, offset, key, value, timestamp });
await heartbeat();
}
// resolve offset manually
const offset = batch.lastOffset();
resolveOffset(offset);
resolvedOffsets[topic][partition] = offset;
},
});
} Also, I need to patch // Patch kafkajs to disable auto offset committing
const consumerGroup = require("kafkajs/src/consumer/consumerGroup");
consumerGroup.prototype.commitOffsets = async () => undefined; It works, sort of. Frankly, I still think it would be nice to have an interface which can commit custom offsets directly. I suggest that:
What do you think? |
Yeah, that's a lot of work. Bear with me while I fully understand the problem 😄 Another note, I'm going on vacation for a week and a half, so I'll be a bit silent here. @Nevon can take it from here. I'm still not sure about a generic |
@tulios Thanks again for replying. Yes, we thought about saving offsets outside Kafka. For us, reducing dependency is the main reason for preferring Kafka's offset management. So we don't need to maintain another service or media for offset storage. Besides, it probably need extra effort to handle the rebalancing cases. We can pick up the discussion later. Have a nice vacation! |
I have a different use case for which it would be useful to be able to manually commit offsets. I have a few services that consumes a mixture of compacted topics and regular topics. I am using compacted topics to recover the state of the service - meaning that topic should always be read from the beginning and I do not want to commit any offsets here at all whereas with the other topics I want to use normal offset commits for checkpointing. An example to illustrate the use case:
So i would want to commit offsets for 'impressions', but not for 'customers' |
Hi @bwdeng and @sklose, I just came back from vacation. I will make some suggestions soon. @sklose I think you can achieve what you want in a different way: 1) you could use different group ids whenever you need to read the topic from the beginning, something like I don't see the use case for manual commits on the |
Unfortunately all three options have some downsides I'd like to avoid
I feel like not committing he offsets in the first place would be the cleanest solution and avoids any unwanted side effects. |
You are right. In Python, I can use the code below to implement this:
Is is very easy to do this when I can commit offset myself. |
@Pingze-github the pull API usually adds a lot of overhead to userland code, if you have all primitives as individual methods you will have to cater for rebalances, etc. The code I usually see around follows the same pattern, it processes messages as fast as they can, committing successful offsets on a certain threshold; on errors, the offset is not committed and the whole process is retried. KafkaJS implements this pattern, the If you need more flexibility you can always use I find extremely valuable to understand the use cases and I'm always open to change or add new APIs, if you have the time I would love to know why this API doesn't work for you and how we can improve it. Thanks. |
I want to create a function that can return messages of a centain number. It can also update the offset to make sure next time I can start where I left off. |
Fixed by #436 |
Feature
Currently, the consumer only supports committing the resolved offset. It would be nice to support custom offset committing.
Use case
We are trying to do aggregation based on kafkajs, but the size of the working set is not fixed. For example, an input stream would look like this:
We try to aggrege events of the same kind (agg([X0, X1, X2, X3]), agg([Y0, Y1]), agg([Z0, Z1, Z2, ...])), and only committing offset when the aggregated result has been saved. The problem is that until seeing a Y event, we don't know if all the X events have arrived. However, when we saw Y0, the resolved offset has already passed X3.
Having the ability to commit custom offsets would allow us to properly track the working progress. Thanks!
The text was updated successfully, but these errors were encountered: