Skip to content

Commit

Permalink
[IMP] test impprove HTTPCase opener
Browse files Browse the repository at this point in the history
Simplify website crawler using the generic HTTPCase opener
  • Loading branch information
antonylesuisse committed Jun 29, 2014
1 parent f9e24e1 commit 78e044b
Show file tree
Hide file tree
Showing 7 changed files with 123 additions and 197 deletions.
2 changes: 1 addition & 1 deletion addons/website/tests/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
import test_converter
import test_requests
import test_crawl
import test_ui
import test_views
35 changes: 0 additions & 35 deletions addons/website/tests/cases.py

This file was deleted.

84 changes: 84 additions & 0 deletions addons/website/tests/test_crawl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# -*- coding: utf-8 -*-
import logging
import urlparse
import unittest2
import urllib2
import time
import werkzeug.urls

import lxml.html

import openerp
from openerp import tools

_logger = logging.getLogger(__name__)

class Crawler(openerp.tests.HttpCase):
""" Test suite crawling an openerp CMS instance and checking that all
internal links lead to a 200 response.
If a username and a password are provided, authenticates the user before
starting the crawl
"""

at_install = False
post_install = True

def crawl(self, url, seen=None, msg=''):
if seen == None:
seen = set()
if url in seen:
return seen
else:
seen.add(url)

_logger.info("%s %s", msg, url)
r = self.url_open(url)
code = r.getcode()
self.assertIn( code, xrange(200, 300), "%s Fetching %s returned error response (%d)" % (msg, url, code))

if r.info().gettype() == 'text/html':
doc = lxml.html.fromstring(r.read())
for link in doc.xpath('//a[@href]'):
href = link.get('href')

parts = urlparse.urlsplit(href)
# href with any fragment removed
href = urlparse.urlunsplit((
parts.scheme,
parts.netloc,
parts.path,
parts.query,
''
))

# FIXME: handle relative link (not parts.path.startswith /)
if parts.netloc or \
not parts.path.startswith('/') or \
parts.path == '/web' or\
parts.path.startswith('/web/') or \
parts.path.startswith('/en_US/') or \
(parts.scheme and parts.scheme not in ('http', 'https')):
continue

self.crawl(href, seen, msg)
return seen


def test_10_crawl_public(self):
t0 = time.time()
seen = self.crawl('/', msg='Anonymous Coward')
_logger.log(25, "public crawled %s urls in %.2fs", len(seen) ,time.time() - t0)

def test_20_crawl_demo(self):
t0 = time.time()
self.authenticate('demo', 'demo')
seen = self.crawl('/', msg='demo')
_logger.log(25, "demo crawled %s urls in %.2fs", len(seen), time.time() - t0)

def test_30_crawl_admin(self):
t0 = time.time()
self.authenticate('admin', 'admin')
seen = self.crawl('/', msg='admin')
_logger.log(25, "admin crawled %s urls in %.2fs", len(seen), time.time() - t0)

155 changes: 0 additions & 155 deletions addons/website/tests/test_requests.py

This file was deleted.

4 changes: 2 additions & 2 deletions openerp/modules/loading.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,12 +231,12 @@ def _load_data(cr, module_name, idref, mode, kind):
registry._init_modules.add(package.name)
cr.commit()

_logger.log(25, "%s modules loaded in %.2fs", len(graph), time.time() - ta0)
_logger.log(25, "%s modules loaded in %.2fs", len(graph), time.time() - ta0)

# The query won't be valid for models created later (i.e. custom model
# created after the registry has been loaded), so empty its result.
registry.fields_by_model = None

cr.commit()

return loaded_modules, processed_modules
Expand Down
3 changes: 2 additions & 1 deletion openerp/modules/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,8 @@ def run_unit_tests(module_name, dbname, position=runs_at_install):
tm0 = time.time()
_logger.info('%s running tests.', m.__name__)
result = unittest2.TextTestRunner(verbosity=2, stream=TestStream(m.__name__)).run(suite)
_logger.log(25, "%s tested in %.2fs", m.__name__, time.time() - tm0)
if time.time() - tm0 > 5:
_logger.log(25, "%s tested in %.2fs", m.__name__, time.time() - tm0)
if not result.wasSuccessful():
r = False
_logger.error("Module %s: %d failures, %d errors", module_name, len(result.failures), len(result.errors))
Expand Down
37 changes: 34 additions & 3 deletions openerp/tests/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,25 @@ def tearDownClass(cls):
cls.cr.rollback()
cls.cr.close()

class RedirectHandler(urllib2.HTTPRedirectHandler):
"""
HTTPRedirectHandler is predicated upon HTTPErrorProcessor being used and
works by intercepting 3xy "errors".
Inherit from it to handle 3xy non-error responses instead, as we're not
using the error processor
"""

def http_response(self, request, response):
code, msg, hdrs = response.code, response.msg, response.info()

if 300 <= code < 400:
return self.parent.error(
'http', request, response, code, msg, hdrs)

return response

https_response = http_response

class HttpCase(TransactionCase):
""" Transactionnal HTTP TestCase with url_open and phantomjs helpers.
Expand All @@ -152,18 +171,30 @@ def setUp(self):
self.session.db = DB
openerp.http.root.session_store.save(self.session)
self.localstorage_path = mkdtemp()
# setup an url opener helper
self.opener = urllib2.OpenerDirector()
self.opener.add_handler(urllib2.UnknownHandler())
self.opener.add_handler(urllib2.HTTPHandler())
self.opener.add_handler(urllib2.HTTPSHandler())
self.opener.add_handler(urllib2.HTTPCookieProcessor())
self.opener.add_handler(RedirectHandler())
self.opener.addheaders.append(('Cookie', 'session_id=%s' % self.session_id))

def tearDown(self):
rmtree(self.localstorage_path)
self.registry.leave_test_mode()
super(HttpCase, self).tearDown()

def url_open(self, url, data=None, timeout=10):
opener = urllib2.build_opener()
opener.addheaders.append(('Cookie', 'session_id=%s' % self.session_id))
if url.startswith('/'):
url = "http://localhost:%s%s" % (PORT, url)
return opener.open(url, data, timeout)
return self.opener.open(url, data, timeout)

def authenticate(self, user, password):
if user is not None:
url = '/login?%s' % werkzeug.urls.url_encode({'db': DB,'login': user, 'key': password})
auth = self.url_open(url)
assert auth.getcode() < 400, "Auth failure %d" % auth.getcode()

def phantom_poll(self, phantom, timeout):
""" Phantomjs Test protocol.
Expand Down

0 comments on commit 78e044b

Please sign in to comment.