Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,14 @@ uv run \
--run_name cosine_similarity_bge_m3
```

Visualize embedding diversity of synthetic data

```bash
uv run \
--env-file .env \
-m src.3_evals.2_synthetic_data.gradio_visualize_diversity
```

Run LLM-as-a-judge Evaluation on synthetic data

```bash
Expand Down
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ dependencies = [
"numpy<2.3.0",
"openai>=1.93.1",
"openai-agents>=0.1.0",
"plotly>=6.2.0",
"pydantic>=2.11.7",
"pydantic-ai-slim[logfire]>=0.3.7",
"pytest-asyncio>=0.25.2",
"scikit-learn>=1.7.0",
"weaviate-client>=4.15.4",
]

Expand Down
21 changes: 8 additions & 13 deletions src/3_evals/2_synthetic_data/annotate_diversity.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
--env-file .env \
-m src.3_evals.2_synthetic_data.annotate_diversity \
--langfuse_dataset_name ${DATASET_NAME} \
--run_name cosine_similarity_bge_m3 \
--run_name cosine_similarity_bge_m3_20250716 \
--limit 18
"""

Expand All @@ -20,7 +20,7 @@
from openai import AsyncOpenAI
from rich.progress import track

from src.utils import Configs, gather_with_progress
from src.utils import Configs, create_batches, gather_with_progress
from src.utils.langfuse.shared_client import flush_langfuse, langfuse_client


Expand Down Expand Up @@ -108,17 +108,12 @@ def _avg_cosine_similarity(matrix: np.ndarray) -> np.ndarray:
)

# Construct embed batches.
batches: list[list["DatasetItemClient"]] = [[]]
for _index, _item in enumerate(lf_dataset_items):
if (args.limit is not None) and (_index >= args.limit):
break

batches[-1].append(_item)
if len(batches[-1]) == args.embed_batch_size:
batches.append([])

if len(batches[-1]) == 0:
batches.pop(-1)
batches: list[list["DatasetItemClient"]] = create_batches(
lf_dataset_items,
batch_size=args.embed_batch_size,
limit=args.limit,
keep_trailing=True,
)

# Async embed, traced.
embed_coros = [
Expand Down
132 changes: 132 additions & 0 deletions src/3_evals/2_synthetic_data/gradio_visualize_diversity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
"""Visualize embedding diversity of the given LangFuse dataset.

Usage:

uv run \
--env-file .env \
-m src.3_evals.2_synthetic_data.gradio_visualize_diversity
"""

from typing import List

import gradio as gr
import numpy as np
import plotly.express as px
from openai import AsyncOpenAI
from plotly.graph_objs import Figure
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE

from ...utils import Configs, create_batches, gather_with_progress
from ...utils.langfuse.shared_client import langfuse_client


def reduce_dimensions(
embeddings: np.ndarray, method: str = "tsne", n_components: int = 2
) -> np.ndarray:
"""
Reduces the dimensionality of the given embeddings to 2D using the specified method.

Args:
embeddings (np.ndarray): The input embeddings of shape (n_samples, n_features).
method (str): The dimensionality reduction method to use ('tsne' or 'pca').
n_components (int): Number of dimensions to reduce to (default is 2).

Returns
-------
np.ndarray: Reduced 2D embeddings of shape (n_samples, 2).
"""
if method == "tsne":
reducer = TSNE(n_components=n_components, random_state=42)
elif method == "pca":
reducer = PCA(n_components=n_components)
else:
raise ValueError("Method must be 'tsne' or 'pca'")

return reducer.fit_transform(embeddings)


def plot_embeddings_2d(
reduced_embeddings: np.ndarray, texts: List[str], dataset_title: str | None = None
) -> Figure:
"""
Plot 2D embeddings using Plotly, displaying text on hover.

Args:
reduced_embeddings (np.ndarray): 2D embeddings of shape (n_samples, 2).
texts (List[str]): List of text snippets for hover information.
"""
fig = px.scatter(
x=reduced_embeddings[:, 0],
y=reduced_embeddings[:, 1],
hover_name=texts,
title=f"Text Embeddings for {dataset_title}"
"<sup>Note: Axis are not interpretible</sup>",
labels={"x": "Latent Component 1", "y": "Latent Component 2"},
)
fig.update_traces(marker={"size": 8, "opacity": 0.7})
return fig


async def get_projection_plot(
dataset_name: str,
projection_method: str,
limit: int | None = None,
embedding_batch_size: int = 16,
) -> Figure:
"""Obtain projection plot for the given dataset up to `limit` items."""
lf_dataset_items = langfuse_client.get_dataset(dataset_name).items

# Generate embeddings
configs = Configs.from_env_var()
embedding_client = AsyncOpenAI(
api_key=configs.embedding_api_key,
base_url=configs.embedding_base_url,
max_retries=5,
)

texts = [_item.input["text"] for _item in lf_dataset_items]
text_batches = create_batches(
texts,
batch_size=embedding_batch_size,
limit=int(limit) if limit else None,
keep_trailing=True,
)
embed_coros = [
embedding_client.embeddings.create(input=_batch, model="@cf/baai/bge-m3")
for _batch in text_batches
]
batched_embed_results = await gather_with_progress(
embed_coros, description=f"Generating {len(texts)} embeddings"
)
embeddings = [
_data.embedding for _result in batched_embed_results for _data in _result.data
] # unpacked
embeddings_np = np.asarray(embeddings)

# Reduce dimensions
assert embeddings_np.shape[0] == len(texts), (embeddings_np.shape, len(texts))
embeddings_reduced = reduce_dimensions(embeddings_np, method=projection_method)

# Create plot
return plot_embeddings_2d(
reduced_embeddings=embeddings_reduced,
texts=texts,
dataset_title=dataset_name,
)


viewer = gr.Interface(
fn=get_projection_plot,
inputs=[
gr.Textbox(label="Dataset name"),
gr.Radio(["tsne", "pca"], label="Dimensionality Reduction Method"),
gr.Number(value=36, label="Number of rows to plot", minimum=1),
],
outputs=gr.Plot(label="2D Embedding Plot"),
title="3.2 Text Embedding Visualizer",
description="Select a method to visualize 256-D embeddings of text snippets.",
)

if __name__ == "__main__":
viewer.launch(server_name="0.0.0.0")
1 change: 1 addition & 0 deletions src/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Shared toolings for reference implementations."""

from .async_utils import gather_with_progress, rate_limited
from .data.batching import create_batches
from .env_vars import Configs
from .gradio.messages import (
gradio_messages_to_oai_chat,
Expand Down
40 changes: 40 additions & 0 deletions src/utils/data/batching.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
"""Utils for creating batches of data for performance."""

from typing import TypeVar


V = TypeVar("V")


def create_batches(
items: list[V],
batch_size: int,
limit: int | None = None,
keep_trailing: bool = True,
) -> list[list[V]]:
"""Transform the list of items into batches.

Params:
limit: number of items to include in total
keep_trailing: if False, the last few items that
does not fit in a full batch will not be returned.

Return:
List of batches.
"""
batches: list[list[V]] = [[]]
for _index, _item in enumerate(items):
if (limit is not None) and (_index >= limit):
break

batches[-1].append(_item)
if len(batches[-1]) == batch_size:
batches.append([])

# Discard trailing batch if empty or required
if (len(batches[-1]) == 0) or (
(not keep_trailing) and (len(batches[-1]) < batch_size)
):
batches.pop(-1)

return batches
Loading