Skip to content

fix(types): add full type annotations to oauth utils and exceptions (#1508) #1718

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions google/auth/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,16 @@ def retryable(self):


class OAuthError(GoogleAuthError):
"""Used to indicate an error occurred during an OAuth related HTTP
request."""
"""Used to indicate an error occurred during an OAuth-related HTTP request."""

def __init__(
self,
message: Optional[str] = None,
response_body: Optional[str] = None,
**kwargs: Any,
) -> None:
super().__init__(message or "", **kwargs)
self.response_body = response_body


class ReauthFailError(RefreshError):
Expand Down
144 changes: 52 additions & 92 deletions google/oauth2/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,154 +14,114 @@

"""OAuth 2.0 Utilities.

This module provides implementations for various OAuth 2.0 utilities.
This includes `OAuth error handling`_ and
`Client authentication for OAuth flows`_.

OAuth error handling
--------------------
This will define interfaces for handling OAuth related error responses as
stated in `RFC 6749 section 5.2`_.
This will include a common function to convert these HTTP error responses to a
:class:`google.auth.exceptions.OAuthError` exception.


Client authentication for OAuth flows
-------------------------------------
We introduce an interface for defining client authentication credentials based
on `RFC 6749 section 2.3.1`_. This will expose the following
capabilities:

* Ability to support basic authentication via request header.
* Ability to support bearer token authentication via request header.
* Ability to support client ID / secret authentication via request body.

.. _RFC 6749 section 2.3.1: https://tools.ietf.org/html/rfc6749#section-2.3.1
.. _RFC 6749 section 5.2: https://tools.ietf.org/html/rfc6749#section-5.2
Provides OAuth error handling and client authentication utilities.
"""

import abc
import base64
import enum
import json
from typing import Mapping, Optional, MutableMapping, Any

from google.auth import exceptions


# OAuth client authentication based on
# https://tools.ietf.org/html/rfc6749#section-2.3.
class ClientAuthType(enum.Enum):
basic = 1
request_body = 2


class ClientAuthentication(object):
"""Defines the client authentication credentials for basic and request-body
types based on https://tools.ietf.org/html/rfc6749#section-2.3.1.
"""
class ClientAuthentication:
"""OAuth client authentication credentials.

def __init__(self, client_auth_type, client_id, client_secret=None):
"""Instantiates a client authentication object containing the client ID
and secret credentials for basic and response-body auth.
Args:
client_auth_type: The client authentication type.
client_id: The client ID.
client_secret: The client secret (optional).
"""

Args:
client_auth_type (google.oauth2.oauth_utils.ClientAuthType): The
client authentication type.
client_id (str): The client ID.
client_secret (Optional[str]): The client secret.
"""
def __init__(
self,
client_auth_type: ClientAuthType,
client_id: str,
client_secret: Optional[str] = None,
) -> None:
self.client_auth_type = client_auth_type
self.client_id = client_id
self.client_secret = client_secret


class OAuthClientAuthHandler(metaclass=abc.ABCMeta):
"""Abstract class for handling client authentication in OAuth-based
operations.
"""
"""Handles client authentication in OAuth flows."""

def __init__(self, client_authentication=None):
"""Instantiates an OAuth client authentication handler.

Args:
client_authentication (Optional[google.oauth2.utils.ClientAuthentication]):
The OAuth client authentication credentials if available.
"""
super(OAuthClientAuthHandler, self).__init__()
def __init__(self, client_authentication: Optional[ClientAuthentication] = None) -> None:
self._client_authentication = client_authentication

def apply_client_authentication_options(
self, headers, request_body=None, bearer_token=None
):
"""Applies client authentication on the OAuth request's headers or POST
body.
self,
headers: MutableMapping[str, str],
request_body: Optional[MutableMapping[str, str]] = None,
bearer_token: Optional[str] = None,
) -> None:
"""Applies authentication via headers or POST body.

Args:
headers (Mapping[str, str]): The HTTP request header.
request_body (Optional[Mapping[str, str]]): The HTTP request body
dictionary. For requests that do not support request body, this
is None and will be ignored.
bearer_token (Optional[str]): The optional bearer token.
headers: HTTP headers.
request_body: POST body dictionary (optional).
bearer_token: Bearer token (optional).
"""
# Inject authenticated header.
self._inject_authenticated_headers(headers, bearer_token)
# Inject authenticated request body.
if bearer_token is None:
self._inject_authenticated_request_body(request_body)

def _inject_authenticated_headers(self, headers, bearer_token=None):
def _inject_authenticated_headers(
self,
headers: MutableMapping[str, str],
bearer_token: Optional[str] = None,
) -> None:
if bearer_token is not None:
headers["Authorization"] = "Bearer %s" % bearer_token
headers["Authorization"] = f"Bearer {bearer_token}"
elif (
self._client_authentication is not None
and self._client_authentication.client_auth_type is ClientAuthType.basic
and self._client_authentication.client_auth_type == ClientAuthType.basic
):
username = self._client_authentication.client_id
password = self._client_authentication.client_secret or ""
credentials = base64.b64encode(f"{username}:{password}".encode()).decode()
headers["Authorization"] = f"Basic {credentials}"

credentials = base64.b64encode(
("%s:%s" % (username, password)).encode()
).decode()
headers["Authorization"] = "Basic %s" % credentials

def _inject_authenticated_request_body(self, request_body):
def _inject_authenticated_request_body(
self,
request_body: Optional[MutableMapping[str, str]],
) -> None:
if (
self._client_authentication is not None
and self._client_authentication.client_auth_type
is ClientAuthType.request_body
and self._client_authentication.client_auth_type == ClientAuthType.request_body
):
if request_body is None:
raise exceptions.OAuthError(
"HTTP request does not support request-body"
)
else:
request_body["client_id"] = self._client_authentication.client_id
request_body["client_secret"] = (
self._client_authentication.client_secret or ""
)
raise exceptions.OAuthError("HTTP request does not support request-body")

request_body["client_id"] = self._client_authentication.client_id
request_body["client_secret"] = self._client_authentication.client_secret or ""


def handle_error_response(response_body):
"""Translates an error response from an OAuth operation into an
OAuthError exception.
def handle_error_response(response_body: str) -> None:
"""Converts OAuth error JSON response to an exception.

Args:
response_body (str): The decoded response data.
response_body: The decoded response data as string.

Raises:
google.auth.exceptions.OAuthError
OAuthError: A typed exception with error details.
"""
try:
error_components = []
error_data = json.loads(response_body)

error_components.append("Error code {}".format(error_data["error"]))
error_data: dict[str, Any] = json.loads(response_body)
error_components = [f"Error code {error_data['error']}"]
if "error_description" in error_data:
error_components.append(": {}".format(error_data["error_description"]))
error_components.append(f": {error_data['error_description']}")
if "error_uri" in error_data:
error_components.append(" - {}".format(error_data["error_uri"]))
error_components.append(f" - {error_data['error_uri']}")
error_details = "".join(error_components)
# If no details could be extracted, use the response data.
except (KeyError, ValueError):
error_details = response_body

Expand Down