Skip to content

Added from_url method to fetch evaluation annotation from the api. #1795

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
137 changes: 125 additions & 12 deletions src/ragas/dataset_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,25 @@
from uuid import UUID

import numpy as np
import requests
from datasets import Dataset as HFDataset
from pydantic import BaseModel, field_validator

from ragas.callbacks import ChainRunEncoder, parse_run_traces
from ragas.cost import CostCallbackHandler
from ragas.exceptions import UploadException
from ragas.messages import AIMessage, HumanMessage, ToolCall, ToolMessage
from ragas.sdk import RAGAS_API_URL, RAGAS_APP_URL, upload_packet
from ragas.sdk import (
upload_packet,
RAGAS_API_SOURCE,
get_app_token,
check_api_response,
build_evaluation_app_url,
get_api_url,
get_app_url,
)
from ragas.utils import safe_nanmean
from ragas._version import __version__

if t.TYPE_CHECKING:
from pathlib import Path
Expand Down Expand Up @@ -508,7 +518,10 @@ def total_cost(
cost_per_input_token, cost_per_output_token, per_model_costs
)

def upload(self, base_url: str = RAGAS_API_URL, verbose: bool = True) -> str:
def upload(
self,
verbose: bool = True,
) -> str:
from datetime import datetime, timezone

timestamp = datetime.now(timezone.utc).isoformat()
Expand All @@ -526,18 +539,16 @@ def upload(self, base_url: str = RAGAS_API_URL, verbose: bool = True) -> str:
response = upload_packet(
path="/alignment/evaluation",
data_json_string=packet,
base_url=base_url,
)

# check status codes
evaluation_endpoint = (
f"{RAGAS_APP_URL}/dashboard/alignment/evaluation/{root_trace.run_id}"
)
app_url = get_app_url()
evaluation_app_url = build_evaluation_app_url(app_url, root_trace.run_id)
if response.status_code == 409:
# this evalution already exists
if verbose:
print(f"Evaluation run already exists. View at {evaluation_endpoint}")
return evaluation_endpoint
print(f"Evaluation run already exists. View at {evaluation_app_url}")
return evaluation_app_url
elif response.status_code != 200:
# any other error
raise UploadException(
Expand All @@ -546,8 +557,8 @@ def upload(self, base_url: str = RAGAS_API_URL, verbose: bool = True) -> str:
)

if verbose:
print(f"Evaluation results uploaded! View at {evaluation_endpoint}")
return evaluation_endpoint
print(f"Evaluation results uploaded! View at {evaluation_app_url}")
return evaluation_app_url


class PromptAnnotation(BaseModel):
Expand Down Expand Up @@ -577,8 +588,24 @@ def __getitem__(self, key):
return SingleMetricAnnotation(name=key, samples=self.root[key])

@classmethod
def from_json(cls, path, metric_name: t.Optional[str]) -> "MetricAnnotation":
dataset = json.load(open(path))
def _process_dataset(
cls, dataset: dict, metric_name: t.Optional[str]
) -> "MetricAnnotation":
"""
Process raw dataset into MetricAnnotation format

Parameters
----------
dataset : dict
Raw dataset to process
metric_name : str, optional
Name of the specific metric to filter

Returns
-------
MetricAnnotation
Processed annotation data
"""
if metric_name is not None and metric_name not in dataset:
raise ValueError(f"Split {metric_name} not found in the dataset.")

Expand All @@ -590,6 +617,92 @@ def from_json(cls, path, metric_name: t.Optional[str]) -> "MetricAnnotation":
}
)

@classmethod
def from_json(cls, path: str, metric_name: t.Optional[str]) -> "MetricAnnotation":
"""Load annotations from a JSON file"""
dataset = json.load(open(path))
return cls._process_dataset(dataset, metric_name)

