Skip to content

Conversation

@cka-y
Copy link
Contributor

@cka-y cka-y commented Jan 8, 2026

Summary:
This PR improves the robustness and error handling of the TDG feed import process by adding explicit validation for required fields, introducing a custom exception, and ensuring that database operations for each resource are safely isolated. The changes also refactor some code for clarity and maintainability.

Error handling and validation:

  • Added a custom exception class InvalidTDGFeedError to represent missing required fields in TDG datasets/resources.
  • Updated _update_common_tdg_fields to validate the presence of the publisher name and producer URL, logging a warning and raising InvalidTDGFeedError if they are missing.
  • In _process_tdg_dataset, added try/except blocks to catch InvalidTDGFeedError and IntegrityError per resource, logging and skipping invalid or problematic resources without interrupting the entire import process. [1] [2]

Database transaction safety:

  • Wrapped processing of each resource within a nested transaction (db_session.begin_nested()), ensuring that errors in one resource do not affect others.

Code refactoring and cleanup:

  • Simplified and unified the way static_feeds_by_dataset_id is updated using setdefault. [1] [2]
  • Improved logging for RT feed creation and updates by refactoring string formatting.
  • Minor code formatting and logic improvements for clarity. [1] [2] [3]
    Summarize the changes in the pull request including how it relates to any issues (include the #number, or link them).

Please make sure these boxes are checked before submitting your pull request - thanks!

  • Run the unit tests with ./scripts/api-tests.sh to make sure you didn't break anything
  • Add or update any needed documentation to the repo
  • Format the title like "feat: [new feature short description]". Title must follow the Conventional Commit Specification(https://www.conventionalcommits.org/en/v1.0.0/).
  • Linked all relevant issues
  • Include screenshot(s) showing how this pull request works and fixes the issue(s)

@cka-y cka-y changed the title fix: db transaction + undefined fields fix: tdg import db transaction handling + undefined fields Jan 8, 2026
@cka-y cka-y requested a review from Copilot January 8, 2026 16:53
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR enhances the Transport Data Gouv (TDG) feed import process by adding field validation, custom exception handling, and nested database transactions for resource-level error isolation. The changes aim to make the import more robust by allowing the process to continue even when individual resources fail validation or encounter database integrity errors.

Key changes:

  • Introduced InvalidTDGFeedError exception and validation for required fields (publisher name, producer URL)
  • Wrapped each resource's processing in a nested database transaction using begin_nested() for isolation
  • Added exception handlers to catch and log InvalidTDGFeedError and IntegrityError without stopping the entire import

Comment on lines +516 to +519
_update_common_tdg_fields(
gtfs_feed, dataset, resource, res_url, locations, db_session
)
_ensure_tdg_external_id(gtfs_feed, res_id)
Copy link

Copilot AI Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The validation for required fields (publisher name and producer URL) occurs after the feed has already been created via get_or_create_feed. This means database objects are created before determining if the resource is valid. Consider moving the validation checks before the get_or_create_feed call to avoid creating database objects for invalid resources. The validation could be done by checking dataset.get("publisher", {}).get("name") and res_url earlier in the flow.

Copilot uses AI. Check for mistakes.
Comment on lines +449 to +609
# SAVEPOINT per resource
with db_session.begin_nested():
try:
res_format = resource.get("format")
if res_format not in (GTFS_FORMAT, GTFS_RT_FORMAT):
continue

# ---- STATIC GTFS ----
if res_format == GTFS_FORMAT:
stable_id = f"tdg-{res_id}"
processed_stable_ids.add(stable_id)
gtfs_feed, is_new = get_or_create_feed(
db_session,
Gtfsfeed,
stable_id,
"gtfs",
official_notes="Imported from Transport.data.gouv.fr as official feed.",
)
res_id = str(resource.get("id") or "")
res_title = resource.get("title")
res_url = resource.get("url")

if not is_new:
api_fp = _build_api_schedule_fingerprint_tdg(
dataset=dataset, resource=resource, producer_url=res_url
)
db_fp = _build_db_schedule_fingerprint_tdg(gtfs_feed)
if db_fp == api_fp:
if not res_url or not res_id:
logger.info(
"No change detected; skipping TDG GTFS feed stable_id=%s",
stable_id,
"Skipping resource without url or id (format=%s title=%s)",
res_format,
res_title,
)
processed += 1
if dataset_id not in static_feeds_by_dataset_id:
static_feeds_by_dataset_id[dataset_id] = []
static_feeds_by_dataset_id[dataset_id].append(gtfs_feed)
continue

# Requirement: if GTFS url returns CSV, skip it (listing, not feed).
status_code, content_type, detected_format = _probe_head_format(
session_http, res_url
)
logger.debug(
"TDG probe: url=%s status=%s ctype=%s detected=%s",
res_url,
status_code,
content_type,
detected_format,
)
# ---- STATIC GTFS ----
if res_format == GTFS_FORMAT:
stable_id = f"tdg-{res_id}"
processed_stable_ids.add(stable_id)
gtfs_feed, is_new = get_or_create_feed(
db_session,
Gtfsfeed,
stable_id,
"gtfs",
official_notes="Imported from Transport.data.gouv.fr as official feed.",
)

if detected_format == "csv":
logger.info(
"Skipping GTFS resource id=%s because it returns CSV (url=%s)",
res_id,
res_url,
)
continue
if not is_new:
api_fp = _build_api_schedule_fingerprint_tdg(
dataset=dataset, resource=resource, producer_url=res_url
)
db_fp = _build_db_schedule_fingerprint_tdg(gtfs_feed)
if db_fp == api_fp:
logger.info(
"No change detected; skipping TDG GTFS feed stable_id=%s",
stable_id,
)
processed += 1
static_feeds_by_dataset_id.setdefault(
dataset_id, []
).append(gtfs_feed)
continue

# Requirement: if GTFS url returns CSV, skip it (listing, not feed).
status_code, content_type, detected_format = _probe_head_format(
session_http, res_url
)
logger.debug(
"TDG probe: url=%s status=%s ctype=%s detected=%s",
res_url,
status_code,
content_type,
detected_format,
)

# Apply changes
_update_common_tdg_fields(
gtfs_feed, dataset, resource, res_url, locations, db_session
)
_ensure_tdg_external_id(gtfs_feed, res_id)

if dataset_id not in static_feeds_by_dataset_id:
static_feeds_by_dataset_id[dataset_id] = []
static_feeds_by_dataset_id[dataset_id].append(gtfs_feed)

if is_new:
created_gtfs += 1
feeds_to_publish.append(gtfs_feed)
logger.info("Created TDG GTFS feed stable_id=%s", stable_id)
else:
updated_gtfs += 1
logger.info("Updated TDG GTFS feed stable_id=%s", stable_id)

processed += 1

# ---- GTFS-RT ----
elif res_format == GTFS_RT_FORMAT:
# Pick the first static GTFS feed created for this dataset as reference
static_gtfs_feeds = (
static_feeds_by_dataset_id[dataset_id]
if dataset_id in static_feeds_by_dataset_id
else []
)
if detected_format == "csv":
logger.info(
"Skipping GTFS resource id=%s because it returns CSV (url=%s)",
res_id,
res_url,
)
continue

rt_stable_id = f"tdg-{res_id}"
processed_stable_ids.add(rt_stable_id)
rt_feed, is_new_rt = get_or_create_feed(
db_session, Gtfsrealtimefeed, rt_stable_id, "gtfs_rt"
)
_update_common_tdg_fields(
gtfs_feed, dataset, resource, res_url, locations, db_session
)
_ensure_tdg_external_id(gtfs_feed, res_id)

if not is_new_rt:
api_rt_fp = _build_api_rt_fingerprint_tdg(
dataset=dataset,
resource=resource,
producer_url=res_url,
static_gtfs_stable_ids=[
static_gtfs_feed.stable_id
for static_gtfs_feed in static_gtfs_feeds
],
rt_stable_id=rt_stable_id,
)
db_rt_fp = _build_db_rt_fingerprint_tdg(rt_feed)
if db_rt_fp == api_rt_fp:
logger.info(
"No change detected; skipping TDG RT feed stable_id=%s",
rt_stable_id,
static_feeds_by_dataset_id.setdefault(dataset_id, []).append(
gtfs_feed
)

if is_new:
created_gtfs += 1
feeds_to_publish.append(gtfs_feed)
logger.info("Created TDG GTFS feed stable_id=%s", stable_id)
else:
updated_gtfs += 1
logger.info("Updated TDG GTFS feed stable_id=%s", stable_id)

processed += 1

# ---- GTFS-RT ----
elif res_format == GTFS_RT_FORMAT:
static_gtfs_feeds = static_feeds_by_dataset_id.get(dataset_id, [])

rt_stable_id = f"tdg-{res_id}"
processed_stable_ids.add(rt_stable_id)
rt_feed, is_new_rt = get_or_create_feed(
db_session, Gtfsrealtimefeed, rt_stable_id, "gtfs_rt"
)

if not is_new_rt:
api_rt_fp = _build_api_rt_fingerprint_tdg(
dataset=dataset,
resource=resource,
producer_url=res_url,
static_gtfs_stable_ids=[
static_gtfs_feed.stable_id
for static_gtfs_feed in static_gtfs_feeds
],
rt_stable_id=rt_stable_id,
)
db_rt_fp = _build_db_rt_fingerprint_tdg(rt_feed)
if db_rt_fp == api_rt_fp:
logger.info(
"No change detected; skipping TDG RT feed stable_id=%s",
rt_stable_id,
)
processed += 1
continue

_update_common_tdg_fields(
rt_feed, dataset, resource, res_url, locations, db_session
)
_ensure_tdg_external_id(rt_feed, res_id)

# Link RT → schedule
rt_feed.gtfs_feeds = static_gtfs_feeds

# Add entity types
entity_types = _get_entity_types_from_resource(resource)
rt_feed.entitytypes = [
get_or_create_entity_type(db_session, et) for et in entity_types
]

if is_new_rt:
created_rt += 1
logger.info(
"Created TDG RT feed stable_id=%s linked_to=%s",
rt_stable_id,
", ".join([f.stable_id for f in static_gtfs_feeds]),
)
else:
logger.info(
"Updated TDG RT feed stable_id=%s linked_to=%s",
rt_stable_id,
", ".join([f.stable_id for f in static_gtfs_feeds]),
)

processed += 1
continue

# Apply changes
_update_common_tdg_fields(
rt_feed, dataset, resource, res_url, locations, db_session
)
_ensure_tdg_external_id(rt_feed, res_id)

# Link RT → schedule
rt_feed.gtfs_feeds = static_gtfs_feeds

# Add entity types
entity_types = _get_entity_types_from_resource(resource)
rt_feed.entitytypes = [
get_or_create_entity_type(db_session, et) for et in entity_types
]

if is_new_rt:
created_rt += 1
logger.info(
"Created TDG RT feed stable_id=%s linked_to=%s",
rt_stable_id,
", ".join(
[
static_gtfs_feed.stable_id
for static_gtfs_feed in static_gtfs_feeds
]
),
except InvalidTDGFeedError as e:
logger.warning(
"Invalid TDG resource skipped (dataset_id=%s resource_id=%s): %s",
dataset.get("id"),
resource.get("id"),
e,
)
else:
logger.info(
"Updated TDG RT feed stable_id=%s linked_to=%s",
rt_stable_id,
", ".join(
[
static_gtfs_feed.stable_id
for static_gtfs_feed in static_gtfs_feeds
]
),
continue
except IntegrityError:
logger.exception(
"IntegrityError while processing TDG resource (dataset_id=%s resource_id=%s). Skipping.",
dataset.get("id"),
resource.get("id"),
)

processed += 1
continue
Copy link

Copilot AI Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The exception handling inside the nested transaction context manager will cause the savepoint to commit rather than rollback when exceptions are caught. When InvalidTDGFeedError is raised after a feed has been created via get_or_create_feed, the feed will remain in the database despite failing validation. When IntegrityError occurs, the session enters a failed state and the implicit commit on context manager exit will fail. The proper fix is to explicitly rollback the nested transaction before continuing, or restructure the code to validate required fields (publisher, producer_url) before creating the feed.

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

false but ill still explicitly rollback

@cka-y cka-y linked an issue Jan 8, 2026 that may be closed by this pull request
@cka-y cka-y marked this pull request as draft January 9, 2026 15:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

TDG feeds with appears with no information

2 participants