11from __future__ import annotations
22
3- import asyncio
4- import threading
53import traceback
6- from concurrent import futures
74from logging import getLogger
8- from typing import TYPE_CHECKING , Any
5+ from typing import TYPE_CHECKING
96
107from scrapy import Spider
118from scrapy .core .scheduler import BaseScheduler
129from scrapy .utils .reactor import is_asyncio_reactor_installed
1310
11+ from ._async_thread import AsyncThread
12+ from .requests import to_apify_request , to_scrapy_request
1413from apify import Configuration
1514from apify .apify_storage_client import ApifyStorageClient
16- from apify .scrapy .requests import to_apify_request , to_scrapy_request
1715from apify .storages import RequestQueue
1816
1917if TYPE_CHECKING :
20- from collections .abc import Coroutine
21-
2218 from scrapy .http .request import Request
2319 from twisted .internet .defer import Deferred
2420
2521logger = getLogger (__name__ )
2622
27- _TIMEOUT = 60
28- """The timeout for waiting on asyncio coroutines to finish."""
29-
30-
31- def _start_event_loop (eventloop : asyncio .AbstractEventLoop ) -> None :
32- """Set and run the event loop until it is stopped.
33-
34- Args:
35- eventloop: The asyncio event loop to run.
36- """
37- asyncio .set_event_loop (eventloop )
38- try :
39- eventloop .run_forever ()
40- finally :
41- eventloop .close ()
42- logger .debug ('Asyncio event loop has been closed.' )
43-
44-
45- def _run_async_coro (eventloop : asyncio .AbstractEventLoop , coro : Coroutine ) -> Any :
46- """Run a coroutine on the given loop in our separate thread, waiting for its result.
47-
48- Args:
49- eventloop: The asyncio event loop to run the coroutine on.
50- coro: The coroutine to run.
51-
52- Returns:
53- The result of the coroutine.
54- """
55- if not eventloop .is_running ():
56- logger .warning ('Event loop is not running! Ignoring coroutine execution.' )
57- return None
58-
59- future = asyncio .run_coroutine_threadsafe (coro , eventloop )
60- try :
61- return future .result (timeout = _TIMEOUT )
62- except futures .TimeoutError as exc :
63- logger .exception ('Coroutine execution timed out.' , exc_info = exc )
64- raise
65- except Exception as exc :
66- logger .exception ('Coroutine execution raised an exception.' , exc_info = exc )
67- raise
68-
69-
70- async def _shutdown_async_tasks (eventloop : asyncio .AbstractEventLoop ) -> None :
71- """Cancel and wait for all pending tasks on the current event loop.
72-
73- Args:
74- eventloop: The asyncio event loop to cancel tasks on.
75- """
76- tasks = [task for task in asyncio .all_tasks (eventloop ) if task is not asyncio .current_task ()]
77- if not tasks :
78- return
79- for task in tasks :
80- task .cancel ()
81- await asyncio .gather (* tasks , return_exceptions = True )
82-
83-
84- def _force_exit_event_loop (eventloop : asyncio .AbstractEventLoop , thread : threading .Thread ) -> None :
85- """Forcefully shut down the event loop and its thread.
86-
87- Args:
88- eventloop: The asyncio event loop to stop.
89- thread: The thread running the event loop.
90- """
91- try :
92- logger .info ('Forced shutdown of the event loop and its thread...' )
93- eventloop .call_soon_threadsafe (eventloop .stop )
94- thread .join (timeout = 5 )
95- except Exception as exc :
96- logger .exception ('Exception occurred during forced event loop shutdown.' , exc_info = exc )
97-
9823
9924class ApifyScheduler (BaseScheduler ):
10025 """A Scrapy scheduler that uses the Apify request queue to manage requests.
@@ -112,10 +37,8 @@ def __init__(self) -> None:
11237 self ._rq : RequestQueue | None = None
11338 self .spider : Spider | None = None
11439
115- # Create a new event loop and run it in a separate thread.
116- self ._eventloop = asyncio .new_event_loop ()
117- self ._thread = threading .Thread (target = lambda : _start_event_loop (self ._eventloop ), daemon = True )
118- self ._thread .start ()
40+ # A thread with the asyncio event loop to run coroutines on.
41+ self ._async_thread = AsyncThread ()
11942
12043 def open (self , spider : Spider ) -> Deferred [None ] | None :
12144 """Open the scheduler.
@@ -133,7 +56,7 @@ async def open_rq() -> RequestQueue:
13356 return await RequestQueue .open ()
13457
13558 try :
136- self ._rq = _run_async_coro ( self ._eventloop , open_rq ())
59+ self ._rq = self ._async_thread . run_coro ( open_rq ())
13760 except Exception :
13861 traceback .print_exc ()
13962 raise
@@ -150,20 +73,7 @@ def close(self, reason: str) -> None:
15073 """
15174 logger .debug (f'Closing { self .__class__ .__name__ } due to { reason } ...' )
15275 try :
153- if self ._eventloop .is_running ():
154- # Cancel all pending tasks in the event loop.
155- _run_async_coro (self ._eventloop , _shutdown_async_tasks (self ._eventloop ))
156-
157- # Stop the event loop.
158- self ._eventloop .call_soon_threadsafe (self ._eventloop .stop )
159-
160- # Wait for the event loop thread to exit.
161- self ._thread .join (timeout = _TIMEOUT )
162-
163- # If the thread is still alive, execute a forced shutdown.
164- if self ._thread .is_alive ():
165- logger .warning ('Event loop thread did not exit cleanly! Forcing shutdown...' )
166- _force_exit_event_loop (self ._eventloop , self ._thread )
76+ self ._async_thread .close ()
16777
16878 except KeyboardInterrupt :
16979 logger .warning ('Shutdown interrupted by KeyboardInterrupt!' )
@@ -184,7 +94,7 @@ def has_pending_requests(self) -> bool:
18494 raise TypeError ('self._rq must be an instance of the RequestQueue class' )
18595
18696 try :
187- is_finished = _run_async_coro ( self ._eventloop , self ._rq .is_finished ())
97+ is_finished = self ._async_thread . run_coro ( self ._rq .is_finished ())
18898 except Exception :
18999 traceback .print_exc ()
190100 raise
@@ -217,7 +127,7 @@ def enqueue_request(self, request: Request) -> bool:
217127 raise TypeError ('self._rq must be an instance of the RequestQueue class' )
218128
219129 try :
220- result = _run_async_coro ( self ._eventloop , self ._rq .add_request (apify_request ))
130+ result = self ._async_thread . run_coro ( self ._rq .add_request (apify_request ))
221131 except Exception :
222132 traceback .print_exc ()
223133 raise
@@ -236,7 +146,7 @@ def next_request(self) -> Request | None:
236146 raise TypeError ('self._rq must be an instance of the RequestQueue class' )
237147
238148 try :
239- apify_request = _run_async_coro ( self ._eventloop , self ._rq .fetch_next_request ())
149+ apify_request = self ._async_thread . run_coro ( self ._rq .fetch_next_request ())
240150 except Exception :
241151 traceback .print_exc ()
242152 raise
@@ -248,10 +158,10 @@ def next_request(self) -> Request | None:
248158 if not isinstance (self .spider , Spider ):
249159 raise TypeError ('self.spider must be an instance of the Spider class' )
250160
251- # Let the request queue know that the request is being handled. Every request should be marked as handled,
252- # retrying is handled by the Scrapy's RetryMiddleware.
161+ # Let the request queue know that the request is being handled. Every request should
162+ # be marked as handled, retrying is handled by the Scrapy's RetryMiddleware.
253163 try :
254- _run_async_coro ( self ._eventloop , self ._rq .mark_request_as_handled (apify_request ))
164+ self ._async_thread . run_coro ( self ._rq .mark_request_as_handled (apify_request ))
255165 except Exception :
256166 traceback .print_exc ()
257167 raise
0 commit comments