Skip to content

Conversation

@LuisJG8
Copy link
Owner

@LuisJG8 LuisJG8 commented Feb 5, 2026

No description provided.

@LuisJG8 LuisJG8 merged commit 260b1e6 into MyProjects Feb 5, 2026
1 check failed
@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @LuisJG8, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request aims to enhance the data ingestion pipeline by refactoring existing logic into more organized, reusable functions and integrating them with Celery for asynchronous execution. The changes prepare the system for a more robust, scheduled data processing workflow, likely orchestrated by Airflow, by encapsulating data operations within dedicated tasks.

Highlights

  • Refactored Data Fetching and Saving Logic: The core logic for fetching data using build_repo_chord and saving it to Parquet files has been extracted from the main execution block of client.py into two new, modular functions: save_to_parquet and get_data_from_queue.
  • New Celery Task for Data Processing: A new Celery task, @app.task def run_queue_and_save(...), has been introduced in worker.py. This task is designed to orchestrate the data fetching and saving process by calling the newly created get_data_from_queue function from client.py.
  • Airflow DAG Task Name Mismatch: The airflow/dags/run_queue.py DAG has been updated to call a Celery task named worker.get_data_from_queue with new arguments [100, 500]. However, the worker.py file defines the new task as run_queue_and_save, indicating a potential mismatch in the task name being invoked by the DAG.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • airflow/dags/run_queue.py
    • Updated the Celery task name called by the DAG from worker.get_github_data to worker.get_data_from_queue.
    • Changed the arguments passed to the task from [0, 500] to [100, 500].
  • client.py
    • Extracted the data saving logic into a new function save_to_parquet(the_data).
    • Extracted the data fetching, result retrieval, and error handling logic into a new function get_data_from_queue().
    • Modified the script to execute get_data_from_queue() when run directly, replacing the previous inline execution block.
  • worker.py
    • Removed the unused import time statement.
    • Added from client import get_data_from_queue to import the new data fetching function.
    • Introduced a new Celery task @app.task def run_queue_and_save(total: int = 5000, batch_size: int = 500) which calls client.get_data_from_queue.
Activity
  • No specific activity (comments, reviews, progress) has been recorded for this pull request yet.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request aims to integrate Celery with Airflow by introducing a new Celery task and supporting functions to fetch and save GitHub data. However, it contains a significant architectural flaw where a Celery task blocks on the result of other tasks using .get(), which can lead to worker deadlocks and Denial of Service. Additionally, the code introduces several critical issues that will prevent it from functioning correctly, including a circular dependency between worker.py and client.py (causing an ImportError), a TypeError in worker.py due to a mismatched function signature for get_data_from_queue, a task name mismatch in the Airflow DAG (worker.get_data_from_queue vs run_queue_and_save), and a potential UnboundLocalError.

@task
def run_the_queue():
app.send_task("worker.get_github_data", args=[0, 500])
app.send_task("worker.get_data_from_queue", args=[100, 500])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

You are trying to send a task named worker.get_data_from_queue, but there is no Celery task with this name. The function get_data_from_queue in client.py is not decorated as a task. I believe you intended to call the run_queue_and_save task defined in worker.py.

Suggested change
app.send_task("worker.get_data_from_queue", args=[100, 500])
app.send_task("worker.run_queue_and_save", args=[100, 500])

from datetime import datetime
from github import Auth, Github, GithubException
from dotenv import load_dotenv
from client import get_data_from_queue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

This import creates a circular dependency. worker.py imports get_data_from_queue from client.py, while client.py imports build_repo_chord from worker.py (at line 5). This will cause an ImportError at runtime. To fix this, you should restructure your code to avoid circular imports. For example, you could move shared functions into a separate utils.py file that both client.py and worker.py can import from.

Comment on lines +187 to +188
def run_queue_and_save(total: int = 5000, batch_size: int = 500):
return get_data_from_queue(total=total, batch_size=batch_size)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

This task calls get_data_from_queue with total and batch_size arguments. However, the get_data_from_queue function in client.py is defined without any parameters, which will cause a TypeError when this task is executed. You need to modify get_data_from_queue in client.py to accept these arguments and use them when calling build_repo_chord.

try:
print("Getting the result")
response = build_repo_chord(total=5000, batch_size=500)
the_data = response.get(timeout=3600) # 1 hour timeout
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-medium medium

The call to response.get(timeout=3600) in the get_data_from_queue function is a blocking operation that waits for the result of a Celery chord. This is a dangerous anti-pattern as it blocks the worker process, preventing it from performing other work, and can lead to worker deadlocks and Denial of Service in a distributed environment. Refactor the code to avoid blocking on task results within a worker; consider using Celery's callback mechanism (e.g., making save_to_parquet a task and linking it as a callback to the chord) for asynchronous processing. Additionally, there's a potential UnboundLocalError if an exception occurs within the try block, as the_data might not be assigned before save_to_parquet is called. Ensure save_to_parquet is only called when the_data is successfully populated, perhaps by using a try...except...else block.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant