Fix: Handle non-UTF-8 headers in Kafka/Confluent message parsing#2459
Fix: Handle non-UTF-8 headers in Kafka/Confluent message parsing#2459Bazarovinc wants to merge 5 commits intoag2ai:mainfrom
Conversation
|
Nikita Veselenko seems not to be a GitHub user. You need a GitHub account to be able to sign the CLA. If you have already a GitHub account, please add the email address used for this commit to your account. You have signed the CLA already but the status is still pending? Let us recheck it. |
|
@Bazarovinc Please look at this #2214 issue as well and add comprehensive tests. |
|
@draincoder Here’s a preliminary implementation for handling Kafka message headers. Please review and let me know if this approach looks good or if any adjustments are needed. |
| headers.get("reply_to"), | ||
| headers.get("content-type"), | ||
| headers.get("correlation_id"), |
There was a problem hiding this comment.
what does headers.get return? Can you show example or write type?
There was a problem hiding this comment.
A small note: comprehension iterates over values (bytes) rather than pairs (key, value), so the expression for header, value in (...) will raise a ValueError.
It is worth iterating over headers.items() or decoding values using a separate helper.
decoded_headers = {
k: v.decode(errors="replace")
for k, v in headers.items()
if k in ("reply_to", "content-type", "correlation_id") and v
}Or, even better and clearer, since iteration here seems excessive to me and makes the code difficult to understand. See https://github.com/ag2ai/faststream/pull/2459/files/d0ee2c6911627934e78d958536e428ebb2cb1ec6#r2427545938
reply_to = self._decode_header(headers, "reply_to")
content_type = self._decode_header(headers, "content-type")
correlation_id = self._decode_header(headers, "correlation_id")| reply_to=headers.get("reply_to", ""), | ||
| content_type=headers.get("content-type"), | ||
| reply_to=headers.get("reply_to").decode() | ||
| if "content-type" in headers |
There was a problem hiding this comment.
content-type -> reply_to
| reply_to=headers.get("reply_to").decode() | ||
| if "content-type" in headers | ||
| else None, | ||
| content_type=headers.get("content-type").decode() | ||
| if "content-type" in headers | ||
| else None, | ||
| message_id=f"{first.offset()}-{last.offset()}-{first_timestamp}", | ||
| correlation_id=headers.get("correlation_id"), | ||
| correlation_id=headers.get("correlation_id").decode() | ||
| if "correlation_id" in headers | ||
| else None, |
There was a problem hiding this comment.
What do u this about this way for better readable:
reply_to=headers.get("reply_to", b"").decode() or None
content_type=headers.get("content-type", b"").decode() or None
correlation_id=headers.get("correlation_id", b"").decode() or NoneThere was a problem hiding this comment.
I suggest doing it this way
@staticmethod
def _decode_header(headers: dict[str, bytes | None], key: str) -> str | None:
val = headers.get(key)
return val.decode(errors="replace") if val else Noneand then, where we need to get the value, we do this
reply_to=self._decode_header(headers, "reply_to")
content_type=self._decode_header(headers, "content-type")
correlation_id=self._decode_header(headers, "correlation_id")This is already applicable in parse_message and parse_message_batch from what I've noticed.
This PR addresses the UnicodeDecodeError that occurred when processing Kafka messages containing headers with non-UTF-8 byte sequences
Fixes #2458, FIxes #2214
Type of change
Checklist
just lintshows no errors)just test-coveragejust static-analysis