Skip to content

Haystack llm and embedding wrapper #1901

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Feb 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ dev = [
"rapidfuzz",
"pandas",
"datacompy",
"haystack-ai",
"sacrebleu",
"r2r",
]
test = [
Expand Down
1 change: 1 addition & 0 deletions requirements/dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ nltk
rapidfuzz
pandas
datacompy
haystack-ai
4 changes: 3 additions & 1 deletion src/ragas/embeddings/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
LlamaIndexEmbeddingsWrapper,
embedding_factory,
)
from ragas.embeddings.haystack_wrapper import HaystackEmbeddingsWrapper

__all__ = [
"BaseRagasEmbeddings",
"HaystackEmbeddingsWrapper",
"HuggingfaceEmbeddings",
"LangchainEmbeddingsWrapper",
"LlamaIndexEmbeddingsWrapper",
"HuggingfaceEmbeddings",
"embedding_factory",
]
23 changes: 11 additions & 12 deletions src/ragas/embeddings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import typing as t
from abc import ABC, abstractmethod
from dataclasses import field
from typing import List

import numpy as np
from langchain_core.embeddings import Embeddings
Expand Down Expand Up @@ -51,15 +50,15 @@ def __init__(self, cache: t.Optional[CacheInterface] = None):
self.aembed_documents
)

async def embed_text(self, text: str, is_async=True) -> List[float]:
async def embed_text(self, text: str, is_async=True) -> t.List[float]:
"""
Embed a single text string.
"""
embs = await self.embed_texts([text], is_async=is_async)
return embs[0]

