Skip to content

Commit 52dbd6f

Browse files
rayokotafangnx
authored andcommitted
DGS-22077 Handle evolution during field transformation (#2121)
* DGS-22077 Handle evolution during field transformation * Minor fix * Minor cleanup * Add new header
1 parent caca3cd commit 52dbd6f

File tree

5 files changed

+134
-6
lines changed

5 files changed

+134
-6
lines changed

src/confluent_kafka/schema_registry/_async/schema_registry_client.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -438,9 +438,10 @@ async def send_request(
438438
" application/json"}
439439

440440
if body is not None:
441-
body = json.dumps(body)
442-
headers = {'Content-Length': str(len(body)),
443-
'Content-Type': "application/vnd.schemaregistry.v1+json"}
441+
body_str = json.dumps(body)
442+
headers = {'Content-Length': str(len(body_str)),
443+
'Content-Type': "application/vnd.schemaregistry.v1+json",
444+
'Confluent-Accept-Unknown-Properties': "true"}
444445

445446
if self.bearer_auth_credentials_source:
446447
await self.handle_bearer_auth(headers)

src/confluent_kafka/schema_registry/_sync/schema_registry_client.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -438,9 +438,10 @@ def send_request(
438438
" application/json"}
439439

440440
if body is not None:
441-
body = json.dumps(body)
442-
headers = {'Content-Length': str(len(body)),
443-
'Content-Type': "application/vnd.schemaregistry.v1+json"}
441+
body_str = json.dumps(body)
442+
headers = {'Content-Length': str(len(body_str)),
443+
'Content-Type': "application/vnd.schemaregistry.v1+json",
444+
'Confluent-Accept-Unknown-Properties': "true"}
444445

445446
if self.bearer_auth_credentials_source:
446447
self.handle_bearer_auth(headers)

src/confluent_kafka/schema_registry/common/avro.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,8 @@ def transform(
121121
elif schema_type == 'record':
122122
fields = schema["fields"]
123123
for field in fields:
124+
if field["name"] not in message:
125+
continue
124126
_transform_field(ctx, schema, field, message, field_transform)
125127
return message
126128

tests/schema_registry/_async/test_avro_serdes.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -721,6 +721,68 @@ async def test_avro_cel_field_transform():
721721
assert obj2 == newobj
722722

723723

724+
async def test_avro_cel_field_transform_missing_prop():
725+
conf = {'url': _BASE_URL}
726+
client = AsyncSchemaRegistryClient.new_client(conf)
727+
ser_conf = {'auto.register.schemas': False, 'use.latest.version': True}
728+
schema = {
729+
'type': 'record',
730+
'name': 'test',
731+
'fields': [
732+
{'name': 'intField', 'type': 'int'},
733+
{'name': 'doubleField', 'type': 'double'},
734+
{'name': 'stringField', 'type': 'string'},
735+
{'name': 'booleanField', 'type': 'boolean'},
736+
{'name': 'bytesField', 'type': 'bytes'},
737+
{'name': 'missing', 'type': ['null', 'string'], 'default': None},
738+
]
739+
}
740+
741+
rule = Rule(
742+
"test-cel",
743+
"",
744+
RuleKind.TRANSFORM,
745+
RuleMode.WRITEREAD,
746+
"CEL_FIELD",
747+
None,
748+
None,
749+
"name == 'stringField' ; value + '-suffix'",
750+
None,
751+
None,
752+
False
753+
)
754+
await client.register_schema(_SUBJECT, Schema(
755+
json.dumps(schema),
756+
"AVRO",
757+
[],
758+
None,
759+
RuleSet(None, [rule])
760+
))
761+
762+
obj = {
763+
'intField': 123,
764+
'doubleField': 45.67,
765+
'stringField': 'hi',
766+
'booleanField': True,
767+
'bytesField': b'foobar',
768+
}
769+
ser = await AsyncAvroSerializer(client, schema_str=None, conf=ser_conf)
770+
ser_ctx = SerializationContext(_TOPIC, MessageField.VALUE)
771+
obj_bytes = await ser(obj, ser_ctx)
772+
773+
obj2 = {
774+
'intField': 123,
775+
'doubleField': 45.67,
776+
'stringField': 'hi-suffix-suffix',
777+
'booleanField': True,
778+
'bytesField': b'foobar',
779+
'missing': None,
780+
}
781+
deser = await AsyncAvroDeserializer(client)
782+
newobj = await deser(obj_bytes, ser_ctx)
783+
assert obj2 == newobj
784+
785+
724786
async def test_avro_cel_field_transform_disable():
725787
conf = {'url': _BASE_URL}
726788
client = AsyncSchemaRegistryClient.new_client(conf)

tests/schema_registry/_sync/test_avro_serdes.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -721,6 +721,68 @@ def test_avro_cel_field_transform():
721721
assert obj2 == newobj
722722

723723

724+
def test_avro_cel_field_transform_missing_prop():
725+
conf = {'url': _BASE_URL}
726+
client = SchemaRegistryClient.new_client(conf)
727+
ser_conf = {'auto.register.schemas': False, 'use.latest.version': True}
728+
schema = {
729+
'type': 'record',
730+
'name': 'test',
731+
'fields': [
732+
{'name': 'intField', 'type': 'int'},
733+
{'name': 'doubleField', 'type': 'double'},
734+
{'name': 'stringField', 'type': 'string'},
735+
{'name': 'booleanField', 'type': 'boolean'},
736+
{'name': 'bytesField', 'type': 'bytes'},
737+
{'name': 'missing', 'type': ['null', 'string'], 'default': None},
738+
]
739+
}
740+
741+
rule = Rule(
742+
"test-cel",
743+
"",
744+
RuleKind.TRANSFORM,
745+
RuleMode.WRITEREAD,
746+
"CEL_FIELD",
747+
None,
748+
None,
749+
"name == 'stringField' ; value + '-suffix'",
750+
None,
751+
None,
752+
False
753+
)
754+
client.register_schema(_SUBJECT, Schema(
755+
json.dumps(schema),
756+
"AVRO",
757+
[],
758+
None,
759+
RuleSet(None, [rule])
760+
))
761+
762+
obj = {
763+
'intField': 123,
764+
'doubleField': 45.67,
765+
'stringField': 'hi',
766+
'booleanField': True,
767+
'bytesField': b'foobar',
768+
}
769+
ser = AvroSerializer(client, schema_str=None, conf=ser_conf)
770+
ser_ctx = SerializationContext(_TOPIC, MessageField.VALUE)
771+
obj_bytes = ser(obj, ser_ctx)
772+
773+
obj2 = {
774+
'intField': 123,
775+
'doubleField': 45.67,
776+
'stringField': 'hi-suffix-suffix',
777+
'booleanField': True,
778+
'bytesField': b'foobar',
779+
'missing': None,
780+
}
781+
deser = AvroDeserializer(client)
782+
newobj = deser(obj_bytes, ser_ctx)
783+
assert obj2 == newobj
784+
785+
724786
def test_avro_cel_field_transform_disable():
725787
conf = {'url': _BASE_URL}
726788
client = SchemaRegistryClient.new_client(conf)

0 commit comments

Comments
 (0)