Skip to content

Commit

Permalink
Merge branch '7.6.x' into 7.7.x by rayokota
Browse files Browse the repository at this point in the history
  • Loading branch information
ConfluentSemaphore committed Oct 28, 2024
2 parents 6b31dee + 5b11dc4 commit d443720
Show file tree
Hide file tree
Showing 6 changed files with 160 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

package io.confluent.kafka.schemaregistry;

import io.confluent.kafka.schemaregistry.client.rest.entities.Metadata;
import io.confluent.kafka.schemaregistry.utils.QualifiedSubject;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
Expand Down Expand Up @@ -86,4 +88,45 @@ private void resolveReferences(Schema schema, Map<String, String> schemas, Set<S
}
}
}

// Parking this method and the following ones here instead of in ParsedSchema as interfaces can't
// have private methods in Java 8. Move these to ParsedSchema in 8.0.x
protected static boolean canLookupIgnoringVersion(
ParsedSchema current, ParsedSchema prev) {
String schemaVer = getConfluentVersion(current.metadata());
String prevVer = getConfluentVersion(prev.metadata());
if (schemaVer == null && prevVer != null) {
ParsedSchema newSchema = current.metadata() != null
? current
: current.copy(new Metadata(null, null, null), current.ruleSet());
ParsedSchema newPrev = prev.copy(
Metadata.removeConfluentVersion(prev.metadata()), prev.ruleSet());
// This handles the case where a schema is sent without confluent:version
return newSchema.deepEquals(newPrev);
} else {
return current.deepEquals(prev);
}
}

protected static boolean hasLatestVersion(List<SchemaReference> refs) {
return refs.stream().anyMatch(e -> e.getVersion() == -1);
}

protected static List<SchemaReference> replaceLatestVersion(
List<SchemaReference> refs, SchemaVersionFetcher fetcher) {
List<SchemaReference> result = new ArrayList<>();
for (SchemaReference ref : refs) {
if (ref.getVersion() == -1) {
Schema s = fetcher.getByVersion(ref.getSubject(), -1, false);
result.add(new SchemaReference(ref.getName(), ref.getSubject(), s.getVersion()));
} else {
result.add(ref);
}
}
return result;
}

