Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 44 additions & 14 deletions agentic_index_cli/agentic_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import os
import sys
import time
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Optional
Expand Down Expand Up @@ -44,14 +45,18 @@
"agpl-2.0",
}

# cache for GitHub license texts to avoid repeated downloads
LICENSE_CACHE: dict[str, str] = {}

def github_search(query: str, page: int = 1) -> List[Dict]:

def github_search(query: str, page: int = 1, per_page: int = 100) -> List[Dict]:
"""Search GitHub repositories with batching support."""
time.sleep(1) # rate limiting
params = {
"q": query,
"sort": "stars",
"order": "desc",
"per_page": 5,
"per_page": per_page,
"page": page,
}
resp = requests.get(f"{GITHUB_API}/search/repositories", params=params, headers=HEADERS)
Expand Down Expand Up @@ -83,6 +88,25 @@ def fetch_readme(full_name: str) -> str:
return ""


def fetch_license_text(url: str) -> str:
"""Fetch and cache license body text."""
if not url:
return ""
if url in LICENSE_CACHE:
return LICENSE_CACHE[url]
resp = requests.get(url, headers=HEADERS)
if resp.status_code != 200:
LICENSE_CACHE[url] = ""
return ""
data = resp.json()
text = data.get("body", "")
if not text and "content" in data:
import base64
text = base64.b64decode(data["content"]).decode("utf-8", errors="ignore")
LICENSE_CACHE[url] = text
return text


def compute_recency_factor(pushed_at: str) -> float:
pushed_date = datetime.strptime(pushed_at, "%Y-%m-%dT%H:%M:%SZ")
days = (datetime.utcnow() - pushed_date).days
Expand Down Expand Up @@ -167,6 +191,9 @@ def harvest_repo(full_name: str) -> Optional[Dict]:
if not repo:
return None
readme = fetch_readme(full_name)
license_url = (repo.get("license") or {}).get("url")
if license_url:
fetch_license_text(license_url)
score = compute_score(repo, readme)
category = categorize(repo.get("description", ""), repo.get("topics", []))
first_paragraph = readme.split("\n\n")[0][:200]
Expand All @@ -188,34 +215,37 @@ def harvest_repo(full_name: str) -> Optional[Dict]:
}


def search_and_harvest(min_stars: int = 0, max_pages: int = 1) -> List[Dict]:
seen = set()
results = []
def search_and_harvest(
min_stars: int = 0, max_pages: int = 1, workers: int = 8
) -> List[Dict]:
"""Search GitHub and harvest repo metadata concurrently."""
seen: set[str] = set()
names: list[str] = []
for term in SEARCH_TERMS:
for page in range(1, max_pages + 1):
query = f"{term} stars:>={min_stars}"
repos = github_search(query, page)
repos = github_search(query, page, per_page=100)
for repo in repos:
full_name = repo["full_name"]
if full_name in seen:
continue
seen.add(full_name)
meta = harvest_repo(full_name)
if meta:
results.append(meta)
# Topic filter
names.append(full_name)
for topic in TOPIC_FILTERS:
for page in range(1, max_pages + 1):
query = f"topic:{topic} stars:>={min_stars}"
repos = github_search(query, page)
repos = github_search(query, page, per_page=100)
for repo in repos:
full_name = repo["full_name"]
if full_name in seen:
continue
seen.add(full_name)
meta = harvest_repo(full_name)
if meta:
results.append(meta)
names.append(full_name)
results: list[dict] = []
with ThreadPoolExecutor(max_workers=workers) as ex:
for meta in ex.map(harvest_repo, names):
if meta:
results.append(meta)
return results


Expand Down
65 changes: 65 additions & 0 deletions bench/benchmark.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import json
import os
import tempfile
import timeit
from pathlib import Path

import responses

from agentic_index_cli.internal import scrape, rank


def _make_items(start: int, count: int) -> list[dict]:
items = []
for i in range(start, start + count):
items.append(
{
"name": f"repo{i}",
"full_name": f"owner/repo{i}",
"html_url": f"https://example.com/repo{i}",
"description": "benchmark repo",
"stargazers_count": i,
"forks_count": 0,
"open_issues_count": 0,
"archived": False,
"license": {"spdx_id": "MIT"},
"language": "Python",
"pushed_at": "2025-01-01T00:00:00Z",
"owner": {"login": "owner"},
}
)
return items


def run() -> bool:
with tempfile.TemporaryDirectory() as td:
repo_path = Path(td) / "repos.json"
with responses.RequestsMock() as rsps:
per_query = 500 // len(scrape.QUERIES)
idx = 0
for _ in scrape.QUERIES:
items = _make_items(idx, per_query)
idx += per_query
rsps.add(
responses.GET,
"https://api.github.com/search/repositories",
json={"items": items},
headers={"X-RateLimit-Remaining": "99"},
match_querystring=False,
status=200,
)
repos = scrape.scrape(min_stars=0, token=None)
repo_path.write_text(json.dumps(repos))
env = os.environ.copy()
env["PYTEST_CURRENT_TEST"] = "benchmark"
rank.main(str(repo_path))
return repo_path.exists()


def main() -> None:
duration = timeit.timeit(run, number=1)
print(f"Pipeline completed in {duration:.2f}s")


if __name__ == "__main__":
main()
24 changes: 24 additions & 0 deletions bench/profile.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import cProfile
from pathlib import Path

from bench.benchmark import run


def main() -> None:
prof = cProfile.Profile()
prof.enable()
run()
prof.disable()
out = Path("bench/profile.prof")
out.parent.mkdir(exist_ok=True)
prof.dump_stats(str(out))
print(f"Profile written to {out}")
try:
import snakeviz
snakeviz.main([str(out)])
except Exception as exc:
print(f"snakeviz failed: {exc}")


if __name__ == "__main__":
main()
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ hypothesis
requests
PyYAML
pytest-socket
responses
responses
pytest-benchmark
11 changes: 11 additions & 0 deletions tests/test_perf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import os
import pytest

from bench import benchmark as bench_module

pytestmark = pytest.mark.skipif(os.getenv("PERF") != "true", reason="perf tests disabled")


@pytest.mark.benchmark
def test_scrape_rank_benchmark(benchmark):
benchmark(bench_module.run)
Loading