44from typing import Any , Dict , List , Optional , Sequence , Tuple
55
66import jmespath
7- from pymongo .cursor import Cursor as MongoCursor
87from pymongo .errors import PyMongoError
98
109from .common import CursorIterator
@@ -20,28 +19,24 @@ class ResultSet(CursorIterator):
2019 def __init__ (
2120 self ,
2221 command_result : Optional [Dict [str , Any ]] = None ,
23- mongo_cursor : Optional [MongoCursor ] = None ,
2422 execution_plan : ExecutionPlan = None ,
2523 arraysize : int = None ,
24+ database : Optional [Any ] = None ,
2625 ** kwargs ,
2726 ) -> None :
2827 super ().__init__ (arraysize = arraysize or self .DEFAULT_FETCH_SIZE , ** kwargs )
2928
30- # Handle both command results and legacy mongo cursor for backward compatibility
29+ # Handle command results from db.command
3130 if command_result is not None :
3231 self ._command_result = command_result
33- self ._mongo_cursor = None
32+ self ._database = database
3433 # Extract cursor info from command result
3534 self ._result_cursor = command_result .get ("cursor" , {})
35+ self ._cursor_id = self ._result_cursor .get ("id" , 0 ) # 0 means no more results
3636 self ._raw_results = self ._result_cursor .get ("firstBatch" , [])
3737 self ._cached_results : List [Sequence [Any ]] = []
38- elif mongo_cursor is not None :
39- self ._mongo_cursor = mongo_cursor
40- self ._command_result = None
41- self ._raw_results = []
42- self ._cached_results : List [Sequence [Any ]] = []
4338 else :
44- raise ProgrammingError ("Either command_result or mongo_cursor must be provided" )
39+ raise ProgrammingError ("command_result must be provided" )
4540
4641 self ._execution_plan = execution_plan
4742 self ._is_closed = False
@@ -53,14 +48,22 @@ def __init__(
5348
5449 # Process firstBatch immediately if available (after all attributes are set)
5550 if command_result is not None and self ._raw_results :
56- processed_batch = [self ._process_document (doc ) for doc in self ._raw_results ]
57- # Convert dictionaries to sequences for DB API 2.0 compliance
58- sequence_batch = [self ._dict_to_sequence (doc ) for doc in processed_batch ]
59- self ._cached_results .extend (sequence_batch )
51+ self ._process_and_cache_batch (self ._raw_results )
6052
6153 # Build description from projection
6254 self ._build_description ()
6355
56+ def _process_and_cache_batch (self , batch : List [Dict [str , Any ]]) -> None :
57+ """Process and cache a batch of documents"""
58+ if not batch :
59+ return
60+ # Process results through projection mapping
61+ processed_batch = [self ._process_document (doc ) for doc in batch ]
62+ # Convert dictionaries to output format (sequence or dict)
63+ formatted_batch = [self ._format_result (doc ) for doc in processed_batch ]
64+ self ._cached_results .extend (formatted_batch )
65+ self ._total_fetched += len (batch )
66+
6467 def _build_description (self ) -> None :
6568 """Build column description from execution plan projection"""
6669 if not self ._execution_plan .projection_stage :
@@ -85,37 +88,37 @@ def _ensure_results_available(self, count: int = 1) -> None:
8588 if self ._cache_exhausted :
8689 return
8790
88- if self ._command_result is not None :
89- # For command results, we already have all data in firstBatch
90- # No additional fetching needed
91- self ._cache_exhausted = True
92- return
91+ # Fetch more results if needed and cursor has more data
92+ while len (self ._cached_results ) < count and self ._cursor_id != 0 :
93+ try :
94+ # Use getMore to fetch next batch
95+ if self ._database and self ._execution_plan .collection :
96+ getmore_cmd = {
97+ "getMore" : self ._cursor_id ,
98+ "collection" : self ._execution_plan .collection ,
99+ }
100+ result = self ._database .command (getmore_cmd )
101+
102+ # Extract and process next batch
103+ cursor_info = result .get ("cursor" , {})
104+ next_batch = cursor_info .get ("nextBatch" , [])
105+ self ._process_and_cache_batch (next_batch )
106+
107+ # Update cursor ID for next iteration
108+ self ._cursor_id = cursor_info .get ("id" , 0 )
109+ else :
110+ # No database access, mark as exhausted
111+ self ._cache_exhausted = True
112+ break
113+
114+ except PyMongoError as e :
115+ self ._errors .append ({"error" : str (e ), "type" : type (e ).__name__ })
116+ self ._cache_exhausted = True
117+ raise DatabaseError (f"Error fetching more results: { e } " )
93118
94- elif self ._mongo_cursor is not None :
95- # Fetch more results if needed (legacy mongo cursor support)
96- while len (self ._cached_results ) < count and not self ._cache_exhausted :
97- try :
98- # Iterate through cursor without calling limit() again
99- batch = []
100- for i , doc in enumerate (self ._mongo_cursor ):
101- if i >= self .arraysize :
102- break
103- batch .append (doc )
104-
105- if not batch :
106- self ._cache_exhausted = True
107- break
108-
109- # Process results through projection mapping
110- processed_batch = [self ._process_document (doc ) for doc in batch ]
111- # Convert dictionaries to sequences for DB API 2.0 compliance
112- sequence_batch = [self ._dict_to_sequence (doc ) for doc in processed_batch ]
113- self ._cached_results .extend (sequence_batch )
114- self ._total_fetched += len (batch )
115-
116- except PyMongoError as e :
117- self ._errors .append ({"error" : str (e ), "type" : type (e ).__name__ })
118- raise DatabaseError (f"Error fetching results: { e } " )
119+ # Mark as exhausted if no more results available
120+ if self ._cursor_id == 0 :
121+ self ._cache_exhausted = True
119122
120123 def _process_document (self , doc : Dict [str , Any ]) -> Dict [str , Any ]:
121124 """Process a MongoDB document according to projection mapping"""
@@ -136,7 +139,10 @@ def _process_document(self, doc: Dict[str, Any]) -> Dict[str, Any]:
136139 return processed
137140
138141 def _mongo_to_bracket_key (self , field_path : str ) -> str :
139- """Convert Mongo dot-index notation to bracket notation for display keys.
142+ """Convert Mongo dot-index notation to bracket notation.
143+
144+ Transforms numeric dot segments into bracket indices for both display keys
145+ and JMESPath-compatible field paths.
140146
141147 Examples:
142148 items.0 -> items[0]
@@ -147,15 +153,6 @@ def _mongo_to_bracket_key(self, field_path: str) -> str:
147153 # Replace .<number> with [<number>]
148154 return re .sub (r"\.(\d+)" , r"[\1]" , field_path )
149155
150- def _mongo_to_jmespath (self , field_path : str ) -> str :
151- """Convert Mongo-style field path to JMESPath-compatible path.
152-
153- This mainly transforms numeric dot segments into bracket indices.
154- """
155- if not isinstance (field_path , str ):
156- return field_path
157- return self ._mongo_to_bracket_key (field_path )
158-
159156 def _get_nested_value (self , doc : Dict [str , Any ], field_path : str ) -> Any :
160157 """Extract nested field value from document using JMESPath
161158
@@ -170,16 +167,16 @@ def _get_nested_value(self, doc: Dict[str, Any], field_path: str) -> Any:
170167 if "." not in field_path and "[" not in field_path :
171168 return doc .get (field_path )
172169
173- # Convert normalized Mongo-style numeric segments to JMESPath bracket notation
174- jmes_field = self ._mongo_to_jmespath (field_path )
170+ # Convert normalized Mongo-style numeric segments to bracket notation
171+ normalized_field = self ._mongo_to_bracket_key (field_path )
175172 # Use jmespath for complex paths
176- return jmespath .search (jmes_field , doc )
173+ return jmespath .search (normalized_field , doc )
177174 except Exception as e :
178175 _logger .debug (f"Error extracting field '{ field_path } ': { e } " )
179176 return None
180177
181- def _dict_to_sequence (self , doc : Dict [str , Any ]) -> Tuple [Any , ...]:
182- """Convert document dictionary to sequence according to column order """
178+ def _format_result (self , doc : Dict [str , Any ]) -> Tuple [Any , ...]:
179+ """Format processed document to output format (tuple for DB API 2.0 compliance) """
183180 if self ._column_names is None :
184181 # First time - establish column order
185182 self ._column_names = list (doc .keys ())
@@ -259,33 +256,12 @@ def fetchall(self) -> List[Sequence[Any]]:
259256 all_results = []
260257
261258 try :
262- if self ._command_result is not None :
263- # Handle command result (db.command)
264- if not self ._cache_exhausted :
265- # Results are already processed in constructor, just extend
266- all_results .extend (self ._cached_results )
267- self ._total_fetched += len (self ._cached_results )
268- self ._cache_exhausted = True
269-
270- elif self ._mongo_cursor is not None :
271- # Handle legacy mongo cursor (for backward compatibility)
272- # Add cached results
259+ # Handle command result (db.command)
260+ if not self ._cache_exhausted :
261+ # Results are already processed in constructor, just extend
273262 all_results .extend (self ._cached_results )
274- self ._cached_results .clear ()
275-
276- # Fetch remaining from cursor
277- if not self ._cache_exhausted :
278- # Iterate through all remaining documents in the cursor
279- remaining_docs = list (self ._mongo_cursor )
280- if remaining_docs :
281- # Process results through projection mapping
282- processed_docs = [self ._process_document (doc ) for doc in remaining_docs ]
283- # Convert dictionaries to sequences for DB API 2.0 compliance
284- sequence_docs = [self ._dict_to_sequence (doc ) for doc in processed_docs ]
285- all_results .extend (sequence_docs )
286- self ._total_fetched += len (remaining_docs )
287-
288- self ._cache_exhausted = True
263+ self ._total_fetched += len (self ._cached_results )
264+ self ._cache_exhausted = True
289265
290266 except PyMongoError as e :
291267 self ._errors .append ({"error" : str (e ), "type" : type (e ).__name__ })
@@ -303,17 +279,10 @@ def is_closed(self) -> bool:
303279 def close (self ) -> None :
304280 """Close the result set and free resources"""
305281 if not self ._is_closed :
306- try :
307- if self ._mongo_cursor :
308- self ._mongo_cursor .close ()
309- # No special cleanup needed for command results
310- except Exception as e :
311- _logger .warning (f"Error closing MongoDB cursor: { e } " )
312- finally :
313- self ._is_closed = True
314- self ._mongo_cursor = None
315- self ._command_result = None
316- self ._cached_results .clear ()
282+ self ._is_closed = True
283+ self ._command_result = None
284+ self ._database = None
285+ self ._cached_results .clear ()
317286
318287 def __enter__ (self ):
319288 return self
@@ -322,5 +291,13 @@ def __exit__(self, exc_type, exc_val, exc_tb):
322291 self .close ()
323292
324293
294+ class DictResultSet (ResultSet ):
295+ """Result set that returns dictionaries instead of sequences"""
296+
297+ def _format_result (self , doc : Dict [str , Any ]) -> Dict [str , Any ]:
298+ """Override to return dictionary directly instead of converting to sequence"""
299+ return doc
300+
301+
325302# For backward compatibility
326303MongoResultSet = ResultSet
0 commit comments