protected static String getConfluentVersion(Metadata metadata) {
return metadata != null ? metadata.getConfluentVersion() : null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,13 @@

package io.confluent.kafka.schemaregistry;

import static io.confluent.kafka.schemaregistry.AbstractSchemaProvider.canLookupIgnoringVersion;
import static io.confluent.kafka.schemaregistry.AbstractSchemaProvider.getConfluentVersion;
import static io.confluent.kafka.schemaregistry.AbstractSchemaProvider.hasLatestVersion;
import static io.confluent.kafka.schemaregistry.AbstractSchemaProvider.replaceLatestVersion;

import com.fasterxml.jackson.databind.JsonNode;
import io.confluent.kafka.schemaregistry.client.SchemaVersionFetcher;
import io.confluent.kafka.schemaregistry.client.rest.entities.Metadata;
import io.confluent.kafka.schemaregistry.client.rest.entities.RuleSet;
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaEntity;
Expand Down Expand Up @@ -245,7 +251,7 @@ default boolean hasTopLevelField(String field) {
}

/**
* Returns whether the underlying raw representations are equal.
* Returns whether the underlying raw representations are equal, ignoring references.
*
* @return whether the underlying raw representations are equal
*/
Expand Down Expand Up @@ -282,4 +288,37 @@ default Set<String> getReservedFields() {
.filter(field -> !field.isEmpty())
.collect(Collectors.toSet());
}

/**
* Returns whether the schema can be used to lookup the specified schema.
*
* @param prev the schema to lookup
* @return whether the schema can be used to lookup the specified schema
*/
default boolean canLookup(ParsedSchema prev, SchemaVersionFetcher fetcher) {
// This schema can be used to lookup a previous schema if this schema
// has no references and the previous schema has references,
// (which can happen with Avro schemas) and the schemas are the same except
// for the previous schema possibly having a confluent:version.
if (references().isEmpty() && !prev.references().isEmpty()) {
if (canLookupIgnoringVersion(this, prev)) {
// This handles the case where a schema is sent with all references resolved
return true;
}
}
// This schema can be used to lookup a previous schema if this schema
// and the previous schema having matching references when all versions of -1
// are replaced by the latest version, and the schemas are the same except
// for the previous schema possibly having a confluent:version.
String schemaVer = getConfluentVersion(metadata());
String prevVer = getConfluentVersion(prev.metadata());
if ((schemaVer == null && prevVer != null)
|| hasLatestVersion(this.references())
|| hasLatestVersion(prev.references())) {
boolean areRefsEquivalent = replaceLatestVersion(references(), fetcher)
.equals(replaceLatestVersion(prev.references(), fetcher));
return areRefsEquivalent && canLookupIgnoringVersion(this, prev);
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ private int getIdFromRegistry(

private boolean schemasEqual(ParsedSchema schema1, ParsedSchema schema2) {
return schema1.canonicalString().equals(schema2.canonicalString())
|| schema1.deepEquals(schema2);
|| schema1.canLookup(schema2, this);
}

private void generateVersion(String subject, ParsedSchema schema) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
package io.confluent.kafka.schemaregistry.client.rest.entities;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
Expand All @@ -43,6 +45,8 @@
@JsonIgnoreProperties(ignoreUnknown = true)
public class Metadata {

public static final String CONFLUENT_VERSION = "confluent:version";

@JsonPropertyOrder(alphabetic = true)
private final SortedMap<String, SortedSet<String>> tags;
@JsonPropertyOrder(alphabetic = true)
Expand Down Expand Up @@ -134,6 +138,34 @@ public void updateHash(MessageDigest md) {
}
}

@JsonIgnore
public String getConfluentVersion() {
return getProperties() != null ? getProperties().get(CONFLUENT_VERSION) : null;
}

public static Metadata setConfluentVersion(Metadata metadata, int version) {
Map<String, String> newProps = metadata != null && metadata.getProperties() != null
? new HashMap<>(metadata.getProperties())
: new HashMap<>();
newProps.put(CONFLUENT_VERSION, String.valueOf(version));
return new Metadata(
metadata != null ? metadata.getTags() : null,
newProps,
metadata != null ? metadata.getSensitive() : null);
}

public static Metadata removeConfluentVersion(Metadata metadata) {
if (metadata == null || metadata.getProperties() == null) {
return metadata;
}
Map<String, String> newProps = new HashMap<>(metadata.getProperties());
newProps.remove(CONFLUENT_VERSION);
return new Metadata(
metadata.getTags(),
newProps,
metadata.getSensitive());
}

public static Metadata mergeMetadata(Metadata oldMetadata, Metadata newMetadata) {
if (oldMetadata == null) {
return newMetadata;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ public class KafkaSchemaRegistry implements SchemaRegistry, LeaderAwareSchemaReg
*/
public static final int MIN_VERSION = 1;
public static final int MAX_VERSION = Integer.MAX_VALUE;
public static final String CONFLUENT_VERSION = "confluent:version";
private static final Logger log = LoggerFactory.getLogger(KafkaSchemaRegistry.class);
private static final String RESERVED_FIELD_REMOVED = "The new schema has reserved field %s "
+ "removed from its metadata which is present in the old schema's metadata.";
Expand Down Expand Up @@ -711,7 +710,7 @@ public Schema register(String subject,
ParsedSchema undeletedSchema = schemaHolder.schema();
if (parsedSchema != null
&& (schemaId < 0 || schemaId == schemaValue.getId())
&& areSchemasEquivalent(parsedSchema, undeletedSchema)) {
&& parsedSchema.canLookup(undeletedSchema, this)) {
// This handles the case where a schema is sent with all references resolved
// or without confluent:version
return modifiedSchema
Expand Down Expand Up @@ -828,33 +827,6 @@ private void checkRegisterMode(
}
}

private boolean areSchemasEquivalent(ParsedSchema schema, Schema prev)
throws SchemaRegistryException {
return areSchemasEquivalent(schema, parseSchema(prev));
}

private boolean areSchemasEquivalent(ParsedSchema schema, ParsedSchema prev) {
if (schema.references().isEmpty() && !prev.references().isEmpty()) {
if (schema.deepEquals(prev)) {
// This handles the case where a schema is sent with all references resolved
return true;
}
}
String schemaVer = getConfluentVersion(schema.metadata());
String prevVer = getConfluentVersion(prev.metadata());
if (schemaVer == null && prevVer != null) {
ParsedSchema newSchema = schema.metadata() != null
? schema
: schema.copy(new Metadata(null, null, null), schema.ruleSet());
ParsedSchema newPrev = prev.copy(removeConfluentVersion(prev.metadata()), prev.ruleSet());
if (newSchema.deepEquals(newPrev)) {
// This handles the case where a schema is sent without confluent:version
return true;
}
}
return false;
}

private boolean isReadOnlyMode(String subject) throws SchemaRegistryStoreException {
Mode subjectMode = getModeInScope(subject);
return subjectMode == Mode.READONLY || subjectMode == Mode.READONLY_OVERRIDE;
Expand Down Expand Up @@ -937,7 +909,7 @@ private boolean maybeSetMetadataRuleSet(
// Set confluent:version if passed in version is not 0,
// or update confluent:version if it already exists in the metadata
if (schema.getVersion() != 0 || getConfluentVersion(mergedMetadata) != null) {
mergedMetadata = setConfluentVersion(mergedMetadata, newVersion);
mergedMetadata = Metadata.setConfluentVersion(mergedMetadata, newVersion);
}

if (mergedMetadata != null || mergedRuleSet != null) {
Expand All @@ -949,32 +921,7 @@ private boolean maybeSetMetadataRuleSet(
}

private String getConfluentVersion(Metadata metadata) {
return metadata != null && metadata.getProperties() != null
? metadata.getProperties().get(CONFLUENT_VERSION)
: null;
}

private Metadata setConfluentVersion(Metadata metadata, int version) {
Map<String, String> newProps = metadata != null && metadata.getProperties() != null
? new HashMap<>(metadata.getProperties())
: new HashMap<>();
newProps.put(CONFLUENT_VERSION, String.valueOf(version));
return new Metadata(
metadata != null ? metadata.getTags() : null,
newProps,
metadata != null ? metadata.getSensitive() : null);
}

private Metadata removeConfluentVersion(Metadata metadata) {
if (metadata == null || metadata.getProperties() == null) {
return metadata;
}
Map<String, String> newProps = new HashMap<>(metadata.getProperties());
newProps.remove(CONFLUENT_VERSION);
return new Metadata(
metadata.getTags(),
newProps,
metadata.getSensitive());
return metadata != null ? metadata.getConfluentVersion() : null;
}

public Schema registerOrForward(String subject,
Expand Down Expand Up @@ -1052,7 +999,7 @@ public Schema modifySchemaTags(String subject, Schema schema, TagSchemaRequest r
Metadata mergedMetadata = request.getMetadata() != null
? request.getMetadata()
: parsedSchema.metadata();
mergedMetadata = setConfluentVersion(mergedMetadata, newVersion);
mergedMetadata = Metadata.setConfluentVersion(mergedMetadata, newVersion);

RuleSet ruleSet = maybeModifyPreviousRuleSet(subject, request);

Expand Down Expand Up @@ -1335,7 +1282,7 @@ private Schema lookUpSchemaUnderSubject(
Schema prev = getLatestVersion(subject);
if (prev != null
&& parsedSchema != null
&& areSchemasEquivalent(parsedSchema, prev)) {
&& parsedSchema.canLookup(parseSchema(prev), this)) {
// This handles the case where a schema is sent with all references resolved
// or without confluent:version
return prev;
Expand All @@ -1347,7 +1294,7 @@ && areSchemasEquivalent(parsedSchema, prev)) {
Schema prev = get(schemaKey.getSubject(), schemaKey.getVersion(), lookupDeletedSchema);
if (prev != null
&& parsedSchema != null
&& areSchemasEquivalent(parsedSchema, prev)) {
&& parsedSchema.canLookup(parseSchema(prev), this)) {
// This handles the case where a schema is sent with all references resolved
// or without confluent:version
return prev;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.Timestamp;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema.Format;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider;
Expand Down Expand Up @@ -815,4 +816,41 @@ public void testKafkaProtobufDeserializerWithPreRegisteredUseLatestRecordNameStr
protobufSerializer.configure(new HashMap(serializerConfig), false);
testMessageDeserializer.configure(new HashMap(deserializerConfig), false);
}

@Test
public void testDependencyPreregisterRefWithNegativeOne() throws Exception {
String refSubject = "TestProto.proto";
String refSchemaString = "syntax = \"proto3\";\n"
+ "package io.confluent.kafka.serializers.protobuf.test;\n"
+ "\n"
+ "option java_package = \"io.confluent.kafka.serializers.protobuf.test\";\n"
+ "\n"
+ "message TestMessage {\n"
+ " string test_string = 1;\n"
+ "}\n";
schemaRegistry.register(refSubject, new ProtobufSchema(refSchemaString));
String subject = topic + "-value";
String schemaString = "syntax = \"proto3\";\n"
+ "package io.confluent.kafka.serializers.protobuf.test;\n"
+ "\n"
+ "import \"TestProto.proto\";\n"
+ "\n"
+ "option java_package = \"io.confluent.kafka.serializers.protobuf.test\";\n"
+ "\n"
+ "message DependencyMessage {\n"
+ " TestMessage test_message = 1;\n"
+ " bool is_active = 2;\n"
+ "}";
SchemaReference ref = new SchemaReference("TestProto.proto", "TestProto.proto", -1);
ParsedSchema parsedSchema = new ProtobufSchema(
schemaString, ImmutableList.of(ref), ImmutableMap.of(ref.getName(), refSchemaString), null, null);
schemaRegistry.register(subject, parsedSchema);

ParsedSchema schema = schemaRegistry.getSchemaBySubjectAndId("test-value", 2);
SchemaReference refCopy = new SchemaReference("TestProto.proto", "TestProto.proto", -1);
schema = schemaRegistry.parseSchema(ProtobufSchema.TYPE, schema.canonicalString(), ImmutableList.of(refCopy)).get();
int id = schemaRegistry.getId(subject, schema);
assertEquals(2, id);
}

}

0 comments on commit d443720

Please sign in to comment.