async def embed_texts(
self, texts: List[str], is_async: bool = True
self, texts: t.List[str], is_async: bool = True
) -> t.List[t.List[float]]:
"""
Embed multiple texts.
Expand All @@ -77,10 +76,10 @@ async def embed_texts(
return await loop.run_in_executor(None, embed_documents_with_retry, texts)

@abstractmethod
async def aembed_query(self, text: str) -> List[float]: ...
async def aembed_query(self, text: str) -> t.List[float]: ...

@abstractmethod
async def aembed_documents(self, texts: List[str]) -> t.List[t.List[float]]: ...
async def aembed_documents(self, texts: t.List[str]) -> t.List[t.List[float]]: ...

def set_run_config(self, run_config: RunConfig):
"""
Expand Down Expand Up @@ -117,25 +116,25 @@ def __init__(
run_config = RunConfig()
self.set_run_config(run_config)

def embed_query(self, text: str) -> List[float]:
def embed_query(self, text: str) -> t.List[float]:
"""
Embed a single query text.
"""
return self.embeddings.embed_query(text)

def embed_documents(self, texts: List[str]) -> List[List[float]]:
def embed_documents(self, texts: t.List[str]) -> t.List[t.List[float]]:
"""
Embed multiple documents.
"""
return self.embeddings.embed_documents(texts)

async def aembed_query(self, text: str) -> List[float]:
async def aembed_query(self, text: str) -> t.List[float]:
"""
Asynchronously embed a single query text.
"""
return await self.embeddings.aembed_query(text)

async def aembed_documents(self, texts: List[str]) -> List[List[float]]:
async def aembed_documents(self, texts: t.List[str]) -> t.List[t.List[float]]:
"""
Asynchronously embed multiple documents.
"""
Expand Down Expand Up @@ -256,13 +255,13 @@ def __post_init__(self):
if self.cache is not None:
self.predict = cacher(cache_backend=self.cache)(self.predict)

def embed_query(self, text: str) -> List[float]:
def embed_query(self, text: str) -> t.List[float]:
"""
Embed a single query text.
"""
return self.embed_documents([text])[0]

def embed_documents(self, texts: List[str]) -> List[List[float]]:
def embed_documents(self, texts: t.List[str]) -> t.List[t.List[float]]:
"""
Embed multiple documents.
"""
Expand All @@ -279,7 +278,7 @@ def embed_documents(self, texts: List[str]) -> List[List[float]]:
assert isinstance(embeddings, Tensor)
return embeddings.tolist()

def predict(self, texts: List[List[str]]) -> List[List[float]]:
def predict(self, texts: t.List[t.List[str]]) -> t.List[t.List[float]]:
"""
Make predictions using a cross-encoder model.
"""
Expand Down
113 changes: 113 additions & 0 deletions src/ragas/embeddings/haystack_wrapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import asyncio
import typing as t

from ragas.cache import CacheInterface
from ragas.embeddings.base import BaseRagasEmbeddings
from ragas.run_config import RunConfig


class HaystackEmbeddingsWrapper(BaseRagasEmbeddings):
"""
A wrapper for using Haystack embedders within the Ragas framework.

This class allows you to use both synchronous and asynchronous methods
(`embed_query`/`embed_documents` and `aembed_query`/`aembed_documents`)
for generating embeddings through a Haystack embedder.

Parameters
----------
embedder : AzureOpenAITextEmbedder | HuggingFaceAPITextEmbedder | OpenAITextEmbedder | SentenceTransformersTextEmbedder
An instance of a supported Haystack embedder class.
run_config : RunConfig, optional
A configuration object to manage embedding execution settings, by default None.
cache : CacheInterface, optional
A cache instance for storing and retrieving embedding results, by default None.
"""

def __init__(
self,
embedder: t.Any,
run_config: t.Optional[RunConfig] = None,
cache: t.Optional[CacheInterface] = None,
):
super().__init__(cache=cache)

# Lazy Import of required Haystack components
try:
from haystack import AsyncPipeline
from haystack.components.embedders import (
AzureOpenAITextEmbedder,
HuggingFaceAPITextEmbedder,
OpenAITextEmbedder,
SentenceTransformersTextEmbedder,
)
except ImportError as exc:
raise ImportError(
"Haystack is not installed. Please install it with `pip install haystack-ai`."
) from exc

# Validate embedder type
if not isinstance(
embedder,
(
AzureOpenAITextEmbedder,
HuggingFaceAPITextEmbedder,
OpenAITextEmbedder,
SentenceTransformersTextEmbedder,
),
):
raise TypeError(
"Expected 'embedder' to be one of: AzureOpenAITextEmbedder, "
"HuggingFaceAPITextEmbedder, OpenAITextEmbedder, or "
f"SentenceTransformersTextEmbedder, but got {type(embedder).__name__}."
)

self.embedder = embedder

# Initialize an asynchronous pipeline and add the embedder component
self.async_pipeline = AsyncPipeline()
self.async_pipeline.add_component("embedder", self.embedder)

# Set or create the run configuration
if run_config is None:
run_config = RunConfig()
self.set_run_config(run_config)

def embed_query(self, text: str) -> t.List[float]:
result = self.embedder.run(text=text)
return result["embedding"]

def embed_documents(self, texts: t.List[str]) -> t.List[t.List[float]]:
return [self.embed_query(text) for text in texts]

async def aembed_query(self, text: str) -> t.List[float]:
# Run the async pipeline with the input text
output = await self.async_pipeline.run_async({"embedder": {"text": text}})
return output.get("embedder", {}).get("embedding", [])

async def aembed_documents(self, texts: t.List[str]) -> t.List[t.List[float]]:
tasks = (self.aembed_query(text) for text in texts)
results = await asyncio.gather(*tasks)
return results

def __repr__(self) -> str:
try:
from haystack.components.embedders import (
AzureOpenAITextEmbedder,
HuggingFaceAPITextEmbedder,
OpenAITextEmbedder,
SentenceTransformersTextEmbedder,
)
except ImportError:
return f"{self.__class__.__name__}(embeddings=Unknown(...))"

if isinstance(self.embedder, (OpenAITextEmbedder, SentenceTransformersTextEmbedder)): # type: ignore
model_info = self.embedder.model
elif isinstance(self.embedder, AzureOpenAITextEmbedder): # type: ignore
model_info = self.embedder.azure_deployment
elif isinstance(self.embedder, HuggingFaceAPITextEmbedder): # type: ignore
model_info = self.embedder.api_params
else:
model_info = "Unknown"

return f"{self.__class__.__name__}(embeddings={model_info}(...))"
2 changes: 2 additions & 0 deletions src/ragas/llms/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
LlamaIndexLLMWrapper,
llm_factory,
)
from ragas.llms.haystack_wrapper import HaystackLLMWrapper

