Skip to content

Added Consumer.store_offsets() API #245

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Sep 15, 2017

Conversation

ctrochalakis
Copy link
Contributor

Hello all,

This PR implements the Cosumer.store_offsets() API. The signature is similar to the .commit() method.

Copy link
Contributor

@edenhill edenhill left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great stuff!
miniscule changes needed

c_offsets = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(
c_offsets, cfl_PyUnistr_AsUTF8(m->topic, &uo8),
m->partition)->offset =m->offset + 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: space after =

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed


err = rd_kafka_offsets_store(self->rk, c_offsets);

if (c_offsets)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is always true

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true, fixed!

Guard rd_kafka_offsets_store() inside version check (implicit-function-declaration)
@@ -341,7 +341,7 @@ static PyObject *Consumer_store_offsets (Handle *self, PyObject *args,
"buildtime 0x%x)",
rd_kafka_version(), RD_KAFKA_VERSION);
return NULL;
#endif
#else
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch

@@ -341,7 +341,7 @@ static PyObject *Consumer_store_offsets (Handle *self, PyObject *args,
"buildtime 0x%x)",
rd_kafka_version(), RD_KAFKA_VERSION);
return NULL;
#endif
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this #else guard the variables defined above will be unused and produce warnings.
I suggest moving the variable decls down to the #else block.

Avoid declaring unused variables & relavant warnings
@edenhill
Copy link
Contributor

The thread test failure is not your fault, ignore it.
Waiting for clogged up travis osx workers for final verification.

@edenhill edenhill merged commit d788937 into confluentinc:master Sep 15, 2017
@edenhill
Copy link
Contributor

Thanks for this!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants