Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[go schema] support go schema for pulsar-client-go #3904

Merged
merged 11 commits into from
Apr 23, 2019

Conversation

wolfstudy
Copy link
Member

Signed-off-by: xiaolong.ran ranxiaolong716@gmail.com

Master Issue: #3855

Motivation

support go schema for pulsar-client-go

Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>
Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>
@sijie sijie requested review from merlimat and sijie March 27, 2019 05:19
@sijie sijie added area/client type/feature The PR added a new feature or issue requested a new feature component/schemaregistry labels Mar 27, 2019
@sijie sijie added this to the 2.4.0 milestone Mar 27, 2019
@sijie
Copy link
Member

sijie commented Mar 27, 2019

@wolfstudy overall looks good. however there is a license header error, I think there are some files missing license header.

Copy link
Contributor

@merlimat merlimat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR goes in a different direction from the schema implementation for Java and Python.

In this case it just takes a byte array and the schema definition as AVRO Json but there's no validation that the data is actually what it's supposed to be.

I think we need to also add the serialization part from a struct to a byte[] through json or avro serializer that is compliant with the schema.

There can be several ways to define a schema. For JSON, forcing users to pass an AVRO definition would be bad experience. Typically people define json:xyz annotations on the structs to.

For Avro, we need to incorporate an Avro implementation to handle the serialization deserialization.

@wolfstudy
Copy link
Member Author

thanks @sijie @merlimat help, i will fix it in next commit.

Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>
Copy link
Member

@sijie sijie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wolfstudy overall looks much better now. left a few comments.

@merlimat can you please review this again.

Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>
@wolfstudy
Copy link
Member Author

thanks for @merlimat @sijie help, PTAL again.

Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>
@wolfstudy
Copy link
Member Author

run java8 tests

2 similar comments
@wolfstudy
Copy link
Member Author

run java8 tests

@wolfstudy
Copy link
Member Author

run java8 tests

Copy link
Member

@sijie sijie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall looks good. left one comment around float32 and float64.

@sijie
Copy link
Member

sijie commented Apr 2, 2019

@merlimat can you take a look at this again?

Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>
@wolfstudy
Copy link
Member Author

run java8 tests

@sijie
Copy link
Member

sijie commented Apr 3, 2019

@merlimat can you review this again?

@merlimat
Copy link
Contributor

merlimat commented Apr 3, 2019

Yes, will get to this today/tomorrow

@sijie
Copy link
Member

sijie commented Apr 8, 2019

ping @merlimat - can you review this again?

//
// The schema will be checked against the schema of the topic, and the
// consumer creation will fail if it's not compatible.
SchemaInfo
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To keep consistency with consumer options, we should keep it as Schema SchemaInfo

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, SchemaInfo it's just a part of whole equation. It represents the schema definition, but, in this case since it's struct it doesn't have any behavior associated.

When creating a producer/consumer we should pass a "Schema" interface, that will encapsulate both the serde and will be able to provide a schemainfo.

@@ -108,6 +108,14 @@ func createProducerAsync(client *client, options ProducerOptions, callback func(
C.pulsar_producer_configuration_set_compression_type(conf, C.pulsar_compression_type(options.CompressionType))
}

if options.SchemaInfo.Type != NONE {
C.pulsar_producer_configuration_set_schema_type(conf, C.pulsar_schema_type(options.SchemaInfo.Type),
C.CString(options.Name), C.CString(options.Schema))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the CString instances will need to be released with C.free()

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, otherwise it will cause a memory leak

Name: "pulsar",
})
str := string(schema)
schemaJsonInfo := NewSchemaInfo("jsonTopic", str, JSON)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case you're passing "schema" as both the "schemaInfo" and as the value.

The schema info JSON definition should actually be the AVRO schema representation of the type, rather than the payload.

We should be extracting the schema info automatically through go introspection of the struct. From API perspective, something like NewJsonSchema(testSchema)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @merlimat , can I understand that in the schema, all schema info are managed by avro, but the specific serialization and deserialization operations are handled by the corresponding format itself.

Copy link
Member Author

@wolfstudy wolfstudy Apr 9, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if so, when the user uses it, he can pass in the specified schema type, and avro will do the corresponding compatibility check?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wolfstudy pulsar is using avro for storing schema definitions for struct schemas (avro, json, and protobuf). the compatibility check is done by avro schema compatibility check. you can check java's implementation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wolfstudy Yes, the API really will depend on the language ergonomics, though the main idea is to have a "type-safe" API (to the max extent) and allow user to precisely define the schema.

