Skip to content

Gestion des NaN en bordure de chunk avec ijson #107

@ColinMaudry

Description

@ColinMaudry

Difficile de dire si les deux sont liés, mais on a deux soucis d'ingestion avec les données de prod :

  • la lecture des chunks renvoie, pour de nombreux fichiers, cette erreur :
Exception ignored in: <generator object utf8_encoder at 0x7eaa41717de0>
Traceback (most recent call last):
  File "/home/colin/git/decp-processing/.venv/lib/python3.13/site-packages/ijson/backends/python.py", line 46, in utf8_encoder
    target.close()
  File "/home/colin/git/decp-processing/.venv/lib/python3.13/site-packages/ijson/backends/python.py", line 88, in Lexer
    raise common.IncompleteJSONError('Incomplete string lexeme')
ijson.common.IncompleteJSONError: Incomplete string lexeme
  • pour moins de fichiers (mais ça touche decp-2025.json, touché également par l'erreur précédente), il y a ça :
12:16:23.819 | ERROR   | Task run 'json_stream_to_parquet-40f' - Task run failed with exception: UnexpectedSymbol("Unexpected symbol 'N' at 269176659") - Retries are exhausted
Traceback (most recent call last):
  File "/home/colin/git/decp-processing/.venv/lib/python3.13/site-packages/ijson/backends/python.py", line 225, in parse_value
    number = to_number(symbol)
  File "/home/colin/git/decp-processing/.venv/lib/python3.13/site-packages/ijson/common.py", line 209, in integer_or_float
    return int(str_value)
ValueError: invalid literal for int() with base 10: 'N'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/colin/git/decp-processing/.venv/lib/python3.13/site-packages/prefect/task_engine.py", line 813, in run_context
    yield self
  File "/home/colin/git/decp-processing/.venv/lib/python3.13/site-packages/prefect/task_engine.py", line 1389, in run_task_sync
    engine.call_task_fn(txn)
    ~~~~~~~~~~~~~~~~~~~^^^^^
  File "/home/colin/git/decp-processing/.venv/lib/python3.13/site-packages/prefect/task_engine.py", line 830, in call_task_fn
    result = call_with_parameters(self.task.fn, parameters)
  File "/home/colin/git/decp-processing/.venv/lib/python3.13/site-packages/prefect/utilities/callables.py", line 210, in call_with_parameters
    return fn(*args, **kwargs)
  File "/home/colin/git/decp-processing/src/tasks/get.py", line 137, in json_stream_to_parquet
    decp_schema.coroutine_ijson.send(text.encode('utf-8'))
    ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^
  File "/home/colin/git/decp-processing/.venv/lib/python3.13/site-packages/ijson/backends/python.py", line 44, in utf8_encoder
    send(sdata)
    ~~~~^^^^^^^
  File "/home/colin/git/decp-processing/.venv/lib/python3.13/site-packages/ijson/backends/python.py", line 103, in Lexer
    send((discarded + match.start(), lexeme))
    ~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/colin/git/decp-processing/.venv/lib/python3.13/site-packages/ijson/backends/python.py", line 231, in parse_value
    raise UnexpectedSymbol(symbol, pos)
ijson.backends.python.UnexpectedSymbol: Unexpected symbol 'N' at 269176659

Le nombre d'octets 269176659 correspond malheureusement à la taille du fichier, pas à l'emplacement de l'erreur.

On dirait qu'un NaN n'est pas remplacé. Peut-être parce qu'il est coupé entre deux chunks ?

Peut-être que le replace des NaN échoue à cause de soucis d'encodage.

J'ai tenté ça dans json_stream_to_parquet(), sans succès :

    with tempfile.NamedTemporaryFile(mode="wb", suffix=".ndjson") as tmp_file:
        chunk_iter = stream_get(url)
        decoder = getincrementaldecoder('utf-8')()

        # In first iteration, will find the right format
        chunk = next(chunk_iter)

        # Decodage du premier chunk
        try:
            text = decoder.decode(chunk)
        except UnicodeDecodeError as e:
            # Optionally fallback to another encoding or log
            print(e)
            raise

        text = text.replace("NaN,", "null,")

        decp_schema = find_json_schema(text.encode("utf-8"), decp_schemas)

        for marche in decp_schema.liste_marches_ijson:
            new_fields = write_marche_rows(marche, tmp_file)
            fields = fields.union(new_fields)

        del decp_schema.liste_marches_ijson[:]

        for chunk in chunk_iter:
            # Decodage
            try:
                text = decoder.decode(chunk)
            except UnicodeDecodeError as e:
                # Optionally fallback to another encoding or log
                print(e)
                raise

            text = text.replace("NaN,", "null,")

            decp_schema.coroutine_ijson.send(text.encode('utf-8'))

            for marche in decp_schema.liste_marches_ijson:
                new_fields = write_marche_rows(marche, tmp_file)
                fields = fields.union(new_fields)

            del decp_schema.liste_marches_ijson[:]

        decp_schema.coroutine_ijson.close()

Metadata

Metadata

Assignees

Labels

Projects

Status

Done

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions