-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[go schema] support go schema for pulsar-client-go #3904
Conversation
Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>
Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>
@wolfstudy overall looks good. however there is a license header error, I think there are some files missing license header. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR goes in a different direction from the schema implementation for Java and Python.
In this case it just takes a byte array and the schema definition as AVRO Json but there's no validation that the data is actually what it's supposed to be.
I think we need to also add the serialization part from a struct to a byte[] through json or avro serializer that is compliant with the schema.
There can be several ways to define a schema. For JSON, forcing users to pass an AVRO definition would be bad experience. Typically people define json:xyz
annotations on the structs to.
For Avro, we need to incorporate an Avro implementation to handle the serialization deserialization.
Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wolfstudy overall looks much better now. left a few comments.
@merlimat can you please review this again.
Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>
Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>
run java8 tests |
2 similar comments
run java8 tests |
run java8 tests |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overall looks good. left one comment around float32
and float64
.
@merlimat can you take a look at this again? |
Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>
run java8 tests |
@merlimat can you review this again? |
Yes, will get to this today/tomorrow |
ping @merlimat - can you review this again? |
pulsar-client-go/pulsar/producer.go
Outdated
// | ||
// The schema will be checked against the schema of the topic, and the | ||
// consumer creation will fail if it's not compatible. | ||
SchemaInfo |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To keep consistency with consumer options, we should keep it as Schema SchemaInfo
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, SchemaInfo
it's just a part of whole equation. It represents the schema definition, but, in this case since it's struct it doesn't have any behavior associated.
When creating a producer/consumer we should pass a "Schema" interface, that will encapsulate both the serde and will be able to provide a schemainfo.
@@ -108,6 +108,14 @@ func createProducerAsync(client *client, options ProducerOptions, callback func( | |||
C.pulsar_producer_configuration_set_compression_type(conf, C.pulsar_compression_type(options.CompressionType)) | |||
} | |||
|
|||
if options.SchemaInfo.Type != NONE { | |||
C.pulsar_producer_configuration_set_schema_type(conf, C.pulsar_schema_type(options.SchemaInfo.Type), | |||
C.CString(options.Name), C.CString(options.Schema)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the CString
instances will need to be released with C.free()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, otherwise it will cause a memory leak
Name: "pulsar", | ||
}) | ||
str := string(schema) | ||
schemaJsonInfo := NewSchemaInfo("jsonTopic", str, JSON) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case you're passing "schema" as both the "schemaInfo" and as the value.
The schema info JSON definition should actually be the AVRO schema representation of the type, rather than the payload.
We should be extracting the schema info automatically through go introspection of the struct. From API perspective, something like NewJsonSchema(testSchema)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks @merlimat , can I understand that in the schema, all schema info are managed by avro, but the specific serialization and deserialization operations are handled by the corresponding format itself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if so, when the user uses it, he can pass in the specified schema type, and avro will do the corresponding compatibility check?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wolfstudy pulsar is using avro for storing schema definitions for struct schemas (avro, json, and protobuf). the compatibility check is done by avro schema compatibility check. you can check java's implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wolfstudy Yes, the API really will depend on the language ergonomics, though the main idea is to have a "type-safe" API (to the max extent) and allow user to precisely define the schema.
In Java this is don through POJO definitions (and recently we added option to override the schema definition directly).
https://pulsar.apache.org/docs/en/client-libraries-java/#schemas
In Python, since it's not possible to extract a meaningful schema from an class, we have added the concept of Record
base class that can be used to declare the schema: https://pulsar.apache.org/docs/en/client-libraries-python/#schema
In Go, I would expect that we should be able to convert the json:'adas'
annotations from a struct into an Avro schema definitition for the struct, or that the Avro library will be able to do that directly for Avro.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks @merlimat will fix it
Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>
Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wolfstudy good job! the current structure is similar as java now. left a few comments, please address them.
@@ -40,6 +40,19 @@ pulsar_consumer_type pulsar_consumer_configuration_get_consumer_type( | |||
return (pulsar_consumer_type)consumer_configuration->consumerConfiguration.getConsumerType(); | |||
} | |||
|
|||
void pulsar_consumer_configuration_set_schema_type(pulsar_consumer_configuration_t *consumer_configuration, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems that we never pass in properties for schema. that means that the properties users specify in go is not passed to c client and sent to broker.
also isn't it clear to have a method in consumer_configuration to set_schema_info and get_schema_info?
@@ -66,6 +66,18 @@ pulsar_compression_type pulsar_producer_configuration_get_compression_type( | |||
return (pulsar_compression_type)conf->conf.getCompressionType(); | |||
} | |||
|
|||
void pulsar_producer_configuration_set_schema_type(pulsar_producer_configuration_t *conf, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment as the one I made for consumer configuration.
cSchema := C.CString(schema.GetSchemaInfo().Schema) | ||
defer C.free(unsafe.Pointer(cName)) | ||
defer C.free(unsafe.Pointer(cSchema)) | ||
C.pulsar_consumer_configuration_set_schema_type(conf, C.pulsar_schema_type(schema.GetSchemaInfo().Type), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
following the comment I had above, if you have a method pulsar_consumer_configuration_set_schema_info
, you can just convert a go SchemaInfo
to a cpp SchemaInfo
, correct? In this way, the properties will not be lost. Also in future, if we introduce new fields to SchemaInfo
, they can be handled correctly in one place, without touching producer and consumer configuration in cpp again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, this is a good idea, but just an idea. in SchemaInfo
struct, we may define a variety of types, such as: map. In c language, don't support map
, of course, we can implement a map by ourselves or use std::map
to implement it. But the biggest question is: how do we pass the map type of go to c?
Go's built-in map is implemented in runtime. The implementation details are not exported. Even if it is exported, its memory is managed by GC. How to ensure that c code is used, it is not recycled.
So, the idea I realized is:
for key, value := range schemainfo.Properties {
cKey := C.CString(key)
cValue := C.CString(value)
C.pulsar_string_map_put(map, cKey, cValue)
C.free(unsafe.Pointer(cKey))
C.free(unsafe.Pointer(cValue))
}
In go and c, we all have a SchemaInfo
struct, how do we guarantee that the two Structs are consistent? If the SchemaInfo
in go is an interface, then we can make them consistent by type assertion, but SchemaInfo
needs to be passed to c, so it can only be a struct.
So far, I have not thought of a good way to deal with this problem. Although the current implementation is ugly, it is an implementation that I can think of, passing each field in SchemaInfo
alone.
@@ -147,7 +168,12 @@ func subscribeAsync(client *client, options ConsumerOptions, callback func(Consu | |||
subName := C.CString(options.SubscriptionName) | |||
defer C.free(unsafe.Pointer(subName)) | |||
|
|||
callbackPtr := savePointer(&subscribeContext{conf: conf, consumer: consumer, callback: callback}) | |||
var callbackPtr unsafe.Pointer | |||
if schema != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I am not a go expert. but do we really need the if-else branches here? can't you just pass a nil
schema to the subscribeContext
struct?
@@ -97,8 +98,8 @@ func buildMessage(message ProducerMessage) *C.pulsar_message_t { | |||
|
|||
////////////// Message | |||
|
|||
func newMessageWrapper(ptr *C.pulsar_message_t) Message { | |||
msg := &message{ptr: ptr} | |||
func newMessageWrapper(schema Schema, ptr *C.pulsar_message_t) Message { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just need some clarifications from you: is schema
here a reference copy or a value copy?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interfaces are always passed by ref. Go syntax is a bit confusing since you need to know that it's indeed an interface and not a struct
pulsar-client-go/pulsar/schema.go
Outdated
|
||
func NewJsonSchema(codec string) *JsonSchema { | ||
js := new(JsonSchema) | ||
schema, err := initCodec(codec) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
schema, err := initCodec(codec) | |
avroCodec, err := initAvroCodec(jsonAvroSchemaDef) |
pulsar-client-go/pulsar/schema.go
Outdated
} | ||
|
||
func (ps *ProtoSchema) GetSchemaInfo() *SchemaInfo { | ||
jsonSchema := NewSchemaInfo("Proto", ps.SchemaDef.Schema(), PROTOBUF) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
jsonSchema := NewSchemaInfo("Proto", ps.SchemaDef.Schema(), PROTOBUF) | |
protoSchema := NewSchemaInfo("Proto", ps.SchemaDef.Schema(), PROTOBUF) | |
return protoSchema |
pulsar-client-go/pulsar/schema.go
Outdated
return as | ||
} | ||
|
||
func (as *AvroSchema) Serialize(data interface{}) ([]byte, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this serialization here doesn't seem to be correct to me. you don't need line 188-197. you can just simply call BinaryFromNative
.
pulsar-client-go/pulsar/schema.go
Outdated
return as.SchemaDef.BinaryFromNative(nil, native) | ||
} | ||
|
||
func (as *AvroSchema) UnSerialize(data []byte, v interface{}) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you don't need line 207-217.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As you might think, I only used this at first, but the value I got was nil. so we need line line 207-217
pulsar-client-go/pulsar/schema.go
Outdated
} | ||
|
||
type StringSchema struct { | ||
SchemaDefinition |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do you need this for StringSchema
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, the StringSchema
don't need this, will fix it.
Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wolfstudy LGTM. Left some nit comments and also I will let you and @merlimat decide the final api.
return | ||
} | ||
msg.Payload = payLoad | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wolfstudy it seems that this comment was not addressed. did you miss this one?
pulsar-client-go/pulsar/schema.go
Outdated
GetSchemaInfo() *SchemaInfo | ||
} | ||
|
||
type SchemaDefinition struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: SchemaDefinition
is not a good name here.
I would rather just call it AvroCodec
.
type AvroCodec struct {
Codec *goavro.Codec
}
pulsar-client-go/pulsar/schema.go
Outdated
SchemaDef *goavro.Codec | ||
} | ||
|
||
func NewSchemaDefinition(schema *goavro.Codec) *SchemaDefinition { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: NewAvroCodec
Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>
@merlimat PTAL again, thanks |
run java8 tests |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very nice work! I just have a couple of comments on the APIs
@@ -66,6 +69,9 @@ type Message interface { | |||
|
|||
// Get the key of the message, if any | |||
Key() string | |||
|
|||
//Get the de-serialized value of the message, according the configured | |||
GetValue(v interface{}) error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this be something like Value() (interface{}, error)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here, I want to maintain my point of view. The reason is as follows: in Schema interface, we define the decode interface is: Decode(data []byte, v interface{}) error
, consistent with the decode interface, more convenient for our processing. in addition, this approach is in line with the design of go.
in java, when we define decode, input is []byte, output is interface.
default T decode(byte[] bytes)
But in go, input is []byte and interface, output is error.
Decode(data []byte, v interface{}) error
So, I think the interface should be a parameter of Value
, not a return.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. We don't necessarely need to be consistent with the Schema interface though.
I just saw the same pattern is used for JSON deserialize:
func Unmarshal(data []byte, v interface{}) error
pulsar-client-go/pulsar/client.go
Outdated
@@ -103,16 +103,22 @@ type Client interface { | |||
// This method will block until the producer is created successfully | |||
CreateProducer(ProducerOptions) (Producer, error) | |||
|
|||
CreateTypedProducer(ProducerOptions, Schema) (Producer, error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we follow same convention across the 3 methods, using WithSchema()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, good suggestion
@@ -97,8 +98,8 @@ func buildMessage(message ProducerMessage) *C.pulsar_message_t { | |||
|
|||
////////////// Message | |||
|
|||
func newMessageWrapper(ptr *C.pulsar_message_t) Message { | |||
msg := &message{ptr: ptr} | |||
func newMessageWrapper(schema Schema, ptr *C.pulsar_message_t) Message { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interfaces are always passed by ref. Go syntax is a bit confusing since you need to know that it's indeed an interface and not a struct
Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>
@merlimat have fixed it, PTAL again, thanks. |
run java8 tests |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
run java8 tests |
Signed-off-by: xiaolong.ran ranxiaolong716@gmail.com
Master Issue: #3855
Motivation
support go schema for pulsar-client-go