Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 13 additions & 12 deletions pinecone/grpc/index_grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import logging
from typing import List, Any, Iterable, cast, Literal, Iterator, TYPE_CHECKING

from google.protobuf import json_format

from pinecone.utils.tqdm import tqdm
from pinecone.utils import require_kwargs
Expand All @@ -15,6 +14,7 @@
parse_fetch_response,
parse_fetch_by_metadata_response,
parse_query_response,
query_response_to_dict,
parse_stats_response,
parse_upsert_response,
parse_update_response,
Expand All @@ -41,6 +41,7 @@
from pinecone.core.grpc.protos.db_data_2025_10_pb2 import (
Vector as GRPCVector,
QueryVector as GRPCQueryVector,
QueryResponse as ProtoQueryResponse,
UpsertRequest,
DeleteRequest,
QueryRequest,
Expand Down Expand Up @@ -501,13 +502,13 @@ def _query(
include_metadata: bool | None = None,
sparse_vector: (SparseValues | GRPCSparseValues | SparseVectorTypedDict) | None = None,
**kwargs,
) -> tuple[dict[str, Any], dict[str, str] | None]:
) -> tuple[ProtoQueryResponse, dict[str, str] | None]:
"""
Low-level query method that returns raw JSON dict and initial metadata without parsing.
Low-level query method that returns protobuf Message and initial metadata without parsing.
Used internally by query() and query_namespaces() for performance.

Returns:
Tuple of (json_dict, initial_metadata). initial_metadata may be None.
Tuple of (protobuf_message, initial_metadata). initial_metadata may be None.
"""
if vector is not None and id is not None:
raise ValueError("Cannot specify both `id` and `vector`")
Expand Down Expand Up @@ -535,7 +536,7 @@ def _query(

timeout = kwargs.pop("timeout", None)
response, initial_metadata = self.runner.run(self.stub.Query, request, timeout=timeout)
return json_format.MessageToDict(response), initial_metadata
return response, initial_metadata

def query(
self,
Expand Down Expand Up @@ -626,8 +627,8 @@ def query(
future, result_transformer=parse_query_response, timeout=timeout
)
else:
# For sync requests, use _query to get raw dict and metadata, then parse it
json_response, initial_metadata = self._query(
# For sync requests, use _query to get protobuf Message and metadata, then parse it
response, initial_metadata = self._query(
vector=vector,
id=id,
namespace=namespace,
Expand All @@ -640,7 +641,7 @@ def query(
**kwargs,
)
return parse_query_response(
json_response, _check_type=False, initial_metadata=initial_metadata
response, _check_type=False, initial_metadata=initial_metadata
)

def query_namespaces(
Expand Down Expand Up @@ -681,8 +682,9 @@ def query_namespaces(

only_futures = cast(Iterable[Future], futures)
for response in as_completed(only_futures):
json_response, _ = response.result() # Ignore initial_metadata for query_namespaces
# Pass raw dict directly to aggregator - no parsing needed
proto_response, _ = response.result() # Ignore initial_metadata for query_namespaces
# Convert protobuf Message to dict format for aggregator using optimized helper
json_response = query_response_to_dict(proto_response)
aggregator.add_results(json_response)

final_results = aggregator.get_results()
Expand Down Expand Up @@ -946,8 +948,7 @@ def describe_index_stats(

request = DescribeIndexStatsRequest(**args_dict)
response, _ = self.runner.run(self.stub.DescribeIndexStats, request, timeout=timeout)
json_response = json_format.MessageToDict(response)
return parse_stats_response(json_response)
return parse_stats_response(response)

@require_kwargs
def create_namespace(
Expand Down
25 changes: 13 additions & 12 deletions pinecone/grpc/resources/vector_grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import logging
from typing import Any, Iterable, cast, Literal

from google.protobuf import json_format

from pinecone.utils.tqdm import tqdm
from concurrent.futures import as_completed, Future
Expand All @@ -13,6 +12,7 @@
parse_fetch_response,
parse_fetch_by_metadata_response,
parse_query_response,
query_response_to_dict,
parse_stats_response,
parse_upsert_response,
parse_update_response,
Expand All @@ -32,6 +32,7 @@
from pinecone.db_control.models.list_response import ListResponse as SimpleListResponse, Pagination
from pinecone.core.grpc.protos.db_data_2025_10_pb2 import (
Vector as GRPCVector,
QueryResponse as ProtoQueryResponse,
UpsertRequest,
DeleteRequest,
QueryRequest,
Expand Down Expand Up @@ -444,13 +445,13 @@ def _query(
include_metadata: bool | None = None,
sparse_vector: (SparseValues | GRPCSparseValues | SparseVectorTypedDict) | None = None,
**kwargs,
) -> tuple[dict[str, Any], dict[str, str] | None]:
) -> tuple[ProtoQueryResponse, dict[str, str] | None]:
"""
Low-level query method that returns raw JSON dict and initial metadata without parsing.
Low-level query method that returns protobuf Message and initial metadata without parsing.
Used internally by query() and query_namespaces() for performance.

Returns:
Tuple of (json_dict, initial_metadata). initial_metadata may be None.
Tuple of (protobuf_message, initial_metadata). initial_metadata may be None.
"""
if vector is not None and id is not None:
raise ValueError("Cannot specify both `id` and `vector`")
Expand Down Expand Up @@ -478,7 +479,7 @@ def _query(

timeout = kwargs.pop("timeout", None)
response, initial_metadata = self._runner.run(self._stub.Query, request, timeout=timeout)
return json_format.MessageToDict(response), initial_metadata
return response, initial_metadata

def query(
self,
Expand Down Expand Up @@ -569,8 +570,8 @@ def query(
future, result_transformer=parse_query_response, timeout=timeout
)
else:
# For sync requests, use _query to get raw dict and metadata, then parse it
json_response, initial_metadata = self._query(
# For sync requests, use _query to get protobuf Message and metadata, then parse it
response, initial_metadata = self._query(
vector=vector,
id=id,
namespace=namespace,
Expand All @@ -583,7 +584,7 @@ def query(
**kwargs,
)
return parse_query_response(
json_response, _check_type=False, initial_metadata=initial_metadata
response, _check_type=False, initial_metadata=initial_metadata
)

def query_namespaces(
Expand Down Expand Up @@ -658,8 +659,9 @@ def query_namespaces(

only_futures = cast(Iterable[Future], futures)
for response in as_completed(only_futures):
json_response, _ = response.result() # Ignore initial_metadata for query_namespaces
# Pass raw dict directly to aggregator - no parsing needed
proto_response, _ = response.result() # Ignore initial_metadata for query_namespaces
# Convert protobuf Message to dict format for aggregator using optimized helper
json_response = query_response_to_dict(proto_response)
aggregator.add_results(json_response)

final_results = aggregator.get_results()
Expand Down Expand Up @@ -853,5 +855,4 @@ def describe_index_stats(

request = DescribeIndexStatsRequest(**args_dict)
response, _ = self._runner.run(self._stub.DescribeIndexStats, request, timeout=timeout)
json_response = json_format.MessageToDict(response)
return parse_stats_response(json_response)
return parse_stats_response(response)
Loading