-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[PIP-88] Replicate schemas across clusters #11441
Conversation
Here is the proposal: https://github.com/apache/pulsar/wiki/PIP-88%3A-Replicate-schemas-across-multiple For the implementation, we just need to set the correct SchemaInfo for the replicated message and using the AutoProduceByte schema for the producer of the replicator.
@@ -64,6 +64,7 @@ | |||
private ByteBuf payload; | |||
|
|||
private Schema<T> schema; | |||
private SchemaInfo schemaInfo; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am a little confused about the modification.
We can get schemaInfo from schema now.
Can we use public static Schema<?> getSchema(SchemaInfo schemaInfo)
in AutoConsumeSchema
to convert schemaInfo to Schema
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the schema replication, we don't need to convert schemaInfo to schema for the local topic and then convert the schema to the schemaInfo for the remote topic. Or we only need to pass the schemaInfo to the message which need to replicate to the remote cluster.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We already have a schema, but we added a schemaInfo, which will make other developers very confused. If we can't reuse existing attributes, we can at least do this:
- Make the new properties easier to understand, such as: naming modification, or adding comments, so that other developers can know schemaInfo only for replicator
- Independent get/set
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I second @315157973 concern.
We already have "getReaderSchema", why cannot we use that ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the change looks good, apart the fact that we are adding that SchemaInfo field from MessageImpl
@@ -64,6 +64,7 @@ | |||
private ByteBuf payload; | |||
|
|||
private Schema<T> schema; | |||
private SchemaInfo schemaInfo; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I second @315157973 concern.
We already have "getReaderSchema", why cannot we use that ?
@@ -418,14 +419,24 @@ private void ensureSchemaIsLoaded() { | |||
} | |||
} | |||
|
|||
private SchemaInfo getSchemaInfo() { | |||
public SchemaInfo getSchemaInfo() { | |||
if (schemaInfo != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is quite confusing, we should use what the internal Schema is reporting
@315157973 @eolivelli I have addressed your comments, PTAL. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lgtm
Great work
* [PIP-88] Replicate schemas accross clusters Here is the proposal: https://github.com/apache/pulsar/wiki/PIP-88%3A-Replicate-schemas-across-multiple For the implementation, we just need to set the correct SchemaInfo for the replicated message and using the AutoProduceByte schema for the producer of the
* [PIP-88] Replicate schemas accross clusters Here is the proposal: https://github.com/apache/pulsar/wiki/PIP-88%3A-Replicate-schemas-across-multiple For the implementation, we just need to set the correct SchemaInfo for the replicated message and using the AutoProduceByte schema for the producer of the
### Motivation apache#11441 supports replicate schema to remote clusters. But there is a mistake that the returned schema state is incorrect. https://github.com/apache/pulsar/blob/e826d849ceef9d6aef28569ad57950bba90dfff1/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java#L765-L770 Because the replicator used MessageImpl will not have the schema. And this will cause the producer to skip the schema upload. https://github.com/apache/pulsar/blob/e826d849ceef9d6aef28569ad57950bba90dfff1/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L2147-L2149 We should remove https://github.com/apache/pulsar/blob/e826d849ceef9d6aef28569ad57950bba90dfff1/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java#L766-L768 To return the correct schema state. And then we should also provide the correct schema hash. If the message is used by the replicator, the schema hash should be based on the replicator schema. Otherwise, it should use based on the schema of the message. ### Modification - Fixed the incorrect returned schema state - Provide the method for getting schema hash for MessageImpl ### Verification Update the test only to create producer to one cluster. Because if create a producer for other clusters, the producer will upload the schema. This is the reason that why the test can get pass before.
### Motivation #11441 supports replicate schema to remote clusters. But there is a mistake that the returned schema state is incorrect. https://github.com/apache/pulsar/blob/e826d849ceef9d6aef28569ad57950bba90dfff1/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java#L765-L770 Because the replicator used MessageImpl will not have the schema. And this will cause the producer to skip the schema upload. https://github.com/apache/pulsar/blob/e826d849ceef9d6aef28569ad57950bba90dfff1/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L2147-L2149 We should remove https://github.com/apache/pulsar/blob/e826d849ceef9d6aef28569ad57950bba90dfff1/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java#L766-L768 To return the correct schema state. And then we should also provide the correct schema hash. If the message is used by the replicator, the schema hash should be based on the replicator schema. Otherwise, it should use based on the schema of the message. ### Modification - Fixed the incorrect returned schema state - Provide the method for getting schema hash for MessageImpl
### Motivation #11441 supports replicate schema to remote clusters. But there is a mistake that the returned schema state is incorrect. https://github.com/apache/pulsar/blob/e826d849ceef9d6aef28569ad57950bba90dfff1/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java#L765-L770 Because the replicator used MessageImpl will not have the schema. And this will cause the producer to skip the schema upload. https://github.com/apache/pulsar/blob/e826d849ceef9d6aef28569ad57950bba90dfff1/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L2147-L2149 We should remove https://github.com/apache/pulsar/blob/e826d849ceef9d6aef28569ad57950bba90dfff1/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java#L766-L768 To return the correct schema state. And then we should also provide the correct schema hash. If the message is used by the replicator, the schema hash should be based on the replicator schema. Otherwise, it should use based on the schema of the message. ### Modification - Fixed the incorrect returned schema state - Provide the method for getting schema hash for MessageImpl
Motivation
Here is the proposal: https://github.com/apache/pulsar/wiki/PIP-88%3A-Replicate-schemas-across-multiple
For the implementation, we just need to set the correct SchemaInfo for the replicated message and using
the AutoProduceByte schema for the producer of the replicator.
Verifying this change
Added new tests to verify the consumer can consume messages with schema from the remote cluster.
Does this pull request potentially affect one of the following parts:
If
yes
was chosen, please highlight the changes