Skip to content
11 changes: 7 additions & 4 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,8 +296,9 @@
(_py_class_role, 'proxy.plugin.cache.store.base.CacheStore'),
(_py_class_role, 'proxy.core.pool.AcceptorPool'),
(_py_class_role, 'proxy.core.executors.ThreadlessPool'),
(_py_class_role, 'proxy.core.acceptor.threadless.T'),
(_py_class_role, 'proxy.core.acceptor.work.T'),
(_py_class_role, 'proxy.core.work.threadless.T'),
(_py_class_role, 'proxy.core.work.work.T'),
(_py_class_role, 'proxy.core.acceptor.threadless.Threadless'),
(_py_class_role, 'queue.Queue[Any]'),
(_py_class_role, 'SelectableEvents'),
(_py_class_role, 'TcpClientConnection'),
Expand All @@ -309,6 +310,8 @@
(_py_class_role, 'Url'),
(_py_class_role, 'WebsocketFrame'),
(_py_class_role, 'Work'),
(_py_obj_role, 'proxy.core.acceptor.threadless.T'),
(_py_obj_role, 'proxy.core.acceptor.work.T'),
(_py_class_role, 'proxy.core.acceptor.work.Work'),
(_py_class_role, 'connection.Connection'),
(_py_obj_role, 'proxy.core.work.threadless.T'),
(_py_obj_role, 'proxy.core.work.work.T'),
]
2 changes: 1 addition & 1 deletion examples/web_scraper.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

from proxy import Proxy
from proxy.common.types import Readables, Writables, SelectableEvents
from proxy.core.acceptor import Work
from proxy.core.work import Work
from proxy.core.connection import TcpClientConnection


Expand Down
14 changes: 2 additions & 12 deletions proxy/core/acceptor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,12 @@

pre
"""
from .listener import Listener
from .acceptor import Acceptor
from .pool import AcceptorPool
from .work import Work
from .threadless import Threadless
from .remote import RemoteExecutor
from .local import LocalExecutor
from .executors import ThreadlessPool
from .listener import Listener

__all__ = [
'Listener',
'Acceptor',
'AcceptorPool',
'Work',
'Threadless',
'RemoteExecutor',
'LocalExecutor',
'ThreadlessPool',
'Listener',
]
11 changes: 5 additions & 6 deletions proxy/core/acceptor/acceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@

from ..event import EventQueue

from .local import LocalExecutor
from .executors import ThreadlessPool
from ..work import LocalExecutor, delegate_work_to_pool, start_threaded_work

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -72,11 +71,11 @@ def __init__(
idd: int,
fd_queue: connection.Connection,
flags: argparse.Namespace,
lock: multiprocessing.synchronize.Lock,
lock: 'multiprocessing.synchronize.Lock',
# semaphore: multiprocessing.synchronize.Semaphore,
executor_queues: List[connection.Connection],
executor_pids: List[int],
executor_locks: List[multiprocessing.synchronize.Lock],
executor_locks: List['multiprocessing.synchronize.Lock'],
event_queue: Optional[EventQueue] = None,
) -> None:
super().__init__()
Expand Down Expand Up @@ -214,7 +213,7 @@ def _work(self, conn: socket.socket, addr: Optional[Tuple[str, int]]) -> None:
# 1st workers. To randomize, we offset index by idd.
index = (self._total + self.idd) % self.flags.num_workers
thread = threading.Thread(
target=ThreadlessPool.delegate,
target=delegate_work_to_pool,
args=(
self.executor_pids[index],
self.executor_queues[index],
Expand All @@ -231,7 +230,7 @@ def _work(self, conn: socket.socket, addr: Optional[Tuple[str, int]]) -> None:
),
)
else:
_, thread = ThreadlessPool.start_threaded_work(
_, thread = start_threaded_work(
self.flags,
conn,
addr,
Expand Down
6 changes: 3 additions & 3 deletions proxy/core/acceptor/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class AcceptorPool:
while True:
time.sleep(1)

`flags.work_klass` must implement `work.Work` class.
`flags.work_klass` must implement :py:class:`~proxy.core.work.Work` class.
"""

