-
Notifications
You must be signed in to change notification settings - Fork 0
trying to run celery queue with airflow #29
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Summary of ChangesHello @LuisJG8, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request aims to enhance the data ingestion pipeline by refactoring existing logic into more organized, reusable functions and integrating them with Celery for asynchronous execution. The changes prepare the system for a more robust, scheduled data processing workflow, likely orchestrated by Airflow, by encapsulating data operations within dedicated tasks. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request aims to integrate Celery with Airflow by introducing a new Celery task and supporting functions to fetch and save GitHub data. However, it contains a significant architectural flaw where a Celery task blocks on the result of other tasks using .get(), which can lead to worker deadlocks and Denial of Service. Additionally, the code introduces several critical issues that will prevent it from functioning correctly, including a circular dependency between worker.py and client.py (causing an ImportError), a TypeError in worker.py due to a mismatched function signature for get_data_from_queue, a task name mismatch in the Airflow DAG (worker.get_data_from_queue vs run_queue_and_save), and a potential UnboundLocalError.
| @task | ||
| def run_the_queue(): | ||
| app.send_task("worker.get_github_data", args=[0, 500]) | ||
| app.send_task("worker.get_data_from_queue", args=[100, 500]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are trying to send a task named worker.get_data_from_queue, but there is no Celery task with this name. The function get_data_from_queue in client.py is not decorated as a task. I believe you intended to call the run_queue_and_save task defined in worker.py.
| app.send_task("worker.get_data_from_queue", args=[100, 500]) | |
| app.send_task("worker.run_queue_and_save", args=[100, 500]) |
| from datetime import datetime | ||
| from github import Auth, Github, GithubException | ||
| from dotenv import load_dotenv | ||
| from client import get_data_from_queue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This import creates a circular dependency. worker.py imports get_data_from_queue from client.py, while client.py imports build_repo_chord from worker.py (at line 5). This will cause an ImportError at runtime. To fix this, you should restructure your code to avoid circular imports. For example, you could move shared functions into a separate utils.py file that both client.py and worker.py can import from.
| def run_queue_and_save(total: int = 5000, batch_size: int = 500): | ||
| return get_data_from_queue(total=total, batch_size=batch_size) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This task calls get_data_from_queue with total and batch_size arguments. However, the get_data_from_queue function in client.py is defined without any parameters, which will cause a TypeError when this task is executed. You need to modify get_data_from_queue in client.py to accept these arguments and use them when calling build_repo_chord.
| try: | ||
| print("Getting the result") | ||
| response = build_repo_chord(total=5000, batch_size=500) | ||
| the_data = response.get(timeout=3600) # 1 hour timeout |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The call to response.get(timeout=3600) in the get_data_from_queue function is a blocking operation that waits for the result of a Celery chord. This is a dangerous anti-pattern as it blocks the worker process, preventing it from performing other work, and can lead to worker deadlocks and Denial of Service in a distributed environment. Refactor the code to avoid blocking on task results within a worker; consider using Celery's callback mechanism (e.g., making save_to_parquet a task and linking it as a callback to the chord) for asynchronous processing. Additionally, there's a potential UnboundLocalError if an exception occurs within the try block, as the_data might not be assigned before save_to_parquet is called. Ensure save_to_parquet is only called when the_data is successfully populated, perhaps by using a try...except...else block.
No description provided.