Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions surfsense_backend/app/connectors/github/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
"""GitHub Connector Module."""

from .client import GitHubConnector
from .constants import MAX_FILE_SIZE, SKIPPED_DIRS
from .service import GitIngestService

__all__ = [
"GitHubConnector",
"GitIngestService",
"MAX_FILE_SIZE",
"SKIPPED_DIRS",
]

132 changes: 132 additions & 0 deletions surfsense_backend/app/connectors/github/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
"""GitHub connector for repository processing."""

import logging
from typing import Any

from github3 import exceptions as github_exceptions
from github3 import login as github_login
from github3.exceptions import ForbiddenError

from .service import GitIngestService

logger = logging.getLogger(__name__)


class GitHubConnector:
"""GitHub connector for bulk repository processing."""

def __init__(self, token: str):
if not token:
raise ValueError("GitHub token cannot be empty.")

try:
self.gh = github_login(token=token)
self.gh.me()
logger.info("Successfully authenticated with GitHub API.")
except (github_exceptions.AuthenticationFailed, ForbiddenError) as e:
logger.error(f"GitHub authentication failed: {e}")
raise ValueError("Invalid GitHub token or insufficient permissions.") from e
except Exception as e:
logger.error(f"Failed to initialize GitHub client: {e}")
raise e

self.gitingest = GitIngestService(token=token)
self.token = token

def get_user_repositories(self) -> list[dict[str, Any]]:
"""Fetch repositories accessible by the authenticated user."""
repos_data = []
try:
for repo in self.gh.repositories(type="all", sort="updated"):
repos_data.append(
{
"id": repo.id,
"name": repo.name,
"full_name": repo.full_name,
"private": repo.private,
"url": repo.html_url,
"description": repo.description or "",
"last_updated": repo.updated_at if repo.updated_at else None,
"default_branch": repo.default_branch or "main",
}
)
logger.info(f"Fetched {len(repos_data)} repositories.")
return repos_data
except Exception as e:
logger.error(f"Failed to fetch GitHub repositories: {e}")
return []

async def process_repository(self, repo_full_name: str) -> dict[str, Any]:
"""
Process a repository using gitingest and return content with metadata.

This method is async to support non-blocking operation within Celery tasks.
"""
try:
owner, repo_name = repo_full_name.split("/")
repo = self.gh.repository(owner, repo_name)

if not repo:
raise ValueError(f"Repository '{repo_full_name}' not found.")

branch = repo.default_branch or "main"
repo_url = f"https://github.com/{repo_full_name}"

logger.info(f"Processing repository {repo_full_name} (branch: {branch})")

# Await the async gitingest call
result = await self.gitingest.process_repository(repo_url, branch)

result["metadata"].update(
{
"repo_id": repo.id,
"private": repo.private,
"description": repo.description or "",
"stars": repo.stargazers_count or 0,
"language": repo.language or "Unknown",
"html_url": repo.html_url,
}
)

logger.info(
f"Successfully processed {repo_full_name}: {len(result['content'])} characters"
)

return result

except Exception as e:
logger.error(f"Failed to process repository {repo_full_name}: {e}")
raise ValueError(f"Failed to process {repo_full_name}: {e!s}") from e

async def process_multiple_repositories(
self, repo_full_names: list[str]
) -> dict[str, Any]:
"""
Process multiple repositories asynchronously and return results with errors.

Processes repositories sequentially to avoid overwhelming the system.
"""
results = {}
errors = {}

for repo_full_name in repo_full_names:
try:
result = await self.process_repository(repo_full_name)
results[repo_full_name] = result
logger.info(f"✓ Successfully processed {repo_full_name}")
except Exception as e:
errors[repo_full_name] = str(e)
logger.error(f"✗ Failed to process {repo_full_name}: {e}")

logger.info(
f"Processed {len(results)}/{len(repo_full_names)} repositories successfully"
)

return {
"results": results,
"errors": errors,
"success_count": len(results),
"error_count": len(errors),
"total": len(repo_full_names),
}

25 changes: 25 additions & 0 deletions surfsense_backend/app/connectors/github/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
"""Constants for GitHub connector with gitingest."""

SKIPPED_DIRS = {
".git",
"node_modules",
"vendor",
"build",
"dist",
"target",
"__pycache__",
"venv",
".venv",
"env",
".vscode",
".idea",
".project",
".settings",
"tmp",
"logs",
}

