-
Notifications
You must be signed in to change notification settings - Fork 32
Issue #517 - TCP Support #540
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
903c6f7
9d66550
2daadbc
9dfa2f2
b8eac95
f54bb72
a289a49
2cb87f5
1ccef5a
8291dab
cc38035
b04ecd1
3892d20
20cb217
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -117,10 +117,10 @@ def _run(self): | |
| raise (e) | ||
|
|
||
|
|
||
| class PortOutputClient(ZMQInputClient): | ||
| class UDPOutputClient(ZMQInputClient): | ||
| """ | ||
| This is the parent class for all outbound streams which publish | ||
| to a port. It opens a UDP port to publish to and publishes | ||
| to a UDP port. It opens a UDP port to publish to and publishes | ||
| outgoing message data to this port. | ||
| """ | ||
|
|
||
|
|
@@ -131,20 +131,74 @@ def __init__( | |
| zmq_proxy_xpub_url=ait.SERVER_DEFAULT_XPUB_URL, | ||
| **kwargs, | ||
| ): | ||
| super(PortOutputClient, self).__init__( | ||
| super(UDPOutputClient, self).__init__( | ||
| zmq_context, zmq_proxy_xsub_url, zmq_proxy_xpub_url | ||
| ) | ||
| self.out_port = kwargs["output"] | ||
| if "output" in kwargs: | ||
| output = kwargs["output"] | ||
| if type(output) is int: | ||
| self.addr_spec = ("localhost", output) | ||
| elif utils.is_valid_address_spec(output): | ||
| protocol, hostname, port = output.split(":") | ||
| if protocol.lower() != "udp": | ||
| raise ( | ||
| ValueError(f"UDPOutputClient: Invalid Specification {output}") | ||
| ) | ||
| self.addr_spec = (hostname, int(port)) | ||
| else: | ||
| raise (ValueError(f"UDPOutputClient: Invalid Specification {output}")) | ||
| else: | ||
| raise (ValueError("UDPOutputClient: Invalid Specification")) | ||
|
|
||
| self.context = zmq_context | ||
| # override pub to be udp socket | ||
| self.pub = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | ||
|
|
||
| def publish(self, msg): | ||
| self.pub.sendto(msg, ("localhost", int(self.out_port))) | ||
| self.pub.sendto(msg, self.addr_spec) | ||
| log.debug("Published message from {}".format(self)) | ||
|
|
||
|
|
||
| class PortInputClient(ZMQClient, gs.DatagramServer): | ||
| class TCPOutputClient(ZMQInputClient): | ||
| """ | ||
| This is the parent class for all outbound streams which publish | ||
| to a TCP port. It opens a TCP connection to publish to and publishes | ||
| outgoing message data to this port. | ||
| """ | ||
|
|
||
| def __init__( | ||
| self, | ||
| zmq_context, | ||
| zmq_proxy_xsub_url=ait.SERVER_DEFAULT_XSUB_URL, | ||
| zmq_proxy_xpub_url=ait.SERVER_DEFAULT_XPUB_URL, | ||
| **kwargs, | ||
| ): | ||
| super(TCPOutputClient, self).__init__( | ||
| zmq_context, zmq_proxy_xsub_url, zmq_proxy_xpub_url | ||
| ) | ||
| if "output" in kwargs: | ||
| output = kwargs["output"] | ||
| if utils.is_valid_address_spec(output): | ||
| protocol, hostname, port = output.split(":") | ||
| if protocol.lower() != "tcp": | ||
| raise ( | ||
| ValueError(f"TCPOutputClient: Invalid Specification {output}") | ||
| ) | ||
| self.addr_spec = (hostname, int(port)) | ||
| else: | ||
| raise (ValueError(f"TCPOutputClient: Invalid Specification {output}")) | ||
| else: | ||
| raise (ValueError("TCPOutputClient: Invalid Specification")) | ||
|
|
||
| self.context = zmq_context | ||
| self.pub = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | ||
|
|
||
| def publish(self, msg): | ||
| self.pub.connect(self.addr_spec) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just checking if the socket.connect() should be called per-message? Or would it be better to call it once in the constructor? Also, should re-connection attempt be added in case of broken connection? |
||
| self.pub.sendall(msg) | ||
|
|
||
|
|
||
| class UDPInputServer(ZMQClient, gs.DatagramServer): | ||
| """ | ||
| This is the parent class for all inbound streams which receive messages | ||
| on a port. It opens a UDP port for receiving messages, listens for them, | ||
|
|
@@ -158,15 +212,31 @@ def __init__( | |
| zmq_proxy_xpub_url=ait.SERVER_DEFAULT_XPUB_URL, | ||
| **kwargs, | ||
| ): | ||
| if "input" in kwargs and type(kwargs["input"][0]) is int: | ||
| super(PortInputClient, self).__init__( | ||
| if "input" in kwargs: | ||
| input = kwargs["input"] | ||
| if type(input) is int: | ||
| host_spec = input | ||
| elif utils.is_valid_address_spec(input): | ||
| protocol, hostname, port = input.split(":") | ||
| if protocol.lower() != "udp": | ||
| raise (ValueError(f"UDPInputServer: Invalid Specification {input}")) | ||
| if hostname in ["127.0.0.1", "localhost"]: | ||
| host_spec = port | ||
| elif hostname in ["0.0.0.0", "server"]: | ||
| host_spec = f"0.0.0.0:{port}" | ||
| else: | ||
| raise (ValueError(f"UDPInputServer: Invalid Specification {input}")) | ||
|
|
||
| else: | ||
| raise (ValueError(f"UDPInputServer: Invalid Specification {input}")) | ||
| super(UDPInputServer, self).__init__( | ||
| zmq_context, | ||
| zmq_proxy_xsub_url, | ||
| zmq_proxy_xpub_url, | ||
| listener=int(kwargs["input"][0]), | ||
| listener=host_spec, | ||
| ) | ||
| else: | ||
| raise (ValueError("Input must be port in order to create PortInputClient")) | ||
| raise (ValueError("UDPInputServer: Invalid Specification")) | ||
|
|
||
| # open sub socket | ||
| self.sub = gevent.socket.socket(gevent.socket.AF_INET, gevent.socket.SOCK_DGRAM) | ||
|
|
@@ -175,3 +245,195 @@ def handle(self, packet, address): | |
| # This function provided for gs.DatagramServer class | ||
| log.debug("{} received message from port {}".format(self, address)) | ||
| self.process(packet) | ||
|
|
||
|
|
||
| class TCPInputServer(ZMQClient, gs.StreamServer): | ||
| """ | ||
| This class is similar to UDPInputServer except its TCP instead of UDP. | ||
| """ | ||
|
|
||
| def __init__( | ||
| self, | ||
| zmq_context, | ||
| zmq_proxy_xsub_url=ait.SERVER_DEFAULT_XSUB_URL, | ||
| zmq_proxy_xpub_url=ait.SERVER_DEFAULT_XPUB_URL, | ||
| buffer=1024, | ||
| **kwargs, | ||
| ): | ||
| self.cur_socket = None | ||
| self.buffer = buffer | ||
| if "input" in kwargs: | ||
| input = kwargs["input"] | ||
| if not utils.is_valid_address_spec(input): | ||
| raise (ValueError(f"TCPInputServer: Invalid Specification {input}")) | ||
| protocol, hostname, port = input.split(":") | ||
| if protocol.lower() != "tcp" or hostname not in [ | ||
| "127.0.0.1", | ||
| "localhost", | ||
| "server", | ||
| "0.0.0.0", | ||
| ]: | ||
| raise (ValueError(f"TCPInputServer: Invalid Specification {input}")) | ||
|
|
||
| self.sub = gevent.socket.socket( | ||
| gevent.socket.AF_INET, gevent.socket.SOCK_STREAM | ||
| ) | ||
| hostname = ( | ||
| "127.0.0.1" if hostname in ["127.0.0.1", "localhost"] else "0.0.0.0" | ||
| ) | ||
| super(TCPInputServer, self).__init__( | ||
| zmq_context, | ||
| zmq_proxy_xsub_url, | ||
| zmq_proxy_xpub_url, | ||
| listener=(hostname, int(port)), | ||
| ) | ||
| else: | ||
| raise (ValueError("TCPInputServer: Invalid Specification")) | ||
|
|
||
| def handle(self, socket, address): | ||
| self.cur_socket = socket | ||
| with socket: | ||
| while True: | ||
| data = socket.recv(self.buffer) | ||
| if not data: | ||
| break | ||
| log.debug("{} received message from port {}".format(self, address)) | ||
| self.process(data) | ||
| gevent.sleep(0) # pass control back | ||
|
|
||
|
|
||
| class TCPInputClient(ZMQClient): | ||
| """ | ||
| This class creates a TCP input client. Unlike TCPInputServer and UDPInputServer, | ||
| this class will proactively initiate a connection with an input source and begin | ||
| receiving data from that source. This class does not inherit directly from gevent | ||
| servers and thus implements its own housekeeping functions. It also implements a | ||
| start function that spawns a process to stay consistent with the behavior of | ||
| TCPInputServer and UDPInputServer. | ||
|
|
||
| """ | ||
|
|
||
| def __init__( | ||
| self, | ||
| zmq_context, | ||
| zmq_proxy_xsub_url=ait.SERVER_DEFAULT_XSUB_URL, | ||
| zmq_proxy_xpub_url=ait.SERVER_DEFAULT_XPUB_URL, | ||
| connection_reattempts=5, | ||
| buffer=1024, | ||
| **kwargs, | ||
| ): | ||
| self.connection_reattempts = connection_reattempts | ||
| self.buffer = buffer | ||
| self.connection_status = -1 | ||
| self.proc = None | ||
| self.protocol = gevent.socket.SOCK_STREAM | ||
|
|
||
| if "buffer" in kwargs and type(kwargs["buffer"]) == int: | ||
| self.buffer = kwargs["buffer"] | ||
|
|
||
| if "input" in kwargs: | ||
| input = kwargs["input"] | ||
| if not utils.is_valid_address_spec(input): | ||
| raise (ValueError(f"TCPInputClient: Invalid Specification {input}")) | ||
| protocol, hostname, port = input.split(":") | ||
| if protocol.lower() != "tcp": | ||
| raise (ValueError(f"TCPInputClient: Invalid Specification {input}")) | ||
| super(TCPInputClient, self).__init__( | ||
| zmq_context, zmq_proxy_xsub_url, zmq_proxy_xpub_url | ||
| ) | ||
|
|
||
| self.sub = gevent.socket.socket(gevent.socket.AF_INET, self.protocol) | ||
|
|
||
| self.hostname = hostname | ||
| self.port = int(port) | ||
| self.address = (hostname, int(port)) | ||
|
|
||
| else: | ||
| raise (ValueError("TCPInputClient: Invalid Specification")) | ||
|
|
||
| def __exit__(self, exc_type, exc_val, exc_tb): | ||
| try: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please consolidate into a single method which is called by exit's and del, and consider Niling the fields after they are closed/killed |
||
| if self.sub: | ||
| self.sub.close() | ||
| if self.proc: | ||
| self.proc.kill() | ||
| except Exception as e: | ||
| log.error(e) | ||
|
|
||
| def __del__(self): | ||
| try: | ||
| if self.sub: | ||
| self.sub.close() | ||
| if self.proc: | ||
| self.proc.kill() | ||
| except Exception as e: | ||
| log.error(e) | ||
|
|
||
| def __repr__(self): | ||
| return "<%s at %s %s>" % ( | ||
| type(self).__name__, | ||
| hex(id(self)), | ||
| self._formatinfo(), | ||
| ) | ||
|
|
||
| def __str__(self): | ||
| return "<%s %s>" % (type(self).__name__, self._formatinfo()) | ||
|
|
||
| def start(self): | ||
| self.proc = gevent.spawn(self._client) | ||
|
|
||
| def _connect(self): | ||
| while self.connection_reattempts: | ||
| try: | ||
| res = self.sub.connect_ex((self.hostname, self.port)) | ||
| if res == 0: | ||
| self.connection_reattempts = 5 | ||
| return res | ||
| else: | ||
| self.connection_reattempts -= 1 | ||
| gevent.sleep(1) | ||
| except Exception as e: | ||
| log.error(e) | ||
| self.connection_reattempts -= 1 | ||
| gevent.sleep(1) | ||
|
|
||
| def _exit(self): | ||
| try: | ||
| if self.sub: | ||
| self.sub.close() | ||
| if self.proc: | ||
| self.proc.kill() | ||
| except Exception as e: | ||
| log.error(e) | ||
|
|
||
| def _client(self): | ||
| self.connection_status = self._connect() | ||
| if self.connection_status != 0: | ||
| log.error( | ||
| f"Unable to connect to client: {self.address[0]}:{self.address[1]}" | ||
| ) | ||
| self._exit() | ||
| while True: | ||
| packet = self.sub.recv(self.buffer) | ||
| if not packet: | ||
| gevent.sleep(1) | ||
| log.info( | ||
| f"Trying to reconnect to client: {self.address[0]}:{self.address[1]}" | ||
| ) | ||
| if self._connect() != 0: | ||
| log.error( | ||
| f"Unable to connect to client: {self.address[0]}:{self.address[1]}" | ||
| ) | ||
| self._exit() | ||
| self.process(packet) | ||
|
|
||
| def _formatinfo(self): | ||
| result = "" | ||
| try: | ||
| if isinstance(self.address, tuple) and len(self.address) == 2: | ||
| result += "address=%s:%s" % self.address | ||
| else: | ||
| result += "address=%s" % (self.address,) | ||
| except Exception as ex: | ||
| result += str(ex) or "<error>" | ||
| return result | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we have
is_valid_address_spec(), could you please add aparse_address_spec()which replaces string.split(":")