|
12 | 12 | from simple_logger.logger import get_logger |
13 | 13 |
|
14 | 14 | from class_generator.constants import DEFINITIONS_FILE, RESOURCES_MAPPING_FILE, SCHEMA_DIR |
15 | | -from class_generator.utils import execute_parallel_with_mapping, execute_parallel_tasks |
| 15 | +from class_generator.utils import execute_parallel_tasks, execute_parallel_with_mapping |
16 | 16 | from ocp_resources.utils.archive_utils import save_json_archive |
17 | 17 | from ocp_resources.utils.schema_validator import SchemaValidator |
18 | 18 |
|
@@ -136,9 +136,9 @@ def check_and_update_cluster_version(client: str) -> bool: |
136 | 136 | last_cluster_version_generated: str = "" |
137 | 137 |
|
138 | 138 | try: |
139 | | - with open(cluster_version_file, "r") as fd: |
| 139 | + with open(cluster_version_file) as fd: |
140 | 140 | last_cluster_version_generated = fd.read().strip() |
141 | | - except (FileNotFoundError, IOError): |
| 141 | + except (OSError, FileNotFoundError): |
142 | 142 | # Treat missing file as first run - use baseline version that allows updates |
143 | 143 | last_cluster_version_generated = "v0.0.0" |
144 | 144 | LOGGER.info("Cluster version file not found - treating as first run with baseline version v0.0.0") |
@@ -530,7 +530,7 @@ def process_schema_definitions( |
530 | 530 | definitions = {} |
531 | 531 | if not allow_updates: |
532 | 532 | try: |
533 | | - with open(DEFINITIONS_FILE, "r") as f: |
| 533 | + with open(DEFINITIONS_FILE) as f: |
534 | 534 | existing_definitions_data = json.load(f) |
535 | 535 | definitions = existing_definitions_data.get("definitions", {}) |
536 | 536 | LOGGER.info(f"Loaded {len(definitions)} existing definitions to preserve") |
@@ -1064,12 +1064,12 @@ def process_explain_result(spec_tuple: tuple[str, str], result: Any) -> None: |
1064 | 1064 | LOGGER.debug(f"Failed to obtain explain data for {ref_name} from {explain_path}") |
1065 | 1065 |
|
1066 | 1066 | def handle_explain_error(spec_tuple: tuple[str, str], exc: Exception) -> None: |
1067 | | - ref_name, explain_path = spec_tuple |
1068 | | - LOGGER.debug(f"Exception occurred while explaining {ref_name} from {explain_path}: {exc}") |
| 1067 | + ref_name, _explain_path = spec_tuple |
| 1068 | + LOGGER.debug(f"Exception occurred while explaining {ref_name}: {exc}") |
1069 | 1069 | explain_results[ref_name] = None |
1070 | 1070 |
|
1071 | 1071 | def create_explain_task(spec_tuple: tuple[str, str]) -> Any: |
1072 | | - ref_name, explain_path = spec_tuple |
| 1072 | + _ref_name, explain_path = spec_tuple |
1073 | 1073 | return _run_explain_and_parse(client, explain_path) |
1074 | 1074 |
|
1075 | 1075 | execute_parallel_tasks( |
@@ -1244,7 +1244,7 @@ def process_required_field_result(task_tuple: tuple[str, str], result: Any) -> N |
1244 | 1244 | definitions[schema_key] = updated_schema |
1245 | 1245 |
|
1246 | 1246 | def handle_required_field_error(task_tuple: tuple[str, str], exc: Exception) -> None: |
1247 | | - schema_key, explain_path = task_tuple |
| 1247 | + schema_key, _explain_path = task_tuple |
1248 | 1248 | LOGGER.debug(f"Failed to process required fields for {schema_key}: {exc}") |
1249 | 1249 | # Set empty list if explain fails |
1250 | 1250 | current_schema = definitions[schema_key] |
@@ -1307,14 +1307,14 @@ def _get_missing_core_definitions( |
1307 | 1307 | if refs_to_fetch: |
1308 | 1308 |
|
1309 | 1309 | def process_missing_definition_result(task_tuple: tuple[str, str], result: Any) -> None: |
1310 | | - ref_name, oc_path = task_tuple |
| 1310 | + _ref_name, _oc_path = task_tuple |
1311 | 1311 | if result: |
1312 | 1312 | fetched_ref_name, schema = result |
1313 | 1313 | missing_definitions[fetched_ref_name] = schema |
1314 | 1314 | LOGGER.debug(f"Successfully fetched definition for {fetched_ref_name}") |
1315 | 1315 |
|
1316 | 1316 | def handle_missing_definition_error(task_tuple: tuple[str, str], exc: Exception) -> None: |
1317 | | - ref_name, oc_path = task_tuple |
| 1317 | + ref_name, _oc_path = task_tuple |
1318 | 1318 | LOGGER.debug(f"Failed to fetch definition for {ref_name}: {exc}") |
1319 | 1319 |
|
1320 | 1320 | def create_missing_definition_task(task_tuple: tuple[str, str]) -> Any: |
@@ -1360,10 +1360,10 @@ def write_schema_files( |
1360 | 1360 | # Ensure schema directory exists |
1361 | 1361 | try: |
1362 | 1362 | Path(SCHEMA_DIR).mkdir(parents=True, exist_ok=True) |
1363 | | - except (OSError, IOError) as e: |
| 1363 | + except OSError as e: |
1364 | 1364 | error_msg = f"Failed to create schema directory {SCHEMA_DIR}: {e}" |
1365 | 1365 | LOGGER.error(error_msg) |
1366 | | - raise IOError(error_msg) from e |
| 1366 | + raise OSError(error_msg) from e |
1367 | 1367 |
|
1368 | 1368 | # Fetch missing core definitions if schemas are available |
1369 | 1369 | if schemas: |
@@ -1400,18 +1400,18 @@ def write_schema_files( |
1400 | 1400 | with open(definitions_file, "w") as fd: |
1401 | 1401 | json.dump(definitions_data, fd, indent=2, sort_keys=True) |
1402 | 1402 | LOGGER.info(f"Written {len(definitions)} definitions to {definitions_file}") |
1403 | | - except (OSError, IOError, TypeError) as e: |
| 1403 | + except (OSError, TypeError) as e: |
1404 | 1404 | error_msg = f"Failed to write definitions file {definitions_file}: {e}" |
1405 | 1405 | LOGGER.error(error_msg) |
1406 | | - raise IOError(error_msg) from e |
| 1406 | + raise OSError(error_msg) from e |
1407 | 1407 |
|
1408 | 1408 | # Write and archive resources mapping |
1409 | 1409 | try: |
1410 | 1410 | save_json_archive(resources_mapping, RESOURCES_MAPPING_FILE) |
1411 | | - except (OSError, IOError, TypeError) as e: |
| 1411 | + except (OSError, TypeError) as e: |
1412 | 1412 | error_msg = f"Failed to save and archive resources mapping file {RESOURCES_MAPPING_FILE}: {e}" |
1413 | 1413 | LOGGER.error(error_msg) |
1414 | | - raise IOError(error_msg) from e |
| 1414 | + raise OSError(error_msg) from e |
1415 | 1415 |
|
1416 | 1416 |
|
1417 | 1417 | @dataclasses.dataclass |
@@ -1523,11 +1523,11 @@ def _handle_no_schemas_case() -> None: |
1523 | 1523 | """ |
1524 | 1524 | LOGGER.info("No schemas fetched. Preserving existing data to avoid overwriting with empty definitions.") |
1525 | 1525 | try: |
1526 | | - with open(DEFINITIONS_FILE, "r") as fd: |
| 1526 | + with open(DEFINITIONS_FILE) as fd: |
1527 | 1527 | existing_definitions_data = json.load(fd) |
1528 | 1528 | definitions = existing_definitions_data.get("definitions", {}) |
1529 | 1529 | LOGGER.info(f"Found {len(definitions)} existing definitions that will be preserved") |
1530 | | - except (FileNotFoundError, IOError, json.JSONDecodeError): |
| 1530 | + except (OSError, FileNotFoundError, json.JSONDecodeError): |
1531 | 1531 | LOGGER.debug("Could not load existing definitions file. No existing definitions to preserve.") |
1532 | 1532 |
|
1533 | 1533 |
|
|
0 commit comments