def __init__(
Expand All @@ -68,7 +68,7 @@ def __init__(
listener: Listener,
executor_queues: List[connection.Connection],
executor_pids: List[int],
executor_locks: List[multiprocessing.synchronize.Lock],
executor_locks: List['multiprocessing.synchronize.Lock'],
event_queue: Optional[EventQueue] = None,
) -> None:
self.flags = flags
Expand All @@ -77,7 +77,7 @@ def __init__(
# Available executors
self.executor_queues: List[connection.Connection] = executor_queues
self.executor_pids: List[int] = executor_pids
self.executor_locks: List[multiprocessing.synchronize.Lock] = executor_locks
self.executor_locks: List['multiprocessing.synchronize.Lock'] = executor_locks
# Eventing core queue
self.event_queue: Optional[EventQueue] = event_queue
# Acceptor process instances
Expand Down
2 changes: 1 addition & 1 deletion proxy/core/base/tcp_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from abc import abstractmethod
from typing import Any, Optional

from ...core.acceptor import Work
from ...core.work import Work
from ...core.connection import TcpClientConnection
from ...common.types import Readables, SelectableEvents, Writables

Expand Down
2 changes: 1 addition & 1 deletion proxy/core/connection/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from ...common.flag import flags
from ...common.types import Readables, SelectableEvents, Writables

from ..acceptor.work import Work
from ..work import Work

from .server import TcpServerConnection

Expand Down
31 changes: 31 additions & 0 deletions proxy/core/work/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# -*- coding: utf-8 -*-
"""
proxy.py
~~~~~~~~
⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on
Network monitoring, controls & Application development, testing, debugging.

:copyright: (c) 2013-present by Abhinav Singh and contributors.
:license: BSD, see LICENSE for more details.

.. spelling::

pre
"""
from .work import Work
from .threadless import Threadless
from .remote import RemoteExecutor
from .local import LocalExecutor
from .pool import ThreadlessPool
from .delegate import delegate_work_to_pool
from .threaded import start_threaded_work

__all__ = [
'Work',
'Threadless',
'RemoteExecutor',
'LocalExecutor',
'ThreadlessPool',
'delegate_work_to_pool',
'start_threaded_work',
]
40 changes: 40 additions & 0 deletions proxy/core/work/delegate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# -*- coding: utf-8 -*-
"""
proxy.py
~~~~~~~~
⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on
Network monitoring, controls & Application development, testing, debugging.

:copyright: (c) 2013-present by Abhinav Singh and contributors.
:license: BSD, see LICENSE for more details.
"""
from typing import TYPE_CHECKING, Optional, Tuple
from multiprocessing.reduction import send_handle

if TYPE_CHECKING:
import socket
import multiprocessing
from multiprocessing import connection


def delegate_work_to_pool(
worker_pid: int,
work_queue: 'connection.Connection',
work_lock: 'multiprocessing.synchronize.Lock',
conn: 'socket.socket',
addr: Optional[Tuple[str, int]],
unix_socket_path: Optional[str] = None,
) -> None:
"""Utility method to delegate a work to threadless executor pool."""
with work_lock:
# Accepted client address is empty string for
# unix socket domain, avoid sending empty string
# for optimization.
if not unix_socket_path:
work_queue.send(addr)
send_handle(
work_queue,
conn.fileno(),
worker_pid,
)
conn.close()
File renamed without changes.
64 changes: 3 additions & 61 deletions proxy/core/acceptor/executors.py → proxy/core/work/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,17 @@
:copyright: (c) 2013-present by Abhinav Singh and contributors.
:license: BSD, see LICENSE for more details.
"""
import socket
import logging
import argparse
import threading
import multiprocessing

from multiprocessing import connection
from multiprocessing.reduction import send_handle

from typing import Any, Optional, List, Tuple
from typing import Any, Optional, List

from .work import Work
from .remote import RemoteExecutor

from ..connection import TcpClientConnection
from ..event import EventQueue, eventNames
from ..event import EventQueue

from ...common.flag import flags
from ...common.constants import DEFAULT_NUM_WORKERS, DEFAULT_THREADLESS
Expand Down Expand Up @@ -83,7 +78,7 @@ def __init__(
# Threadless worker communication states
self.work_queues: List[connection.Connection] = []
self.work_pids: List[int] = []
self.work_locks: List[multiprocessing.synchronize.Lock] = []
self.work_locks: List['multiprocessing.synchronize.Lock'] = []
# List of threadless workers
self._workers: List[RemoteExecutor] = []
self._processes: List[multiprocessing.Process] = []
Expand All @@ -95,59 +90,6 @@ def __enter__(self) -> 'ThreadlessPool':
def __exit__(self, *args: Any) -> None:
self.shutdown()

@staticmethod
def delegate(
worker_pid: int,
work_queue: connection.Connection,
work_lock: multiprocessing.synchronize.Lock,
conn: socket.socket,
addr: Optional[Tuple[str, int]],
unix_socket_path: Optional[str] = None,
) -> None:
"""Utility method to delegate a work to threadless executor pool."""
with work_lock:
# Accepted client address is empty string for
# unix socket domain, avoid sending empty string
# for optimization.
if not unix_socket_path:
work_queue.send(addr)
send_handle(
work_queue,
conn.fileno(),
worker_pid,
)
conn.close()

@staticmethod
def start_threaded_work(
flags: argparse.Namespace,
conn: socket.socket,
addr: Optional[Tuple[str, int]],
event_queue: Optional[EventQueue] = None,
publisher_id: Optional[str] = None,
) -> Tuple[Work[TcpClientConnection], threading.Thread]:
"""Utility method to start a work in a new thread."""
work = flags.work_klass(
TcpClientConnection(conn, addr),
flags=flags,
event_queue=event_queue,
upstream_conn_pool=None,
)
# TODO: Keep reference to threads and join during shutdown.
# This will ensure connections are not abruptly closed on shutdown
# for threaded execution mode.
thread = threading.Thread(target=work.run)
thread.daemon = True
thread.start()
work.publish_event(
event_name=eventNames.WORK_STARTED,
event_payload={'fileno': conn.fileno(), 'addr': addr},
publisher_id=publisher_id or 'thread#{0}'.format(
thread.ident,
),
)
return (work, thread)

def setup(self) -> None:
"""Setup threadless processes."""
if self.flags.threadless:
Expand Down
File renamed without changes.
49 changes: 49 additions & 0 deletions proxy/core/work/threaded.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# -*- coding: utf-8 -*-
"""
proxy.py
~~~~~~~~
⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on
Network monitoring, controls & Application development, testing, debugging.

:copyright: (c) 2013-present by Abhinav Singh and contributors.
:license: BSD, see LICENSE for more details.
"""
import socket
import argparse
import threading
from typing import Optional, Tuple

from .work import Work

from ..connection import TcpClientConnection
from ..event import EventQueue, eventNames


def start_threaded_work(
flags: argparse.Namespace,
conn: socket.socket,
addr: Optional[Tuple[str, int]],
event_queue: Optional[EventQueue] = None,
publisher_id: Optional[str] = None,
) -> Tuple[Work[TcpClientConnection], threading.Thread]:
"""Utility method to start a work in a new thread."""
work = flags.work_klass(
TcpClientConnection(conn, addr),
flags=flags,
event_queue=event_queue,
upstream_conn_pool=None,
)
# TODO: Keep reference to threads and join during shutdown.
# This will ensure connections are not abruptly closed on shutdown
# for threaded execution mode.
thread = threading.Thread(target=work.run)
thread.daemon = True
thread.start()
work.publish_event(
event_name=eventNames.WORK_STARTED,
event_payload={'fileno': conn.fileno(), 'addr': addr},
publisher_id=publisher_id or 'thread#{0}'.format(
thread.ident,
),
)
return (work, thread)
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ async def _selected_events(self) -> Tuple[
Returned boolean value indicates whether there is
a newly accepted work waiting to be received and
queued for processing. This is only applicable when
:class:`~proxy.core.acceptor.threadless.Threadless.work_queue_fileno`
:class:`~proxy.core.work.threadless.Threadless.work_queue_fileno`
returns a valid fd.
"""
assert self.selector is not None
Expand Down
File renamed without changes.
4 changes: 3 additions & 1 deletion proxy/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@

from typing import List, Optional, Any

from .core.acceptor import AcceptorPool, ThreadlessPool, Listener
from .core.work import ThreadlessPool
from .core.event import EventManager
from .core.acceptor import AcceptorPool, Listener

from .common.utils import bytes_
from .common.flag import FlagParser, flags
from .common.constants import DEFAULT_LOCAL_EXECUTOR, DEFAULT_LOG_FILE, DEFAULT_LOG_FORMAT, DEFAULT_LOG_LEVEL, IS_WINDOWS
Expand Down
2 changes: 1 addition & 1 deletion tests/core/test_acceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def test_continues_when_no_events(
sock.accept.assert_not_called()
self.flags.work_klass.assert_not_called()

@mock.patch('proxy.core.acceptor.executors.TcpClientConnection')
@mock.patch('proxy.core.work.threaded.TcpClientConnection')
@mock.patch('threading.Thread')
@mock.patch('selectors.DefaultSelector')
@mock.patch('socket.fromfd')
Expand Down