Skip to content

Commit e676a02

Browse files
committed
Adding a pure Python crypt sub-module.
Resurrects googleapis#1.
1 parent 9bd8522 commit e676a02

File tree

7 files changed

+382
-8
lines changed

7 files changed

+382
-8
lines changed

oauth2client/_pure_python_crypt.py

Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
# Copyright 2016 Google Inc. All rights reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Pure Python crypto-related routines for oauth2client.
16+
17+
Uses the ``rsa``, ``pyasn1`` and ``pyasn1_modules`` packages
18+
to parse PEM files storing PKCS#1 or PKCS#8 keys as well as
19+
certificates.
20+
"""
21+
22+
from pyasn1.codec.der import decoder
23+
from pyasn1_modules import pem
24+
from pyasn1_modules.rfc2459 import Certificate
25+
from pyasn1_modules.rfc5208 import PrivateKeyInfo
26+
import rsa
27+
import six
28+
29+
from oauth2client._helpers import _from_bytes
30+
from oauth2client._helpers import _to_bytes
31+
32+
33+
_PKCS12_ERROR = r"""\
34+
PKCS12 format is not supported by the RSA library.
35+
Either install PyOpenSSL, or please convert .p12 format
36+
to .pem format:
37+
$ cat key.p12 | \
38+
> openssl pkcs12 -nodes -nocerts -passin pass:notasecret | \
39+
> openssl rsa > key.pem
40+
"""
41+
42+
_POW2 = (128, 64, 32, 16, 8, 4, 2, 1)
43+
_PKCS1_MARKER = ('-----BEGIN RSA PRIVATE KEY-----',
44+
'-----END RSA PRIVATE KEY-----')
45+
_PKCS8_MARKER = ('-----BEGIN PRIVATE KEY-----',
46+
'-----END PRIVATE KEY-----')
47+
_PKCS8_SPEC = PrivateKeyInfo()
48+
49+
50+
def _bit_list_to_bytes(bit_list):
51+
"""Converts an iterable of 1's and 0's to bytes.
52+
53+
Combines the list 8 at a time, treating each group of 8 bits
54+
as a single byte.
55+
"""
56+
num_bits = len(bit_list)
57+
byte_vals = bytearray()
58+
for start in six.moves.xrange(0, num_bits, 8):
59+
curr_bits = bit_list[start:start + 8]
60+
char_val = sum(val * digit
61+
for val, digit in zip(_POW2, curr_bits))
62+
byte_vals.append(char_val)
63+
return bytes(byte_vals)
64+
65+
66+
class RsaVerifier(object):
67+
"""Verifies the signature on a message.
68+
69+
Args:
70+
pubkey: rsa.key.PublicKey (or equiv), The public key to verify with.
71+
"""
72+
73+
def __init__(self, pubkey):
74+
self._pubkey = pubkey
75+
76+
def verify(self, message, signature):
77+
"""Verifies a message against a signature.
78+
79+
Args:
80+
message: string or bytes, The message to verify. If string, will be
81+
encoded to bytes as utf-8.
82+
signature: string or bytes, The signature on the message. If
83+
string, will be encoded to bytes as utf-8.
84+
85+
Returns:
86+
True if message was signed by the private key associated with the
87+
public key that this object was constructed with.
88+
"""
89+
message = _to_bytes(message, encoding='utf-8')
90+
try:
91+
return rsa.pkcs1.verify(message, signature, self._pubkey)
92+
except (ValueError, rsa.pkcs1.VerificationError):
93+
return False
94+
95+
@classmethod
96+
def from_string(cls, key_pem, is_x509_cert):
97+
"""Construct an RsaVerifier instance from a string.
98+
99+
Args:
100+
key_pem: string, public key in PEM format.
101+
is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it
102+
is expected to be an RSA key in PEM format.
103+
104+
Returns:
105+
RsaVerifier instance.
106+
107+
Raises:
108+
ValueError: if the key_pem can't be parsed. In either case, error
109+
will begin with 'No PEM start marker'. If
110+
``is_x509_cert`` is True, will fail to find the
111+
"-----BEGIN CERTIFICATE-----" error, otherwise fails
112+
to find "-----BEGIN RSA PUBLIC KEY-----".
113+
"""
114+
key_pem = _to_bytes(key_pem)
115+
if is_x509_cert:
116+
der = rsa.pem.load_pem(key_pem, 'CERTIFICATE')
117+
asn1_cert, remaining = decoder.decode(der, asn1Spec=Certificate())
118+
if remaining != b'':
119+
raise ValueError('Unused bytes', remaining)
120+
121+
cert_info = asn1_cert['tbsCertificate']['subjectPublicKeyInfo']
122+
key_bytes = _bit_list_to_bytes(cert_info['subjectPublicKey'])
123+
pubkey = rsa.PublicKey.load_pkcs1(key_bytes, 'DER')
124+
else:
125+
pubkey = rsa.PublicKey.load_pkcs1(key_pem, 'PEM')
126+
return cls(pubkey)
127+
128+
129+
class RsaSigner(object):
130+
"""Signs messages with a private key.
131+
132+
Args:
133+
pkey: rsa.key.PrivateKey (or equiv), The private key to sign with.
134+
"""
135+
136+
def __init__(self, pkey):
137+
self._key = pkey
138+
139+
def sign(self, message):
140+
"""Signs a message.
141+
142+
Args:
143+
message: bytes, Message to be signed.
144+
145+
Returns:
146+
string, The signature of the message for the given key.
147+
"""
148+
message = _to_bytes(message, encoding='utf-8')
149+
return rsa.pkcs1.sign(message, self._key, 'SHA-256')
150+
151+
@classmethod
152+
def from_string(cls, key, password='notasecret'):
153+
"""Construct an RsaSigner instance from a string.
154+
155+
Args:
156+
key: string, private key in PEM format.
157+
password: string, password for private key file. Unused for PEM
158+
files.
159+
160+
Returns:
161+
RsaSigner instance.
162+
163+
Raises:
164+
ValueError if the key cannot be parsed as PKCS#1 or PKCS#8 in
165+
PEM format.
166+
"""
167+
key = _from_bytes(key) # pem expects str in Py3
168+
marker_id, key_bytes = pem.readPemBlocksFromFile(
169+
six.StringIO(key), _PKCS1_MARKER, _PKCS8_MARKER)
170+
171+
if marker_id == 0:
172+
pkey = rsa.key.PrivateKey.load_pkcs1(key_bytes,
173+
format='DER')
174+
elif marker_id == 1:
175+
key_info, remaining = decoder.decode(
176+
key_bytes, asn1Spec=_PKCS8_SPEC)
177+
if remaining != b'':
178+
raise ValueError('Unused bytes', remaining)
179+
pkey_info = key_info.getComponentByName('privateKey')
180+
pkey = rsa.key.PrivateKey.load_pkcs1(pkey_info.asOctets(),
181+
format='DER')
182+
else:
183+
raise ValueError('No key could be detected.')
184+
185+
return cls(pkey)

oauth2client/crypt.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
from oauth2client._helpers import _to_bytes
2525
from oauth2client._helpers import _urlsafe_b64decode
2626
from oauth2client._helpers import _urlsafe_b64encode
27+
from oauth2client._pure_python_crypt import RsaSigner
28+
from oauth2client._pure_python_crypt import RsaVerifier
2729

2830

2931
CLOCK_SKEW_SECS = 300 # 5 minutes in seconds
@@ -65,8 +67,8 @@ def _bad_pkcs12_key_as_pem(*args, **kwargs):
6567
Signer = PyCryptoSigner
6668
Verifier = PyCryptoVerifier
6769
else: # pragma: NO COVER
68-
raise ImportError('No encryption library found. Please install either '
69-
'PyOpenSSL, or PyCrypto 2.6 or later')
70+
Signer = RsaSigner
71+
Verifier = RsaVerifier
7072

7173

7274
def make_signed_jwt(signer, payload):

tests/data/privatekey.pub

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
-----BEGIN RSA PUBLIC KEY-----
2+
MIIBCgKCAQEA4ej0p7bQ7L/r4rVGUz9RN4VQWoej1Bg1mYWIDYslvKrk1gpj7wZg
3+
kdmM7oVK2OfgrSj/FCTkInKPqaCR0gD7K80q+mLBrN3PUkDrJQZpvRZIff3/xmVU
4+
1WeruQLFJjnFb2dqu0s/FY/2kWiJtBCakXvXEOb7zfbINuayL+MSsCGSdVYsSliS
5+
5qQpgyDap+8b5fpXZVJkq92hrcNtbkg7hCYUJczt8n9hcCTJCfUpApvaFQ18pe+z
6+
pyl4+WzkP66I28hniMQyUlA1hBiskT7qiouq0m8IOodhv2fagSZKjOTTU2xkSBc/
7+
/fy3ZpsL7WqgsZS7Q+0VRK8gKfqkxg5OYQIDAQAB
8+
-----END RSA PUBLIC KEY-----
File renamed without changes.

tests/test__pure_python_crypt.py

Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
# Copyright 2016 Google Inc. All rights reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Unit tests for oauth2client._pure_python_crypt."""
16+
17+
import os
18+
19+
import mock
20+
from pyasn1_modules import pem
21+
import rsa
22+
import six
23+
import unittest2
24+
25+
from oauth2client._helpers import _from_bytes
26+
from oauth2client import _pure_python_crypt
27+
from oauth2client.crypt import RsaSigner
28+
from oauth2client.crypt import RsaVerifier
29+
30+
31+
class TestRsaVerifier(unittest2.TestCase):
32+
33+
PUBLIC_KEY_FILENAME = os.path.join(os.path.dirname(__file__),
34+
'data', 'privatekey.pub')
35+
PUBLIC_CERT_FILENAME = os.path.join(os.path.dirname(__file__),
36+
'data', 'public_cert.pem')
37+
PRIVATE_KEY_FILENAME = os.path.join(os.path.dirname(__file__),
38+
'data', 'privatekey.pem')
39+
40+
def _load_public_key_bytes(self):
41+
with open(self.PUBLIC_KEY_FILENAME, 'rb') as fh:
42+
return fh.read()
43+
44+
def _load_public_cert_bytes(self):
45+
with open(self.PUBLIC_CERT_FILENAME, 'rb') as fh:
46+
return fh.read()
47+
48+
def _load_private_key_bytes(self):
49+
with open(self.PRIVATE_KEY_FILENAME, 'rb') as fh:
50+
return fh.read()
51+
52+
def test_verify_success(self):
53+
to_sign = b'foo'
54+
signer = RsaSigner.from_string(self._load_private_key_bytes())
55+
actual_signature = signer.sign(to_sign)
56+
57+
verifier = RsaVerifier.from_string(self._load_public_key_bytes(),
58+
is_x509_cert=False)
59+
self.assertTrue(verifier.verify(to_sign, actual_signature))
60+
61+
def test_verify_unicode_success(self):
62+
to_sign = u'foo'
63+
signer = RsaSigner.from_string(self._load_private_key_bytes())
64+
actual_signature = signer.sign(to_sign)
65+
66+
verifier = RsaVerifier.from_string(self._load_public_key_bytes(),
67+
is_x509_cert=False)
68+
self.assertTrue(verifier.verify(to_sign, actual_signature))
69+
70+
def test_verify_failure(self):
71+
verifier = RsaVerifier.from_string(self._load_public_key_bytes(),
72+
is_x509_cert=False)
73+
bad_signature1 = b''
74+
self.assertFalse(verifier.verify(b'foo', bad_signature1))
75+
bad_signature2 = b'a'
76+
self.assertFalse(verifier.verify(b'foo', bad_signature2))
77+
78+
def test_from_string_pub_key(self):
79+
public_key = self._load_public_key_bytes()
80+
verifier = RsaVerifier.from_string(public_key, is_x509_cert=False)
81+
self.assertIsInstance(verifier, RsaVerifier)
82+
self.assertIsInstance(verifier._pubkey, rsa.key.PublicKey)
83+
84+
def test_from_string_pub_key_unicode(self):
85+
public_key = _from_bytes(self._load_public_key_bytes())
86+
verifier = RsaVerifier.from_string(public_key, is_x509_cert=False)
87+
self.assertIsInstance(verifier, RsaVerifier)
88+
self.assertIsInstance(verifier._pubkey, rsa.key.PublicKey)
89+
90+
def test_from_string_pub_cert(self):
91+
public_cert = self._load_public_cert_bytes()
92+
verifier = RsaVerifier.from_string(public_cert, is_x509_cert=True)
93+
self.assertIsInstance(verifier, RsaVerifier)
94+
self.assertIsInstance(verifier._pubkey, rsa.key.PublicKey)
95+
96+
def test_from_string_pub_cert_unicode(self):
97+
public_cert = _from_bytes(self._load_public_cert_bytes())
98+
verifier = RsaVerifier.from_string(public_cert, is_x509_cert=True)
99+
self.assertIsInstance(verifier, RsaVerifier)
100+
self.assertIsInstance(verifier._pubkey, rsa.key.PublicKey)
101+
102+
def test_from_string_pub_cert_failure(self):
103+
cert_bytes = self._load_public_cert_bytes()
104+
true_der = rsa.pem.load_pem(cert_bytes, 'CERTIFICATE')
105+
with mock.patch('rsa.pem.load_pem',
106+
return_value=true_der + b'extra') as load_pem:
107+
with self.assertRaises(ValueError):
108+
RsaVerifier.from_string(cert_bytes, is_x509_cert=True)
109+
load_pem.assert_called_once_with(cert_bytes, 'CERTIFICATE')
110+
111+
112+
class TestRsaSigner(unittest2.TestCase):
113+
114+
PKCS1_KEY_FILENAME = os.path.join(os.path.dirname(__file__),
115+
'data', 'privatekey.pem')
116+
PKCS8_KEY_FILENAME = os.path.join(os.path.dirname(__file__),
117+
'data', 'pem_from_pkcs12.pem')
118+
PKCS12_KEY_FILENAME = os.path.join(os.path.dirname(__file__),
119+
'data', 'privatekey.p12')
120+
121+
def _load_pkcs1_key_bytes(self):
122+
with open(self.PKCS1_KEY_FILENAME, 'rb') as fh:
123+
return fh.read()
124+
125+
def _load_pkcs8_key_bytes(self):
126+
with open(self.PKCS8_KEY_FILENAME, 'rb') as fh:
127+
return fh.read()
128+
129+
def _load_pkcs12_key_bytes(self):
130+
with open(self.PKCS12_KEY_FILENAME, 'rb') as fh:
131+
return fh.read()
132+
133+
def test_from_string_pkcs1(self):
134+
key_bytes = self._load_pkcs1_key_bytes()
135+
signer = RsaSigner.from_string(key_bytes)
136+
self.assertIsInstance(signer, RsaSigner)
137+
self.assertIsInstance(signer._key, rsa.key.PrivateKey)
138+
139+
def test_from_string_pkcs1_unicode(self):
140+
key_bytes = _from_bytes(self._load_pkcs1_key_bytes())
141+
signer = RsaSigner.from_string(key_bytes)
142+
self.assertIsInstance(signer, RsaSigner)
143+
self.assertIsInstance(signer._key, rsa.key.PrivateKey)
144+
145+
def test_from_string_pkcs8(self):
146+
key_bytes = self._load_pkcs8_key_bytes()
147+
signer = RsaSigner.from_string(key_bytes)
148+
self.assertIsInstance(signer, RsaSigner)
149+
self.assertIsInstance(signer._key, rsa.key.PrivateKey)
150+
151+
def test_from_string_pkcs8_extra_bytes(self):
152+
key_bytes = self._load_pkcs8_key_bytes()
153+
_, pem_bytes = pem.readPemBlocksFromFile(
154+
six.StringIO(_from_bytes(key_bytes)),
155+
_pure_python_crypt._PKCS8_MARKER)
156+
157+
with mock.patch('pyasn1.codec.der.decoder.decode') as mock_decode:
158+
key_info, remaining = None, 'extra'
159+
mock_decode.return_value = (key_info, remaining)
160+
with self.assertRaises(ValueError):
161+
RsaSigner.from_string(key_bytes)
162+
# Verify mock was called.
163+
mock_decode.assert_called_once_with(
164+
pem_bytes, asn1Spec=_pure_python_crypt._PKCS8_SPEC)
165+
166+
def test_from_string_pkcs8_unicode(self):
167+
key_bytes = _from_bytes(self._load_pkcs8_key_bytes())
168+
signer = RsaSigner.from_string(key_bytes)
169+
self.assertIsInstance(signer, RsaSigner)
170+
self.assertIsInstance(signer._key, rsa.key.PrivateKey)
171+
172+
def test_from_string_pkcs12(self):
173+
key_bytes = self._load_pkcs12_key_bytes()
174+
with self.assertRaises(ValueError):
175+
RsaSigner.from_string(key_bytes)
176+
177+
178+
if __name__ == '__main__': # pragma: NO COVER
179+
unittest2.main()

0 commit comments

Comments
 (0)