Skip to content

Commit 6d0b8a4

Browse files
committed
session: Create stream_recv() to read everything from a socket
Some session requests will return large binary data through the socket until it's complete, then the socket is closed. Provide a new method stream_recv() which will support this behavior so callers don't need to grovel around in our internal class methods and data. Create a test for this.
1 parent a0454a5 commit 6d0b8a4

File tree

2 files changed

+65
-2
lines changed

2 files changed

+65
-2
lines changed

pynuodb/session.py

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@
2626
from urlparse import urlparse # type: ignore
2727

2828
try:
29-
from typing import Dict, Iterable, Mapping # pylint: disable=unused-import
30-
from typing import Optional, Tuple, Union # pylint: disable=unused-import
29+
from typing import Dict, Generator, Iterable, Mapping # pylint: disable=unused-import
30+
from typing import Optional, Tuple, Union # pylint: disable=unused-import
3131
except ImportError:
3232
pass
3333

@@ -489,6 +489,31 @@ def __readFully(self, msgLength, timeout=None):
489489

490490
return bytes(msg)
491491

492+
def stream_recv(self, blocksz=4096, timeout=None):
493+
# type: (int, Optional[float]) -> Generator[bytes, None, None]
494+
"""Read data from the socket in blocksz increments.
495+
496+
Will yield bytes buffers of blocksz for as long as the sender is
497+
sending. After this function completes the socket has been closed.
498+
Note it's best if blocksz is a multiple of 32, to ensure that block
499+
ciphers will work. This code doesn't manage block sizes or padding.
500+
501+
If timeout is not None, raises a socket.timeout exception on timeout.
502+
The socket is still closed.
503+
"""
504+
sock = self._sock
505+
try:
506+
sock.settimeout(timeout)
507+
while True:
508+
msg = sock.recv(blocksz)
509+
if not msg:
510+
break
511+
if self.__cipherIn:
512+
msg = self.__cipherIn.transform(msg)
513+
yield msg
514+
finally:
515+
self.close()
516+
492517
def close(self, force=False):
493518
# type: (bool) -> None
494519
"""Close the current socket connection with the server."""

tests/nuodb_service_test.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
#!/usr/bin/env python
22

3+
import os
34
import unittest
5+
import tempfile
6+
import gzip
47
import xml.etree.ElementTree as ET
58

69
import pynuodb
@@ -64,6 +67,41 @@ def test_request_gc(self):
6467
self.assertEqual(len(info), 1)
6568
self.assertEqual(info[0].get('Action'), 'RequestGarbageCollection')
6669

70+
def test_stream_recv(self):
71+
"""Test the stream_recv() facility."""
72+
ap_conn = get_ap_conn()
73+
if ap_conn is None:
74+
self.skipTest("No AP available")
75+
76+
procs = ap_conn.get_processes(db_name=DATABASE_NAME)
77+
dbpasswd = ap_conn._get_db_password(DATABASE_NAME)
78+
79+
session = pynuodb.session.Session(
80+
procs[0].address, service='Admin',
81+
options={'verifyHostname': 'False'})
82+
session.authorize('Cloud', dbpasswd)
83+
84+
session.send('''<Request Service="Admin">
85+
<Request Type="GetSysDepends"/>
86+
</Request>''')
87+
resp = session.recv()
88+
xml = ET.fromstring(resp)
89+
self.assertIsNotNone(xml.find('Success'), "Failed: %s" % (resp))
90+
91+
deppath = os.path.join(tempfile.gettempdir(), 'deps.tar.gz')
92+
with open(deppath, 'wb') as of:
93+
for data in session.stream_recv():
94+
of.write(data)
95+
96+
# The socket should be closed now: this will raise
97+
self.assertRaises(pynuodb.session.SessionException,
98+
lambda: session._sock)
99+
100+
# Now make sure that what we read is uncompressable
101+
with gzip.GzipFile(deppath, 'rb') as gz:
102+
# We don't really care we just want to make sure it works
103+
self.assertIsNotNone(gz.read(), "Failed to unzip %s" % (deppath))
104+
67105

68106
if __name__ == '__main__':
69107
unittest.main()

0 commit comments

Comments
 (0)