Skip to content

[improve][client] PIP-420: Supports users implement external schemas #24488

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,7 @@ zookeeperSessionExpiredPolicy=reconnect
systemTopicEnabled=true

# Deploy the schema compatibility checker for a specific schema type to enforce schema compatibility check
schemaRegistryCompatibilityCheckers=org.apache.pulsar.broker.service.schema.JsonSchemaCompatibilityCheck,org.apache.pulsar.broker.service.schema.AvroSchemaCompatibilityCheck,org.apache.pulsar.broker.service.schema.ProtobufNativeSchemaCompatibilityCheck
schemaRegistryCompatibilityCheckers=org.apache.pulsar.broker.service.schema.JsonSchemaCompatibilityCheck,org.apache.pulsar.broker.service.schema.AvroSchemaCompatibilityCheck,org.apache.pulsar.broker.service.schema.ProtobufNativeSchemaCompatibilityCheck,org.apache.pulsar.broker.service.schema.ExternalSchemaCompatibilityCheck

# The schema compatibility strategy is used for system topics.
# Available values: ALWAYS_INCOMPATIBLE, ALWAYS_COMPATIBLE, BACKWARD, FORWARD, FULL, BACKWARD_TRANSITIVE, FORWARD_TRANSITIVE, FULL_TRANSITIVE
Expand Down
2 changes: 1 addition & 1 deletion conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ brokerClientTlsProtocols=
systemTopicEnabled=true

# Deploy the schema compatibility checker for a specific schema type to enforce schema compatibility check
schemaRegistryCompatibilityCheckers=org.apache.pulsar.broker.service.schema.JsonSchemaCompatibilityCheck,org.apache.pulsar.broker.service.schema.AvroSchemaCompatibilityCheck,org.apache.pulsar.broker.service.schema.ProtobufNativeSchemaCompatibilityCheck
schemaRegistryCompatibilityCheckers=org.apache.pulsar.broker.service.schema.JsonSchemaCompatibilityCheck,org.apache.pulsar.broker.service.schema.AvroSchemaCompatibilityCheck,org.apache.pulsar.broker.service.schema.ProtobufNativeSchemaCompatibilityCheck,org.apache.pulsar.broker.service.schema.ExternalSchemaCompatibilityCheck

# The schema compatibility strategy is used for system topics.
# Available values: ALWAYS_INCOMPATIBLE, ALWAYS_COMPATIBLE, BACKWARD, FORWARD, FULL, BACKWARD_TRANSITIVE, FORWARD_TRANSITIVE, FULL_TRANSITIVE
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service.schema;

import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.schema.SchemaType;

