Skip to content

feat: issue task control #327

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions petercat_utils/data_class.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,4 @@ class GitDocTaskNodeType(AutoNameEnum):
class GitIssueTaskNodeType(AutoNameEnum):
REPO = auto()
ISSUE = auto()
ISSUE_PAGE = auto()
106 changes: 84 additions & 22 deletions petercat_utils/rag_helper/git_issue_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
from ..data_class import GitIssueTaskNodeType, TaskStatus, TaskType, RAGGitIssueConfig
from ..rag_helper import issue_retrieval

g = Github()
GITHUB_PER_PAGE = 30
g = Github(per_page=GITHUB_PER_PAGE)


def add_rag_git_issue_task(config: RAGGitIssueConfig):
Expand All @@ -22,8 +23,21 @@
return res


def create_rag_git_issue_task(record):
return GitIssueTask(id=record["id"],

Check warning on line 27 in petercat_utils/rag_helper/git_issue_task.py

View check run for this annotation

Codecov / codecov/patch

petercat_utils/rag_helper/git_issue_task.py#L27

Added line #L27 was not covered by tests
issue_id=record["issue_id"],
repo_name=record["repo_name"],
node_type=record["node_type"],
bot_id=record["bot_id"],
status=record["status"],
from_id=record["from_task_id"],
page_index=record["page_index"]
)


class GitIssueTask(GitTask):
issue_id: str
issue_id: int
page_index: int
node_type: GitIssueTaskNodeType

def __init__(self,
Expand All @@ -33,11 +47,13 @@
repo_name,
status=TaskStatus.NOT_STARTED,
from_id=None,
id=None
id=None,
page_index=None
):
super().__init__(bot_id=bot_id, type=TaskType.GIT_ISSUE, from_id=from_id, id=id, status=status,
repo_name=repo_name)
self.issue_id = issue_id
self.page_index = page_index

Check warning on line 56 in petercat_utils/rag_helper/git_issue_task.py

View check run for this annotation

Codecov / codecov/patch

petercat_utils/rag_helper/git_issue_task.py#L56

Added line #L56 was not covered by tests
self.node_type = GitIssueTaskNodeType(node_type)

def extra_save_data(self):
Expand All @@ -50,20 +66,63 @@
self.update_status(TaskStatus.IN_PROGRESS)
if self.node_type == GitIssueTaskNodeType.REPO:
return self.handle_repo_node()
elif self.node_type == GitIssueTaskNodeType.ISSUE_PAGE:
return self.handle_issue_page_node()

Check warning on line 70 in petercat_utils/rag_helper/git_issue_task.py

View check run for this annotation

Codecov / codecov/patch

petercat_utils/rag_helper/git_issue_task.py#L69-L70

Added lines #L69 - L70 were not covered by tests
elif self.node_type == GitIssueTaskNodeType.ISSUE:
return self.handle_issue_node()
else:
raise ValueError(f"Unsupported node type [{self.node_type}]")

def handle_repo_node(self):
repo = g.get_repo(self.repo_name)
repo.get_issues()
issues = [issue for issue in repo.get_issues()]
issues = repo.get_issues(state='all')
latest_page = (self.get_table()

Check warning on line 79 in petercat_utils/rag_helper/git_issue_task.py

View check run for this annotation

Codecov / codecov/patch

petercat_utils/rag_helper/git_issue_task.py#L78-L79

Added lines #L78 - L79 were not covered by tests
.select('*')
.eq('repo_name', self.repo_name)
.eq('node_type', GitIssueTaskNodeType.ISSUE_PAGE.value)
.order('page_index', desc=True)
.limit(1)
.execute()).data

slice_page_index = latest_page[0]["page_index"] if len(latest_page) > 0 else 0

Check warning on line 87 in petercat_utils/rag_helper/git_issue_task.py

View check run for this annotation

Codecov / codecov/patch

petercat_utils/rag_helper/git_issue_task.py#L87

Added line #L87 was not covered by tests

# The latest page might have a new issue.
if len(latest_page) > 0:
create_rag_git_issue_task(latest_page[0]).send()

Check warning on line 91 in petercat_utils/rag_helper/git_issue_task.py

View check run for this annotation

Codecov / codecov/patch

petercat_utils/rag_helper/git_issue_task.py#L90-L91

Added lines #L90 - L91 were not covered by tests

if issues.totalCount > 0:
pages = issues.totalCount // GITHUB_PER_PAGE + (1 if issues.totalCount % GITHUB_PER_PAGE != 0 else 0)
pages_array = list(range(1, pages + 1))[slice_page_index:]
task_list = list(

Check warning on line 96 in petercat_utils/rag_helper/git_issue_task.py

View check run for this annotation

Codecov / codecov/patch

petercat_utils/rag_helper/git_issue_task.py#L93-L96

Added lines #L93 - L96 were not covered by tests
map(
lambda item: {
"repo_name": self.repo_name,
"status": TaskStatus.NOT_STARTED.value,
"node_type": GitIssueTaskNodeType.ISSUE_PAGE.value,
"from_task_id": self.id,
"bot_id": self.bot_id,
"page_index": item
},
pages_array,
),
)
if len(task_list) > 0:
result = self.get_table().insert(task_list).execute()
for record in result.data:
issue_task = create_rag_git_issue_task(record)
issue_task.send()

