Open
Description
Some integrations with tools like haystack would be cool. https://docs.haystack.deepset.ai/docs/inmemorydocumentstore
from typing import Any, Dict, Iterable, List, Optional
import numpy as np
from vicinity import Vicinity
from haystack import logging
from haystack.dataclasses import Document
from haystack.document_stores.errors import DuplicateDocumentError
from haystack.document_stores.types import DuplicatePolicy
from haystack.utils import expit
from haystack.utils.filters import document_matches_filter
logger = logging.getLogger(__name__)
class VicinityDocumentStore:
def __init__(self, backend_type: str = "basic", **kwargs):
self.vicinity = Vicinity.from_vectors_and_items(
vectors=[], items=[], backend_type=backend_type, store_vectors=True, **kwargs
)
def save_to_disk(self, path: str):
self.vicinity.save(path)
def load_from_disk(self, path: str):
self.vicinity.load(path)
def save_to_hub(self, hub_name: str, hub_model_id: str):
self.vicinity.save_to_hub(hub_name, hub_model_id)
def load_from_hub(self, hub_name: str, hub_model_id: str):
self.vicinity.load_from_hub(hub_name, hub_model_id)
def count_documents(self) -> int:
return len(self.vicinity.items)
def count_documents(self, documents: List[Document]):
self.vicinity.add_documents(documents)
def filter_documents(self, filters: Optional[Dict[str, Any]] = None) -> List[Document]:
"""
Returns the documents that match the filters provided.
For a detailed specification of the filters, refer to the DocumentStore.filter_documents() protocol
documentation.
:param filters: The filters to apply to the document list.
:returns: A list of Documents that match the given filters.
"""
if filters:
if "operator" not in filters and "conditions" not in filters:
raise ValueError(
"Invalid filter syntax. See https://docs.haystack.deepset.ai/docs/metadata-filtering for details."
)
return [doc for doc in self.vicinity.items if document_matches_filter(filters=filters, document=doc)]
return list(self.vicinity.items)
def write_documents(self, documents: List[Document], policy: DuplicatePolicy = DuplicatePolicy.NONE) -> int:
"""
Refer to the DocumentStore.write_documents() protocol documentation.
If `policy` is set to `DuplicatePolicy.NONE` defaults to `DuplicatePolicy.FAIL`.
"""
if (
not isinstance(documents, Iterable)
or isinstance(documents, str)
or any(not isinstance(doc, Document) for doc in documents)
):
raise ValueError("Please provide a list of Documents.")
if policy == DuplicatePolicy.NONE:
policy = DuplicatePolicy.FAIL
written_documents = len(documents)
for document in documents:
document_in_index = False
for item in self.vicinity.items:
if item.id == document.id:
document_in_index = True
break
if policy != DuplicatePolicy.OVERWRITE and document_in_index:
if policy == DuplicatePolicy.FAIL:
raise DuplicateDocumentError(f"ID '{document.id}' already exists.")
if policy == DuplicatePolicy.SKIP:
logger.warning("ID '{document_id}' already exists", document_id=document.id)
written_documents -= 1
continue
# Since the statistics are updated in an incremental manner,
# we need to explicitly remove the existing document to revert
# the statistics before updating them with the new document.
if document_in_index:
self.vicinity.delete([document])
self.vicinity.insert([document], embedding=np.array(document.embedding))
return written_documents
def delete_documents(self, document_ids: List[str]) -> None:
"""
Deletes all documents with matching document_ids from the DocumentStore.
:param document_ids: The object_ids to delete.
"""
items_to_delete = []
for doc_id in document_ids:
for item in self.vicinity.items:
if item.id == doc_id:
items_to_delete.append(item)
self.vicinity.delete(items_to_delete)
def bm25_retrieval(
self, query: str, filters: Optional[Dict[str, Any]] = None, top_k: int = 10, scale_score: bool = False
) -> List[Document]:
"""
Retrieves documents that are most relevant to the query using BM25 algorithm.
:param query: The query string.
:param filters: A dictionary with filters to narrow down the search space.
:param top_k: The number of top documents to retrieve. Default is 10.
:param scale_score: Whether to scale the scores of the retrieved documents. Default is False.
:returns: A list of the top_k documents most relevant to the query.
"""
if not query:
raise ValueError("Query should be a non-empty string")
content_type_filter = {
"operator": "OR",
"conditions": [
{"field": "content", "operator": "!=", "value": None},
{"field": "dataframe", "operator": "!=", "value": None},
],
}
if filters:
if "operator" not in filters:
raise ValueError(
"Invalid filter syntax. See https://docs.haystack.deepset.ai/docs/metadata-filtering for details."
)
filters = {"operator": "AND", "conditions": [content_type_filter, filters]}
else:
filters = content_type_filter
all_documents = self.filter_documents(filters=filters)
if len(all_documents) == 0:
logger.info("No documents found for BM25 retrieval. Returning empty list.")
return []
results = sorted(self.bm25_algorithm_inst(query, all_documents), key=lambda x: x[1], reverse=True)[:top_k]
# BM25Okapi can return meaningful negative values, so they should not be filtered out when scale_score is False.
# It's the only algorithm supported by rank_bm25 at the time of writing (2024) that can return negative scores.
# see https://github.com/deepset-ai/haystack/pull/6889 for more context.
negatives_are_valid = self.bm25_algorithm == "BM25Okapi" and not scale_score
# Create documents with the BM25 score to return them
return_documents = []
for doc, score in results:
if scale_score:
score = expit(score / BM25_SCALING_FACTOR)
if not negatives_are_valid and score <= 0.0:
continue
doc_fields = doc.to_dict()
doc_fields["score"] = score
return_document = Document.from_dict(doc_fields)
return_documents.append(return_document)
return return_documents
def embedding_retrieval( # pylint: disable=too-many-positional-arguments
self,
query_embedding: List[float],
filters: Optional[Dict[str, Any]] = None,
top_k: int = 10,
scale_score: bool = False,
return_embedding: bool = False,
) -> List[Document]:
"""
Retrieves documents that are most similar to the query embedding using a vector similarity metric.
:param query_embedding: Embedding of the query.
:param filters: A dictionary with filters to narrow down the search space.
:param top_k: The number of top documents to retrieve. Default is 10.
:param scale_score: Whether to scale the scores of the retrieved Documents. Default is False.
:param return_embedding: Whether to return the embedding of the retrieved Documents. Default is False.
:returns: A list of the top_k documents most relevant to the query.
"""
if len(query_embedding) == 0 or not isinstance(query_embedding[0], float):
raise ValueError("query_embedding should be a non-empty list of floats.")
k_nearest_neighbors = self.vicinity.query(np.array(query_embedding), top_k=top_k)
all_documents = [self.vicinity.items[i[0]] for i in k_nearest_neighbors]
distances = [i[1] for i in k_nearest_neighbors]
filters = filters or {}
all_documents = [
(doc, distance)
for doc, distance in zip(all_documents, distances)
if document_matches_filter(filters=filters, document=doc)
]
# create Documents with the similarity score for the top k results
top_documents = []
for doc, score in sorted(all_documents, key=lambda x: x[1], reverse=True)[:top_k]:
doc_fields = doc.to_dict()
doc_fields["score"] = score
if return_embedding is False:
doc_fields["embedding"] = None
top_documents.append(Document.from_dict(doc_fields))
return top_documents
Metadata
Metadata
Assignees
Labels
No labels