@classmethod
def from_app(
cls,
run_id: t.Optional[str] = None,
evaluation_result: t.Optional[EvaluationResult] = None,
metric_name: t.Optional[str] = None,
) -> "MetricAnnotation":
"""
Fetch annotations from a URL using either evaluation result or run_id

Parameters
----------
run_id : str, optional
Direct run ID to fetch annotations
evaluation_result : EvaluationResult, optional
The evaluation result containing the run_id
metric_name : str, optional
Name of the specific metric to filter

Returns
-------
MetricAnnotation
Annotation data from the API

Raises
------
ValueError
If neither evaluation_result nor run_id is provided, or if both are provided
If no traces found or no root trace found when using evaluation_result
"""
if evaluation_result is None and run_id is None:
raise ValueError("Either evaluation_result or run_id must be provided")
if evaluation_result is not None and run_id is not None:
raise ValueError(
"Only one of evaluation_result or run_id should be provided"
)

if evaluation_result is not None:
if not evaluation_result.ragas_traces:
raise ValueError("No traces found in evaluation_result")

root_trace = [
trace
for trace in evaluation_result.ragas_traces.values()
if trace.parent_run_id is None
]

if not root_trace:
raise ValueError("No root trace found in evaluation_result")

run_id = root_trace[0].run_id

endpoint = f"/api/v1/alignment/evaluation/annotation/{run_id}"

app_token = get_app_token()
base_url = get_api_url()
app_url = get_app_url()

response = requests.get(
f"{base_url}{endpoint}",
headers={
"Content-Type": "application/json",
"x-app-token": app_token,
"x-source": RAGAS_API_SOURCE,
"x-app-version": __version__,
},
)

check_api_response(response)
dataset = response.json()["data"]

if not dataset:
evaluation_url = build_evaluation_app_url(app_url, run_id)
raise ValueError(
f"No annotations found. Please annotate the Evaluation first then run this method. "
f"\nNote: you can annotate the evaluations using the Ragas app by going to {evaluation_url}"
)

return cls._process_dataset(dataset, metric_name)

def __len__(self):
return sum(len(value) for value in self.root.values())

Expand Down
69 changes: 62 additions & 7 deletions src/ragas/metrics/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,12 @@

