Skip to content

Commit 17f2547

Browse files
Fix #2227 async implementation
1 parent 74ad422 commit 17f2547

File tree

1 file changed

+22
-0
lines changed

1 file changed

+22
-0
lines changed

src/snowflake/connector/aio/_result_set.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
from snowflake.connector.result_set import ResultSet as ResultSetSync
3232

3333
from .. import NotSupportedError
34+
from ..errors import Error
3435
from ..options import pyarrow as pa
3536
from ..result_batch import DownloadMetrics
3637
from ..telemetry import TelemetryField
@@ -55,6 +56,7 @@ def __init__(
5556
**kw: Any,
5657
) -> None:
5758
self._is_fetch_all = kw.pop("is_fetch_all", False)
59+
self._cursor = kw.pop("cursor", None)
5860
self._first_batch_iter = first_batch_iter
5961
self._unfetched_batches = unfetched_batches
6062
self._final = final
@@ -75,12 +77,31 @@ async def _download_batch_and_convert_to_list(self, result_batch):
7577

7678
async def fetch_all_data(self):
7779
rets = list(self._first_batch_iter)
80+
# Check for exceptions in the first batch
81+
connection = self._kw.get("connection")
82+
83+
for item in rets:
84+
if isinstance(item, Exception):
85+
Error.errorhandler_wrapper_from_ready_exception(
86+
connection,
87+
self._cursor,
88+
item,
89+
)
90+
7891
tasks = [
7992
self._download_batch_and_convert_to_list(result_batch)
8093
for result_batch in self._unfetched_batches
8194
]
8295
batches = await asyncio.gather(*tasks)
8396
for batch in batches:
97+
# Check for exceptions in each batch before extending
98+
for item in batch:
99+
if isinstance(item, Exception):
100+
Error.errorhandler_wrapper_from_ready_exception(
101+
connection,
102+
self._cursor,
103+
item,
104+
)
84105
rets.extend(batch)
85106
# yield to avoid blocking the event loop for too long when processing large result sets
86107
# await asyncio.sleep(0)
@@ -195,6 +216,7 @@ async def _create_iter(
195216
unfetched_batches,
196217
self._finish_iterating,
197218
self.prefetch_thread_num,
219+
cursor=self._cursor,
198220
is_fetch_all=is_fetch_all,
199221
**kwargs,
200222
)

0 commit comments

Comments
 (0)