Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,24 @@ uv run -m src.3_evals.2_synthetic_data.synthesize_data \
--limit 18
```

Quantify embedding diversity of synthetic data

```bash
# Baseline: "Real" dataset
uv run \
--env-file .env \
-m src.3_evals.2_synthetic_data.annotate_diversity \
--langfuse_dataset_name search-dataset \
--run_name cosine_similarity_bge_m3

# Synthetic dataset
uv run \
--env-file .env \
-m src.3_evals.2_synthetic_data.annotate_diversity \
--langfuse_dataset_name search-dataset-synthetic-20250609 \
--run_name cosine_similarity_bge_m3
```

Run LLM-as-a-judge Evaluation on synthetic data

```bash
Expand Down
12 changes: 3 additions & 9 deletions src/3_evals/1_llm_judge/run_eval.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from dotenv import load_dotenv
from langfuse._client.datasets import DatasetItemClient
from openai import AsyncOpenAI
from rich.progress import Progress, SpinnerColumn, TextColumn, track
from rich.progress import track

from src.utils import (
AsyncWeaviateKnowledgeBase,
Expand All @@ -18,7 +18,7 @@
set_up_logging,
setup_langfuse_tracer,
)
from src.utils.langfuse.shared_client import langfuse_client
from src.utils.langfuse.shared_client import flush_langfuse, langfuse_client


load_dotenv(verbose=True)
Expand Down Expand Up @@ -212,12 +212,6 @@ async def run_and_evaluate(
trace_id=_traced_response.trace_id,
)

with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
transient=True,
) as progress:
task = progress.add_task("Finalizing Langfuse annotations...", total=None)
langfuse_client.flush()
flush_langfuse()

asyncio.run(async_weaviate_client.close())
52 changes: 52 additions & 0 deletions src/3_evals/2_synthetic_data/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,58 @@ uv run -m src.3_evals.2_synthetic_data.synthesize_data \
--limit 18
```

## Evaluate diversity of synthetic data

```bash
# Baseline: "Real" dataset
uv run \
--env-file .env \
-m src.3_evals.2_synthetic_data.annotate_diversity \
--langfuse_dataset_name search-dataset \
--run_name cosine_similarity_bge_m3

# Synthetic dataset
uv run \
--env-file .env \
-m src.3_evals.2_synthetic_data.annotate_diversity \
--langfuse_dataset_name search-dataset-synthetic-20250609 \
--run_name cosine_similarity_bge_m3
```

Example Output:

```bash
# baseline
Items to process: 18
Embedding ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 0:00:02 0:00:00
Cosine similarity of search-dataset
count 18.000000
mean 0.376153
std 0.027593
min 0.337244
25% 0.355240
50% 0.368091
75% 0.397940
max 0.421926
dtype: float64
Uploading scores... ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 0:00:00

# synthetic, default temperature, etc.
Items to process: 80
Embedding ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 0:00:09 0:00:00
Cosine similarity of search-dataset-synthetic-20250609
count 80.000000
mean 0.350789
std 0.027978
min 0.275784
25% 0.330807
50% 0.351904
75% 0.371099
max 0.409278
dtype: float64
Uploading scores... ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 0:00:00
```

## Run Evaluation on synthetic data

```bash
Expand Down
163 changes: 163 additions & 0 deletions src/3_evals/2_synthetic_data/annotate_diversity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
"""Add diversity metrics to the given LangFuse dataset.

Usage:

uv run \
--env-file .env \
-m src.3_evals.2_synthetic_data.annotate_diversity \
--langfuse_dataset_name ${DATASET_NAME} \
--run_name cosine_similarity_bge_m3 \
--limit 18
"""

import argparse
import asyncio
from typing import TYPE_CHECKING

import numpy as np
import pandas as pd
import pydantic
from openai import AsyncOpenAI
from rich.progress import track

from src.utils import Configs, gather_with_progress
from src.utils.langfuse.shared_client import flush_langfuse, langfuse_client


if TYPE_CHECKING:
from langfuse._client.datasets import DatasetItemClient


parser = argparse.ArgumentParser()
parser.add_argument("--langfuse_dataset_name", required=True)
parser.add_argument("--run_name", default="cosine_similarity")
parser.add_argument("--limit", type=int)
parser.add_argument("--embed_batch_size", type=int, default=18)


class EmbeddingResult(pydantic.BaseModel):
"""Tracks trace_id and embedding vector for an instance."""

langfuse_trace_id: str
embedding: list[float]


async def batch_embed(
dataset_items: list["DatasetItemClient"],
oai_client: AsyncOpenAI,
model_name: str,
run_name: str | None = None,
) -> list[EmbeddingResult]:
"""Run embedding on the given LangFuse dataset items."""
output: list[EmbeddingResult] = []
run_name = run_name or f"embed_{model_name}"

