Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions schemaregistry/serde/protobuf.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
file_google_protobuf_timestamp,
file_google_protobuf_type,
file_google_protobuf_wrappers,
DescriptorProto,
FieldDescriptorProto,
FileDescriptorProto,
FileDescriptorProtoSchema
} from "@bufbuild/protobuf/wkt";
Expand Down Expand Up @@ -458,6 +460,7 @@
}

function newFileRegistry(fileDesc: FileDescriptorProto, deps: Map<string, string>): FileRegistry {
fullyQualifyTypeNames(fileDesc)
const resolve = (depName: string) => {
if (isBuiltin(depName)) {
const dep = builtinDeps.get(depName)
Expand All @@ -472,12 +475,91 @@
}
const fileDesc = fromBinary(FileDescriptorProtoSchema, Buffer.from(dep, 'base64'))
fileDesc.name = depName
fullyQualifyTypeNames(fileDesc)
return fileDesc
}
}
return createFileRegistry(fileDesc, resolve)
}

/**
* Fully qualifies relative type_name references in a FileDescriptorProto.
* The schema registry may return FileDescriptorProto with relative type names
* (e.g., "MyEnum" instead of ".test.MyEnum"), which createFileRegistry cannot resolve.
*/
function fullyQualifyTypeNames(fileDesc: FileDescriptorProto): void {
const prefix = fileDesc.package ? `.${fileDesc.package}` : ''

const allTypes = new Set<string>()
for (const enumProto of fileDesc.enumType) {
allTypes.add(`${prefix}.${enumProto.name}`)
}
for (const msgProto of fileDesc.messageType) {
collectTypes(msgProto, prefix, allTypes)
}

for (const msgProto of fileDesc.messageType) {
resolveFieldTypeNames(msgProto, prefix, allTypes)
}
for (const extProto of fileDesc.extension) {
resolveTypeName(extProto, prefix, allTypes)
}
}

function collectTypes(
msg: DescriptorProto, parentScope: string, allTypes: Set<string>
): void {
const scope = `${parentScope}.${msg.name}`
allTypes.add(scope)
for (const enumProto of msg.enumType) {
allTypes.add(`${scope}.${enumProto.name}`)
}
for (const nestedMsg of msg.nestedType) {
collectTypes(nestedMsg, scope, allTypes)
}
}

function resolveFieldTypeNames(
msg: DescriptorProto, parentScope: string, allTypes: Set<string>
): void {
const scope = `${parentScope}.${msg.name}`
for (const field of msg.field) {
resolveTypeName(field, scope, allTypes)
}
for (const ext of msg.extension) {
resolveTypeName(ext, scope, allTypes)
}
for (const nestedMsg of msg.nestedType) {
resolveFieldTypeNames(nestedMsg, scope, allTypes)
}
}

function resolveTypeName(
field: FieldDescriptorProto, scope: string, allTypes: Set<string>
): void {
if (!field.typeName || field.typeName.startsWith('.')) {
return
}
let currentScope = scope
while (currentScope !== '') {
const candidate = `${currentScope}.${field.typeName}`
if (allTypes.has(candidate)) {
field.typeName = candidate
return
}
const lastDot = currentScope.lastIndexOf('.')
currentScope = lastDot >= 0 ? currentScope.substring(0, lastDot) : ''
}
const rootCandidate = `.${field.typeName}`
if (allTypes.has(rootCandidate)) {
field.typeName = rootCandidate
Comment thread
rayokota marked this conversation as resolved.
} else if (field.typeName.includes('.')) {
// Cross-file reference that is package-qualified but missing the leading dot.
// Prefix with '.' so createFileRegistry can resolve it via dependencies.
field.typeName = rootCandidate
}

Check warning on line 560 in schemaregistry/serde/protobuf.ts

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

This branch's code block is the same as the block for the branch on line 554.

[S1871] Two branches in a conditional structure should not have exactly the same implementation See more on https://sonarqube.confluent.io/project/issues?id=confluent-kafka-javascript&pullRequest=462&issues=c27bfa77-6fa6-49f2-b422-305e4791a974&open=c27bfa77-6fa6-49f2-b422-305e4791a974
}

