Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add KeepAliveClientRequest class for k8s async client #15220

Merged
merged 5 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
import sys
from typing import Optional, TypeVar

from kubernetes_asyncio.client import ApiClient
from aiohttp import ClientResponse
from aiohttp.client_reqrep import ClientRequest
from aiohttp.connector import Connection
from slugify import slugify

# Note: `dict(str, str)` is the Kubernetes API convention for
Expand All @@ -14,34 +16,35 @@
V1KubernetesModel = TypeVar("V1KubernetesModel")


def enable_socket_keep_alive(client: ApiClient) -> None:
class KeepAliveClientRequest(ClientRequest):
"""
Setting the keep-alive flags on the kubernetes client object.
Unfortunately neither the kubernetes library nor the urllib3 library which
kubernetes is using internally offer the functionality to enable keep-alive
messages. Thus the flags are added to be used on the underlying sockets.
aiohttp only directly implements socket keepalive for incoming connections
in its RequestHandler. For client connections, we need to set the keepalive
ourselves.

Refer to https://github.com/aio-libs/aiohttp/issues/3904#issuecomment-759205696
"""

socket_options = [(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)]
async def send(self, conn: "Connection") -> "ClientResponse":
kevingrismore marked this conversation as resolved.
Show resolved Hide resolved
sock = conn.protocol.transport.get_extra_info("socket")
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)

if hasattr(socket, "TCP_KEEPINTVL"):
socket_options.append((socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 30))
if hasattr(socket, "TCP_KEEPIDLE"):
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 30)

if hasattr(socket, "TCP_KEEPCNT"):
socket_options.append((socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 6))
if hasattr(socket, "TCP_KEEPINTVL"):
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 6)

if hasattr(socket, "TCP_KEEPIDLE"):
socket_options.append((socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 6))
if hasattr(socket, "TCP_KEEPCNT"):
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 6)

if sys.platform == "darwin":
# TCP_KEEP_ALIVE not available on socket module in macOS, but defined in
# https://github.com/apple/darwin-xnu/blob/2ff845c2e033bd0ff64b5b6aa6063a1f8f65aa32/bsd/netinet/tcp.h#L215
TCP_KEEP_ALIVE = 0x10
socket_options.append((socket.IPPROTO_TCP, TCP_KEEP_ALIVE, 30))
if sys.platform == "darwin":
# TCP_KEEP_ALIVE not available on socket module in macOS, but defined in
# https://github.com/apple/darwin-xnu/blob/2ff845c2e033bd0ff64b5b6aa6063a1f8f65aa32/bsd/netinet/tcp.h#L215
TCP_KEEP_ALIVE = 0x10
sock.setsockopt(socket.IPPROTO_TCP, TCP_KEEP_ALIVE, 30)

client.rest_client.pool_manager.connection_pool_kw[
"socket_options"
] = socket_options
return await super().send(conn)


def _slugify_name(name: str, max_length: int = 45) -> Optional[str]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,10 @@
from prefect_kubernetes.credentials import KubernetesClusterConfig
from prefect_kubernetes.events import KubernetesEventsReplicator
from prefect_kubernetes.utilities import (
KeepAliveClientRequest,
_slugify_label_key,
_slugify_label_value,
_slugify_name,
enable_socket_keep_alive,
)

MAX_ATTEMPTS = 3
Expand Down Expand Up @@ -637,10 +637,6 @@ async def _get_configured_kubernetes_client(
Returns a configured Kubernetes client.
"""
client = None
if os.environ.get(
"PREFECT_KUBERNETES_WORKER_ADD_TCP_KEEPALIVE", "TRUE"
).strip().lower() in ("true", "1"):
enable_socket_keep_alive(client)

if configuration.cluster_config:
config_dict = configuration.cluster_config.config
Expand All @@ -657,6 +653,12 @@ async def _get_configured_kubernetes_client(
except config.ConfigException:
# If in-cluster config fails, load the local kubeconfig
client = await config.new_client_from_config()

if os.environ.get(
"PREFECT_KUBERNETES_WORKER_ADD_TCP_KEEPALIVE", "TRUE"
).strip().lower() in ("true", "1"):
client.rest_client.pool_manager._request_class = KeepAliveClientRequest

try:
yield client
finally:
Expand Down
Loading