CODE_EXTENSIONS = set()
DOC_EXTENSIONS = set()
MAX_FILE_SIZE = 10 * 1024 * 1024

147 changes: 147 additions & 0 deletions surfsense_backend/app/connectors/github/service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
"""
Gitingest service for processing GitHub repositories.

This module provides a wrapper around the gitingest library to convert
GitHub repositories into text format optimized for indexing and LLM processing.
"""

import asyncio
import logging
from typing import Any

from .constants import MAX_FILE_SIZE, SKIPPED_DIRS

logger = logging.getLogger(__name__)


class GitIngestService:
"""Service for processing GitHub repositories using gitingest."""

def __init__(self, token: str | None = None):
self.token = token
self.max_file_size = MAX_FILE_SIZE
logger.info("GitIngest service initialized")

async def process_repository(
self, repo_url: str, branch: str = "main"
) -> dict[str, Any]:
"""
Process a GitHub repository and extract its content asynchronously.

Uses gitingest's native ingest_async function for proper async support
within Celery tasks and other async contexts.

Args:
repo_url: GitHub repository URL (e.g., "https://github.com/owner/repo")
branch: Branch to process (default: "main")

Returns:
Dictionary containing content, tree, summary, and metadata
"""
try:
try:
from gitingest import ingest_async
except ImportError:
logger.error("gitingest package not installed")
raise ImportError(
"gitingest is required. Install with: pip install gitingest"
)

logger.info(f"Processing repository: {repo_url} (branch: {branch})")
repo_full_name = self._parse_repo_url(repo_url)

exclude_patterns = [f"{dir_name}/**" for dir_name in SKIPPED_DIRS]
exclude_patterns.append("*.pyc")

# Use gitingest's native async function for non-blocking operation
# ingest_async returns a tuple: (summary, tree, content)
summary, tree, content = await ingest_async(
query=repo_url,
max_file_size=self.max_file_size,
include_patterns=None,
exclude_patterns=exclude_patterns,
)

logger.info(
f"Successfully processed repository {repo_full_name}: {len(content)} characters"
)

return {
"content": content,
"tree": tree,
"summary": summary,
"repo_full_name": repo_full_name,
"branch": branch,
"metadata": {
"repository": repo_full_name,
"branch": branch,
"source": "gitingest",
"content_length": len(content),
},
}

except ImportError:
raise
except Exception as e:
logger.error(f"Failed to process repository {repo_url}: {e}", exc_info=True)
raise ValueError(f"Failed to process repository: {e!s}") from e

def _parse_repo_url(self, repo_url: str) -> str:
"""Parse GitHub repository URL to extract owner/repo format."""
if not repo_url:
raise ValueError("Repository URL cannot be empty")

repo_url = repo_url.rstrip("/")

if repo_url.endswith(".git"):
repo_url = repo_url[:-4]

if "github.com/" in repo_url:
parts = repo_url.split("github.com/")[-1].split("/")
if len(parts) >= 2:
return f"{parts[0]}/{parts[1]}"
else:
raise ValueError(f"Invalid GitHub URL format: {repo_url}")
elif "/" in repo_url and repo_url.count("/") == 1:
return repo_url
else:
raise ValueError(
f"Invalid repository URL format: {repo_url}. "
f"Expected format: 'owner/repo' or 'https://github.com/owner/repo'"
)

async def process_multiple_repositories(
self, repo_urls: list[str], branch: str = "main"
) -> dict[str, dict[str, Any]]:
"""
Process multiple GitHub repositories asynchronously and return results with errors.

Processes repositories sequentially to avoid overwhelming the system.
"""
results = {}
errors = {}

for repo_url in repo_urls:
try:
result = await self.process_repository(repo_url, branch)
repo_name = result["repo_full_name"]
results[repo_name] = result
logger.info(f"✓ Successfully processed {repo_name}")
except Exception as e:
repo_name = repo_url
errors[repo_name] = str(e)
logger.error(f"✗ Failed to process {repo_name}: {e}")

if errors:
logger.warning(
f"Processed {len(results)}/{len(repo_urls)} repositories successfully. "
f"{len(errors)} failed."
)

return {
"results": results,
"errors": errors,
"success_count": len(results),
"error_count": len(errors),
}

Loading