Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
ae45464
PYTHON-5517 Updates to connection pool backoff
blink1073 Oct 22, 2025
a4dd0f1
wip add tests
blink1073 Oct 23, 2025
25ab418
update tests
blink1073 Oct 23, 2025
58602c7
update sdam tests
blink1073 Oct 24, 2025
d3a4958
wip update tests
blink1073 Oct 24, 2025
1895e00
Merge branch 'backpressure' of github.com:mongodb/mongo-python-driver…
blink1073 Oct 24, 2025
76c4ee6
Revert "Merge branch 'backpressure' of github.com:mongodb/mongo-pytho…
blink1073 Oct 24, 2025
546976d
Revert "wip update tests"
blink1073 Oct 24, 2025
e52ecdf
wip update tests
blink1073 Oct 24, 2025
5ef7656
update to branch
blink1073 Oct 24, 2025
6d8369f
wip
blink1073 Oct 24, 2025
24542c9
fix backoff logic
blink1073 Oct 24, 2025
5e64aa9
fix race condition
blink1073 Oct 24, 2025
f67195e
update to use durationms
blink1073 Oct 24, 2025
873d1f1
add test that transitions from backoff to clear
blink1073 Oct 24, 2025
02aec91
clean up the tests
blink1073 Oct 24, 2025
73ff3d6
update logging test
blink1073 Oct 24, 2025
c70b66c
fix typing
blink1073 Oct 24, 2025
f20cc0a
add final test
blink1073 Oct 25, 2025
e905b9b
fix ready condition
blink1073 Oct 25, 2025
d228f08
wip incorporate design changes
blink1073 Oct 27, 2025
73e78b6
update tests
blink1073 Oct 28, 2025
adc1375
PYTHON-5627 - Update feedback link (#2601)
NoahStapp Oct 24, 2025
d6d43e7
fix tests
blink1073 Oct 28, 2025
36d4490
fix tests
blink1073 Oct 28, 2025
f936b1b
update backoff logic and fix test
blink1073 Oct 28, 2025
714bc31
fix test
blink1073 Oct 28, 2025
2748749
address failure
blink1073 Oct 28, 2025
2c3c9ad
revert changes to lb test
blink1073 Oct 28, 2025
84f3b68
more test cleanup
blink1073 Oct 28, 2025
ca6c981
more test cleanup
blink1073 Oct 28, 2025
94bc9a3
fix load balancer test
blink1073 Oct 28, 2025
85d2a6b
fix load balancer test
blink1073 Oct 28, 2025
c75ea23
clean up tests
blink1073 Oct 29, 2025
b2b4507
try pypy 3.11
blink1073 Oct 29, 2025
7411be6
add logic for multiple pending connections
blink1073 Oct 29, 2025
c2c8d40
fix race condition in tests
blink1073 Oct 29, 2025
d0aa7c7
undo change to flaky condition
blink1073 Oct 29, 2025
65eb2dc
fix test format
blink1073 Oct 30, 2025
063238d
update schema version
blink1073 Oct 30, 2025
0e9c29a
formatting
blink1073 Oct 30, 2025
a2aec86
update tests
blink1073 Oct 30, 2025
fd63597
fix supported schema version
blink1073 Oct 30, 2025
09647f0
address review
blink1073 Oct 31, 2025
1e6973b
add error label check and add runOnRequirement
blink1073 Oct 31, 2025
d72f279
update retry behavior and tests
blink1073 Nov 3, 2025
476c373
add connection-logging-pool-backoff tests
blink1073 Nov 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
wip incorporate design changes
  • Loading branch information
blink1073 committed Oct 27, 2025
commit d228f08eeafadf6749f07c97def316970e807ad6
47 changes: 23 additions & 24 deletions pymongo/asynchronous/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -820,8 +820,7 @@ def __init__(
async def ready(self) -> None:
# Take the lock to avoid the race condition described in PYTHON-2699.
async with self.lock:
# Do not set the pool as ready if in backoff.
if self._backoff:
if self.state == PoolState.BACKOFF:
return
if self.state != PoolState.READY:
self.state = PoolState.READY
Expand All @@ -847,14 +846,11 @@ async def _reset(
pause: bool = True,
service_id: Optional[ObjectId] = None,
interrupt_connections: bool = False,
from_server_description: bool = False,
) -> None:
old_state = self.state
async with self.size_cond:
if self.closed:
return
if from_server_description and self.state == PoolState.BACKOFF:
return
# Clear the backoff amount.
self._backoff = 0
if self.opts.pause_enabled and pause and not self.opts.load_balanced:
Expand Down Expand Up @@ -954,16 +950,12 @@ async def update_is_writable(self, is_writable: Optional[bool]) -> None:
_socket.update_is_writable(self.is_writable) # type: ignore[arg-type]

async def reset(
self,
service_id: Optional[ObjectId] = None,
interrupt_connections: bool = False,
from_server_description: bool = False,
self, service_id: Optional[ObjectId] = None, interrupt_connections: bool = False
) -> None:
await self._reset(
close=False,
service_id=service_id,
interrupt_connections=interrupt_connections,
from_server_description=from_server_description,
)

async def reset_without_pause(self) -> None:
Expand Down Expand Up @@ -1044,27 +1036,30 @@ async def remove_stale_sockets(self, reference_generation: int) -> None:
self.requests -= 1
self.size_cond.notify()

def _handle_connection_error(self, error: BaseException, phase: str) -> None:
async def _handle_connection_error(self, error: BaseException, phase: str) -> None:
# Handle system overload condition for non-sdam pools.
# Look for an AutoReconnect or NetworkTimeout error.
# If found, set backoff and add error labels.
if self.is_sdam or type(error) not in (AutoReconnect, NetworkTimeout):
return
error._add_error_label("SystemOverloadedError") # type:ignore[attr-defined]
error._add_error_label("RetryableError") # type:ignore[attr-defined]
self.backoff()
await self.backoff()

def backoff(self) -> None:
async def backoff(self) -> None:
"""Set/increase backoff mode."""
self._backoff += 1
backoff_duration_sec = _backoff(self._backoff)
backoff_duration_ms = int(backoff_duration_sec * 1000)
if self.state != PoolState.BACKOFF:
self.state = PoolState.BACKOFF
if self.enabled_for_cmap:
assert self.opts._event_listeners is not None
self.opts._event_listeners.publish_pool_backoff(self.address, backoff_duration_ms)
self._backoff_connection_time = backoff_duration_sec + time.monotonic()
async with self.lock:
self._backoff += 1
backoff_duration_sec = _backoff(self._backoff)
backoff_duration_ms = int(backoff_duration_sec * 1000)
if self.state != PoolState.BACKOFF:
self.state = PoolState.BACKOFF
if self.enabled_for_cmap:
assert self.opts._event_listeners is not None
self.opts._event_listeners.publish_pool_backoff(
self.address, self._backoff, backoff_duration_ms
)
self._backoff_connection_time = backoff_duration_sec + time.monotonic()

# Log the pool backoff message.
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
Expand All @@ -1074,6 +1069,7 @@ def backoff(self) -> None:
clientId=self._client_id,
serverHost=self.address[0],
serverPort=self.address[1],
attempt=self._backoff,
durationMS=backoff_duration_ms,
reason=_verbose_connection_error_reason(ConnectionClosedReason.POOL_BACKOFF),
error=ConnectionClosedReason.POOL_BACKOFF,
Expand Down Expand Up @@ -1136,7 +1132,7 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A
error=ConnectionClosedReason.ERROR,
)
if context["has_created_socket"]:
self._handle_connection_error(error, "handshake")
await self._handle_connection_error(error, "handshake")
if isinstance(error, (IOError, OSError, *SSLErrors)):
details = _get_timeout_details(self.opts)
_raise_connection_failure(self.address, error, timeout_details=details)
Expand All @@ -1148,9 +1144,11 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A
self.active_contexts.discard(tmp_context)
if tmp_context.cancelled:
conn.cancel_context.cancel()
has_completed_hello = False
try:
if not self.is_sdam:
await conn.hello()
has_completed_hello = True
self.is_writable = conn.is_writable
if handler:
handler.contribute_socket(conn, completed_handshake=False)
Expand All @@ -1160,7 +1158,8 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A
except BaseException as e:
async with self.lock:
self.active_contexts.discard(conn.cancel_context)
self._handle_connection_error(e, "hello")
if not has_completed_hello:
await self._handle_connection_error(e, "hello")
await conn.close_conn(ConnectionClosedReason.ERROR)
raise

Expand Down
7 changes: 3 additions & 4 deletions pymongo/asynchronous/topology.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
_SDAMStatusMessage,
_ServerSelectionStatusMessage,
)
from pymongo.pool import PoolState
from pymongo.pool_options import PoolOptions
from pymongo.server_description import ServerDescription
from pymongo.server_selectors import (
Expand Down Expand Up @@ -485,7 +486,7 @@ async def _process_change(
server_description.is_server_type_known and new_td.topology_type == TOPOLOGY_TYPE.Single
):
server = self._servers.get(server_description.address)
if server:
if server and server.pool.state != PoolState.BACKOFF:
await server.pool.ready()

suppress_event = sd_old == server_description
Expand Down Expand Up @@ -555,9 +556,7 @@ async def on_change(
if reset_pool:
server = self._servers.get(server_description.address)
if server:
await server.pool.reset(
interrupt_connections=interrupt_connections, from_server_description=True
)
await server.pool.reset(interrupt_connections=interrupt_connections)

async def _process_srv_update(self, seedlist: list[tuple[str, Any]]) -> None:
"""Process a new seedlist on an opened topology.
Expand Down
15 changes: 11 additions & 4 deletions pymongo/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -931,17 +931,24 @@ class PoolBackoffEvent(_PoolEvent):

:param address: The address (host, port) pair of the server this Pool is
attempting to connect to.
:param attempt: The backoff attempt.
:param duration_ms: The backoff duration in ms.

.. versionadded:: 4.16
"""

__slots__ = ("__duration_ms",)
__slots__ = ("__attempt", "__duration_ms")

def __init__(self, address: _Address, duration_ms: int) -> None:
def __init__(self, address: _Address, attempt: int, duration_ms: int) -> None:
super().__init__(address)
self.__attempt = attempt
self.__duration_ms = duration_ms

@property
def attempt(self) -> int:
"""The backoff attempt."""
return self.__attempt

@property
def duration_ms(self) -> int:
"""The backoff duration in ms."""
Expand Down Expand Up @@ -1864,9 +1871,9 @@ def publish_pool_closed(self, address: _Address) -> None:
except Exception:
_handle_exception()

def publish_pool_backoff(self, address: _Address, attempt: int) -> None:
def publish_pool_backoff(self, address: _Address, attempt: int, duration_ms: int) -> None:
"""Publish a :class:`PoolBackoffEvent` to all pool listeners."""
event = PoolBackoffEvent(address, attempt)
event = PoolBackoffEvent(address, attempt, duration_ms)
for subscriber in self.__cmap_listeners:
try:
subscriber.pool_backoff(event)
Expand Down
39 changes: 19 additions & 20 deletions pymongo/synchronous/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -818,8 +818,7 @@ def __init__(
def ready(self) -> None:
# Take the lock to avoid the race condition described in PYTHON-2699.
with self.lock:
# Do not set the pool as ready if in backoff.
if self._backoff:
if self.state == PoolState.BACKOFF:
return
if self.state != PoolState.READY:
self.state = PoolState.READY
Expand All @@ -845,14 +844,11 @@ def _reset(
pause: bool = True,
service_id: Optional[ObjectId] = None,
interrupt_connections: bool = False,
from_server_description: bool = False,
) -> None:
old_state = self.state
with self.size_cond:
if self.closed:
return
if from_server_description and self.state == PoolState.BACKOFF:
return
# Clear the backoff amount.
self._backoff = 0
if self.opts.pause_enabled and pause and not self.opts.load_balanced:
Expand Down Expand Up @@ -952,16 +948,12 @@ def update_is_writable(self, is_writable: Optional[bool]) -> None:
_socket.update_is_writable(self.is_writable) # type: ignore[arg-type]

def reset(
self,
service_id: Optional[ObjectId] = None,
interrupt_connections: bool = False,
from_server_description: bool = False,
self, service_id: Optional[ObjectId] = None, interrupt_connections: bool = False
) -> None:
self._reset(
close=False,
service_id=service_id,
interrupt_connections=interrupt_connections,
from_server_description=from_server_description,
)

def reset_without_pause(self) -> None:
Expand Down Expand Up @@ -1054,15 +1046,18 @@ def _handle_connection_error(self, error: BaseException, phase: str) -> None:

def backoff(self) -> None:
"""Set/increase backoff mode."""
self._backoff += 1
backoff_duration_sec = _backoff(self._backoff)
backoff_duration_ms = int(backoff_duration_sec * 1000)
if self.state != PoolState.BACKOFF:
self.state = PoolState.BACKOFF
if self.enabled_for_cmap:
assert self.opts._event_listeners is not None
self.opts._event_listeners.publish_pool_backoff(self.address, backoff_duration_ms)
self._backoff_connection_time = backoff_duration_sec + time.monotonic()
with self.lock:
self._backoff += 1
backoff_duration_sec = _backoff(self._backoff)
backoff_duration_ms = int(backoff_duration_sec * 1000)
if self.state != PoolState.BACKOFF:
self.state = PoolState.BACKOFF
if self.enabled_for_cmap:
assert self.opts._event_listeners is not None
self.opts._event_listeners.publish_pool_backoff(
self.address, self._backoff, backoff_duration_ms
)
self._backoff_connection_time = backoff_duration_sec + time.monotonic()

# Log the pool backoff message.
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
Expand All @@ -1072,6 +1067,7 @@ def backoff(self) -> None:
clientId=self._client_id,
serverHost=self.address[0],
serverPort=self.address[1],
attempt=self._backoff,
durationMS=backoff_duration_ms,
reason=_verbose_connection_error_reason(ConnectionClosedReason.POOL_BACKOFF),
error=ConnectionClosedReason.POOL_BACKOFF,
Expand Down Expand Up @@ -1146,9 +1142,11 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect
self.active_contexts.discard(tmp_context)
if tmp_context.cancelled:
conn.cancel_context.cancel()
has_completed_hello = False
try:
if not self.is_sdam:
conn.hello()
has_completed_hello = True
self.is_writable = conn.is_writable
if handler:
handler.contribute_socket(conn, completed_handshake=False)
Expand All @@ -1158,7 +1156,8 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect
except BaseException as e:
with self.lock:
self.active_contexts.discard(conn.cancel_context)
self._handle_connection_error(e, "hello")
if not has_completed_hello:
self._handle_connection_error(e, "hello")
conn.close_conn(ConnectionClosedReason.ERROR)
raise

Expand Down
7 changes: 3 additions & 4 deletions pymongo/synchronous/topology.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
_SDAMStatusMessage,
_ServerSelectionStatusMessage,
)
from pymongo.pool import PoolState
from pymongo.pool_options import PoolOptions
from pymongo.server_description import ServerDescription
from pymongo.server_selectors import (
Expand Down Expand Up @@ -485,7 +486,7 @@ def _process_change(
server_description.is_server_type_known and new_td.topology_type == TOPOLOGY_TYPE.Single
):
server = self._servers.get(server_description.address)
if server:
if server and server.pool.state != PoolState.BACKOFF:
server.pool.ready()

suppress_event = sd_old == server_description
Expand Down Expand Up @@ -555,9 +556,7 @@ def on_change(
if reset_pool:
server = self._servers.get(server_description.address)
if server:
server.pool.reset(
interrupt_connections=interrupt_connections, from_server_description=True
)
server.pool.reset(interrupt_connections=interrupt_connections)

def _process_srv_update(self, seedlist: list[tuple[str, Any]]) -> None:
"""Process a new seedlist on an opened topology.
Expand Down
Loading