Skip to content
This repository was archived by the owner on Oct 3, 2020. It is now read-only.

Overridable HTTP bits #56

Merged
merged 2 commits into from
Mar 29, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 49 additions & 28 deletions pykube/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import posixpath
import shlex
import subprocess
from typing import Optional

try:
import google.auth
Expand Down Expand Up @@ -88,21 +89,45 @@ def send(self, request, **kwargs):
config = self.kube_config

_retry_attempt = kwargs.pop("_retry_attempt", 0)
retry_func = None
retry_func = self._setup_request_auth(config, request, kwargs)
self._setup_request_certificates(config, request, kwargs)

# setup cluster API authentication
response = self._do_send(request, **kwargs)

_retry_status_codes = {HTTPStatus.UNAUTHORIZED}

if (
response.status_code in _retry_status_codes
and retry_func
and _retry_attempt < 2
):
send_kwargs = {"_retry_attempt": _retry_attempt + 1, "kube_config": config}
send_kwargs.update(kwargs)
return retry_func(send_kwargs=send_kwargs)

return response

def _setup_request_auth(self, config, request, kwargs):
"""
Set up authorization for the request.

Return an optional function to use as a retry manager if the initial request fails
with an unauthorized error.
"""
if "Authorization" in request.headers:
# request already has some auth header (e.g. Bearer token)
# don't modify/overwrite it
pass
elif "token" in config.user and config.user["token"]:
return None

if config.user.get("token"):
request.headers["Authorization"] = "Bearer {}".format(config.user["token"])
return None

elif "exec" in config.user:
if "exec" in config.user:
exec_conf = config.user["exec"]

if exec_conf["apiVersion"] == "client.authentication.k8s.io/v1alpha1":
api_version = exec_conf["apiVersion"]
if api_version == "client.authentication.k8s.io/v1alpha1":
cmd_env_vars = dict(os.environ)
for env_var in exec_conf.get("env") or []:
cmd_env_vars[env_var["name"]] = env_var["value"]
Expand All @@ -113,10 +138,17 @@ def send(self, request, **kwargs):

parsed_out = json.loads(output)
token = parsed_out["status"]["token"]
else:
raise NotImplementedError(f'auth exec api version {api_version} not implemented')

request.headers["Authorization"] = "Bearer {}".format(token)
return None

if config.user.get("username") and config.user.get("password"):
request.prepare_auth((config.user["username"], config.user["password"]))
return None

elif "auth-provider" in config.user:
if "auth-provider" in config.user:
auth_provider = config.user["auth-provider"]
if auth_provider.get("name") == "gcp":
dependencies = [google_auth_installed, jsonpath_installed]
Expand All @@ -143,55 +175,42 @@ def send(self, request, **kwargs):
auth_config.get("expiry"),
config,
)
return retry_func
elif auth_provider.get("name") == "oidc":
auth_config = auth_provider.get("config", {})
# @@@ support token refresh
if "id-token" in auth_config:
request.headers["Authorization"] = "Bearer {}".format(
auth_config["id-token"]
)
elif config.user.get("username") and config.user.get("password"):
request.prepare_auth((config.user["username"], config.user["password"]))
return None

def _setup_request_certificates(self, config, request, kwargs):
if "client-certificate" in config.user:
kwargs["cert"] = (
config.user["client-certificate"].filename(),
config.user["client-key"].filename(),
)

# setup certificate verification

if "certificate-authority" in config.cluster:
kwargs["verify"] = config.cluster["certificate-authority"].filename()
elif "insecure-skip-tls-verify" in config.cluster:
kwargs["verify"] = not config.cluster["insecure-skip-tls-verify"]

response = self._do_send(request, **kwargs)

_retry_status_codes = {HTTPStatus.UNAUTHORIZED}

if (
response.status_code in _retry_status_codes
and retry_func
and _retry_attempt < 2
):
send_kwargs = {"_retry_attempt": _retry_attempt + 1, "kube_config": config}
send_kwargs.update(kwargs)
return retry_func(send_kwargs=send_kwargs)

return response


class HTTPClient:
"""
Client for interfacing with the Kubernetes API.
"""

http_adapter_cls = KubernetesHTTPAdapter

def __init__(
self,
config: KubeConfig,
timeout: float = DEFAULT_HTTP_TIMEOUT,
dry_run: bool = False,
http_adapter: Optional[requests.adapters.HTTPAdapter] = None,
):
"""
Creates a new instance of the HTTPClient.
Expand All @@ -206,8 +225,10 @@ def __init__(

session = requests.Session()
session.headers["User-Agent"] = f"pykube-ng/{__version__}"
session.mount("https://", KubernetesHTTPAdapter(self.config))
session.mount("http://", KubernetesHTTPAdapter(self.config))
if not http_adapter:
http_adapter = self.http_adapter_cls(self.config)
session.mount("https://", http_adapter)
session.mount("http://", http_adapter)
self.session = session

@property
Expand Down