Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ mistralai = ["mistralai>=1.0.0"]
openai = ["openai>=1.1.0"]
nltk = ["nltk>=3.8.1,<4"]
cohere = ["cohere>=4.44"]
voyageai = ["voyageai>=0.2.2"]
voyageai = ["voyageai>=0.3.5"]
sentence-transformers = ["sentence-transformers>=3.4.0,<4"]
vertexai = [
"google-cloud-aiplatform>=1.26,<2.0.0",
Expand Down
255 changes: 204 additions & 51 deletions redisvl/utils/vectorize/text/voyageai.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,31 +14,55 @@
# ignore that voyageai isn't imported
# mypy: disable-error-code="name-defined"

# Token limits for different VoyageAI models
VOYAGE_TOTAL_TOKEN_LIMITS = {
"voyage-context-3": 32_000,
"voyage-3.5-lite": 1_000_000,
"voyage-3.5": 320_000,
"voyage-2": 320_000,
"voyage-3-large": 120_000,
"voyage-code-3": 120_000,
"voyage-large-2-instruct": 120_000,
"voyage-finance-2": 120_000,
"voyage-multilingual-2": 120_000,
"voyage-law-2": 120_000,
"voyage-large-2": 120_000,
"voyage-3": 120_000,
"voyage-3-lite": 120_000,
"voyage-code-2": 120_000,
"voyage-3-m-exp": 120_000,
"voyage-multimodal-3": 120_000,
}


class VoyageAITextVectorizer(BaseVectorizer):
"""The VoyageAITextVectorizer class utilizes VoyageAI's API to generate
embeddings for text data.

This vectorizer is designed to interact with VoyageAI's /embed API,
This vectorizer is designed to interact with VoyageAI's /embed API and
/contextualized_embed API (for context models like voyage-context-3),
requiring an API key for authentication. The key can be provided
directly in the `api_config` dictionary or through the `VOYAGE_API_KEY`
environment variable. User must obtain an API key from VoyageAI's website
(https://dash.voyageai.com/). Additionally, the `voyageai` python
client must be installed with `pip install voyageai`.

The vectorizer supports both synchronous and asynchronous operations, allows for batch
processing of texts and flexibility in handling preprocessing tasks.
processing of texts and flexibility in handling preprocessing tasks. It automatically
detects and handles contextualized embedding models (like voyage-context-3) which
generate embeddings that are aware of the surrounding context within a document.

You can optionally enable caching to improve performance when generating
embeddings for repeated text inputs.
embeddings for repeated text inputs. The vectorizer also provides token counting
capabilities to help manage API usage and optimize batching strategies.

.. code-block:: python

from redisvl.utils.vectorize import VoyageAITextVectorizer

# Basic usage
vectorizer = VoyageAITextVectorizer(
model="voyage-large-2",
model="voyage-3.5",
api_config={"api_key": "your-voyageai-api-key"} # OR set VOYAGE_API_KEY in your env
)
query_embedding = vectorizer.embed(
Expand All @@ -55,7 +79,7 @@ class VoyageAITextVectorizer(BaseVectorizer):
cache = EmbeddingsCache(name="voyageai_embeddings_cache")

vectorizer = VoyageAITextVectorizer(
model="voyage-large-2",
model="voyage-3.5",
api_config={"api_key": "your-voyageai-api-key"},
cache=cache
)
Expand All @@ -72,13 +96,30 @@ class VoyageAITextVectorizer(BaseVectorizer):
input_type="query"
)

# Using contextualized embeddings (voyage-context-3)
context_vectorizer = VoyageAITextVectorizer(
model="voyage-context-3",
api_config={"api_key": "your-voyageai-api-key"}
)
# Context models automatically use contextualized_embed API
# which generates context-aware embeddings for document chunks
context_embeddings = context_vectorizer.embed_many(
texts=["chunk 1 of document", "chunk 2 of document", "chunk 3 of document"],
input_type="document"
)

# Token counting for API usage management
token_counts = vectorizer.count_tokens(["text one", "text two"])
print(f"Token counts: {token_counts}")
print(f"Model token limit: {VOYAGE_TOTAL_TOKEN_LIMITS.get(vectorizer.model, 120_000)}")

"""

model_config = ConfigDict(arbitrary_types_allowed=True)

def __init__(
self,
model: str = "voyage-large-2",
model: str,
Copy link

Copilot AI Oct 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing the default model value is a breaking API change. Existing code that instantiates VoyageAITextVectorizer without specifying a model will fail. Consider adding a deprecation warning in a previous release or documenting this as a breaking change in the release notes.

Copilot uses AI. Check for mistakes.
api_config: Optional[Dict] = None,
dtype: str = "float32",
cache: Optional["EmbeddingsCache"] = None,
Expand All @@ -89,7 +130,7 @@ def __init__(
Visit https://docs.voyageai.com/docs/embeddings to learn about embeddings and check the available models.

Args:
model (str): Model to use for embedding. Defaults to "voyage-large-2".
model (str): Model to use for embedding (e.g., "voyage-3.5", "voyage-context-3").
api_config (Optional[Dict], optional): Dictionary containing the API key.
Defaults to None.
dtype (str): the default datatype to use when embedding text as byte arrays.
Expand Down Expand Up @@ -172,22 +213,6 @@ def _set_model_dims(self) -> int:
# fall back (TODO get more specific)
raise ValueError(f"Error setting embedding model dimensions: {str(e)}")

def _get_batch_size(self) -> int:
"""
Determine the appropriate batch size based on the model being used.

Returns:
int: Recommended batch size for the current model
"""
if self.model in ["voyage-2", "voyage-02"]:
return 72
elif self.model in ["voyage-3-lite", "voyage-3.5-lite"]:
return 30
elif self.model in ["voyage-3", "voyage-3.5"]:
return 10
else:
return 7 # Default for other models

def _validate_input(
self, texts: List[str], input_type: Optional[str], truncation: Optional[bool]
):
Expand Down Expand Up @@ -244,10 +269,12 @@ def _embed_many(
"""
Generate vector embeddings for a batch of texts using the VoyageAI API.

Uses token-aware batching to respect model token limits and optimize API calls.

Args:
texts: List of texts to embed
batch_size: Number of texts to process in each API call
**kwargs: Additional parameters to pass to the VoyageAI API
batch_size: Deprecated. Token-aware batching is now always used.
Copy link

Copilot AI Oct 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The batch_size parameter is marked as deprecated in the docstring, but the method signature still accepts it without any deprecation warning being raised. Consider adding a runtime deprecation warning when batch_size is provided, or remove the parameter entirely if it's no longer used.

Copilot uses AI. Check for mistakes.
**kwargs: Additional parameters to pass to the VoyageAI API.

Returns:
List[List[float]]: List of vector embeddings as lists of floats
Expand All @@ -262,21 +289,35 @@ def _embed_many(
# Validate inputs
self._validate_input(texts, input_type, truncation)

# Determine batch size if not provided
if batch_size is None:
batch_size = self._get_batch_size()
# Use token-aware batching
batches = self._build_token_aware_batches(texts)

try:
embeddings: List = []
for batch in self.batchify(texts, batch_size):
response = self._client.embed(
texts=batch,
model=self.model,
input_type=input_type,
truncation=truncation,
**kwargs,
)
embeddings.extend(response.embeddings)

# Use contextualized embed API for context models
if self._is_context_model():
for batch in batches:
# Context models expect inputs as a list of lists
response = self._client.contextualized_embed(
inputs=[batch],
model=self.model,
input_type=input_type,
**kwargs,
)
# Extract embeddings from the first (and only) result
embeddings.extend(response.results[0].embeddings)
else:
# Use regular embed API for standard models
for batch in batches:
response = self._client.embed(
texts=batch,
model=self.model,
input_type=input_type,
truncation=truncation, # type: ignore[assignment]
**kwargs,
)
embeddings.extend(response.embeddings) # type: ignore[attr-defined]
return embeddings
except Exception as e:
raise ValueError(f"Embedding texts failed: {e}")
Expand Down Expand Up @@ -311,10 +352,12 @@ async def _aembed_many(
"""
Asynchronously generate vector embeddings for a batch of texts using the VoyageAI API.

Uses token-aware batching to respect model token limits and optimize API calls.

Args:
texts: List of texts to embed
batch_size: Number of texts to process in each API call
**kwargs: Additional parameters to pass to the VoyageAI API
batch_size: Deprecated. Token-aware batching is now always used.
**kwargs: Additional parameters to pass to the VoyageAI API.

Returns:
List[List[float]]: List of vector embeddings as lists of floats
Expand All @@ -329,25 +372,135 @@ async def _aembed_many(
# Validate inputs
self._validate_input(texts, input_type, truncation)

# Determine batch size if not provided
if batch_size is None:
batch_size = self._get_batch_size()
# Use token-aware batching (synchronous - tokenization is sync-only)
batches = self._build_token_aware_batches(texts)

try:
embeddings: List = []
for batch in self.batchify(texts, batch_size):
response = await self._aclient.embed(
texts=batch,
model=self.model,
input_type=input_type,
truncation=truncation,
**kwargs,
)
embeddings.extend(response.embeddings)

# Use contextualized embed API for context models
if self._is_context_model():
for batch in batches:
# Context models expect inputs as a list of lists
response = await self._aclient.contextualized_embed(
inputs=[batch],
model=self.model,
input_type=input_type,
**kwargs,
)
# Extract embeddings from the first (and only) result
embeddings.extend(response.results[0].embeddings)
else:
# Use regular embed API for standard models
for batch in batches:
response = await self._aclient.embed(
texts=batch,
model=self.model,
input_type=input_type,
truncation=truncation, # type: ignore[assignment]
**kwargs,
)
embeddings.extend(response.embeddings) # type: ignore[attr-defined]
return embeddings
except Exception as e:
raise ValueError(f"Embedding texts failed: {e}")

def count_tokens(self, texts: List[str]) -> List[int]:
"""
Count tokens for the given texts using VoyageAI's tokenization API.

Args:
texts: List of texts to count tokens for.

Returns:
List[int]: List of token counts for each text.

Raises:
ValueError: If tokenization fails.

Example:
>>> vectorizer = VoyageAITextVectorizer(model="voyage-3.5")
>>> token_counts = vectorizer.count_tokens(["Hello world", "Another text"])
>>> print(token_counts) # [2, 2]
"""
if not texts:
return []

try:
# Use the VoyageAI tokenize API to get token counts
token_lists = self._client.tokenize(texts, model=self.model)
return [len(token_list) for token_list in token_lists]
except Exception as e:
raise ValueError(f"Token counting failed: {e}")

def _is_context_model(self) -> bool:
"""
Check if the current model is a contextualized embedding model.

Contextualized models (like voyage-context-3) use a different API
endpoint and expect inputs formatted differently.

Returns:
bool: True if the model is a context model, False otherwise.
"""
return "context" in self.model

def _build_token_aware_batches(
self, texts: List[str], max_batch_size: int = 1000
) -> List[List[str]]:
"""
Generate batches of texts based on token limits and batch size constraints.

This method uses VoyageAI's tokenization API to count tokens for all texts
in a single call, then creates batches that respect both the model's token
limit and a maximum batch size.

Args:
texts: List of texts to batch.
max_batch_size: Maximum number of texts per batch (default: 1000).

Returns:
List[List[str]]: List of batches, where each batch is a list of texts.

Raises:
ValueError: If tokenization fails.
"""
if not texts:
return []

max_tokens_per_batch = VOYAGE_TOTAL_TOKEN_LIMITS.get(self.model, 120_000)
batches = []
current_batch: List[str] = []
current_batch_tokens = 0

# Tokenize all texts in one API call for efficiency
try:
token_counts = self.count_tokens(texts)
except Exception as e:
raise ValueError(f"Failed to count tokens for batching: {e}")
Comment on lines +476 to +480
Copy link

Copilot AI Oct 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The token-aware batching makes an additional API call to count_tokens() for every embed operation. This adds latency and API usage overhead. Consider adding a parameter to allow users to opt out of token-aware batching or implement local tokenization if the VoyageAI client supports it.

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

count_token() is a low-latency local function.


for i, text in enumerate(texts):
n_tokens = token_counts[i]

# Check if adding this text would exceed limits
if current_batch and (
len(current_batch) >= max_batch_size
or (current_batch_tokens + n_tokens > max_tokens_per_batch)
):
# Save the current batch and start a new one
batches.append(current_batch)
current_batch = []
current_batch_tokens = 0

current_batch.append(text)
current_batch_tokens += n_tokens

# Add the last batch if it has any texts
if current_batch:
batches.append(current_batch)

return batches

@property
def type(self) -> str:
return "voyageai"
7 changes: 4 additions & 3 deletions tests/integration/test_rerankers.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@
@pytest.fixture(
params=[
CohereReranker,
VoyageAIReranker,
(VoyageAIReranker, "rerank-lite-1"),
(VoyageAIReranker, "rerank-2.5"),
]
)
def reranker(request):
if request.param == CohereReranker:
return request.param()
elif request.param == VoyageAIReranker:
return request.param(model="rerank-lite-1")
elif isinstance(request.param, tuple) and request.param[0] == VoyageAIReranker:
return request.param[0](model=request.param[1])


@pytest.fixture
Expand Down
Loading
Loading