Skip to content
This repository was archived by the owner on Jan 18, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 6 additions & 8 deletions oauth2client/_openssl_crypt.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,19 +123,17 @@ def from_string(key, password=b'notasecret'):
return OpenSSLSigner(pkey)


def pkcs12_key_as_pem(private_key_text, private_key_password):
"""Convert the contents of a PKCS12 key to PEM using OpenSSL.
def pkcs12_key_as_pem(private_key_bytes, private_key_password):
"""Convert the contents of a PKCS#12 key to PEM using pyOpenSSL.

Args:
private_key_text: String. Private key.
private_key_password: String. Password for PKCS12.
private_key_bytes: Bytes. PKCS#12 key in DER format.
private_key_password: String. Password for PKCS#12 key.

Returns:
String. PEM contents of ``private_key_text``.
String. PEM contents of ``private_key_bytes``.
"""
decoded_body = base64.b64decode(private_key_text)
private_key_password = _to_bytes(private_key_password)

pkcs12 = crypto.load_pkcs12(decoded_body, private_key_password)
pkcs12 = crypto.load_pkcs12(private_key_bytes, private_key_password)
return crypto.dump_privatekey(crypto.FILETYPE_PEM,
pkcs12.get_privatekey())
119 changes: 11 additions & 108 deletions oauth2client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,20 +298,20 @@ def to_json(self):
return self._to_json(self.NON_SERIALIZED_MEMBERS)

@classmethod
def new_from_json(cls, s):
def new_from_json(cls, json_data):
"""Utility class method to instantiate a Credentials subclass from JSON.

Expects the JSON string to have been produced by to_json().

Args:
s: string or bytes, JSON from to_json().
json_data: string or bytes, JSON from to_json().

Returns:
An instance of the subclass of Credentials that was serialized with
to_json().
"""
json_string_as_unicode = _from_bytes(s)
data = json.loads(json_string_as_unicode)
json_data_as_unicode = _from_bytes(json_data)
data = json.loads(json_data_as_unicode)
# Find and call the right classmethod from_json() to restore
# the object.
module_name = data['_module']
Expand All @@ -326,8 +326,7 @@ def new_from_json(cls, s):
module_obj = __import__(module_name,
fromlist=module_name.split('.')[:-1])
kls = getattr(module_obj, data['_class'])
from_json = getattr(kls, 'from_json')
return from_json(json_string_as_unicode)
return kls.from_json(json_data_as_unicode)

@classmethod
def from_json(cls, unused_data):
Expand Down Expand Up @@ -710,19 +709,18 @@ def retrieve_scopes(self, http):
return self.scopes

@classmethod
def from_json(cls, s):
def from_json(cls, json_data):
"""Instantiate a Credentials object from a JSON description of it.

The JSON should have been produced by calling .to_json() on the object.

Args:
data: dict, A deserialized JSON object.
json_data: string or bytes, JSON to deserialize.

Returns:
An instance of a Credentials subclass.
"""
s = _from_bytes(s)
data = json.loads(s)
data = json.loads(_from_bytes(json_data))
if (data.get('token_expiry') and
not isinstance(data['token_expiry'], datetime.datetime)):
try:
Expand Down Expand Up @@ -1070,8 +1068,8 @@ def __init__(self, access_token, user_agent, revoke_uri=None):
revoke_uri=revoke_uri)

@classmethod
def from_json(cls, s):
data = json.loads(_from_bytes(s))
def from_json(cls, json_data):
data = json.loads(_from_bytes(json_data))
retval = AccessTokenCredentials(
data['access_token'],
data['user_agent'])
Expand Down Expand Up @@ -1190,7 +1188,7 @@ class GoogleCredentials(OAuth2Credentials):
NON_SERIALIZED_MEMBERS = (
frozenset(['_private_key']) |
OAuth2Credentials.NON_SERIALIZED_MEMBERS)

"""Members that aren't serialized when object is converted to JSON."""

