File tree Expand file tree Collapse file tree 1 file changed +4
-1
lines changed 
src/apify/storage_clients/_apify Expand file tree Collapse file tree 1 file changed +4
-1
lines changed Original file line number Diff line number Diff line change @@ -86,7 +86,7 @@ def __init__(
8686        """The number of requests we assume have been handled (tracked manually for this instance).""" 
8787
8888        self ._fetch_lock  =  asyncio .Lock ()
89-         """Fetch lock to minimize race conditions when communicationg  with API.""" 
89+         """Fetch lock to minimize race conditions when communicating  with API.""" 
9090
9191    @override  
9292    async  def  get_metadata (self ) ->  RequestQueueMetadata :
@@ -397,6 +397,7 @@ async def reclaim_request(
397397        if  request .was_already_handled :
398398            request .handled_at  =  None 
399399
400+         # Reclaim with lock to prevent race conditions that could lead to double processing of same the request. 
400401        async  with  self ._fetch_lock :
401402            try :
402403                # Update the request in the API. 
@@ -439,6 +440,8 @@ async def is_empty(self) -> bool:
439440        Returns: 
440441            True if the queue is empty, False otherwise. 
441442        """ 
443+         # Check _list_head and self._queue_has_locked_requests with lock to make sure they are consistent. 
444+         # Without the lock the `is_empty` is prone to falsely report True with some low probability race condition. 
442445        async  with  self ._fetch_lock :
443446            head  =  await  self ._list_head (limit = 1 , lock_time = None )
444447            return  len (head .items ) ==  0  and  not  self ._queue_has_locked_requests 
 
 
   
 
     
   
   
          
    
    
     
    
      
     
     
    You can’t perform that action at this time.
  
 
    
  
    
      
        
     
       
      
     
   
 
    
    
  
 
  
 
     
    
0 commit comments