Skip to content

Commit

Permalink
Add additional tests for the pool keys.
Browse files Browse the repository at this point in the history
  • Loading branch information
jeremycline committed Apr 4, 2016
1 parent a8021b5 commit ccacc96
Showing 1 changed file with 100 additions and 18 deletions.
118 changes: 100 additions & 18 deletions test/test_poolmanager.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
import unittest

from urllib3.poolmanager import PoolManager
from urllib3.poolmanager import (
PoolManager,
pool_keys_by_scheme,
HTTPPoolKey,
HTTPSPoolKey,
)
from urllib3 import connection_from_url
from urllib3.exceptions import (
ClosedPoolError,
LocationValueError,
)
from urllib3.util import retry, timeout


class TestPoolManager(unittest.TestCase):
Expand Down Expand Up @@ -87,36 +93,108 @@ def test_contextmanager(self):

self.assertEqual(len(p.pools), 0)

def test_pools_keyed_on_ssl_arguments_from_host(self):
ssl_kw = {
def test_http_pool_key_fields(self):
"""Assert the HTTPPoolKey fields are honored when selecting a pool."""
connection_pool_kw = {
'timeout': timeout.Timeout(3.14),
'retries': retry.Retry(total=6, connect=2),
'block': True,
'strict': True,
}
p = PoolManager()
conn_pools = [
p.connection_from_url('http://example.com/'),
p.connection_from_url('http://example.com:8000/'),
p.connection_from_url('http://other.example.com/'),
]

for key, value in connection_pool_kw.items():
p.connection_pool_kw[key] = value
conn_pools.append(p.connection_from_url('http://example.com/'))

self.assertTrue(
all(
x is not y
for i, x in enumerate(conn_pools)
for j, y in enumerate(conn_pools)
if i != j
)
)
self.assertTrue(
all(
isinstance(key, HTTPPoolKey)
for key in p.pools.keys())
)

def test_http_pool_key_extra_kwargs(self):
"""Assert non-HTTPPoolKey fields are ignored when selecting a pool."""
p = PoolManager()
conn_pool = p.connection_from_url('http://example.com/')
p.connection_pool_kw['some_kwarg'] = 'that should be ignored'
other_conn_pool = p.connection_from_url('http://example.com/')

self.assertTrue(conn_pool is other_conn_pool)

def test_https_pool_key_fields(self):
"""Assert the HTTPSPoolKey fields are honored when selecting a pool."""
connection_pool_kw = {
'timeout': timeout.Timeout(3.14),
'retries': retry.Retry(total=6, connect=2),
'block': True,
'strict': True,
'key_file': '/root/totally_legit.key',
'cert_file': '/root/totally_legit.crt',
'cert_reqs': 'CERT_REQUIRED',
'ca_certs': '/root/path_to_pem',
'ssl_version': 'SSLv23_METHOD',
}
p = PoolManager(5, **ssl_kw)
conns = []
conns.append(
p.connection_from_host('example.com', 443, scheme='https')
)
p = PoolManager()
conn_pools = [
p.connection_from_url('https://example.com/'),
p.connection_from_url('https://example.com:4333/'),
p.connection_from_url('https://other.example.com/'),
]
# Asking for a connection pool with the same key should give us an
# existing pool.
dup_pools = []

for k in ssl_kw:
p.connection_pool_kw[k] = 'newval'
conns.append(
p.connection_from_host('example.com', 443, scheme='https')
)
for key, value in connection_pool_kw.items():
p.connection_pool_kw[key] = value
conn_pools.append(p.connection_from_url('https://example.com/'))
dup_pools.append(p.connection_from_url('https://example.com/'))

self.assertTrue(
all(
x is not y
for i, x in enumerate(conns)
for j, y in enumerate(conns)
for i, x in enumerate(conn_pools)
for j, y in enumerate(conn_pools)
if i != j
)
)
self.assertTrue(all(pool in conn_pools for pool in dup_pools))
self.assertTrue(
all(
isinstance(key, HTTPSPoolKey)
for key in p.pools.keys())
)

def test_https_pool_key_extra_kwargs(self):
"""Assert non-HTTPSPoolKey fields are ignored when selecting a pool."""
p = PoolManager()
conn_pool = p.connection_from_url('https://example.com/')
p.connection_pool_kw['some_kwarg'] = 'that should be ignored'
other_conn_pool = p.connection_from_url('https://example.com/')

self.assertTrue(conn_pool is other_conn_pool)

def test_pools_keyed_on_ssl_arguments_from_url(self):
def test_default_pool_keys_copy(self):
"""Assert each PoolManager gets a copy of ``pool_keys_by_scheme``."""
p = PoolManager()
self.assertEqual(p.pool_keys_by_scheme, pool_keys_by_scheme)
self.assertFalse(p.pool_keys_by_scheme is pool_keys_by_scheme)

def test_pools_keyed_with_from_host(self):
"""Assert pools are still keyed correctly with connection_from_host."""
ssl_kw = {
'key_file': '/root/totally_legit.key',
'cert_file': '/root/totally_legit.crt',
Expand All @@ -126,11 +204,15 @@ def test_pools_keyed_on_ssl_arguments_from_url(self):
}
p = PoolManager(5, **ssl_kw)
conns = []
conns.append(p.connection_from_url('https://example.com/'))
conns.append(
p.connection_from_host('example.com', 443, scheme='https')
)

for k in ssl_kw:
p.connection_pool_kw[k] = 'newval'
conns.append(p.connection_from_url('https://example.com/'))
conns.append(
p.connection_from_host('example.com', 443, scheme='https')
)

self.assertTrue(
all(
Expand Down

0 comments on commit ccacc96

Please sign in to comment.