def __init__(self, access_token, client_id, client_secret, refresh_token,
token_expiry, token_uri, user_agent,
Expand Down Expand Up @@ -1630,101 +1628,6 @@ def _RequireCryptoOrDie():
raise CryptoUnavailableError('No crypto library available')


class SignedJwtAssertionCredentials(AssertionCredentials):
"""Credentials object used for OAuth 2.0 Signed JWT assertion grants.

This credential does not require a flow to instantiate because it
represents a two legged flow, and therefore has all of the required
information to generate and refresh its own access tokens.

SignedJwtAssertionCredentials requires either PyOpenSSL, or PyCrypto
2.6 or later. For App Engine you may also consider using
AppAssertionCredentials.
"""

MAX_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds

@util.positional(4)
def __init__(self,
service_account_name,
private_key,
scope,
private_key_password='notasecret',
user_agent=None,
token_uri=GOOGLE_TOKEN_URI,
revoke_uri=GOOGLE_REVOKE_URI,
**kwargs):
"""Constructor for SignedJwtAssertionCredentials.

Args:
service_account_name: string, id for account, usually an email
address.
private_key: string or bytes, private key in PKCS12 or PEM format.
scope: string or iterable of strings, scope(s) of the credentials
being requested.
private_key_password: string, password for private_key, unused if
private_key is in PEM format.
user_agent: string, HTTP User-Agent to provide for this
application.
token_uri: string, URI for token endpoint. For convenience defaults
to Google's endpoints but any OAuth 2.0 provider can be
used.
revoke_uri: string, URI for revoke endpoint.
kwargs: kwargs, Additional parameters to add to the JWT token, for
example sub=joe@xample.org.

Raises:
CryptoUnavailableError if no crypto library is available.
"""
_RequireCryptoOrDie()
super(SignedJwtAssertionCredentials, self).__init__(
None,
user_agent=user_agent,
token_uri=token_uri,
revoke_uri=revoke_uri,
)

self.scope = util.scopes_to_string(scope)

# Keep base64 encoded so it can be stored in JSON.
self.private_key = base64.b64encode(_to_bytes(private_key))
self.private_key_password = private_key_password
self.service_account_name = service_account_name
self.kwargs = kwargs

@classmethod
def from_json(cls, s):
data = json.loads(_from_bytes(s))
retval = SignedJwtAssertionCredentials(
data['service_account_name'],
base64.b64decode(data['private_key']),
data['scope'],
private_key_password=data['private_key_password'],
user_agent=data['user_agent'],
token_uri=data['token_uri'],
**data['kwargs']
)
retval.invalid = data['invalid']
retval.access_token = data['access_token']
return retval

def _generate_assertion(self):
"""Generate the assertion that will be used in the request."""
now = int(time.time())
payload = {
'aud': self.token_uri,
'scope': self.scope,
'iat': now,
'exp': now + SignedJwtAssertionCredentials.MAX_TOKEN_LIFETIME_SECS,
'iss': self.service_account_name
}
payload.update(self.kwargs)
logger.debug(str(payload))

private_key = base64.b64decode(self.private_key)
return crypt.make_signed_jwt(crypt.Signer.from_string(
private_key, self.private_key_password), payload)

# Only used in verify_id_token(), which is always calling to the same URI
# for the certs.
_cached_http = httplib2.Http(MemoryCache())
Expand Down
3 changes: 1 addition & 2 deletions oauth2client/service_account.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,7 @@ def from_json(cls, json_data):
# state.
pkcs12_val = base64.b64decode(pkcs12_val)
password = json_data['_private_key_password']
signer = crypt.Signer.from_string(private_key_pkcs12,
private_key_password)
signer = crypt.Signer.from_string(pkcs12_val, password)

credentials = cls(
json_data['_service_account_email'],
Expand Down
5 changes: 5 additions & 0 deletions tests/test__pure_python_crypt.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,11 @@ def test_from_string_pkcs12(self):
with self.assertRaises(ValueError):
RsaSigner.from_string(key_bytes)

def test_from_string_bogus_key(self):
key_bytes = 'bogus-key'
with self.assertRaises(ValueError):
RsaSigner.from_string(key_bytes)


if __name__ == '__main__': # pragma: NO COVER
unittest2.main()
14 changes: 11 additions & 3 deletions tests/test__pycrypto_crypt.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@
"""Unit tests for oauth2client._pycrypto_crypt."""

import os
import unittest
import unittest2

from oauth2client.crypt import PyCryptoSigner
from oauth2client.crypt import PyCryptoVerifier


class TestPyCryptoVerifier(unittest.TestCase):
class TestPyCryptoVerifier(unittest2.TestCase):

PUBLIC_CERT_FILENAME = os.path.join(os.path.dirname(__file__),
'data', 'public_cert.pem')
Expand Down Expand Up @@ -63,5 +63,13 @@ def test_from_string_unicode_key(self):
self.assertTrue(isinstance(verifier, PyCryptoVerifier))


class TestPyCryptoSigner(unittest2.TestCase):

def test_from_string_bad_key(self):
key_bytes = 'definitely-not-pem-format'
with self.assertRaises(NotImplementedError):
PyCryptoSigner.from_string(key_bytes)


if __name__ == '__main__': # pragma: NO COVER
unittest.main()
unittest2.main()
41 changes: 18 additions & 23 deletions tests/test_crypt.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,17 @@

from oauth2client import _helpers
from oauth2client.client import HAS_OPENSSL
from oauth2client.client import SignedJwtAssertionCredentials
from oauth2client import crypt
from oauth2client.service_account import ServiceAccountCredentials


def data_filename(filename):
return os.path.join(os.path.dirname(__file__), 'data', filename)


def datafile(filename):
f = open(os.path.join(os.path.dirname(__file__), 'data', filename), 'rb')
data = f.read()
f.close()
return data
with open(data_filename(filename), 'rb') as file_obj:
return file_obj.read()


class Test__bad_pkcs12_key_as_pem(unittest.TestCase):
Expand All @@ -39,23 +41,23 @@ def test_fails(self):

class Test_pkcs12_key_as_pem(unittest.TestCase):

def _make_signed_jwt_creds(self, private_key_file='privatekey.p12',
private_key=None):
private_key = private_key or datafile(private_key_file)
return SignedJwtAssertionCredentials(
def _make_svc_account_creds(self, private_key_file='privatekey.p12'):
filename = data_filename(private_key_file)
credentials = ServiceAccountCredentials.from_p12_keyfile(
'some_account@example.com',
private_key,
scope='read+write',
sub='joe@example.org')
filename,
scopes='read+write')
credentials._kwargs['sub'] ='joe@example.org'
return credentials

def _succeeds_helper(self, password=None):
self.assertEqual(True, HAS_OPENSSL)

credentials = self._make_signed_jwt_creds()
credentials = self._make_svc_account_creds()
if password is None:
password = credentials.private_key_password
pem_contents = crypt.pkcs12_key_as_pem(credentials.private_key,
password)
password = credentials._private_key_password
pem_contents = crypt.pkcs12_key_as_pem(
credentials._private_key_pkcs12, password)
pkcs12_key_as_pem = datafile('pem_from_pkcs12.pem')
pkcs12_key_as_pem = _helpers._parse_pem_key(pkcs12_key_as_pem)
alternate_pem = datafile('pem_from_pkcs12_alternate.pem')
Expand All @@ -68,13 +70,6 @@ def test_succeeds_with_unicode_password(self):
password = u'notasecret'
self._succeeds_helper(password)

def test_with_nonsense_key(self):
from OpenSSL import crypto
credentials = self._make_signed_jwt_creds(private_key=b'NOT_A_KEY')
self.assertRaises(crypto.Error, crypt.pkcs12_key_as_pem,
credentials.private_key,
credentials.private_key_password)


class Test__verify_signature(unittest.TestCase):

Expand Down
Loading