from ragas._analytics import EvaluationEvent, _analytics_batcher
from ragas.callbacks import ChainType, new_group
from ragas.dataset_schema import MetricAnnotation, MultiTurnSample, SingleTurnSample
from ragas.dataset_schema import (
MetricAnnotation,
MultiTurnSample,
SingleTurnSample,
EvaluationResult,
)
from ragas.executor import is_event_loop_running
from ragas.losses import BinaryMetricLoss, MSELoss
from ragas.prompt import FewShotPydanticPrompt, PromptMixin
Expand Down Expand Up @@ -350,7 +355,9 @@ def _optimize_demonstration(

def train(
self,
path: str,
path: t.Optional[str] = None,
run_id: t.Optional[str] = None,
evaluation_result: t.Optional[EvaluationResult] = None,
demonstration_config: t.Optional[DemonstrationConfig] = None,
instruction_config: t.Optional[InstructionConfig] = None,
callbacks: t.Optional[Callbacks] = None,
Expand All @@ -359,13 +366,62 @@ def train(
with_debugging_logs=False,
raise_exceptions: bool = True,
) -> None:
"""
Train the metric using local JSON data or annotations from Ragas platform

Parameters
----------
path : str, optional
Path to local JSON training data file
run_id : str, optional
Direct run ID to fetch annotations
evaluation_result : EvaluationResult, optional
Evaluation result to fetch training data from Ragas platform
demonstration_config : DemonstrationConfig, optional
Configuration for demonstration optimization
instruction_config : InstructionConfig, optional
Configuration for instruction optimization
callbacks : Callbacks, optional
List of callback functions
run_config : RunConfig, optional
Run configuration
batch_size : int, optional
Batch size for training
with_debugging_logs : bool, default=False
Enable debugging logs
raise_exceptions : bool, default=True
Whether to raise exceptions during training

Raises
------
ValueError
If invalid combination of path, run_id, and evaluation_result is provided
"""
# Validate input parameters
provided_inputs = sum(x is not None for x in [path, run_id, evaluation_result])
if provided_inputs == 0:
raise ValueError(
"One of path, run_id, or evaluation_result must be provided"
)
if provided_inputs > 1:
raise ValueError(
"Only one of path, run_id, or evaluation_result should be provided"
)

run_config = run_config or RunConfig()
callbacks = callbacks or []

# load the dataset from path
if not path.endswith(".json"):
raise ValueError("Train data must be in json format")
dataset = MetricAnnotation.from_json(path, metric_name=self.name)
# Load the dataset based on input type
if path is not None:
if not path.endswith(".json"):
raise ValueError("Train data must be in json format")
dataset = MetricAnnotation.from_json(path, metric_name=self.name)
else:
dataset = MetricAnnotation.from_app(
evaluation_result=evaluation_result,
run_id=run_id,
metric_name=self.name,
)

# only optimize the instruction if instruction_config is provided
if instruction_config is not None:
Expand All @@ -386,7 +442,6 @@ def train(
dataset=dataset,
)


@dataclass
class MetricWithEmbeddings(Metric):
embeddings: t.Optional[BaseRagasEmbeddings] = None
Expand Down
54 changes: 52 additions & 2 deletions src/ragas/sdk.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,20 @@ def get_app_token() -> str:
return app_token


def upload_packet(path: str, data_json_string: str, base_url: str = RAGAS_API_URL):
@lru_cache(maxsize=1)
def get_api_url() -> str:
return os.environ.get("RAGAS_API_URL", RAGAS_API_URL)


@lru_cache(maxsize=1)
def get_app_url() -> str:
return os.environ.get("RAGAS_APP_URL", RAGAS_APP_URL)


def upload_packet(path: str, data_json_string: str):
app_token = get_app_token()
base_url = get_api_url()

response = requests.post(
f"{base_url}/api/v1{path}",
data=data_json_string,
Expand All @@ -36,9 +48,47 @@ def upload_packet(path: str, data_json_string: str, base_url: str = RAGAS_API_UR
"x-app-version": __version__,
},
)
check_api_response(response)
return response


def check_api_response(response: requests.Response) -> None:
"""
Check API response status and raise appropriate exceptions

Parameters
----------
response : requests.Response
Response object from API request

Raises
------
UploadException
If authentication fails or other API errors occur
"""
if response.status_code == 403:
raise UploadException(
status_code=response.status_code,
message="AUTHENTICATION_ERROR: The app token is invalid. Please check your RAGAS_APP_TOKEN environment variable.",
)
return response

try:
response.raise_for_status()
except requests.exceptions.HTTPError:
error_msg = ""
try:
error_data = response.json()
if "message" in error_data:
error_msg += f"\nAPI Message: {error_data['message']}"
if "debug_error_info" in error_data:
error_msg += f"\nDebug Info: {error_data['debug_error_info']}"
except:
error_msg = f"\nStatus Code: {response.status_code}"

raise UploadException(
status_code=response.status_code, message=f"Request failed: {error_msg}"
)


def build_evaluation_app_url(app_url: str, run_id: str) -> str:
return f"{app_url}/dashboard/alignment/evaluation/{run_id}"
9 changes: 5 additions & 4 deletions src/ragas/testset/synthesizers/testset_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
SingleTurnSample,
)
from ragas.exceptions import UploadException
from ragas.sdk import RAGAS_API_URL, RAGAS_APP_URL, upload_packet
from ragas.sdk import upload_packet, get_app_url


class TestsetSample(BaseSample):
Expand Down Expand Up @@ -136,14 +136,15 @@ def total_cost(
cost_per_output_token=cost_per_output_token,
)

def upload(self, base_url: str = RAGAS_API_URL, verbose: bool = True) -> str:
def upload(self, verbose: bool = True) -> str:
packet = TestsetPacket(samples_original=self.samples, run_id=self.run_id)
response = upload_packet(
path="/alignment/testset",
data_json_string=packet.model_dump_json(),
base_url=base_url,
)
testset_endpoint = f"{RAGAS_APP_URL}/dashboard/alignment/testset/{self.run_id}"
app_url = get_app_url()

testset_endpoint = f"{app_url}/dashboard/alignment/testset/{self.run_id}"
if response.status_code == 409:
# this testset already exists
if verbose:
Expand Down
Loading