@@ -232,12 +232,22 @@ def to_records(self, mode: Literal["json", "python"]) -> List[Dict[str, Any]]:
232232 StructField ,
233233 )
234234
235- def binary_to_string_repr (binary_data : Optional [bytearray ]) -> Optional [str ]:
236- """Convert binary data to Python string representation (e.g., b'hello')."""
235+ def binary_to_string_repr (
236+ binary_data : Optional [Union [bytes , bytearray ]]
237+ ) -> Optional [str ]:
238+ """Convert binary data to Python string representation (e.g., b'hello').
239+
240+ Args:
241+ binary_data: Binary data as bytes or bytearray. PySpark passes BinaryType
242+ as bytearray by default, but Spark 4.1+ with
243+ spark.sql.execution.pyspark.binaryAsBytes=true passes bytes instead.
244+ """
237245 if binary_data is None :
238246 return None
239247 return str (bytes (binary_data ))
240248
249+ binary_udf = F .udf (binary_to_string_repr , StringType ())
250+
241251 def select_column (field : StructField ) -> Column :
242252 col = F .col (field .name )
243253 # Numbers are already JSON-serialise, except Decimal
@@ -248,8 +258,9 @@ def select_column(field: StructField) -> Column:
248258
249259 # We slice binary field before converting to string representation
250260 if isinstance (field .dataType , BinaryType ):
251- sliced = F .substring (F .col (field .name ), 1 , keep_bytes )
252- binary_udf = F .udf (binary_to_string_repr , StringType ())
261+ # Each byte becomes up to 4 chars (\xNN) in string repr, plus b'' overhead
262+ max_binary_bytes = (MAX_STRING_CELL_LENGTH - 3 ) // 4
263+ sliced = F .substring (F .col (field .name ), 1 , max_binary_bytes )
253264 return binary_udf (sliced )
254265
255266 # String just needs to be trimmed
@@ -259,8 +270,6 @@ def select_column(field: StructField) -> Column:
259270 # Everything else gets stringified (Decimal, Date, Timestamp, Struct, …)
260271 return F .substring (col .cast ("string" ), 1 , MAX_STRING_CELL_LENGTH )
261272
262- keep_bytes = (MAX_STRING_CELL_LENGTH // 4 ) * 3
263-
264273 if mode == "python" :
265274 return [row .asDict () for row in self ._df .collect ()]
266275 elif mode == "json" :
0 commit comments