Skip to content

Add reliability testing for DSPy Adapters #8333

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added benchmark/__init__.py
Empty file.
Empty file added benchmark/adapter/__init__.py
Empty file.
Empty file.
74 changes: 74 additions & 0 deletions benchmark/adapter/math_qa/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import argparse

from tabulate import tabulate

import dspy
from dspy.datasets import MATH

from .program import (
dspy_cot_chat_adapter,
dspy_cot_json_adapter,
vanilla_sdk,
vanilla_sdk_with_structured_output,
)


def print_results(results):
program_name_mapping = {
"vanilla_sdk": "Vanilla",
"vanilla_sdk_with_structured_output": "Vanilla with Structured Output",
"dspy_cot_chat_adapter": "DSPy (ChatAdapter)",
"dspy_cot_json_adapter": "DSPy (JSONAdapter)",
}

headers = list(program_name_mapping.values())

table = []
for model_name, result in results.items():
score_row = [model_name]
for program_name in program_name_mapping.keys():
score = result[program_name].get("score", 0.0)
score_row.append(f"{score:.2f}")

table.append(score_row)

print("\nBenchmark Results Summary:\n")

print(tabulate(table, headers=[""] + headers, tablefmt="github"))


def main():
parser = argparse.ArgumentParser(description="Run math QA benchmarking with different LMs.")
parser.add_argument(
"--models",
type=str,
nargs="+",
default=["openai/gpt-4o-mini", "openai/gpt-4.1", "anthropic/claude-3.5-sonnet"],
help="The model name(s) of the LM(s) to use. (default: %(default)s)",
)
parser.add_argument("--num_threads", type=int, default=1, help="The number of threads to use for the benchmark.")
parser.add_argument("--enable_cache", action="store_true", help="Enable cache for the benchmark.")
args = parser.parse_args()

# If we enable cache, we always use litellm cache for fair comparison.
dspy.configure_cache(enable_disk_cache=False, enable_litellm_cache=args.enable_cache)

dataset = MATH(subset="algebra").dev[:10]

final_results = {}
for model in args.models:
model_results = {}
model_results["vanilla_sdk"] = vanilla_sdk(dataset, model, args.num_threads)
model_results["vanilla_sdk_with_structured_output"] = vanilla_sdk_with_structured_output(
dataset, model, args.num_threads
)
model_results["dspy_cot_chat_adapter"] = dspy_cot_chat_adapter(dataset, model, args.num_threads)
model_results["dspy_cot_json_adapter"] = dspy_cot_json_adapter(dataset, model, args.num_threads)

final_results[model] = model_results

print_results(final_results)


if __name__ == "__main__":
main()
12 changes: 12 additions & 0 deletions benchmark/adapter/math_qa/metric.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
try:
import math_equivalence
except ImportError:
raise ImportError("MATH's metric requires `pip install git+https://github.com/hendrycks/math.git`")


def is_equiv(golden, pred):
return math_equivalence.is_equiv(golden, pred)


def is_equiv_dspy(example, pred, trace=None):
return is_equiv(example.answer, pred.answer)
129 changes: 129 additions & 0 deletions benchmark/adapter/math_qa/program.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
import json
import time
from concurrent.futures import ThreadPoolExecutor, as_completed

import litellm
import pydantic
from tqdm import tqdm

import dspy

from .metric import is_equiv, is_equiv_dspy


class MathQA(dspy.Signature):
"""You are a math expert, and you will be given a math problem and you will need to solve it."""

question: str = dspy.InputField(desc="Math question to answer")
answer: str = dspy.OutputField()


def _dspy_cot(dataset, model, adapter, num_threads=1):
adapter_name = adapter.__class__.__name__
print(f"""
========================================
Math Question Answering
========================================
- Runs with: DSPy Program
- Adapter: {adapter_name}
- Model: {model}
----------------------------------------
Running benchmarking, this may take a while...
""")
cot = dspy.ChainOfThought(MathQA)

evaluator = dspy.Evaluate(devset=dataset, num_threads=num_threads, display_progress=True, display_table=False)

with dspy.settings.context(lm=dspy.LM(model), adapter=adapter):
start_time = time.time()
score = evaluator(cot, metric=is_equiv_dspy)
time_taken = time.time() - start_time

print(f"""
- Score: {score}
- Time taken: {time_taken:.2f} seconds
========================================
""")

return {"score": score}


def dspy_cot_chat_adapter(dataset, model, num_threads=1):
return _dspy_cot(dataset, model, dspy.ChatAdapter(), num_threads)


def dspy_cot_json_adapter(dataset, model, num_threads=1):
return _dspy_cot(dataset, model, dspy.JSONAdapter(), num_threads)


def _vanilla_sdk(dataset, model, num_threads=1, lm_kwargs=None):
prompt = """You are a math expert. Solve the following math problem step by step, and provide the final answer.
Please make sure the answer only contains the final answer, no other text like reasoning steps or explanation should
be included. for example: if the answer is 10, the response should be "10" instead of "The answer is 10".

Question: {question}
Answer:"""

scores = []

start_time = time.time()
lm_kwargs = lm_kwargs or {}

print(f"""
========================================
Math Question Answering
========================================
- Runs with: Vanilla LM SDK
- Model: {model}
- Using structured output: {"response_format" in lm_kwargs}
----------------------------------------
Running benchmarking, this may take a while...
""")

def process_example(example):
lm_kwargs["caching"] = True
response = litellm.completion(
model=model, messages=[{"role": "user", "content": prompt.format(question=example.question)}], **lm_kwargs
)
if "response_format" in lm_kwargs:
pred_answer = json.loads(response["choices"][0].message.content).get("answer", "")
else:
pred_answer = response["choices"][0].message.content

return is_equiv(example.answer, pred_answer)

scores = []
with ThreadPoolExecutor(max_workers=num_threads) as executor: # Adjust max_workers as needed
futures = [executor.submit(process_example, example) for example in dataset]
for future in tqdm(as_completed(futures), total=len(futures), desc="Evaluating", unit="ex"):
scores.append(future.result())

time_taken = time.time() - start_time

score = sum(scores) * 100.0 / len(scores)

print(f"""
- Score: {score}
- Time taken: {time_taken:.2f} seconds
========================================
""")
return {"score": score}


def vanilla_sdk(dataset, model, num_threads=1):
return _vanilla_sdk(dataset, model, num_threads)


def vanilla_sdk_with_structured_output(dataset, model, num_threads=1):
lm_kwargs = {
"response_format": {
"type": "json_object",
}
}

class MathAnswer(pydantic.BaseModel):
answer: str

lm_kwargs["response_format"] = MathAnswer

return _vanilla_sdk(dataset, model, num_threads, lm_kwargs)