Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PIP-88] Replicate schemas across clusters #11441

Merged
merged 4 commits into from
Aug 8, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Fix comments
  • Loading branch information
codelipenghui committed Aug 8, 2021
commit 4d5bdc10d59712bfedf5af0fd8b3aa335d19ee9c
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ public void readEntriesComplete(List<Entry> entries, Object ctx) {
headersAndPayload.retain();

getSchemaInfo(msg).thenAccept(schemaInfo -> {
msg.setSchemaInfo(schemaInfo);
msg.setSchemaInfoForReplicator(schemaInfo);
producer.sendAsync(msg, ProducerSendCallback.create(this, entry, msg));
}).exceptionally(ex -> {
log.error("[{}][{} -> {}] Failed to get schema from local cluster", topicName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public class MessageImpl<T> implements Message<T> {
private ByteBuf payload;

private Schema<T> schema;
private SchemaInfo schemaInfo;
private SchemaInfo schemaInfoForReplicator;
private SchemaState schemaState = SchemaState.None;
private Optional<EncryptionContext> encryptionCtx = Optional.empty();

Expand Down Expand Up @@ -420,9 +420,6 @@ private void ensureSchemaIsLoaded() {
}

public SchemaInfo getSchemaInfo() {
if (schemaInfo != null) {
return schemaInfo;
}
if (schema == null) {
return null;
}
Expand All @@ -433,8 +430,16 @@ public SchemaInfo getSchemaInfo() {
return schema.getSchemaInfo();
}

public void setSchemaInfo(SchemaInfo schemaInfo) {
this.schemaInfo = schemaInfo;
public void setSchemaInfoForReplicator(SchemaInfo schemaInfo) {
if (msgMetadata.hasReplicatedFrom()) {
this.schemaInfoForReplicator = schemaInfo;
} else {
throw new IllegalArgumentException("Only allowed to set schemaInfoForReplicator for a replicated message.");
}
}

public SchemaInfo getSchemaInfoForReplicator() {
return msgMetadata.hasReplicatedFrom() ? this.schemaInfoForReplicator : null;
}

@Override
Expand Down Expand Up @@ -682,6 +687,10 @@ public List<String> getReplicateTo() {
return msgMetadata.getReplicateTosList();
}

public boolean hasReplicateFrom() {
return msgMetadata.hasReplicatedFrom();
}

void setMessageId(MessageIdImpl messageId) {
this.messageId = messageId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,8 @@ private void tryRegisterSchema(ClientCnx cnx, MessageImpl msg, SendCallback call
if (!changeToRegisteringSchemaState()) {
return;
}
SchemaInfo schemaInfo = Optional.ofNullable(msg.getSchemaInfo())
SchemaInfo schemaInfo = msg.hasReplicateFrom() ? msg.getSchemaInfoForReplicator() : msg.getSchemaInfo();
schemaInfo = Optional.ofNullable(schemaInfo)
.filter(si -> si.getType().getValue() > 0)
.orElse(Schema.BYTES.getSchemaInfo());
getOrCreateSchemaAsync(cnx, schemaInfo).handle((v, ex) -> {
Expand Down