async function transform(ctx: RuleContext, descriptor: DescMessage, msg: any, fieldTransform: FieldTransform): Promise<any> {
if (msg == null || descriptor == null) {
return msg
Expand Down
104 changes: 102 additions & 2 deletions schemaregistry/test/serde/protobuf.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import {
ProtobufDeserializer, ProtobufDeserializerConfig,
ProtobufSerializer, ProtobufSerializerConfig,
} from "../../serde/protobuf";
import {HeaderSchemaIdSerializer, SerdeType} from "../../serde/serde";
import {HeaderSchemaIdSerializer, SchemaId, SerdeType} from "../../serde/serde";
import {
Comment thread
rayokota marked this conversation as resolved.
Rule,
RuleMode,
Expand All @@ -16,7 +16,11 @@ import {LocalKmsDriver} from "../../rules/encryption/localkms/local-driver";
import {EncryptionExecutor, FieldEncryptionExecutor} from "../../rules/encryption/encrypt-executor";
import {AuthorSchema, file_test_schemaregistry_serde_example, PizzaSchema} from "./test/example_pb";
import {create, toBinary} from "@bufbuild/protobuf";
import {FileDescriptorProtoSchema} from "@bufbuild/protobuf/wkt";
import {
FieldDescriptorProto_Label,
FieldDescriptorProto_Type,
FileDescriptorProtoSchema
} from "@bufbuild/protobuf/wkt";
import {
NestedMessage_InnerMessageSchema
} from "./test/nested_pb";
Expand Down Expand Up @@ -106,6 +110,102 @@ describe('ProtobufSerializer', () => {
let obj2 = await deser.deserialize(topic, bytes)
expect(obj2).toEqual(obj)
})
it('deserialize with relative type names in FileDescriptorProto', async () => {
const conf: ClientConfig = { baseURLs: [baseURL], cacheCapacity: 1000 }
const client = SchemaRegistryClient.newClient(conf)

Comment thread
rayokota marked this conversation as resolved.
// Build a FileDescriptorProto with various relative type name forms,
// simulating what the schema registry returns for format=serialized
const fileDescProto = create(FileDescriptorProtoSchema, {
name: 'top.proto',
package: 'test',
syntax: 'proto3',
dependency: ['google/protobuf/timestamp.proto'],
messageType: [{
name: 'MyMessage',
field: [{
name: 'my_field',
number: 1,
type: FieldDescriptorProto_Type.ENUM,
label: FieldDescriptorProto_Label.OPTIONAL,
typeName: 'MyEnum', // case 1: unqualified relative name
proto3Optional: true,
}, {
Comment thread
rayokota marked this conversation as resolved.
name: 'my_field2',
number: 2,
type: FieldDescriptorProto_Type.ENUM,
label: FieldDescriptorProto_Label.OPTIONAL,
typeName: 'test.MyEnum', // case 2: package-qualified, missing leading dot
proto3Optional: true,
}, {
name: 'my_nested',
number: 3,
type: FieldDescriptorProto_Type.MESSAGE,
label: FieldDescriptorProto_Label.OPTIONAL,
typeName: 'Outer.Inner', // case 3: nested type reference
proto3Optional: true,
}, {
name: 'my_ts',
number: 4,
type: FieldDescriptorProto_Type.MESSAGE,
label: FieldDescriptorProto_Label.OPTIONAL,
typeName: 'google.protobuf.Timestamp', // case 4: cross-file, missing leading dot
proto3Optional: true,
}],
oneofDecl: [
{ name: '_my_field' },
{ name: '_my_field2' },
{ name: '_my_nested' },
{ name: '_my_ts' },
],
}, {
name: 'Outer',
nestedType: [{
name: 'Inner',
field: [{
name: 'value',
number: 1,
type: FieldDescriptorProto_Type.INT32,
label: FieldDescriptorProto_Label.OPTIONAL,
proto3Optional: true,
Comment thread
rayokota marked this conversation as resolved.
}],
oneofDecl: [{ name: '_value' }],
}],
}],
enumType: [{
name: 'MyEnum',
value: [
{ name: 'MY_ENUM_UNSPECIFIED', number: 0 },
{ name: 'MY_ENUM_FOO', number: 1 },
]
}]
})
Comment thread
rayokota marked this conversation as resolved.

const schema = Buffer.from(toBinary(FileDescriptorProtoSchema, fileDescProto)).toString('base64')
const info: SchemaInfo = { schemaType: 'PROTOBUF', schema }
const schemaId = await client.register(subject, info, false)

// MyMessage { my_field = 1, my_field2 = 1, my_nested = { value = 42 }, my_ts = { seconds = 1000 } }
// field 1 varint 1: 0x08, 0x01
// field 2 varint 1: 0x10, 0x01
// field 3 LEN {08 2A}: 0x1a, 0x02, 0x08, 0x2a (Inner { value = 42 })
// field 4 LEN {08 e807}: 0x22, 0x03, 0x08, 0xe8, 0x07 (Timestamp { seconds = 1000 })
const msgBytes = Buffer.from([
0x08, 0x01,
0x10, 0x01,
0x1a, 0x02, 0x08, 0x2a,
0x22, 0x03, 0x08, 0xe8, 0x07,
])
const sid = new SchemaId('PROTOBUF', schemaId, undefined, [0])
const buf = Buffer.concat([sid.idToBytes(), msgBytes])

const deser = new ProtobufDeserializer(client, SerdeType.VALUE, {})
const result = await deser.deserialize(topic, buf)
expect(result.myField).toEqual(1) // MY_ENUM_FOO = 1
expect(result.myField2).toEqual(1) // MY_ENUM_FOO = 1 (package-qualified)
expect(result.myNested.value).toEqual(42) // nested type
expect(result.myTs.seconds).toEqual(BigInt(1000)) // cross-file Timestamp
})
it('serialize nested messsage', async () => {
let conf: ClientConfig = {
baseURLs: [baseURL],
Expand Down