public class ExternalSchemaCompatibilityCheck implements SchemaCompatibilityCheck {

@Override
public SchemaType getSchemaType() {
return SchemaType.EXTERNAL;
}

@Override
public void checkCompatible(SchemaData from, SchemaData to, SchemaCompatibilityStrategy strategy)
throws IncompatibleSchemaException {
if (strategy == SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE) {
return;
}
if ((SchemaType.EXTERNAL.equals(from.getType()) || SchemaType.EXTERNAL.equals(to.getType()))
&& !from.getType().equals(to.getType())) {
throw new IncompatibleSchemaException("External schema is not compatible with the other schema types.");
}
}

@Override
public void checkCompatible(Iterable<SchemaData> from, SchemaData to, SchemaCompatibilityStrategy strategy)
throws IncompatibleSchemaException {
if (strategy == SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE) {
return;
}
while (from.iterator().hasNext()) {
SchemaData fromSchema = from.iterator().next();
checkCompatible(fromSchema, to, strategy);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ static void validateSchemaData(SchemaData schemaData) throws InvalidSchemaDataEx
break;
case NONE:
case BYTES:
// `NONE` and `BYTES` schema is not stored
case EXTERNAL:
// `NONE`, `BYTES` and `EXTERNAL` schema is not stored
break;
case AUTO:
case AUTO_CONSUME:
Expand Down
1 change: 1 addition & 0 deletions pulsar-broker/src/main/proto/SchemaRegistryFormat.proto
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ message SchemaInfo {
LOCALTIME = 19;
LOCALDATETIME = 20;
PROTOBUFNATIVE = 21;
EXTERNAL = 22;
}
message KeyValuePair {
required string key = 1;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service.schema;

import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.schema.SchemaType;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import static org.testng.Assert.fail;

@Test(groups = "broker")
public class ExternalSchemaCompatibilityCheckTest {

private final ExternalSchemaCompatibilityCheck externalSchemaCompatibilityCheck = new ExternalSchemaCompatibilityCheck();

private final SchemaData externalSchemaData = SchemaData.builder()
.type(SchemaType.EXTERNAL)
.data(new byte[0])
.build();

@DataProvider(name = "otherSchemasProvider")
public Object[] otherSchemasProvider() {
return new Object[] {
SchemaData.builder()
.type(SchemaType.JSON)
.build(),
SchemaData.builder()
.type(SchemaType.AVRO)
.build(),
SchemaData.builder()
.type(SchemaType.PROTOBUF)
.build(),
SchemaData.builder()
.type(SchemaType.PROTOBUF_NATIVE)
.build()
};
}

@Test(dataProvider = "otherSchemasProvider")
public void testExternalSchemaCompatibilityCheck(SchemaData schemaData) {
try {
externalSchemaCompatibilityCheck.checkCompatible(
schemaData, externalSchemaData, SchemaCompatibilityStrategy.FULL);
fail("Expected IncompatibleSchemaException not thrown");
} catch (IncompatibleSchemaException e) {
// Expected exception, as external schema is not compatible with the other schemas
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.schema;

import lombok.Getter;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;

public class MockExternalJsonSchema<T> implements Schema<T> {

@Getter
private SchemaInfoProvider schemaInfoProvider;
private final Class<T> clazz;
@Getter
private boolean isClosed;

public MockExternalJsonSchema(Class<T> clazz) {
this.clazz = clazz;
}

@Override
public byte[] encode(T message) {
// the external schema should register schema when encoding the message, this is just a mock implementation
return new byte[0];
}

@Override
public T decode(byte[] bytes) {
// the external schema should retrieve the schema and decoding the payload, this is just a mock implementation
return null;
}

@Override
public SchemaInfo getSchemaInfo() {
return SchemaInfoImpl.builder()
.name("")
.type(SchemaType.EXTERNAL)
.schema(new byte[0])
.build();
}

@Override
public void setSchemaInfoProvider(SchemaInfoProvider schemaInfoProvider) {
this.schemaInfoProvider = schemaInfoProvider;
}

@Override
public Schema clone() {
return new MockExternalJsonSchema(clazz);
}

@Override
public void close() {
this.schemaInfoProvider = null;
this.isClosed = true;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,14 @@
import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Sets;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -37,6 +41,8 @@
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
Expand All @@ -51,6 +57,7 @@
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.schema.Schemas;
import org.apache.pulsar.schema.MockExternalJsonSchema;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
Expand Down Expand Up @@ -622,6 +629,95 @@ public void testConsumerWithNotCompatibilitySchema(SchemaCompatibilityStrategy s
}

}

@Test
public void testExternalSchemaTypeCompatibility() throws Exception {
String namespace = "test-namespace-" + randomName(16);
admin.namespaces().createNamespace(
PUBLIC_TENANT + "/" + namespace,
Sets.newHashSet(CLUSTER_NAME)
);

NamespaceName namespaceName = NamespaceName.get(PUBLIC_TENANT, namespace);
admin.namespaces().setSchemaCompatibilityStrategy(namespaceName.toString(), SchemaCompatibilityStrategy.FULL);

final String topic = "persistent://" + PUBLIC_TENANT + "/" + namespace + "/testExternalSchemaTypeCompatibility";

MockExternalJsonSchema<Schemas.PersonThree> externalJsonSchema =
new MockExternalJsonSchema<>(Schemas.PersonThree.class);

Map<String, String> schemaConfigs = new HashMap<>();
schemaConfigs.put("schema.registry.url", "http://localhost:8080");

PulsarClient client = PulsarClient.builder()
.serviceUrl(lookupUrl.toString())
.schemaProperties(schemaConfigs)
.build();

// Existing topic schema is JSON, new schema can't be EXTERNAL
Producer<Schemas.PersonThree> producer = client
.newProducer(Schema.JSON(Schemas.PersonThree.class))
.topic(topic)
.create();
producer.close();

try (Producer<Schemas.PersonThree> ignored = client
.newProducer(externalJsonSchema)
.topic(topic)
.create()) {
fail("Should not be able to create producer with incompatible schema.");
} catch (Exception e) {
assertTrue(e instanceof PulsarClientException.IncompatibleSchemaException);
assertTrue(e.getMessage().contains(
"Incompatible schema: exists schema type JSON, new schema type EXTERNAL"));
}
try (Consumer<Schemas.PersonThree> ignored = client
.newConsumer(externalJsonSchema)
.topic(topic)
.subscriptionName("sub")
.subscribe()) {
fail("Should not be able to create consumer with incompatible schema.");
} catch (Exception e) {
assertTrue(e instanceof PulsarClientException.IncompatibleSchemaException);
assertTrue(e.getMessage().contains(
"Incompatible schema: exists schema type JSON, new schema type EXTERNAL"));
}
admin.topics().delete(topic);

producer = client
.newProducer(externalJsonSchema)
.topic(topic)
.create();
assertNotNull(externalJsonSchema.getSchemaInfoProvider());
assertEquals(externalJsonSchema.getSchemaInfoProvider().getConfigs(), schemaConfigs);
producer.close();
assertNull(externalJsonSchema.getSchemaInfoProvider());
assertTrue(externalJsonSchema.isClosed());

try (Producer<Schemas.PersonThree> ignored = client
.newProducer(Schema.JSON(Schemas.PersonThree.class))
.topic(topic)
.create()) {
fail("Should not be able to create producer with incompatible schema.");
} catch (Exception e) {
assertTrue(e instanceof PulsarClientException.IncompatibleSchemaException);
assertTrue(e.getMessage().contains(
"Incompatible schema: exists schema type EXTERNAL, new schema type JSON"));
}
try (Consumer<Schemas.PersonThree> ignored = client
.newConsumer(Schema.JSON(Schemas.PersonThree.class))
.topic(topic)
.subscriptionName("sub")
.subscribe()) {
fail("Should not be able to create consumer with incompatible schema.");
} catch (Exception e) {
assertTrue(e instanceof PulsarClientException.IncompatibleSchemaException);
assertTrue(e.getMessage().contains(
"Incompatible schema: exists schema type EXTERNAL, new schema type JSON"));
}
admin.topics().delete(topic);
}

public static String randomName(int numChars) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < numChars; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -679,4 +679,13 @@ ClientBuilder authentication(String authPluginClassName, Map<String, String> aut
* - The `loadManagerClassName` config in broker is a class that implements the `ExtensibleLoadManager` interface
*/
ClientBuilder lookupProperties(Map<String, String> properties);

/**
* Set the properties used for schema.
* <p>
* These properties will be used to configure the schema registry client.
* </p>
* @param properties schema registry properties
*/
ClientBuilder schemaProperties(Map<String, String> properties);
}
Loading
Loading