Skip to content

Conversation

@fmunz
Copy link
Contributor

@fmunz fmunz commented Jul 18, 2025

Summary by CodeRabbit

  • New Features

    • Introduced a new streaming data source for Apache Spark that enables real-time aircraft tracking from the OpenSky Network API, supporting multiple geographic regions and OAuth2 authentication.
    • Added robust error handling for API rate limits and connection issues, with automatic retries and logging.
    • Documented the new data source in the README, including usage details and requirements.
  • Documentation

    • Updated the README to include information about the new OpenSky data source in the example data sources table.

@coderabbitai
Copy link

coderabbitai bot commented Jul 18, 2025

Walkthrough

A new streaming data source for Apache Spark, named "opensky," has been implemented to ingest real-time aircraft tracking data from the OpenSky Network API. The README was updated to document this data source, including its description, source file, and configuration. The implementation includes schema definitions, error handling, authentication, and region selection.

Changes

File(s) Change Summary
README.md Added an entry for the "OpenSkyDataSource" to the Example Data Sources table, describing its usage and details.
pyspark_datasources/opensky.py Introduced a new Spark streaming data source module for OpenSky Network, with schema, error handling, and region support.

Sequence Diagram(s)

sequenceDiagram
    participant Spark
    participant OpenSkyDataSource
    participant OpenSkyStreamReader
    participant OpenSky API

    Spark->>OpenSkyDataSource: Request schema and stream reader
    OpenSkyDataSource->>OpenSkyStreamReader: Initialize with options and schema
    loop Streaming Query
        OpenSkyStreamReader->>OpenSky API: Fetch aircraft states (with authentication and rate limiting)
        OpenSky API-->>OpenSkyStreamReader: Return aircraft state data (or error)
        OpenSkyStreamReader->>OpenSkyStreamReader: Parse and validate data
        OpenSkyStreamReader-->>Spark: Return parsed aircraft state tuples
    end
Loading

Poem

In skies of code, a new stream flows,
Aircraft data, where the north wind blows.
Bounding boxes, tokens in hand,
We fetch the flights from distant lands.
Spark now listens, ears up high—
OpenSky whispers as planes pass by.
🛩️✨

✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🧹 Nitpick comments (1)
pyspark_datasources/opensky.py (1)

185-185: Fix typo in docstring

Minor typo in the docstring.

-        """Ensure e MIN_REQUEST_INTERVAL seconds between requests"""
+        """Ensure MIN_REQUEST_INTERVAL seconds between requests"""
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 372d3c7 and 695527e.

📒 Files selected for processing (2)
  • README.md (1 hunks)
  • pyspark_datasources/opensky.py (1 hunks)
🧰 Additional context used
🪛 Ruff (0.12.2)
pyspark_datasources/opensky.py

80-80: from pyspark.sql.types import * used; unable to detect undefined names

(F403)


118-118: StructType may be undefined, or defined from star imports

(F405)


166-166: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


325-325: StructType may be undefined, or defined from star imports

(F405)


326-326: StructType may be undefined, or defined from star imports

(F405)


327-327: StructField may be undefined, or defined from star imports

(F405)


327-327: TimestampType may be undefined, or defined from star imports

(F405)


328-328: StructField may be undefined, or defined from star imports

(F405)


328-328: StringType may be undefined, or defined from star imports

(F405)


329-329: StructField may be undefined, or defined from star imports

(F405)


329-329: StringType may be undefined, or defined from star imports

(F405)


330-330: StructField may be undefined, or defined from star imports

(F405)


330-330: StringType may be undefined, or defined from star imports

(F405)


331-331: StructField may be undefined, or defined from star imports

(F405)


331-331: TimestampType may be undefined, or defined from star imports

(F405)


332-332: StructField may be undefined, or defined from star imports

(F405)


332-332: TimestampType may be undefined, or defined from star imports

(F405)


333-333: StructField may be undefined, or defined from star imports

(F405)


333-333: DoubleType may be undefined, or defined from star imports

(F405)


334-334: StructField may be undefined, or defined from star imports

(F405)


334-334: DoubleType may be undefined, or defined from star imports

(F405)


335-335: StructField may be undefined, or defined from star imports

(F405)


335-335: DoubleType may be undefined, or defined from star imports

(F405)


336-336: StructField may be undefined, or defined from star imports

(F405)


