11
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
12
# See the License for the specific language governing permissions and
13
13
# limitations under the License.
14
- from typing import Generic , List , TypeVar
14
+ from typing import Generic , Hashable , List , Set , TypeVar
15
15
16
- T = TypeVar ("T" )
16
+ T = TypeVar ("T" , bound = Hashable )
17
17
18
18
19
19
class _Entry (Generic [T ]):
20
20
__slots__ = ["end_key" , "queue" ]
21
21
22
22
def __init__ (self , end_key : int ) -> None :
23
23
self .end_key : int = end_key
24
- self .queue : List [T ] = []
24
+
25
+ # We use a set here as otherwise we can end up with a lot of duplicate
26
+ # entries.
27
+ self .queue : Set [T ] = set ()
25
28
26
29
27
30
class WheelTimer (Generic [T ]):
@@ -55,7 +58,7 @@ def insert(self, now: int, obj: T, then: int) -> None:
55
58
56
59
if then_key <= max_key :
57
60
# The max here is to protect against inserts for times in the past
58
- self .entries [max (min_key , then_key ) - min_key ].queue .append (obj )
61
+ self .entries [max (min_key , then_key ) - min_key ].queue .add (obj )
59
62
return
60
63
61
64
next_key = int (now / self .bucket_size ) + 1
@@ -71,7 +74,7 @@ def insert(self, now: int, obj: T, then: int) -> None:
71
74
# to insert. This ensures there are no gaps.
72
75
self .entries .extend (_Entry (key ) for key in range (last_key , then_key + 1 ))
73
76
74
- self .entries [- 1 ].queue .append (obj )
77
+ self .entries [- 1 ].queue .add (obj )
75
78
76
79
def fetch (self , now : int ) -> List [T ]:
77
80
"""Fetch any objects that have timed out
@@ -84,7 +87,7 @@ def fetch(self, now: int) -> List[T]:
84
87
"""
85
88
now_key = int (now / self .bucket_size )
86
89
87
- ret = []
90
+ ret : List [ T ] = []
88
91
while self .entries and self .entries [0 ].end_key <= now_key :
89
92
ret .extend (self .entries .pop (0 ).queue )
90
93
0 commit comments