Skip to content

Commit dd2476f

Browse files
authored
Refactor into separate Work module (#977)
* work module * Fix imports * String based typing for multiprocessing.synchronize * Fix `test_accepts_client_from_server_socket` * Move staticmethod outside of threadless pool class * Fix doc build * Fix test mock * mp grouped together * pylint happy * import only for type checking * doc build * wrong import order
1 parent f0d39eb commit dd2476f

File tree

17 files changed

+148
-92
lines changed

17 files changed

+148
-92
lines changed

docs/conf.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -296,8 +296,9 @@
296296
(_py_class_role, 'proxy.plugin.cache.store.base.CacheStore'),
297297
(_py_class_role, 'proxy.core.pool.AcceptorPool'),
298298
(_py_class_role, 'proxy.core.executors.ThreadlessPool'),
299-
(_py_class_role, 'proxy.core.acceptor.threadless.T'),
300-
(_py_class_role, 'proxy.core.acceptor.work.T'),
299+
(_py_class_role, 'proxy.core.work.threadless.T'),
300+
(_py_class_role, 'proxy.core.work.work.T'),
301+
(_py_class_role, 'proxy.core.acceptor.threadless.Threadless'),
301302
(_py_class_role, 'queue.Queue[Any]'),
302303
(_py_class_role, 'SelectableEvents'),
303304
(_py_class_role, 'TcpClientConnection'),
@@ -309,6 +310,8 @@
309310
(_py_class_role, 'Url'),
310311
(_py_class_role, 'WebsocketFrame'),
311312
(_py_class_role, 'Work'),
312-
(_py_obj_role, 'proxy.core.acceptor.threadless.T'),
313-
(_py_obj_role, 'proxy.core.acceptor.work.T'),
313+
(_py_class_role, 'proxy.core.acceptor.work.Work'),
314+
(_py_class_role, 'connection.Connection'),
315+
(_py_obj_role, 'proxy.core.work.threadless.T'),
316+
(_py_obj_role, 'proxy.core.work.work.T'),
314317
]

examples/web_scraper.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212

1313
from proxy import Proxy
1414
from proxy.common.types import Readables, Writables, SelectableEvents
15-
from proxy.core.acceptor import Work
15+
from proxy.core.work import Work
1616
from proxy.core.connection import TcpClientConnection
1717

1818

proxy/core/acceptor/__init__.py

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,22 +12,12 @@
1212
1313
pre
1414
"""
15+
from .listener import Listener
1516
from .acceptor import Acceptor
1617
from .pool import AcceptorPool
17-
from .work import Work
18-
from .threadless import Threadless
19-
from .remote import RemoteExecutor
20-
from .local import LocalExecutor
21-
from .executors import ThreadlessPool
22-
from .listener import Listener
2318

2419
__all__ = [
20+
'Listener',
2521
'Acceptor',
2622
'AcceptorPool',
27-
'Work',
28-
'Threadless',
29-
'RemoteExecutor',
30-
'LocalExecutor',
31-
'ThreadlessPool',
32-
'Listener',
3323
]

proxy/core/acceptor/acceptor.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,7 @@
3232

3333
from ..event import EventQueue
3434

35-
from .local import LocalExecutor
36-
from .executors import ThreadlessPool
35+
from ..work import LocalExecutor, delegate_work_to_pool, start_threaded_work
3736

3837
logger = logging.getLogger(__name__)
3938

@@ -72,11 +71,11 @@ def __init__(
7271
idd: int,
7372
fd_queue: connection.Connection,
7473
flags: argparse.Namespace,
75-
lock: multiprocessing.synchronize.Lock,
74+
lock: 'multiprocessing.synchronize.Lock',
7675
# semaphore: multiprocessing.synchronize.Semaphore,
7776
executor_queues: List[connection.Connection],
7877
executor_pids: List[int],
79-
executor_locks: List[multiprocessing.synchronize.Lock],
78+
executor_locks: List['multiprocessing.synchronize.Lock'],
8079
event_queue: Optional[EventQueue] = None,
8180
) -> None:
8281
super().__init__()
@@ -214,7 +213,7 @@ def _work(self, conn: socket.socket, addr: Optional[Tuple[str, int]]) -> None:
214213
# 1st workers. To randomize, we offset index by idd.
215214
index = (self._total + self.idd) % self.flags.num_workers
216215
thread = threading.Thread(
217-
target=ThreadlessPool.delegate,
216+
target=delegate_work_to_pool,
218217
args=(
219218
self.executor_pids[index],
220219
self.executor_queues[index],
@@ -231,7 +230,7 @@ def _work(self, conn: socket.socket, addr: Optional[Tuple[str, int]]) -> None:
231230
),
232231
)
233232
else:
234-
_, thread = ThreadlessPool.start_threaded_work(
233+
_, thread = start_threaded_work(
235234
self.flags,
236235
conn,
237236
addr,

proxy/core/acceptor/pool.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ class AcceptorPool:
5959
while True:
6060
time.sleep(1)
6161
62-
`flags.work_klass` must implement `work.Work` class.
62+
`flags.work_klass` must implement :py:class:`~proxy.core.work.Work` class.
6363
"""
6464