336-336: BooleanType may be undefined, or defined from star imports

(F405)


337-337: StructField may be undefined, or defined from star imports

(F405)


337-337: DoubleType may be undefined, or defined from star imports

(F405)


338-338: StructField may be undefined, or defined from star imports

(F405)


338-338: DoubleType may be undefined, or defined from star imports

(F405)


339-339: StructField may be undefined, or defined from star imports

(F405)


339-339: DoubleType may be undefined, or defined from star imports

(F405)


340-340: StructField may be undefined, or defined from star imports

(F405)


340-340: ArrayType may be undefined, or defined from star imports

(F405)


340-340: IntegerType may be undefined, or defined from star imports

(F405)


341-341: StructField may be undefined, or defined from star imports

(F405)


341-341: DoubleType may be undefined, or defined from star imports

(F405)


342-342: StructField may be undefined, or defined from star imports

(F405)


342-342: StringType may be undefined, or defined from star imports

(F405)


343-343: StructField may be undefined, or defined from star imports

(F405)


343-343: BooleanType may be undefined, or defined from star imports

(F405)


344-344: StructField may be undefined, or defined from star imports

(F405)


344-344: IntegerType may be undefined, or defined from star imports

(F405)


347-347: StructType may be undefined, or defined from star imports

(F405)


350-350: spark may be undefined, or defined from star imports

(F405)

🔇 Additional comments (6)
pyspark_datasources/opensky.py (5)

84-107: Well-structured data models and exception hierarchy

The dataclass, enum, and exception definitions are clean and follow Python best practices. Good use of type hints and clear naming conventions.


168-180: Robust session configuration with retry logic

Excellent implementation of HTTP retry strategy with exponential backoff. This will handle transient network issues gracefully.


195-234: Well-implemented API interaction with comprehensive error handling

The method properly handles rate limiting, authentication, and various error scenarios with appropriate error messages.


235-281: Excellent data validation and parsing implementation

The safe type conversion helpers and proper validation ensure robust handling of potentially malformed API responses. Good practice converting all timestamps to UTC.


310-349: Well-structured data source implementation

The class properly validates configuration options and defines a comprehensive schema for aircraft tracking data. Good error messages for invalid configurations.

README.md (1)

48-48: Documentation entry correctly added

The new OpenSkyDataSource entry is properly formatted and consistent with other entries in the table.

Comment on lines +129 to +130
print(f"Invalid region '{region_name}'. Defaulting to {self.DEFAULT_REGION}.")
self.bbox = Region[self.DEFAULT_REGION].value
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Use logging instead of print statements

For production code, use Python's logging module instead of print statements for better control over log levels and outputs.

Add at the top of the file:

import logging
logger = logging.getLogger(__name__)

Then update:

-            print(f"Invalid region '{region_name}'. Defaulting to {self.DEFAULT_REGION}.")
+            logger.warning(f"Invalid region '{region_name}'. Defaulting to {self.DEFAULT_REGION}.")
🤖 Prompt for AI Agents
In pyspark_datasources/opensky.py around lines 129 to 130, replace the print
statement used for logging the invalid region message with a logging call using
Python's logging module. First, add "import logging" and create a logger
instance with "logger = logging.getLogger(__name__)" at the top of the file.
Then change the print statement to use "logger.warning" to log the message about
the invalid region and defaulting to the default region.

def simpleStreamReader(self, schema: StructType) -> OpenSkyStreamReader:
return OpenSkyStreamReader(schema, self.options)

spark.dataSource.register(OpenSkyDataSource) No newline at end of file
Copy link

@coderabbitai coderabbitai bot Jul 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Remove automatic registration at module level

Registering the data source at module import time will fail if Spark session is not initialized. This should be done by the user explicitly.

Remove this line and document in the module docstring or README that users should register the data source manually:

-spark.dataSource.register(OpenSkyDataSource)

Users should register it in their code:

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark.dataSource.register(OpenSkyDataSource)
🧰 Tools
🪛 Ruff (0.12.2)

350-350: spark may be undefined, or defined from star imports

(F405)

🤖 Prompt for AI Agents
In pyspark_datasources/opensky.py at line 350, remove the automatic registration
line 'spark.dataSource.register(OpenSkyDataSource)' to avoid errors when Spark
session is not initialized. Instead, update the module docstring or README to
instruct users to manually register the data source after creating a Spark
session, providing an example code snippet for clarity.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fmunz data source cannot be registered here since there is no spark session available. You can have a separate init file to register the data source before using it in production.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the humans are having a chat. I'll hop back into my burrow for now. If you need me again, just tag @coderabbitai in a new comment, and I'll come hopping out!

