-
-
Notifications
You must be signed in to change notification settings - Fork 6
Closed
Description
Difficile de dire si les deux sont liés, mais on a deux soucis d'ingestion avec les données de prod :
- la lecture des chunks renvoie, pour de nombreux fichiers, cette erreur :
Exception ignored in: <generator object utf8_encoder at 0x7eaa41717de0>
Traceback (most recent call last):
File "/home/colin/git/decp-processing/.venv/lib/python3.13/site-packages/ijson/backends/python.py", line 46, in utf8_encoder
target.close()
File "/home/colin/git/decp-processing/.venv/lib/python3.13/site-packages/ijson/backends/python.py", line 88, in Lexer
raise common.IncompleteJSONError('Incomplete string lexeme')
ijson.common.IncompleteJSONError: Incomplete string lexeme
- pour moins de fichiers (mais ça touche decp-2025.json, touché également par l'erreur précédente), il y a ça :
12:16:23.819 | ERROR | Task run 'json_stream_to_parquet-40f' - Task run failed with exception: UnexpectedSymbol("Unexpected symbol 'N' at 269176659") - Retries are exhausted
Traceback (most recent call last):
File "/home/colin/git/decp-processing/.venv/lib/python3.13/site-packages/ijson/backends/python.py", line 225, in parse_value
number = to_number(symbol)
File "/home/colin/git/decp-processing/.venv/lib/python3.13/site-packages/ijson/common.py", line 209, in integer_or_float
return int(str_value)
ValueError: invalid literal for int() with base 10: 'N'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/colin/git/decp-processing/.venv/lib/python3.13/site-packages/prefect/task_engine.py", line 813, in run_context
yield self
File "/home/colin/git/decp-processing/.venv/lib/python3.13/site-packages/prefect/task_engine.py", line 1389, in run_task_sync
engine.call_task_fn(txn)
~~~~~~~~~~~~~~~~~~~^^^^^
File "/home/colin/git/decp-processing/.venv/lib/python3.13/site-packages/prefect/task_engine.py", line 830, in call_task_fn
result = call_with_parameters(self.task.fn, parameters)
File "/home/colin/git/decp-processing/.venv/lib/python3.13/site-packages/prefect/utilities/callables.py", line 210, in call_with_parameters
return fn(*args, **kwargs)
File "/home/colin/git/decp-processing/src/tasks/get.py", line 137, in json_stream_to_parquet
decp_schema.coroutine_ijson.send(text.encode('utf-8'))
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^
File "/home/colin/git/decp-processing/.venv/lib/python3.13/site-packages/ijson/backends/python.py", line 44, in utf8_encoder
send(sdata)
~~~~^^^^^^^
File "/home/colin/git/decp-processing/.venv/lib/python3.13/site-packages/ijson/backends/python.py", line 103, in Lexer
send((discarded + match.start(), lexeme))
~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/colin/git/decp-processing/.venv/lib/python3.13/site-packages/ijson/backends/python.py", line 231, in parse_value
raise UnexpectedSymbol(symbol, pos)
ijson.backends.python.UnexpectedSymbol: Unexpected symbol 'N' at 269176659
Le nombre d'octets 269176659 correspond malheureusement à la taille du fichier, pas à l'emplacement de l'erreur.
On dirait qu'un NaN n'est pas remplacé. Peut-être parce qu'il est coupé entre deux chunks ?
Peut-être que le replace des NaN échoue à cause de soucis d'encodage.
J'ai tenté ça dans json_stream_to_parquet(), sans succès :
with tempfile.NamedTemporaryFile(mode="wb", suffix=".ndjson") as tmp_file:
chunk_iter = stream_get(url)
decoder = getincrementaldecoder('utf-8')()
# In first iteration, will find the right format
chunk = next(chunk_iter)
# Decodage du premier chunk
try:
text = decoder.decode(chunk)
except UnicodeDecodeError as e:
# Optionally fallback to another encoding or log
print(e)
raise
text = text.replace("NaN,", "null,")
decp_schema = find_json_schema(text.encode("utf-8"), decp_schemas)
for marche in decp_schema.liste_marches_ijson:
new_fields = write_marche_rows(marche, tmp_file)
fields = fields.union(new_fields)
del decp_schema.liste_marches_ijson[:]
for chunk in chunk_iter:
# Decodage
try:
text = decoder.decode(chunk)
except UnicodeDecodeError as e:
# Optionally fallback to another encoding or log
print(e)
raise
text = text.replace("NaN,", "null,")
decp_schema.coroutine_ijson.send(text.encode('utf-8'))
for marche in decp_schema.liste_marches_ijson:
new_fields = write_marche_rows(marche, tmp_file)
fields = fields.union(new_fields)
del decp_schema.liste_marches_ijson[:]
decp_schema.coroutine_ijson.close()Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
Projects
Status
Done