@@ -53,14 +53,15 @@ def test_create(self):
53
53
self .assertNotIn (key , [source .uuid for source in sources ])
54
54
55
55
def test_fetch_none (self ):
56
- source = CustomSource (uuid = str (uuid4 ()),
57
- name = 'Test Source' ,
58
- enabled = True ,
59
- playlist_uri = None ,
60
- playlist_type = CustomSource .TYPE_NONE ,
61
- epg_uri = None ,
62
- epg_type = CustomSource .TYPE_NONE ,
63
- )
56
+ source = CustomSource (
57
+ uuid = str (uuid4 ()),
58
+ name = 'Test Source' ,
59
+ enabled = True ,
60
+ playlist_uri = None ,
61
+ playlist_type = CustomSource .TYPE_NONE ,
62
+ epg_uri = None ,
63
+ epg_type = CustomSource .TYPE_NONE ,
64
+ )
64
65
65
66
channels = source .get_channels ()
66
67
self .assertEqual (channels , '' )
@@ -69,20 +70,30 @@ def test_fetch_none(self):
69
70
self .assertEqual (epg , '' )
70
71
71
72
def test_fetch_file (self ):
72
- source = CustomSource (uuid = str (uuid4 ()),
73
- name = 'Test Source' ,
74
- enabled = True ,
75
- playlist_uri = os .path .realpath ('tests/data/custom_playlist.m3u' ),
76
- playlist_type = CustomSource .TYPE_FILE ,
77
- epg_uri = os .path .realpath ('tests/data/custom_epg.xml' ),
78
- epg_type = CustomSource .TYPE_FILE ,
79
- )
73
+ source = CustomSource (
74
+ uuid = str (uuid4 ()),
75
+ name = 'Test Source' ,
76
+ enabled = True ,
77
+ playlist_uri = os .path .realpath ('tests/data/custom_playlist.m3u' ),
78
+ playlist_type = CustomSource .TYPE_FILE ,
79
+ epg_uri = os .path .realpath ('tests/data/custom_epg.xml' ),
80
+ epg_type = CustomSource .TYPE_FILE ,
81
+ )
82
+ expected_channels = Source ._extract_m3u (open ('tests/data/custom_playlist.m3u' , 'r' ).read ())
83
+ expected_epg = Source ._extract_xmltv (open ('tests/data/custom_epg.xml' , 'r' ).read ())
84
+
85
+ # Test channels
86
+ channels = source .get_channels ()
87
+ self .assertEqual (channels .replace ('\r \n ' , '\n ' ), expected_channels )
80
88
89
+ # Test channels (gzip)
90
+ source .playlist_uri = os .path .realpath ('tests/data/custom_playlist.m3u.gz' )
81
91
channels = source .get_channels ()
82
- self .assertEqual (channels , Source . _extract_m3u ( open ( 'tests/data/custom_playlist.m3u' ). read ()) )
92
+ self .assertEqual (channels . replace ( ' \r \n ' , ' \n ' ), expected_channels )
83
93
94
+ # Test EPG
84
95
epg = source .get_epg ()
85
- self .assertEqual (epg , Source . _extract_xmltv ( open ( 'tests/data/custom_epg.xml' ). read ()) )
96
+ self .assertEqual (epg . replace ( ' \r \n ' , ' \n ' ), expected_epg )
86
97
87
98
def test_fetch_url (self ):
88
99
@@ -99,55 +110,68 @@ def raise_for_status(self):
99
110
def content (self ):
100
111
return self .data
101
112
113
+ @property
114
+ def text (self ):
115
+ return self .data .decode ()
116
+
102
117
if args [0 ].endswith ('m3u' ):
103
- data = open ('tests/data/custom_playlist.m3u' , 'r ' ).read ()
118
+ data = open ('tests/data/custom_playlist.m3u' , 'rb ' ).read ()
104
119
return MockResponse (data , 200 )
105
120
106
121
if args [0 ].endswith ('m3u.gz' ):
107
- from gzip import compress
108
122
data = open ('tests/data/custom_playlist.m3u' , 'rb' ).read ()
109
- return MockResponse (compress (data ), 200 )
123
+ try : # Python 3
124
+ from gzip import compress
125
+ return MockResponse (compress (data ), 200 )
126
+ except ImportError : # Python 2
127
+ from gzip import GzipFile
128
+ from StringIO import StringIO
129
+ buf = StringIO ()
130
+ with GzipFile (fileobj = buf , mode = 'wb' ) as f :
131
+ f .write (data )
132
+ return MockResponse (buf .getvalue (), 200 )
110
133
111
134
if args [0 ].endswith ('m3u.bz2' ):
112
135
from bz2 import compress
113
136
data = open ('tests/data/custom_playlist.m3u' , 'rb' ).read ()
114
137
return MockResponse (compress (data ), 200 )
115
138
116
139
if args [0 ].endswith ('xml' ):
117
- data = open ('tests/data/custom_epg.xml' , 'r ' ).read ()
140
+ data = open ('tests/data/custom_epg.xml' , 'rb ' ).read ()
118
141
return MockResponse (data , 200 )
119
142
120
143
return MockResponse (None , 404 )
121
144
122
- with patch ('requests.get' , side_effect = mocked_requests_get ):
123
- source = CustomSource (uuid = str (uuid4 ()),
124
- name = 'Test Source' ,
125
- enabled = True ,
126
- playlist_uri = 'https://example.com/playlist.m3u' ,
127
- playlist_type = CustomSource .TYPE_URL ,
128
- epg_uri = 'https://example.com/xmltv.xml' ,
129
- epg_type = CustomSource .TYPE_URL ,
130
- )
145
+ source = CustomSource (
146
+ uuid = str (uuid4 ()),
147
+ name = 'Test Source' ,
148
+ enabled = True ,
149
+ playlist_uri = 'https://example.com/playlist.m3u' ,
150
+ playlist_type = CustomSource .TYPE_URL ,
151
+ epg_uri = 'https://example.com/xmltv.xml' ,
152
+ epg_type = CustomSource .TYPE_URL ,
153
+ )
154
+ expected_channels = Source ._extract_m3u (open ('tests/data/custom_playlist.m3u' , 'r' ).read ())
155
+ expected_epg = Source ._extract_xmltv (open ('tests/data/custom_epg.xml' , 'r' ).read ())
131
156
157
+ with patch ('requests.get' , side_effect = mocked_requests_get ):
132
158
# Test channels
133
159
channels = source .get_channels ()
134
- self .assertEqual (channels , Source ._extract_m3u (open ('tests/data/custom_playlist.m3u' ).read ()))
135
-
136
- # Test EPG
137
- epg = source .get_epg ()
138
- self .assertEqual (epg , Source ._extract_xmltv (open ('tests/data/custom_epg.xml' ).read ()))
160
+ self .assertEqual (channels .replace ('\r \n ' , '\n ' ), expected_channels )
139
161
140
162
# Test channels (gzip)
141
163
source .playlist_uri = 'https://example.com/playlist.m3u.gz'
142
164
channels = source .get_channels ()
143
- self .assertEqual (channels , Source . _extract_m3u ( open ( 'tests/data/custom_playlist.m3u' ). read ()) )
165
+ self .assertEqual (channels . replace ( ' \r \n ' , ' \n ' ), expected_channels )
144
166
145
167
# Test channels (bzip2)
146
168
source .playlist_uri = 'https://example.com/playlist.m3u.bz2'
147
169
channels = source .get_channels ()
148
- self .assertEqual (channels , Source ._extract_m3u (open ('tests/data/custom_playlist.m3u' ).read ()))
149
-
170
+ self .assertEqual (channels .replace ('\r \n ' , '\n ' ), expected_channels )
150
171
172
+ # Test EPG
173
+ epg = source .get_epg ()
174
+ self .assertEqual (epg .replace ('\r \n ' , '\n ' ), expected_epg )
151
175
152
176
153
177
if __name__ == '__main__' :
0 commit comments