Comment on lines +165 to +166
except requests.exceptions.RequestException as e:
raise OpenSkyAPIError(f"Failed to get access token: {str(e)}")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Use exception chaining for better error context

When re-raising exceptions, use from e to preserve the original exception context.

-        except requests.exceptions.RequestException as e:
-            raise OpenSkyAPIError(f"Failed to get access token: {str(e)}")
+        except requests.exceptions.RequestException as e:
+            raise OpenSkyAPIError(f"Failed to get access token: {str(e)}") from e
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
except requests.exceptions.RequestException as e:
raise OpenSkyAPIError(f"Failed to get access token: {str(e)}")
except requests.exceptions.RequestException as e:
raise OpenSkyAPIError(f"Failed to get access token: {str(e)}") from e
🧰 Tools
🪛 Ruff (0.12.2)

166-166: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)

🤖 Prompt for AI Agents
In pyspark_datasources/opensky.py around lines 165 to 166, the exception is
re-raised without chaining, which loses the original error context. Modify the
raise statement to use "raise OpenSkyAPIError(...) from e" to preserve the
original exception context for better debugging.

from enum import Enum

from pyspark.sql.datasource import SimpleDataSourceStreamReader, DataSource
from pyspark.sql.types import *
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Replace star import with explicit imports

Star imports reduce code clarity and can lead to namespace pollution. Replace with explicit imports for better maintainability.

-from pyspark.sql.types import *
+from pyspark.sql.types import (
+    StructType, StructField, StringType, IntegerType, 
+    DoubleType, BooleanType, TimestampType, ArrayType
+)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
from pyspark.sql.types import *
-from pyspark.sql.types import *
+from pyspark.sql.types import (
+ StructType,
+ StructField,
+ StringType,
+ IntegerType,
+ DoubleType,
+ BooleanType,
+ TimestampType,
+ ArrayType,
+)
🧰 Tools
🪛 Ruff (0.12.2)

80-80: from pyspark.sql.types import * used; unable to detect undefined names

(F403)

🤖 Prompt for AI Agents
In pyspark_datasources/opensky.py at line 80, replace the star import from
pyspark.sql.types with explicit imports of only the needed classes or functions.
Identify which specific types are used in the file and import them individually
to improve code clarity and avoid namespace pollution.

Comment on lines +304 to +308
print(f"OpenSky API Error: {str(e)}")
return ([], start)
except Exception as e:
print(f"Unexpected error: {str(e)}")
return ([], start)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Use logging for error messages

Replace print statements with proper logging for better production monitoring.

-        except OpenSkyAPIError as e:
-            print(f"OpenSky API Error: {str(e)}")
-            return ([], start)
-        except Exception as e:
-            print(f"Unexpected error: {str(e)}")
-            return ([], start)
+        except OpenSkyAPIError as e:
+            logger.error(f"OpenSky API Error: {str(e)}")
+            return ([], start)
+        except Exception as e:
+            logger.error(f"Unexpected error: {str(e)}", exc_info=True)
+            return ([], start)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
print(f"OpenSky API Error: {str(e)}")
return ([], start)
except Exception as e:
print(f"Unexpected error: {str(e)}")
return ([], start)
except OpenSkyAPIError as e:
logger.error(f"OpenSky API Error: {str(e)}")
return ([], start)
except Exception as e:
logger.error(f"Unexpected error: {str(e)}", exc_info=True)
return ([], start)
🤖 Prompt for AI Agents
In pyspark_datasources/opensky.py around lines 304 to 308, replace the print
statements used for error messages with appropriate logging calls. Import the
logging module if not already done, and use logger.error() or a similar logging
method to record the error messages instead of print, ensuring better production
monitoring and log management.

@@ -0,0 +1,350 @@
"""
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add the docstring under the python data source class?

@allisonwang-db
Copy link
Owner

Thanks for adding the data source!

@allisonwang-db allisonwang-db merged commit 55248ae into allisonwang-db:master Jul 21, 2025
1 check passed
@coderabbitai coderabbitai bot mentioned this pull request Jul 21, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants