Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include lifecycle policy tests #239

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions katdal/chunkstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ class StoreUnavailable(OSError, ChunkStoreError):
"""Could not access underlying storage medium (offline, auth failed, etc)."""


class UnsupportedStoreFeature(ChunkStoreError):
"""The underlying store does not support the requested operation
(e.g. minio doesn't support lifecycle policies on buckets)"""


class ChunkNotFound(KeyError, ChunkStoreError):
"""The store was accessible but a chunk with the given name was not found."""

Expand Down
38 changes: 29 additions & 9 deletions katdal/chunkstore_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
standard_library.install_aliases() # noqa: E402
from builtins import object
import future.utils
from future.utils import raise_, bytes_to_native_str
from future.utils import raise_, bytes_to_native_str, raise_from

import contextlib
import io
Expand Down Expand Up @@ -53,9 +53,11 @@
botocore = None

from .chunkstore import (ChunkStore, StoreUnavailable, ChunkNotFound, BadChunk,
npy_header_and_body)
UnsupportedStoreFeature, npy_header_and_body)
from .sensordata import to_str

from . import schemas


# Lifecycle policies unfortunately use XML encoding rather than JSON
# Following path of least resistance we simply .format() this string
Expand Down Expand Up @@ -180,6 +182,8 @@ def _raise_for_status(response):
except requests.HTTPError as error:
if response.status_code == 404:
raise ChunkNotFound(str(error))
elif response.status_code == 501:
raise_from(UnsupportedStoreFeature(str(error)), error)
else:
raise StoreUnavailable(str(error))

Expand Down Expand Up @@ -280,6 +284,11 @@ class S3ChunkStore(ChunkStore):
expiry_days : int
If set to a value greater than 0 will set a future expiry time in days
for any new buckets created.
validate_xml_policies : bool
If set to true, S3 operations that use XML policies will be validated
against the inbuilt schemas. Note that these are relatively minimal
and not a guarantee of operation success on passing validation.
Requires lxml


Raises
Expand All @@ -288,7 +297,7 @@ class S3ChunkStore(ChunkStore):
If requests is not installed (it's an optional dependency otherwise)
"""

def __init__(self, session_factory, url, public_read=False, expiry_days=0):
def __init__(self, session_factory, url, public_read=False, expiry_days=0, validate_xml_policies=False):
try:
# Quick smoke test to see if the S3 server is available, by listing
# buckets. Depending on the server in use, this may return a 403
Expand All @@ -309,9 +318,10 @@ def __init__(self, session_factory, url, public_read=False, expiry_days=0):
self._url = to_str(url)
self.public_read = public_read
self.expiry_days = int(expiry_days)
self.validate_xml_policies = validate_xml_policies

@classmethod
def _from_url(cls, url, timeout, token, credentials, public_read, expiry_days):
def _from_url(cls, url, timeout, token, credentials, public_read, expiry_days, validate_xml_policies):
"""Construct S3 chunk store from endpoint URL (see :meth:`from_url`)."""
if token is not None:
parsed = urllib.parse.urlparse(url)
Expand All @@ -333,12 +343,12 @@ def session_factory():
session.mount(url, adapter)
return session

return cls(session_factory, url, public_read, expiry_days)
return cls(session_factory, url, public_read, expiry_days, validate_xml_policies)

@classmethod
def from_url(cls, url, timeout=300, extra_timeout=10,
token=None, credentials=None, public_read=False,
expiry_days=0, **kwargs):
expiry_days=0, validate_xml_policies=False, **kwargs):
"""Construct S3 chunk store from endpoint URL.

Parameters
Expand All @@ -360,6 +370,11 @@ def from_url(cls, url, timeout=300, extra_timeout=10,
expiry_days : int
If set to a value greater than 0 will set a future expiry time in days
for any new buckets created.
validate_xml_policies : bool
If set to true, S3 operations that use XML policies will be validated
against the inbuilt schemas. Note that these are relatively minimal
and not a guarantee of operation success on passing validation.
Requires lxml
kwargs : dict
Extra keyword arguments (unused)

Expand All @@ -375,15 +390,16 @@ def from_url(cls, url, timeout=300, extra_timeout=10,
# (avoiding extra dependency on Python 2, revisit when Python 3 only)
q = queue.Queue()

def _from_url(url, timeout, token, credentials, public_read, expiry_days):
def _from_url(url, timeout, token, credentials, public_read, expiry_days, validate_xml_policies):
"""Construct chunk store and return it (or exception) via queue."""
try:
q.put(cls._from_url(url, timeout, token, credentials, public_read, expiry_days))
q.put(cls._from_url(url, timeout, token, credentials, public_read, expiry_days, validate_xml_policies))
except BaseException:
q.put(sys.exc_info())

thread = threading.Thread(target=_from_url,
args=(url, timeout, token, credentials, public_read, expiry_days))
args=(url, timeout, token, credentials, public_read, expiry_days,
validate_xml_policies))
thread.daemon = True
thread.start()
if timeout is not None:
Expand Down Expand Up @@ -467,6 +483,10 @@ def create_array(self, array_name):

if self.expiry_days > 0:
xml_payload = _BASE_LIFECYCLE_POLICY.format(self.expiry_days)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So xml_payload is always _BASE_LIFECYCLE_POLICY? When would it not validate - a bad value for expiry_days? It seems like a lot of effort to solve future problems.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The grand scheme is that you will sometimes want to override the base for very user specific things - like managing storage migration say. I can see us ending up with quite a number of policies as future versions of CEPH support more elaborate management via lifecycle policies.

if self.validate_xml_policies:
if not schemas.has_lxml:
raise ImportError("XML schema validation requires lxml to be installed.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to link to the original error via raise_from. See e.g. the RDB reader handling in katsdptelstate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I agree - I prefer the very unambiguous nature of just telling the user that you need lxml if you want to use validators.

schemas.validate('MINIMAL_LIFECYCLE_POLICY', xml_payload)
b64_md5 = base64.b64encode(hashlib.md5(xml_payload.encode('utf-8')).digest()).decode('utf-8')
lifecycle_headers = {'Content-Type': 'text/xml', 'Content-MD5': b64_md5}
with self.request(None, 'PUT', url, params='lifecycle', data=xml_payload, headers=lifecycle_headers):
Expand Down
58 changes: 58 additions & 0 deletions katdal/schemas/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
"""Makes packaged XSD schemas available as validators."""

import pkg_resources
from future.utils import raise_from

has_lxml = False
validators = {}

try:
from lxml import etree
has_lxml = True
except ImportError:
pass


class ValidatorWithLog(object):
def __init__(self, validator):
self.validator = validator

def validate(self, xml_string):
"""Validates a supplied XML string against the instantiated validator.

Parameters
---------
xml_string : str
String representation of the XML to be turned into a document
and validated.

Raises
------
etree.DocumentInvalid
if `xml_string` does not validate against the XSD schema
ValueError
if `xml_string` cannot be parsed into a valid XML document
"""
try:
xml_doc = etree.fromstring(bytes(bytearray(xml_string, encoding='utf-8')))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, why bytes and bytesarray?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Python2.7 doesn't allow encoding types in bytes, but does in bytearray. Kludge to keep 2/3 compatible.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should be something in future.utils to force things one way or another.

But also, why encode at all? lxml.etree.fromstring seems perfectly happy to take Unicode.

except etree.XMLSyntaxError as e:
raise_from(ValueError("Supplied string cannot be parsed as XML"), e)
if not self.validator.validate(xml_doc):
log = self.validator.error_log
raise etree.DocumentInvalid(log.last_error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are converting etree.XMLSyntaxError to ValueError but leaving etree.DocumentInvalid as is... This seems a bit inconsistent. Will the end user typically be expected to catch invalid document exceptions? If that's the case, I'd probably also transmogrify it for the user's convenience.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simply because the XMLSyntaxError is quite messy. I also wanted to easily catch the difference between an XML syntax error and an invalid document. If I didn't use DocumentInvalid, then ValueError would be the sensible standard exception but wouldn't allow this differentiation.

return True


def validate(validator_name, string_to_validate):
try:
return validators[validator_name].validate(string_to_validate)
except KeyError:
raise_from(KeyError("Specified validator {} doesn't map to an installed"
" schema.".format(validator_name)), None)


for name in pkg_resources.resource_listdir(__name__, '.'):
if name.endswith('.xsd') and has_lxml:
xmlschema_doc = etree.parse(pkg_resources.resource_stream(__name__, name))
xml_validator = etree.XMLSchema(xmlschema_doc)
validators[name[:-4].upper()] = ValidatorWithLog(xml_validator)
38 changes: 38 additions & 0 deletions katdal/schemas/minimal_lifecycle_policy.xsd
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
<xs:schema attributeFormDefault="unqualified" elementFormDefault="qualified" xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:element name="LifecycleConfiguration">
<xs:complexType>
<xs:sequence>
<xs:element name="Rule" maxOccurs="unbounded">
<xs:complexType>
<xs:sequence>
<xs:element type="xs:string" name="ID"/>
<xs:element name="Filter">
<xs:complexType>
<xs:sequence>
<xs:element type="xs:string" name="Prefix" minOccurs="0"/>
</xs:sequence>
</xs:complexType>
</xs:element>
<xs:element type="xs:string" name="Status"/>
<xs:element name="Transition" minOccurs="0">
<xs:complexType>
<xs:sequence>
<xs:element type="xs:short" name="Days"/>
<xs:element type="xs:string" name="StorageClass"/>
</xs:sequence>
</xs:complexType>
</xs:element>
<xs:element name="Expiration">
<xs:complexType>
<xs:sequence>
<xs:element type="xs:short" name="Days"/>
</xs:sequence>
</xs:complexType>
</xs:element>
</xs:sequence>
</xs:complexType>
</xs:element>
</xs:sequence>
</xs:complexType>
</xs:element>
</xs:schema>
31 changes: 30 additions & 1 deletion katdal/test/test_chunkstore_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import contextlib
import io
import warnings
import lxml

import numpy as np
from nose import SkipTest
Expand All @@ -53,9 +54,15 @@
import requests

from katdal.chunkstore_s3 import S3ChunkStore, _AWSAuth, read_array
from katdal.chunkstore import StoreUnavailable
from katdal.chunkstore import StoreUnavailable, UnsupportedStoreFeature
from katdal.test.test_chunkstore import ChunkStoreTestBase

# No expiration rule included
_INVALID_LIFECYCLE_POLICY = """<?xml version="1.0" encoding="UTF-8"?>
<LifecycleConfiguration><Rule>
<ID>katdal_expiry_{0}_days</ID><Filter></Filter><Status>Enabled</Status>
</Rule></LifecycleConfiguration>"""


def gethostbyname_slow(host):
"""Mock DNS lookup that is meant to be slow."""
Expand Down Expand Up @@ -231,6 +238,28 @@ def test_public_read(self):
y = reader.get_chunk('public', slices, x.dtype)
np.testing.assert_array_equal(x, y)

def test_bucket_expiry(self):
# NOTE: Minimum bucket expiry time is 1 day so real world testing is impractical.
# We expect not supported since minio doesn't allow lifecycle policies
test_store = self.from_url(self.url, expiry_days=1)
assert_raises(UnsupportedStoreFeature, test_store.create_array, 'test-expiry')

def test_bucket_expiry_with_validation(self):
test_store = self.from_url(self.url, expiry_days=1, validate_xml_policies=True)
assert_raises(UnsupportedStoreFeature, test_store.create_array, 'test-expiry')

@mock.patch('katdal.chunkstore_s3._BASE_LIFECYCLE_POLICY', _INVALID_LIFECYCLE_POLICY)
def test_bucket_expiry_invalid_schema(self):
# Now test with an invalid policy
test_store = self.from_url(self.url, expiry_days=1, validate_xml_policies=True)
assert_raises(lxml.etree.DocumentInvalid, test_store.create_array, 'test-expiry')

@mock.patch('katdal.chunkstore_s3._BASE_LIFECYCLE_POLICY', "<XML?>")
def test_bucket_expiry_not_xml(self):
# Code path coverage to test a policy that is not even valid XML
test_store = self.from_url(self.url, expiry_days=1, validate_xml_policies=True)
assert_raises(ValueError, test_store.create_array, 'test-expiry')

@timed(0.1 + 0.05)
def test_store_unavailable_invalid_url(self):
# Ensure that timeouts work
Expand Down
2 changes: 2 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
author='Ludwig Schwardt',
author_email='ludwig@ska.ac.za',
packages=find_packages(),
package_data={'katdal': ['schemas/*']},
include_package_data=True,
scripts=[
'scripts/h5list.py',
'scripts/h5toms.py',
Expand Down
1 change: 1 addition & 0 deletions test-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
coverage
funcsigs
lxml==4.3.3
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want this in requirements.txt as well, to be used on site?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope - the import checking handles this being missing for production systems and lets you know that validation wont be used if it is missing.

mock
nose
pbr