-
Notifications
You must be signed in to change notification settings - Fork 917
Generic Serdes API with Avro #787
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
examples/avro-cli.py
Outdated
if args.mode == "produce": | ||
produce(args.topic, conf) | ||
produce(args.topic, conf, avro_serializer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fwiw, when I read this my first thought was: 'why pass avro_serializer into the produce method?'.. so maybe it's better to call this produce_msgs
or something, but v. minor point, if there is a point here at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's fair I should qualify all kwargs as well
examples/avro-cli.py
Outdated
callback=lambda err, msg, obj=record: on_delivery(err, msg, obj)) | ||
producer.produce(topic=topic, key=user_id, | ||
value=record, | ||
on_delivery=lambda err, msg, obj=record: on_delivery(err, msg, obj)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm sold on this approach now, but I think it would be really good if we could change the names: Message.key()
-> Message.key_bytes()
and Message.value()
-> Message.value_bytes()
, making the the API more immediately clear.
It seems likely the release with this PR in it will be called 2.0, so maybe that makes this more palatable. Would want to depreciate key()
and value()
rather than removing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is nothing actually preventing you from constructing a dr cb that deserializes the message contents. I'll update the example to just do that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My point is about making the API provided out of the box give the user what they most intuitively would expect. Not arguing you could override that with something that is better - point is you shouldn't need to.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps, though this feels a bit heavy handed to me. This change does not have any effect on the overall behavior or result and yet it would break every existing message callback handler in use today. I think the costs outweigh the benefits on this one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you could consider making key()
, value()
throw an exception (probably 'not implemented yet') if the function was called with a non byte[]
like arg (but return bytes if they were, so there is no breaking change). also add key_bytes
and value_bytes
which always return the bytes. in the future (i.e. probably plan on never, but leave the option open) you could add a config which would allow the original value to be passed through if enabled.
i think you should do this, but it's (nearly?) the last i'll say of it...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What we could do is decorate any user supplied delivery callbacks on initialization and deserialize the contents prior to passing the message handle to the user's callback handler. We will want to be careful to catch any exceptions that may arise and set the msg error accordingly though. I'll wait for @edenhill to chime in before going that route though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went ahead and put a preliminary implementation in there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I strongly oppose having the delivery report perform deserialization: just pass the original object to the delivery report callback instead.
It seems like an anti-pattern to me to mix the serialized form/output with the internal program logic.
As for key() and value() changing meaning: yes, that is a problem, and it would be possible to change the semantics of key() and value() in the delivery report callback when using the SerializingProducer(), but then we're also always holding on to the passed object which has GC and lifetime issues which might not always be desired.
So I'd say we keep it simple for now and simply document that key() and value() are the serialized form iin the delivery report callback, and to use bound variables to pass the object as an opt-in if the original object is needed in the dr cb.
Avro record serialization/deserialization. | ||
|
||
""" | ||
def to_dict(self, record_obj): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
having this as a default implementation seems like it would do more harm than good to me. same with from_dict.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agreed
examples/avro-cli.py
Outdated
conf['schema.registry.basic.auth.user.info'] = args.userinfo | ||
registry_client.user_auth(args.userinfo.split(':')) | ||
|
||
avro_serializer = AvroSerializer(registry_client, record_schema, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the approach of passing the schema and converter to the constructor seems defensible, but...
My feeling is still to err on the side of simplicity. Primarily, I'm struggling to see the value of the Converter idea vs just requiring you need to pass in a "dict convertible like object" (i.e. one with a to_dict()
method) into the produce method otherwise it blows up. The latter seems easier to grok (one less abstraction), and pythonic. I'd would be convinced to change my opinion on this if there's precedent from other well used python libraries (i don't know, don't use them).
An argument against this is if people typically don't want to/can't have a to_dict
method on their types they want to produce. I feel most of the time this won't be a problem and if it is, it's a simple matter to provide the behavior in a sub class. Also, if it turns out people do do the subclass thing all the time, the Converter
concept can be added later. On the flip side of this, if we add it now, we can't take it away. Err on the side of smallest API is a good strategy.
Second, I was wondering if even the record_schema could be specified via a .schema()
method on the object passed into the produce method. That I think is not a good idea though for reasons of performance, and also what most closely matches the behavior in other languages is to specify it in the constructor (in other languages it's a static property of the type... but close).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it makes more sense to modify the behavior of the serializer, through the use of the converter, than it does to modify the behavior of the object being serialized. The object may be serialized as Avro today, Protobuf tomorrow and JSON the next. It could become quite messy if I had to add methods to satisfy the needs of each serializer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i remain unconvinced and we can argue further.... widespread precedent of use of abstract base classes in python would help the case against what I think here. but it seems to go against what I thought the language was about.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The JSON library takes a very similar approach with its custom encoder and decoder functions which allow you to customize how your object will go from its initial form to the json representation that comes out. We actually take advantage of this in the SchemaRegistryClient code.
Json:
https://github.com/python/cpython/blob/master/Lib/json/encoder.py#L96
Registryclient:
https://github.com/confluentinc/confluent-kafka-python/blob/FirstClassFastAvro/confluent_kafka/schema_registry/schema_registry_client.py#L41
Abstract Base Classes are widely used throughout python though.
Requests(mentioned previously):
https://github.com/psf/requests/blob/fd13816d015c4c90ee65297fa996caea6a094ed1/requests/adapters.py#L55
https://github.com/psf/requests/blob/d79024f246d1037c4196da6027c2c2be08719c8d/requests/auth.py#L72
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I strongly object to require modifications or sub-classing of the original object.
Having said that, I'm not that fond of Converts since it adds another level of abstraction and introduces a new concept.
Have we considered sub-classing the Serializer type instead, so:
class UserAvroSerializer(AvroSerializer):
def __init__(kwargs):
super(UserAvroSerializer, self).__init__(**kwargs)
def to_dict(user):
return { 'id': user.id, 'name': user.name, 'breadth': user.breadth }
def from_dict(data):
return User(**data)
fef1440
to
b4f2481
Compare
examples/avro-cli.py
Outdated
|
||
print("Consuming user records from topic {} with group {}. ^c to exit.".format(topic, conf["group.id"])) | ||
c = Consumer(conf, | ||
key_serializer=StringSerializer('utf_8'), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does a consumer need a serializer for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Each record is associated with a utf_8 encoded string
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't that be a Deserializer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ack, also *Serializer has been renamed *Serde. There is some protobuf stuff mixed in the serializer tests so I'll push those changes soon
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed everything but ../avro/* and ../tests/*, left a comment or two.
@@ -0,0 +1,192 @@ | |||
import cProfile |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
copyright header
tests/integration/java_fixutre.py
Outdated
@@ -0,0 +1,222 @@ | |||
#!/usr/bin/env python |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
filename is mipselled.
@@ -0,0 +1,5 @@ | |||
#Generated by Maven |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment has little to do with this file, but why are there a bunhc of class and jar files checked in?
And do we need maven or can we rely on trivup's KAFKA_PATH?
|
||
|
||
@pytest.mark.parametrize("serializer, data", | ||
[(DoubleSerializer(), random.uniform(-32768.0, 32768.0)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't use random tests inputs since it makes the tests unreproducible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think random number generation is appropriate here. The deserialized content must always match the serialized content so its constant within the scope of the test.
36e5690
to
32063ae
Compare
b4f2481
to
903fa30
Compare
903fa30
to
c1b89e1
Compare
SASL_USERNAME: None, | ||
SASL_PASSWORD: None, | ||
SASL_MECHANISM: 'GSSAPI', | ||
REST_CLIENT: DefaultRestClient |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this need to be configurable?
also, the stuff we're making is a bit tricky, in that we have things that are configuration like that are programmatic (serdes, subject name strategies), but i think an API that explicitly separates the two is going to be better. i don't know exactly what this looks like in python.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually the idea was to make it easy for me to inject mocks without hard coding it into the signature as it was previously.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can get the coverage you want with integration tests alone (and calls to SR are quick) and that is better than mucking up the API... and this is python - everything is publicly available if you want it, you should be able to set something up that doesn't compromise on API for testing.
return self[URL] | ||
|
||
@property | ||
def SchemaRegistry(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is upside down, thing that makes an SR, whatever that is in python, should accept a config.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm, probably wrongly, using the config as a vehicle of dependency injection. It's less for direct instantiation and more for when it's embedded in something like a serializer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DI has it's place, but not in a library like this. also note: you'll find many top engineers who hate it and refuse to use it outright.
""" | ||
|
||
def __init__(self, conf): | ||
self.rest_client = conf.RestClient |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rest client should probably be completely encapsulated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So provide the API to the restclient as opposed to the rest client to the api? I considered that as well actually I wasn't sure which was better
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is all very good now, we're almost there!
@@ -1,9 +1,29 @@ | |||
__all__ = ['cimpl', 'admin', 'avro', 'kafkatest'] | |||
from .cimpl import (Consumer, # noqa | |||
#!/usr/bin/env python |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Defining the interpreter is good practice, it is not the same thing as being executable (file mode).
__all__ = ['cimpl', 'admin', 'avro', 'kafkatest'] | ||
from .cimpl import (Consumer, # noqa | ||
#!/usr/bin/env python | ||
# -*- coding: utf-8 -*- |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since all of this file is ascii-7 there is no need for coding here, but it doesn't hurt.
778e1ed
to
49dc980
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM except can you change the ordering of parameters in subject name strategies to have ctx first?
oh and some checks are failing. probably need to be fixed.
b936017
to
40fe906
Compare
- if [[ -n $TRAVIS_TAG && $TRAVIS_OS_NAME == osx && -n $CIBW_BEFORE_BUILD ]]; then tools/test-osx.sh; fi | ||
- if [[ $MK_DOCS == y ]]; then make docs ; fi | ||
- if [[ $MK_DOCS == y ]]; then make docs; fi |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this file has a lot of unnecessary cosmetic changes I strongly recommend you create a tag and generate wheels and upload to testing today, Friday, so we are not bit by surprises on release next week.
|
||
if len(conf_copy) > 0: | ||
raise ValueError("Unrecognized property(ies) {}" | ||
.format(conf_copy.keys())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's pretty ugly though, what we want is:
"Unrecognized configuration properties: ab.cc.dd, foo.bar, session.timeout.ms"
But that can be fixed later.
981ba8f
to
2d1b742
Compare
2d1b742
to
4d318bc
Compare
Hi! can we know when we can expect the resolution for schema registry subject naming functionality? |
No description provided.