__all__ = [
"BaseRagasLLM",
"HaystackLLMWrapper",
"LangchainLLMWrapper",
"LlamaIndexLLMWrapper",
"llm_factory",
Expand Down
1 change: 1 addition & 0 deletions src/ragas/llms/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from langchain_core.prompt_values import PromptValue
from llama_index.core.base.llms.base import BaseLLM


logger = logging.getLogger(__name__)

MULTIPLE_COMPLETION_SUPPORTED = [
Expand Down
141 changes: 141 additions & 0 deletions src/ragas/llms/haystack_wrapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
import typing as t

from langchain_core.callbacks import Callbacks
from langchain_core.outputs import Generation, LLMResult
from langchain_core.prompt_values import PromptValue

from ragas.cache import CacheInterface
from ragas.llms import BaseRagasLLM
from ragas.run_config import RunConfig


class HaystackLLMWrapper(BaseRagasLLM):
"""
A wrapper class for using Haystack LLM generators within the Ragas framework.

This class integrates Haystack's LLM components (e.g., `OpenAIGenerator`,
`HuggingFaceAPIGenerator`, etc.) into Ragas, enabling both synchronous and
asynchronous text generation.

Parameters
----------
haystack_generator : AzureOpenAIGenerator | HuggingFaceAPIGenerator | HuggingFaceLocalGenerator | OpenAIGenerator
An instance of a Haystack generator.
run_config : RunConfig, optional
Configuration object to manage LLM execution settings, by default None.
cache : CacheInterface, optional
A cache instance for storing results, by default None.
"""

def __init__(
self,
haystack_generator: t.Any,
run_config: t.Optional[RunConfig] = None,
cache: t.Optional[CacheInterface] = None,
):
super().__init__(cache=cache)

# Lazy Import of required Haystack components
try:
from haystack import AsyncPipeline
from haystack.components.generators import (
AzureOpenAIGenerator,
HuggingFaceAPIGenerator,
HuggingFaceLocalGenerator,
OpenAIGenerator,
)
except ImportError as exc:
raise ImportError(
"Haystack is not installed. Please install it using `pip install haystack-ai`."
) from exc

# Validate haystack_generator type
if not isinstance(
haystack_generator,
(
AzureOpenAIGenerator,
HuggingFaceAPIGenerator,
HuggingFaceLocalGenerator,
OpenAIGenerator,
),
):
raise TypeError(
"Expected 'haystack_generator' to be one of: "
"AzureOpenAIGenerator, HuggingFaceAPIGenerator, "
"HuggingFaceLocalGenerator, or OpenAIGenerator, but received "
f"{type(haystack_generator).__name__}."
)

# Set up Haystack pipeline and generator
self.generator = haystack_generator
self.async_pipeline = AsyncPipeline()
self.async_pipeline.add_component("llm", self.generator)

if run_config is None:
run_config = RunConfig()
self.set_run_config(run_config)

def is_finished(self, response: LLMResult) -> bool:
return True

def generate_text(
self,
prompt: PromptValue,
n: int = 1,
temperature: float = 1e-8,
stop: t.Optional[t.List[str]] = None,
callbacks: t.Optional[Callbacks] = None,
) -> LLMResult:

component_output: t.Dict[str, t.Any] = self.generator.run(prompt.to_string())
replies = component_output.get("llm", {}).get("replies", [])
output_text = replies[0] if replies else ""

return LLMResult(generations=[[Generation(text=output_text)]])

async def agenerate_text(
self,
prompt: PromptValue,
n: int = 1,
temperature: t.Optional[float] = None,
stop: t.Optional[t.List[str]] = None,
callbacks: t.Optional[Callbacks] = None,
) -> LLMResult:
# Prepare input parameters for the LLM component
llm_input = {
"prompt": prompt.to_string(),
"generation_kwargs": {"temperature": temperature},
}

# Run the async pipeline with the LLM input
pipeline_output = await self.async_pipeline.run_async(data={"llm": llm_input})
replies = pipeline_output.get("llm", {}).get("replies", [])
output_text = replies[0] if replies else ""

return LLMResult(generations=[[Generation(text=output_text)]])

def __repr__(self) -> str:
try:
from haystack.components.generators import (
AzureOpenAIGenerator,
HuggingFaceAPIGenerator,
HuggingFaceLocalGenerator,
OpenAIGenerator,
)
except ImportError:
return f"{self.__class__.__name__}(llm=Unknown(...))"

generator = self.generator

if isinstance(generator, OpenAIGenerator):
model_info = generator.model
elif isinstance(generator, HuggingFaceLocalGenerator):
model_info = generator.huggingface_pipeline_kwargs.get("model")
elif isinstance(generator, HuggingFaceAPIGenerator):
model_info = generator.api_params.get("model")
elif isinstance(generator, AzureOpenAIGenerator):
model_info = generator.azure_deployment
else:
model_info = "Unknown"

return f"{self.__class__.__name__}(llm={model_info}(...))"
Loading
Loading