- 
                Notifications
    You must be signed in to change notification settings 
- Fork 19
opensky pull request #11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| WalkthroughA new streaming data source for Apache Spark, named "opensky," has been implemented to ingest real-time aircraft tracking data from the OpenSky Network API. The README was updated to document this data source, including its description, source file, and configuration. The implementation includes schema definitions, error handling, authentication, and region selection. Changes
 Sequence Diagram(s)sequenceDiagram
    participant Spark
    participant OpenSkyDataSource
    participant OpenSkyStreamReader
    participant OpenSky API
    Spark->>OpenSkyDataSource: Request schema and stream reader
    OpenSkyDataSource->>OpenSkyStreamReader: Initialize with options and schema
    loop Streaming Query
        OpenSkyStreamReader->>OpenSky API: Fetch aircraft states (with authentication and rate limiting)
        OpenSky API-->>OpenSkyStreamReader: Return aircraft state data (or error)
        OpenSkyStreamReader->>OpenSkyStreamReader: Parse and validate data
        OpenSkyStreamReader-->>Spark: Return parsed aircraft state tuples
    end
Poem
 ✨ Finishing Touches
 Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit: 
 SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
 Other keywords and placeholders
 CodeRabbit Configuration File ( | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
🧹 Nitpick comments (1)
pyspark_datasources/opensky.py (1)
185-185: Fix typo in docstringMinor typo in the docstring.
- """Ensure e MIN_REQUEST_INTERVAL seconds between requests""" + """Ensure MIN_REQUEST_INTERVAL seconds between requests"""
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
- README.md(1 hunks)
- pyspark_datasources/opensky.py(1 hunks)
🧰 Additional context used
🪛 Ruff (0.12.2)
pyspark_datasources/opensky.py
80-80: from pyspark.sql.types import * used; unable to detect undefined names
(F403)
118-118: StructType may be undefined, or defined from star imports
(F405)
166-166: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
325-325: StructType may be undefined, or defined from star imports
(F405)
326-326: StructType may be undefined, or defined from star imports
(F405)
327-327: StructField may be undefined, or defined from star imports
(F405)
327-327: TimestampType may be undefined, or defined from star imports
(F405)
328-328: StructField may be undefined, or defined from star imports
(F405)
328-328: StringType may be undefined, or defined from star imports
(F405)
329-329: StructField may be undefined, or defined from star imports
(F405)
329-329: StringType may be undefined, or defined from star imports
(F405)
330-330: StructField may be undefined, or defined from star imports
(F405)
330-330: StringType may be undefined, or defined from star imports
(F405)
331-331: StructField may be undefined, or defined from star imports
(F405)
331-331: TimestampType may be undefined, or defined from star imports
(F405)
332-332: StructField may be undefined, or defined from star imports
(F405)
332-332: TimestampType may be undefined, or defined from star imports
(F405)
333-333: StructField may be undefined, or defined from star imports
(F405)
333-333: DoubleType may be undefined, or defined from star imports
(F405)
334-334: StructField may be undefined, or defined from star imports
(F405)
334-334: DoubleType may be undefined, or defined from star imports
(F405)
335-335: StructField may be undefined, or defined from star imports
(F405)
335-335: DoubleType may be undefined, or defined from star imports
(F405)
336-336: StructField may be undefined, or defined from star imports
(F405)
336-336: BooleanType may be undefined, or defined from star imports
(F405)
337-337: StructField may be undefined, or defined from star imports
(F405)
337-337: DoubleType may be undefined, or defined from star imports
(F405)
338-338: StructField may be undefined, or defined from star imports
(F405)
338-338: DoubleType may be undefined, or defined from star imports
(F405)
339-339: StructField may be undefined, or defined from star imports
(F405)
339-339: DoubleType may be undefined, or defined from star imports
(F405)
340-340: StructField may be undefined, or defined from star imports
(F405)
340-340: ArrayType may be undefined, or defined from star imports
(F405)
340-340: IntegerType may be undefined, or defined from star imports
(F405)
341-341: StructField may be undefined, or defined from star imports
(F405)
341-341: DoubleType may be undefined, or defined from star imports
(F405)
342-342: StructField may be undefined, or defined from star imports
(F405)
342-342: StringType may be undefined, or defined from star imports
(F405)
343-343: StructField may be undefined, or defined from star imports
(F405)
343-343: BooleanType may be undefined, or defined from star imports
(F405)
344-344: StructField may be undefined, or defined from star imports
(F405)
344-344: IntegerType may be undefined, or defined from star imports
(F405)
347-347: StructType may be undefined, or defined from star imports
(F405)
350-350: spark may be undefined, or defined from star imports
(F405)
🔇 Additional comments (6)
pyspark_datasources/opensky.py (5)
84-107: Well-structured data models and exception hierarchyThe dataclass, enum, and exception definitions are clean and follow Python best practices. Good use of type hints and clear naming conventions.
168-180: Robust session configuration with retry logicExcellent implementation of HTTP retry strategy with exponential backoff. This will handle transient network issues gracefully.
195-234: Well-implemented API interaction with comprehensive error handlingThe method properly handles rate limiting, authentication, and various error scenarios with appropriate error messages.
235-281: Excellent data validation and parsing implementationThe safe type conversion helpers and proper validation ensure robust handling of potentially malformed API responses. Good practice converting all timestamps to UTC.
310-349: Well-structured data source implementationThe class properly validates configuration options and defines a comprehensive schema for aircraft tracking data. Good error messages for invalid configurations.
README.md (1)
48-48: Documentation entry correctly addedThe new OpenSkyDataSource entry is properly formatted and consistent with other entries in the table.
| print(f"Invalid region '{region_name}'. Defaulting to {self.DEFAULT_REGION}.") | ||
| self.bbox = Region[self.DEFAULT_REGION].value | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Use logging instead of print statements
For production code, use Python's logging module instead of print statements for better control over log levels and outputs.
Add at the top of the file:
import logging
logger = logging.getLogger(__name__)Then update:
-            print(f"Invalid region '{region_name}'. Defaulting to {self.DEFAULT_REGION}.")
+            logger.warning(f"Invalid region '{region_name}'. Defaulting to {self.DEFAULT_REGION}.")🤖 Prompt for AI Agents
In pyspark_datasources/opensky.py around lines 129 to 130, replace the print
statement used for logging the invalid region message with a logging call using
Python's logging module. First, add "import logging" and create a logger
instance with "logger = logging.getLogger(__name__)" at the top of the file.
Then change the print statement to use "logger.warning" to log the message about
the invalid region and defaulting to the default region.
| def simpleStreamReader(self, schema: StructType) -> OpenSkyStreamReader: | ||
| return OpenSkyStreamReader(schema, self.options) | ||
|  | ||
| spark.dataSource.register(OpenSkyDataSource) No newline at end of file | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove automatic registration at module level
Registering the data source at module import time will fail if Spark session is not initialized. This should be done by the user explicitly.
Remove this line and document in the module docstring or README that users should register the data source manually:
-spark.dataSource.register(OpenSkyDataSource)Users should register it in their code:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark.dataSource.register(OpenSkyDataSource)🧰 Tools
🪛 Ruff (0.12.2)
350-350: spark may be undefined, or defined from star imports
(F405)
🤖 Prompt for AI Agents
In pyspark_datasources/opensky.py at line 350, remove the automatic registration
line 'spark.dataSource.register(OpenSkyDataSource)' to avoid errors when Spark
session is not initialized. Instead, update the module docstring or README to
instruct users to manually register the data source after creating a Spark
session, providing an example code snippet for clarity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@fmunz data source cannot be registered here since there is no spark session available. You can have a separate init file to register the data source before using it in production.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like the humans are having a chat. I'll hop back into my burrow for now. If you need me again, just tag @coderabbitai in a new comment, and I'll come hopping out!
| except requests.exceptions.RequestException as e: | ||
| raise OpenSkyAPIError(f"Failed to get access token: {str(e)}") | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use exception chaining for better error context
When re-raising exceptions, use from e to preserve the original exception context.
-        except requests.exceptions.RequestException as e:
-            raise OpenSkyAPIError(f"Failed to get access token: {str(e)}")
+        except requests.exceptions.RequestException as e:
+            raise OpenSkyAPIError(f"Failed to get access token: {str(e)}") from e📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| except requests.exceptions.RequestException as e: | |
| raise OpenSkyAPIError(f"Failed to get access token: {str(e)}") | |
| except requests.exceptions.RequestException as e: | |
| raise OpenSkyAPIError(f"Failed to get access token: {str(e)}") from e | 
🧰 Tools
🪛 Ruff (0.12.2)
166-166: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
🤖 Prompt for AI Agents
In pyspark_datasources/opensky.py around lines 165 to 166, the exception is
re-raised without chaining, which loses the original error context. Modify the
raise statement to use "raise OpenSkyAPIError(...) from e" to preserve the
original exception context for better debugging.
| from enum import Enum | ||
|  | ||
| from pyspark.sql.datasource import SimpleDataSourceStreamReader, DataSource | ||
| from pyspark.sql.types import * | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replace star import with explicit imports
Star imports reduce code clarity and can lead to namespace pollution. Replace with explicit imports for better maintainability.
-from pyspark.sql.types import *
+from pyspark.sql.types import (
+    StructType, StructField, StringType, IntegerType, 
+    DoubleType, BooleanType, TimestampType, ArrayType
+)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| from pyspark.sql.types import * | |
| -from pyspark.sql.types import * | |
| +from pyspark.sql.types import ( | |
| + StructType, | |
| + StructField, | |
| + StringType, | |
| + IntegerType, | |
| + DoubleType, | |
| + BooleanType, | |
| + TimestampType, | |
| + ArrayType, | |
| +) | 
🧰 Tools
🪛 Ruff (0.12.2)
80-80: from pyspark.sql.types import * used; unable to detect undefined names
(F403)
🤖 Prompt for AI Agents
In pyspark_datasources/opensky.py at line 80, replace the star import from
pyspark.sql.types with explicit imports of only the needed classes or functions.
Identify which specific types are used in the file and import them individually
to improve code clarity and avoid namespace pollution.
| print(f"OpenSky API Error: {str(e)}") | ||
| return ([], start) | ||
| except Exception as e: | ||
| print(f"Unexpected error: {str(e)}") | ||
| return ([], start) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Use logging for error messages
Replace print statements with proper logging for better production monitoring.
-        except OpenSkyAPIError as e:
-            print(f"OpenSky API Error: {str(e)}")
-            return ([], start)
-        except Exception as e:
-            print(f"Unexpected error: {str(e)}")
-            return ([], start)
+        except OpenSkyAPIError as e:
+            logger.error(f"OpenSky API Error: {str(e)}")
+            return ([], start)
+        except Exception as e:
+            logger.error(f"Unexpected error: {str(e)}", exc_info=True)
+            return ([], start)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| print(f"OpenSky API Error: {str(e)}") | |
| return ([], start) | |
| except Exception as e: | |
| print(f"Unexpected error: {str(e)}") | |
| return ([], start) | |
| except OpenSkyAPIError as e: | |
| logger.error(f"OpenSky API Error: {str(e)}") | |
| return ([], start) | |
| except Exception as e: | |
| logger.error(f"Unexpected error: {str(e)}", exc_info=True) | |
| return ([], start) | 
🤖 Prompt for AI Agents
In pyspark_datasources/opensky.py around lines 304 to 308, replace the print
statements used for error messages with appropriate logging calls. Import the
logging module if not already done, and use logger.error() or a similar logging
method to record the error messages instead of print, ensuring better production
monitoring and log management.
| @@ -0,0 +1,350 @@ | |||
| """ | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add the docstring under the python data source class?
| Thanks for adding the data source! | 
Summary by CodeRabbit
New Features
Documentation