-
Notifications
You must be signed in to change notification settings - Fork 6
fix: tdg import db transaction handling + undefined fields #1543
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR enhances the Transport Data Gouv (TDG) feed import process by adding field validation, custom exception handling, and nested database transactions for resource-level error isolation. The changes aim to make the import more robust by allowing the process to continue even when individual resources fail validation or encounter database integrity errors.
Key changes:
- Introduced
InvalidTDGFeedErrorexception and validation for required fields (publisher name, producer URL) - Wrapped each resource's processing in a nested database transaction using
begin_nested()for isolation - Added exception handlers to catch and log
InvalidTDGFeedErrorandIntegrityErrorwithout stopping the entire import
functions-python/tasks_executor/src/tasks/data_import/transportdatagouv/import_tdg_feeds.py
Show resolved
Hide resolved
functions-python/tasks_executor/src/tasks/data_import/transportdatagouv/import_tdg_feeds.py
Show resolved
Hide resolved
| _update_common_tdg_fields( | ||
| gtfs_feed, dataset, resource, res_url, locations, db_session | ||
| ) | ||
| _ensure_tdg_external_id(gtfs_feed, res_id) |
Copilot
AI
Jan 8, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The validation for required fields (publisher name and producer URL) occurs after the feed has already been created via get_or_create_feed. This means database objects are created before determining if the resource is valid. Consider moving the validation checks before the get_or_create_feed call to avoid creating database objects for invalid resources. The validation could be done by checking dataset.get("publisher", {}).get("name") and res_url earlier in the flow.
functions-python/tasks_executor/src/tasks/data_import/transportdatagouv/import_tdg_feeds.py
Show resolved
Hide resolved
functions-python/tasks_executor/src/tasks/data_import/transportdatagouv/import_tdg_feeds.py
Show resolved
Hide resolved
| # SAVEPOINT per resource | ||
| with db_session.begin_nested(): | ||
| try: | ||
| res_format = resource.get("format") | ||
| if res_format not in (GTFS_FORMAT, GTFS_RT_FORMAT): | ||
| continue | ||
|
|
||
| # ---- STATIC GTFS ---- | ||
| if res_format == GTFS_FORMAT: | ||
| stable_id = f"tdg-{res_id}" | ||
| processed_stable_ids.add(stable_id) | ||
| gtfs_feed, is_new = get_or_create_feed( | ||
| db_session, | ||
| Gtfsfeed, | ||
| stable_id, | ||
| "gtfs", | ||
| official_notes="Imported from Transport.data.gouv.fr as official feed.", | ||
| ) | ||
| res_id = str(resource.get("id") or "") | ||
| res_title = resource.get("title") | ||
| res_url = resource.get("url") | ||
|
|
||
| if not is_new: | ||
| api_fp = _build_api_schedule_fingerprint_tdg( | ||
| dataset=dataset, resource=resource, producer_url=res_url | ||
| ) | ||
| db_fp = _build_db_schedule_fingerprint_tdg(gtfs_feed) | ||
| if db_fp == api_fp: | ||
| if not res_url or not res_id: | ||
| logger.info( | ||
| "No change detected; skipping TDG GTFS feed stable_id=%s", | ||
| stable_id, | ||
| "Skipping resource without url or id (format=%s title=%s)", | ||
| res_format, | ||
| res_title, | ||
| ) | ||
| processed += 1 | ||
| if dataset_id not in static_feeds_by_dataset_id: | ||
| static_feeds_by_dataset_id[dataset_id] = [] | ||
| static_feeds_by_dataset_id[dataset_id].append(gtfs_feed) | ||
| continue | ||
|
|
||
| # Requirement: if GTFS url returns CSV, skip it (listing, not feed). | ||
| status_code, content_type, detected_format = _probe_head_format( | ||
| session_http, res_url | ||
| ) | ||
| logger.debug( | ||
| "TDG probe: url=%s status=%s ctype=%s detected=%s", | ||
| res_url, | ||
| status_code, | ||
| content_type, | ||
| detected_format, | ||
| ) | ||
| # ---- STATIC GTFS ---- | ||
| if res_format == GTFS_FORMAT: | ||
| stable_id = f"tdg-{res_id}" | ||
| processed_stable_ids.add(stable_id) | ||
| gtfs_feed, is_new = get_or_create_feed( | ||
| db_session, | ||
| Gtfsfeed, | ||
| stable_id, | ||
| "gtfs", | ||
| official_notes="Imported from Transport.data.gouv.fr as official feed.", | ||
| ) | ||
|
|
||
| if detected_format == "csv": | ||
| logger.info( | ||
| "Skipping GTFS resource id=%s because it returns CSV (url=%s)", | ||
| res_id, | ||
| res_url, | ||
| ) | ||
| continue | ||
| if not is_new: | ||
| api_fp = _build_api_schedule_fingerprint_tdg( | ||
| dataset=dataset, resource=resource, producer_url=res_url | ||
| ) | ||
| db_fp = _build_db_schedule_fingerprint_tdg(gtfs_feed) | ||
| if db_fp == api_fp: | ||
| logger.info( | ||
| "No change detected; skipping TDG GTFS feed stable_id=%s", | ||
| stable_id, | ||
| ) | ||
| processed += 1 | ||
| static_feeds_by_dataset_id.setdefault( | ||
| dataset_id, [] | ||
| ).append(gtfs_feed) | ||
| continue | ||
|
|
||
| # Requirement: if GTFS url returns CSV, skip it (listing, not feed). | ||
| status_code, content_type, detected_format = _probe_head_format( | ||
| session_http, res_url | ||
| ) | ||
| logger.debug( | ||
| "TDG probe: url=%s status=%s ctype=%s detected=%s", | ||
| res_url, | ||
| status_code, | ||
| content_type, | ||
| detected_format, | ||
| ) | ||
|
|
||
| # Apply changes | ||
| _update_common_tdg_fields( | ||
| gtfs_feed, dataset, resource, res_url, locations, db_session | ||
| ) | ||
| _ensure_tdg_external_id(gtfs_feed, res_id) | ||
|
|
||
| if dataset_id not in static_feeds_by_dataset_id: | ||
| static_feeds_by_dataset_id[dataset_id] = [] | ||
| static_feeds_by_dataset_id[dataset_id].append(gtfs_feed) | ||
|
|
||
| if is_new: | ||
| created_gtfs += 1 | ||
| feeds_to_publish.append(gtfs_feed) | ||
| logger.info("Created TDG GTFS feed stable_id=%s", stable_id) | ||
| else: | ||
| updated_gtfs += 1 | ||
| logger.info("Updated TDG GTFS feed stable_id=%s", stable_id) | ||
|
|
||
| processed += 1 | ||
|
|
||
| # ---- GTFS-RT ---- | ||
| elif res_format == GTFS_RT_FORMAT: | ||
| # Pick the first static GTFS feed created for this dataset as reference | ||
| static_gtfs_feeds = ( | ||
| static_feeds_by_dataset_id[dataset_id] | ||
| if dataset_id in static_feeds_by_dataset_id | ||
| else [] | ||
| ) | ||
| if detected_format == "csv": | ||
| logger.info( | ||
| "Skipping GTFS resource id=%s because it returns CSV (url=%s)", | ||
| res_id, | ||
| res_url, | ||
| ) | ||
| continue | ||
|
|
||
| rt_stable_id = f"tdg-{res_id}" | ||
| processed_stable_ids.add(rt_stable_id) | ||
| rt_feed, is_new_rt = get_or_create_feed( | ||
| db_session, Gtfsrealtimefeed, rt_stable_id, "gtfs_rt" | ||
| ) | ||
| _update_common_tdg_fields( | ||
| gtfs_feed, dataset, resource, res_url, locations, db_session | ||
| ) | ||
| _ensure_tdg_external_id(gtfs_feed, res_id) | ||
|
|
||
| if not is_new_rt: | ||
| api_rt_fp = _build_api_rt_fingerprint_tdg( | ||
| dataset=dataset, | ||
| resource=resource, | ||
| producer_url=res_url, | ||
| static_gtfs_stable_ids=[ | ||
| static_gtfs_feed.stable_id | ||
| for static_gtfs_feed in static_gtfs_feeds | ||
| ], | ||
| rt_stable_id=rt_stable_id, | ||
| ) | ||
| db_rt_fp = _build_db_rt_fingerprint_tdg(rt_feed) | ||
| if db_rt_fp == api_rt_fp: | ||
| logger.info( | ||
| "No change detected; skipping TDG RT feed stable_id=%s", | ||
| rt_stable_id, | ||
| static_feeds_by_dataset_id.setdefault(dataset_id, []).append( | ||
| gtfs_feed | ||
| ) | ||
|
|
||
| if is_new: | ||
| created_gtfs += 1 | ||
| feeds_to_publish.append(gtfs_feed) | ||
| logger.info("Created TDG GTFS feed stable_id=%s", stable_id) | ||
| else: | ||
| updated_gtfs += 1 | ||
| logger.info("Updated TDG GTFS feed stable_id=%s", stable_id) | ||
|
|
||
| processed += 1 | ||
|
|
||
| # ---- GTFS-RT ---- | ||
| elif res_format == GTFS_RT_FORMAT: | ||
| static_gtfs_feeds = static_feeds_by_dataset_id.get(dataset_id, []) | ||
|
|
||
| rt_stable_id = f"tdg-{res_id}" | ||
| processed_stable_ids.add(rt_stable_id) | ||
| rt_feed, is_new_rt = get_or_create_feed( | ||
| db_session, Gtfsrealtimefeed, rt_stable_id, "gtfs_rt" | ||
| ) | ||
|
|
||
| if not is_new_rt: | ||
| api_rt_fp = _build_api_rt_fingerprint_tdg( | ||
| dataset=dataset, | ||
| resource=resource, | ||
| producer_url=res_url, | ||
| static_gtfs_stable_ids=[ | ||
| static_gtfs_feed.stable_id | ||
| for static_gtfs_feed in static_gtfs_feeds | ||
| ], | ||
| rt_stable_id=rt_stable_id, | ||
| ) | ||
| db_rt_fp = _build_db_rt_fingerprint_tdg(rt_feed) | ||
| if db_rt_fp == api_rt_fp: | ||
| logger.info( | ||
| "No change detected; skipping TDG RT feed stable_id=%s", | ||
| rt_stable_id, | ||
| ) | ||
| processed += 1 | ||
| continue | ||
|
|
||
| _update_common_tdg_fields( | ||
| rt_feed, dataset, resource, res_url, locations, db_session | ||
| ) | ||
| _ensure_tdg_external_id(rt_feed, res_id) | ||
|
|
||
| # Link RT → schedule | ||
| rt_feed.gtfs_feeds = static_gtfs_feeds | ||
|
|
||
| # Add entity types | ||
| entity_types = _get_entity_types_from_resource(resource) | ||
| rt_feed.entitytypes = [ | ||
| get_or_create_entity_type(db_session, et) for et in entity_types | ||
| ] | ||
|
|
||
| if is_new_rt: | ||
| created_rt += 1 | ||
| logger.info( | ||
| "Created TDG RT feed stable_id=%s linked_to=%s", | ||
| rt_stable_id, | ||
| ", ".join([f.stable_id for f in static_gtfs_feeds]), | ||
| ) | ||
| else: | ||
| logger.info( | ||
| "Updated TDG RT feed stable_id=%s linked_to=%s", | ||
| rt_stable_id, | ||
| ", ".join([f.stable_id for f in static_gtfs_feeds]), | ||
| ) | ||
|
|
||
| processed += 1 | ||
| continue | ||
|
|
||
| # Apply changes | ||
| _update_common_tdg_fields( | ||
| rt_feed, dataset, resource, res_url, locations, db_session | ||
| ) | ||
| _ensure_tdg_external_id(rt_feed, res_id) | ||
|
|
||
| # Link RT → schedule | ||
| rt_feed.gtfs_feeds = static_gtfs_feeds | ||
|
|
||
| # Add entity types | ||
| entity_types = _get_entity_types_from_resource(resource) | ||
| rt_feed.entitytypes = [ | ||
| get_or_create_entity_type(db_session, et) for et in entity_types | ||
| ] | ||
|
|
||
| if is_new_rt: | ||
| created_rt += 1 | ||
| logger.info( | ||
| "Created TDG RT feed stable_id=%s linked_to=%s", | ||
| rt_stable_id, | ||
| ", ".join( | ||
| [ | ||
| static_gtfs_feed.stable_id | ||
| for static_gtfs_feed in static_gtfs_feeds | ||
| ] | ||
| ), | ||
| except InvalidTDGFeedError as e: | ||
| logger.warning( | ||
| "Invalid TDG resource skipped (dataset_id=%s resource_id=%s): %s", | ||
| dataset.get("id"), | ||
| resource.get("id"), | ||
| e, | ||
| ) | ||
| else: | ||
| logger.info( | ||
| "Updated TDG RT feed stable_id=%s linked_to=%s", | ||
| rt_stable_id, | ||
| ", ".join( | ||
| [ | ||
| static_gtfs_feed.stable_id | ||
| for static_gtfs_feed in static_gtfs_feeds | ||
| ] | ||
| ), | ||
| continue | ||
| except IntegrityError: | ||
| logger.exception( | ||
| "IntegrityError while processing TDG resource (dataset_id=%s resource_id=%s). Skipping.", | ||
| dataset.get("id"), | ||
| resource.get("id"), | ||
| ) | ||
|
|
||
| processed += 1 | ||
| continue |
Copilot
AI
Jan 8, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The exception handling inside the nested transaction context manager will cause the savepoint to commit rather than rollback when exceptions are caught. When InvalidTDGFeedError is raised after a feed has been created via get_or_create_feed, the feed will remain in the database despite failing validation. When IntegrityError occurs, the session enters a failed state and the implicit commit on context manager exit will fail. The proper fix is to explicitly rollback the nested transaction before continuing, or restructure the code to validate required fields (publisher, producer_url) before creating the feed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
false but ill still explicitly rollback
Summary:
This PR improves the robustness and error handling of the TDG feed import process by adding explicit validation for required fields, introducing a custom exception, and ensuring that database operations for each resource are safely isolated. The changes also refactor some code for clarity and maintainability.
Error handling and validation:
InvalidTDGFeedErrorto represent missing required fields in TDG datasets/resources._update_common_tdg_fieldsto validate the presence of the publisher name and producer URL, logging a warning and raisingInvalidTDGFeedErrorif they are missing._process_tdg_dataset, added try/except blocks to catchInvalidTDGFeedErrorandIntegrityErrorper resource, logging and skipping invalid or problematic resources without interrupting the entire import process. [1] [2]Database transaction safety:
db_session.begin_nested()), ensuring that errors in one resource do not affect others.Code refactoring and cleanup:
static_feeds_by_dataset_idis updated usingsetdefault. [1] [2]Summarize the changes in the pull request including how it relates to any issues (include the #number, or link them).
Please make sure these boxes are checked before submitting your pull request - thanks!
./scripts/api-tests.shto make sure you didn't break anything