|
7 | 7 | # ----------------------------------------------------------------------------- |
8 | 8 |
|
9 | 9 | from unittest import TestCase, main |
10 | | -from tempfile import mkstemp |
| 10 | +from tempfile import mkstemp, NamedTemporaryFile, TemporaryFile |
11 | 11 | from os import close, remove, makedirs, mkdir |
12 | 12 | from os.path import join, exists, basename |
13 | 13 | from shutil import rmtree |
14 | 14 | from datetime import datetime |
15 | 15 | from functools import partial |
16 | 16 | from string import punctuation |
17 | | - |
| 17 | +import h5py |
| 18 | +from six import StringIO, BytesIO |
18 | 19 | import pandas as pd |
19 | 20 |
|
20 | 21 | from qiita_core.util import qiita_test_checker |
@@ -869,5 +870,77 @@ def test_get_artifacts_information(self): |
869 | 870 | self.assertItemsEqual(obs, exp) |
870 | 871 |
|
871 | 872 |
|
| 873 | +class TestFilePathOpening(TestCase): |
| 874 | + """Tests adapted from scikit-bio's skbio.io.util tests""" |
| 875 | + def test_is_string_or_bytes(self): |
| 876 | + self.assertTrue(qdb.util._is_string_or_bytes('foo')) |
| 877 | + self.assertTrue(qdb.util._is_string_or_bytes(u'foo')) |
| 878 | + self.assertTrue(qdb.util._is_string_or_bytes(b'foo')) |
| 879 | + self.assertFalse(qdb.util._is_string_or_bytes(StringIO('bar'))) |
| 880 | + self.assertFalse(qdb.util._is_string_or_bytes([1])) |
| 881 | + |
| 882 | + def test_file_closed(self): |
| 883 | + """File gets closed in decorator""" |
| 884 | + f = NamedTemporaryFile('r') |
| 885 | + filepath = f.name |
| 886 | + with qdb.util.open_file(filepath) as fh: |
| 887 | + pass |
| 888 | + self.assertTrue(fh.closed) |
| 889 | + |
| 890 | + def test_file_closed_harder(self): |
| 891 | + """File gets closed in decorator, even if exceptions happen.""" |
| 892 | + f = NamedTemporaryFile('r') |
| 893 | + filepath = f.name |
| 894 | + try: |
| 895 | + with qdb.util.open_file(filepath) as fh: |
| 896 | + raise TypeError |
| 897 | + except TypeError: |
| 898 | + self.assertTrue(fh.closed) |
| 899 | + else: |
| 900 | + # If we're here, no exceptions have been raised inside the |
| 901 | + # try clause, so the context manager swallowed them. No |
| 902 | + # good. |
| 903 | + raise Exception("`open_file` didn't propagate exceptions") |
| 904 | + |
| 905 | + def test_filehandle(self): |
| 906 | + """Filehandles slip through untouched""" |
| 907 | + with TemporaryFile('r') as fh: |
| 908 | + with qdb.util.open_file(fh) as ffh: |
| 909 | + self.assertTrue(fh is ffh) |
| 910 | + # And it doesn't close the file-handle |
| 911 | + self.assertFalse(fh.closed) |
| 912 | + |
| 913 | + def test_StringIO(self): |
| 914 | + """StringIO (useful e.g. for testing) slips through.""" |
| 915 | + f = StringIO("File contents") |
| 916 | + with qdb.util.open_file(f) as fh: |
| 917 | + self.assertTrue(fh is f) |
| 918 | + |
| 919 | + def test_BytesIO(self): |
| 920 | + """BytesIO (useful e.g. for testing) slips through.""" |
| 921 | + f = BytesIO(b"File contents") |
| 922 | + with qdb.util.open_file(f) as fh: |
| 923 | + self.assertTrue(fh is f) |
| 924 | + |
| 925 | + def test_hdf5IO(self): |
| 926 | + f = h5py.File('test', driver='core', backing_store=False) |
| 927 | + with qdb.util.open_file(f) as fh: |
| 928 | + self.assertTrue(fh is f) |
| 929 | + |
| 930 | + def test_hdf5IO_open(self): |
| 931 | + name = None |
| 932 | + with NamedTemporaryFile(delete=False) as fh: |
| 933 | + name = fh.name |
| 934 | + fh.close() |
| 935 | + |
| 936 | + h5file = h5py.File(name, 'w') |
| 937 | + h5file.close() |
| 938 | + |
| 939 | + with qdb.util.open_file(name) as fh_inner: |
| 940 | + self.assertTrue(isinstance(fh_inner, h5py.File)) |
| 941 | + |
| 942 | + remove(name) |
| 943 | + |
| 944 | + |
872 | 945 | if __name__ == '__main__': |
873 | 946 | main() |
0 commit comments