Check warning on line 113 in petercat_utils/rag_helper/git_issue_task.py

View check run for this annotation

Codecov / codecov/patch

petercat_utils/rag_helper/git_issue_task.py#L109-L113

Added lines #L109 - L113 were not covered by tests

return self.update_status(TaskStatus.COMPLETED)

Check warning on line 115 in petercat_utils/rag_helper/git_issue_task.py

View check run for this annotation

Codecov / codecov/patch

petercat_utils/rag_helper/git_issue_task.py#L115

Added line #L115 was not covered by tests

def handle_issue_page_node(self):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The handle_issue_page_node method does not handle the case where repo.get_issues(state='all').get_page(self.page_index) returns an empty list. This could lead to potential errors when processing issues.

repo = g.get_repo(self.repo_name)
issues = repo.get_issues(state='all').get_page(self.page_index)

Check warning on line 119 in petercat_utils/rag_helper/git_issue_task.py

View check run for this annotation

Codecov / codecov/patch

petercat_utils/rag_helper/git_issue_task.py#L118-L119

Added lines #L118 - L119 were not covered by tests

task_list = list(
map(
lambda item: {
"repo_name": self.repo_name,
"issue_id": str(item.number),
"issue_id": item.number,
"status": TaskStatus.NOT_STARTED.value,
"node_type": GitIssueTaskNodeType.ISSUE.value,
"from_task_id": self.id,
Expand All @@ -73,22 +132,25 @@
),
)
if len(task_list) > 0:
result = self.get_table().insert(task_list).execute()
for record in result.data:
issue_task = GitIssueTask(id=record["id"],
issue_id=record["issue_id"],
repo_name=record["repo_name"],
node_type=record["node_type"],
bot_id=record["bot_id"],
status=record["status"],
from_id=record["from_task_id"]
)
issue_task.send()

return (self.get_table().update(
{"status": TaskStatus.COMPLETED.value})
.eq("id", self.id)
.execute())
existing_issues = (self.get_table()

Check warning on line 135 in petercat_utils/rag_helper/git_issue_task.py

View check run for this annotation

Codecov / codecov/patch

petercat_utils/rag_helper/git_issue_task.py#L135

Added line #L135 was not covered by tests
.select('*')
.in_('issue_id', [item['issue_id'] for item in task_list])
.eq('repo_name', self.repo_name)
.eq('node_type', GitIssueTaskNodeType.ISSUE.value)
.execute()
)

existing_issue_ids = {int(issue['issue_id']) for issue in existing_issues.data}

Check warning on line 143 in petercat_utils/rag_helper/git_issue_task.py

View check run for this annotation

Codecov / codecov/patch

petercat_utils/rag_helper/git_issue_task.py#L143

Added line #L143 was not covered by tests

new_task_list = [item for item in task_list if item['issue_id'] not in existing_issue_ids]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new_task_list is created without checking if task_list is empty. This could lead to unnecessary database operations if task_list is empty.

if len(new_task_list) > 0:
result = self.get_table().insert(new_task_list).execute()
for record in result.data:
issue_task = create_rag_git_issue_task(record)
issue_task.send()

Check warning on line 150 in petercat_utils/rag_helper/git_issue_task.py

View check run for this annotation

Codecov / codecov/patch

petercat_utils/rag_helper/git_issue_task.py#L145-L150

Added lines #L145 - L150 were not covered by tests

return self.update_status(TaskStatus.COMPLETED)

Check warning on line 152 in petercat_utils/rag_helper/git_issue_task.py

View check run for this annotation

Codecov / codecov/patch

petercat_utils/rag_helper/git_issue_task.py#L152

Added line #L152 was not covered by tests


def handle_issue_node(self):
issue_retrieval.add_knowledge_by_issue(
Expand Down
12 changes: 2 additions & 10 deletions petercat_utils/rag_helper/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import boto3

from .git_doc_task import GitDocTask
from .git_issue_task import GitIssueTask
from .git_issue_task import GitIssueTask, create_rag_git_issue_task
from .git_task import GitTask

from ..utils.env import get_env_variable
Expand Down Expand Up @@ -77,15 +77,7 @@
from_id=data["from_task_id"],
)
if task_type == TaskType.GIT_ISSUE:
return GitIssueTask(
id=data["id"],
issue_id=data["issue_id"],
repo_name=data["repo_name"],
node_type=data["node_type"],
bot_id=data["bot_id"],
status=data["status"],
from_id=data["from_task_id"],
)
return create_rag_git_issue_task(data)

Check warning on line 80 in petercat_utils/rag_helper/task.py

View check run for this annotation

Codecov / codecov/patch

petercat_utils/rag_helper/task.py#L80

Added line #L80 was not covered by tests


def trigger_task(task_type: TaskType, task_id: Optional[str]):
Expand Down
2 changes: 2 additions & 0 deletions subscriber/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ def lambda_handler(event, context):
batch_item_failures = []
sqs_batch_response = {}

if len(event):
print(f"event batch size is ${len(event)}")
for record in event["Records"]:
try:
body = record["body"]
Expand Down