6565
def __init__(
@@ -68,7 +68,7 @@ def __init__(
6868
listener: Listener,
6969
executor_queues: List[connection.Connection],
7070
executor_pids: List[int],
71-
executor_locks: List[multiprocessing.synchronize.Lock],
71+
executor_locks: List['multiprocessing.synchronize.Lock'],
7272
event_queue: Optional[EventQueue] = None,
7373
) -> None:
7474
self.flags = flags
@@ -77,7 +77,7 @@ def __init__(
7777
# Available executors
7878
self.executor_queues: List[connection.Connection] = executor_queues
7979
self.executor_pids: List[int] = executor_pids
80-
self.executor_locks: List[multiprocessing.synchronize.Lock] = executor_locks
80+
self.executor_locks: List['multiprocessing.synchronize.Lock'] = executor_locks
8181
# Eventing core queue
8282
self.event_queue: Optional[EventQueue] = event_queue
8383
# Acceptor process instances

proxy/core/base/tcp_server.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
from abc import abstractmethod
1919
from typing import Any, Optional
2020

21-
from ...core.acceptor import Work
21+
from ...core.work import Work
2222
from ...core.connection import TcpClientConnection
2323
from ...common.types import Readables, SelectableEvents, Writables
2424

proxy/core/connection/pool.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
from ...common.flag import flags
2222
from ...common.types import Readables, SelectableEvents, Writables
2323

24-
from ..acceptor.work import Work
24+
from ..work import Work
2525

2626
from .server import TcpServerConnection
2727

proxy/core/work/__init__.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
# -*- coding: utf-8 -*-
2+
"""
3+
proxy.py
4+
~~~~~~~~
5+
⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on
6+
Network monitoring, controls & Application development, testing, debugging.
7+
8+
:copyright: (c) 2013-present by Abhinav Singh and contributors.
9+
:license: BSD, see LICENSE for more details.
10+
11+
.. spelling::
12+
13+
pre
14+
"""
15+
from .work import Work
16+
from .threadless import Threadless
17+
from .remote import RemoteExecutor
18+
from .local import LocalExecutor
19+
from .pool import ThreadlessPool
20+
from .delegate import delegate_work_to_pool
21+
from .threaded import start_threaded_work
22+
23+
__all__ = [
24+
'Work',
25+
'Threadless',
26+
'RemoteExecutor',
27+
'LocalExecutor',
28+
'ThreadlessPool',
29+
'delegate_work_to_pool',
30+
'start_threaded_work',
31+
]

proxy/core/work/delegate.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
# -*- coding: utf-8 -*-
2+
"""
3+
proxy.py
4+
~~~~~~~~
5+
⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on
6+
Network monitoring, controls & Application development, testing, debugging.
7+
8+
:copyright: (c) 2013-present by Abhinav Singh and contributors.
9+
:license: BSD, see LICENSE for more details.
10+
"""
11+
from typing import TYPE_CHECKING, Optional, Tuple
12+
from multiprocessing.reduction import send_handle
13+
14+
if TYPE_CHECKING:
15+
import socket
16+
import multiprocessing
17+
from multiprocessing import connection
18+
19+
20+
def delegate_work_to_pool(
21+
worker_pid: int,
22+
work_queue: 'connection.Connection',
23+
work_lock: 'multiprocessing.synchronize.Lock',
24+
conn: 'socket.socket',
25+
addr: Optional[Tuple[str, int]],
26+
unix_socket_path: Optional[str] = None,
27+
) -> None:
28+
"""Utility method to delegate a work to threadless executor pool."""
29+
with work_lock:
30+
# Accepted client address is empty string for
31+
# unix socket domain, avoid sending empty string
32+
# for optimization.
33+
if not unix_socket_path:
34+
work_queue.send(addr)
35+
send_handle(
36+
work_queue,
37+
conn.fileno(),
38+
worker_pid,
39+
)
40+
conn.close()
File renamed without changes.

0 commit comments

Comments
 (0)