texts = [_item.input["text"] for _item in dataset_items]
embedding_response = await oai_client.embeddings.create(
input=texts, model=model_name
)
embeddings = [_data.embedding for _data in embedding_response.data]
assert len(dataset_items) == len(embeddings), (len(dataset_items), len(embeddings))

for _lf_dataset_item, _embedding in zip(dataset_items, embeddings):
with _lf_dataset_item.run(run_name=run_name) as span:
_result = EmbeddingResult(
langfuse_trace_id=span.trace_id,
embedding=_embedding,
)
output.append(_result)

return output


def _avg_cosine_similarity(matrix: np.ndarray) -> np.ndarray:
"""
Memory-efficient calculation of instance-wise cosine similarity with population.

Copied from:

github.com/ComplexData-MILA/AIF-Gen/\
blob/10f500c68517b34aa4c07381f2cf67ea6043d73c/\
aif_gen/validation/embedding_diversity.py#L155

(MIT)
"""
row_norms = np.linalg.norm(matrix, axis=1, keepdims=True)
row_norms[row_norms == 0] = 1e-10 # avoid division by zero

normalized_matrix = matrix / row_norms # shape: (n, m)
sum_normalized = normalized_matrix.sum(axis=0) # shape: (m,)
return normalized_matrix.dot(sum_normalized) / matrix.shape[0]


if __name__ == "__main__":
args = parser.parse_args()
assert args.embed_batch_size > 0, "args.embed_batch_size must be at least 1."

lf_dataset_items = langfuse_client.get_dataset(args.langfuse_dataset_name).items
limit = (
min(len(lf_dataset_items), args.limit) if args.limit else len(lf_dataset_items)
)
print(f"Items to process: {limit}")

configs = Configs.from_env_var()
async_embed_client = AsyncOpenAI(
api_key=configs.embedding_api_key,
base_url=configs.embedding_base_url,
max_retries=5,
)

# Construct embed batches.
batches: list[list["DatasetItemClient"]] = [[]]
for _index, _item in enumerate(lf_dataset_items):
if (args.limit is not None) and (_index >= args.limit):
break

batches[-1].append(_item)
if len(batches[-1]) == args.embed_batch_size:
batches.append([])

if len(batches[-1]) == 0:
batches.pop(-1)

# Async embed, traced.
embed_coros = [
batch_embed(
_batch,
oai_client=async_embed_client,
model_name="@cf/baai/bge-m3",
run_name=args.run_name,
)
for _batch in batches
]
batched_embed_results = asyncio.run(
gather_with_progress(embed_coros, description="Embedding")
)
embed_results = [
_result for _results in batched_embed_results for _result in _results
] # Unpacked

# Compute per-row cosine similarity offsets
embeddings = [_result.embedding for _result in embed_results]
embeddings_np = np.asarray(embeddings) # (N, L)
cosine_similarities = _avg_cosine_similarity(embeddings_np) # (N,)
mean_similarity = np.mean(cosine_similarities).item()
assert cosine_similarities.shape == (len(embeddings),), cosine_similarities.shape
print(
f"Cosine similarity of {args.langfuse_dataset_name}\n",
pd.Series(cosine_similarities).describe(),
)

# Annotate dataset rows in LangFuse
for _similarity, _trace_id in zip(
track(cosine_similarities.flatten(), description="Uploading scores..."),
[_result.langfuse_trace_id for _result in embed_results],
):
langfuse_client.create_score(
name="similarity_to_mean",
value=_similarity,
comment=f"Mean similarity: {mean_similarity:.3f}",
trace_id=_trace_id,
)

flush_langfuse()
4 changes: 4 additions & 0 deletions src/utils/env_vars.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@
class Configs(pydantic.BaseModel):
"""Type-friendly collection of env var configs."""

# Embeddings
embedding_base_url: str
embedding_api_key: str

# Weaviate
weaviate_http_host: str
weaviate_grpc_host: str
Expand Down
15 changes: 15 additions & 0 deletions src/utils/langfuse/shared_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from os import getenv

from langfuse import Langfuse
from rich.progress import Progress, SpinnerColumn, TextColumn

from ..env_vars import Configs

Expand All @@ -15,3 +16,17 @@
langfuse_client = Langfuse(
public_key=config.langfuse_public_key, secret_key=config.langfuse_secret_key
)


def flush_langfuse(client: "Langfuse | None" = None):
"""Flush shared LangFuse Client. Rich Progress included."""
if client is None:
client = langfuse_client

with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
transient=True,
) as progress:
progress.add_task("Finalizing Langfuse annotations...", total=None)
langfuse_client.flush()
Loading