In Java this is don through POJO definitions (and recently we added option to override the schema definition directly).
https://pulsar.apache.org/docs/en/client-libraries-java/#schemas

In Python, since it's not possible to extract a meaningful schema from an class, we have added the concept of Record base class that can be used to declare the schema: https://pulsar.apache.org/docs/en/client-libraries-python/#schema

In Go, I would expect that we should be able to convert the json:'adas' annotations from a struct into an Avro schema definitition for the struct, or that the Avro library will be able to do that directly for Avro.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @merlimat will fix it

Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>
@wolfstudy
Copy link
Member Author

@merlimat @sijie already fixed it, PTAL again. in the next pull request, i will add integration tests to ensure the correctness of the logic.

Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>
Copy link
Member

@sijie sijie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wolfstudy good job! the current structure is similar as java now. left a few comments, please address them.

@@ -40,6 +40,19 @@ pulsar_consumer_type pulsar_consumer_configuration_get_consumer_type(
return (pulsar_consumer_type)consumer_configuration->consumerConfiguration.getConsumerType();
}

void pulsar_consumer_configuration_set_schema_type(pulsar_consumer_configuration_t *consumer_configuration,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wolfstudy

it seems that we never pass in properties for schema. that means that the properties users specify in go is not passed to c client and sent to broker.

also isn't it clear to have a method in consumer_configuration to set_schema_info and get_schema_info?

@@ -66,6 +66,18 @@ pulsar_compression_type pulsar_producer_configuration_get_compression_type(
return (pulsar_compression_type)conf->conf.getCompressionType();
}

void pulsar_producer_configuration_set_schema_type(pulsar_producer_configuration_t *conf,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment as the one I made for consumer configuration.

cSchema := C.CString(schema.GetSchemaInfo().Schema)
defer C.free(unsafe.Pointer(cName))
defer C.free(unsafe.Pointer(cSchema))
C.pulsar_consumer_configuration_set_schema_type(conf, C.pulsar_schema_type(schema.GetSchemaInfo().Type),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

following the comment I had above, if you have a method pulsar_consumer_configuration_set_schema_info, you can just convert a go SchemaInfo to a cpp SchemaInfo, correct? In this way, the properties will not be lost. Also in future, if we introduce new fields to SchemaInfo, they can be handled correctly in one place, without touching producer and consumer configuration in cpp again.

Copy link
Member Author

@wolfstudy wolfstudy Apr 17, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this is a good idea, but just an idea. in SchemaInfo struct, we may define a variety of types, such as: map. In c language, don't support map, of course, we can implement a map by ourselves or use std::map to implement it. But the biggest question is: how do we pass the map type of go to c?

Go's built-in map is implemented in runtime. The implementation details are not exported. Even if it is exported, its memory is managed by GC. How to ensure that c code is used, it is not recycled.

So, the idea I realized is:

for key, value := range schemainfo.Properties {
			cKey := C.CString(key)
			cValue := C.CString(value)

			C.pulsar_string_map_put(map, cKey, cValue)

			C.free(unsafe.Pointer(cKey))
			C.free(unsafe.Pointer(cValue))
		}

In go and c, we all have a SchemaInfo struct, how do we guarantee that the two Structs are consistent? If the SchemaInfo in go is an interface, then we can make them consistent by type assertion, but SchemaInfo needs to be passed to c, so it can only be a struct.

So far, I have not thought of a good way to deal with this problem. Although the current implementation is ugly, it is an implementation that I can think of, passing each field in SchemaInfo alone.

@@ -147,7 +168,12 @@ func subscribeAsync(client *client, options ConsumerOptions, callback func(Consu
subName := C.CString(options.SubscriptionName)
defer C.free(unsafe.Pointer(subName))

callbackPtr := savePointer(&subscribeContext{conf: conf, consumer: consumer, callback: callback})
var callbackPtr unsafe.Pointer
if schema != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I am not a go expert. but do we really need the if-else branches here? can't you just pass a nil schema to the subscribeContext struct?

@@ -97,8 +98,8 @@ func buildMessage(message ProducerMessage) *C.pulsar_message_t {

////////////// Message

func newMessageWrapper(ptr *C.pulsar_message_t) Message {
msg := &message{ptr: ptr}
func newMessageWrapper(schema Schema, ptr *C.pulsar_message_t) Message {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just need some clarifications from you: is schema here a reference copy or a value copy?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interfaces are always passed by ref. Go syntax is a bit confusing since you need to know that it's indeed an interface and not a struct


func NewJsonSchema(codec string) *JsonSchema {
js := new(JsonSchema)
schema, err := initCodec(codec)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
schema, err := initCodec(codec)
avroCodec, err := initAvroCodec(jsonAvroSchemaDef)

}

func (ps *ProtoSchema) GetSchemaInfo() *SchemaInfo {
jsonSchema := NewSchemaInfo("Proto", ps.SchemaDef.Schema(), PROTOBUF)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
jsonSchema := NewSchemaInfo("Proto", ps.SchemaDef.Schema(), PROTOBUF)
protoSchema := NewSchemaInfo("Proto", ps.SchemaDef.Schema(), PROTOBUF)
return protoSchema

return as
}

func (as *AvroSchema) Serialize(data interface{}) ([]byte, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this serialization here doesn't seem to be correct to me. you don't need line 188-197. you can just simply call BinaryFromNative.

return as.SchemaDef.BinaryFromNative(nil, native)
}

func (as *AvroSchema) UnSerialize(data []byte, v interface{}) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you don't need line 207-217.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As you might think, I only used this at first, but the value I got was nil. so we need line line 207-217

}

type StringSchema struct {
SchemaDefinition
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need this for StringSchema?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, the StringSchema don't need this, will fix it.

Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>
@wolfstudy
Copy link
Member Author

wolfstudy commented Apr 18, 2019

add properties for SchemaInfo. @sijie @merlimat PTAL again

Copy link
Member

@sijie sijie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wolfstudy LGTM. Left some nit comments and also I will let you and @merlimat decide the final api.

return
}
msg.Payload = payLoad
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wolfstudy it seems that this comment was not addressed. did you miss this one?

GetSchemaInfo() *SchemaInfo
}

type SchemaDefinition struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: SchemaDefinition is not a good name here.

I would rather just call it AvroCodec.

type AvroCodec struct {
    Codec *goavro.Codec
}

SchemaDef *goavro.Codec
}

func NewSchemaDefinition(schema *goavro.Codec) *SchemaDefinition {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: NewAvroCodec

Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>
@wolfstudy
Copy link
Member Author

@merlimat PTAL again, thanks

@wolfstudy
Copy link
Member Author

run java8 tests
run integration tests

Copy link
Contributor

@merlimat merlimat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice work! I just have a couple of comments on the APIs

@@ -66,6 +69,9 @@ type Message interface {

// Get the key of the message, if any
Key() string

//Get the de-serialized value of the message, according the configured
GetValue(v interface{}) error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be something like Value() (interface{}, error)?

Copy link
Member Author

@wolfstudy wolfstudy Apr 22, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, I want to maintain my point of view. The reason is as follows: in Schema interface, we define the decode interface is: Decode(data []byte, v interface{}) error, consistent with the decode interface, more convenient for our processing. in addition, this approach is in line with the design of go.

in java, when we define decode, input is []byte, output is interface.

default T decode(byte[] bytes)

But in go, input is []byte and interface, output is error.

Decode(data []byte, v interface{}) error

So, I think the interface should be a parameter of Value, not a return.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. We don't necessarely need to be consistent with the Schema interface though.

I just saw the same pattern is used for JSON deserialize:

func Unmarshal(data []byte, v interface{}) error

@@ -103,16 +103,22 @@ type Client interface {
// This method will block until the producer is created successfully
CreateProducer(ProducerOptions) (Producer, error)

CreateTypedProducer(ProducerOptions, Schema) (Producer, error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we follow same convention across the 3 methods, using WithSchema() ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, good suggestion

@@ -97,8 +98,8 @@ func buildMessage(message ProducerMessage) *C.pulsar_message_t {

////////////// Message

func newMessageWrapper(ptr *C.pulsar_message_t) Message {
msg := &message{ptr: ptr}
func newMessageWrapper(schema Schema, ptr *C.pulsar_message_t) Message {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interfaces are always passed by ref. Go syntax is a bit confusing since you need to know that it's indeed an interface and not a struct

Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>
@wolfstudy
Copy link
Member Author

wolfstudy commented Apr 22, 2019

@merlimat have fixed it, PTAL again, thanks.

@wolfstudy
Copy link
Member Author

run java8 tests

Copy link
Contributor

@merlimat merlimat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@merlimat
Copy link
Contributor

run java8 tests

@jiazhai jiazhai merged commit d5036ea into apache:master Apr 23, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/client type/feature The PR added a new feature or issue requested a new feature
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants