forked from circuits/circuits
-
Notifications
You must be signed in to change notification settings - Fork 0
/
telnet.py
executable file
·172 lines (128 loc) · 4.24 KB
/
telnet.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Telnet Example
A basic telnet-like clone that connects to remote hosts
via tcp and allows the user to send data to the remote
server.
This example demonstrates:
* Basic Component creation.
* Basic Event handling.
* Basiv Networking
* Basic Request/Response Networking
This example makes use of:
* Component
* Event
* net.sockets.TCPClient
"""
from __future__ import print_function
import os
from optparse import OptionParser
import circuits
from circuits import Component, handler
from circuits.io import stdin
from circuits.net.events import connect, write
from circuits.net.sockets import TCPClient, UDPClient, UNIXClient
from circuits.tools import graph
USAGE = "%prog [options] host [port]"
VERSION = "%prog v" + circuits.__version__
def parse_options():
parser = OptionParser(usage=USAGE, version=VERSION)
parser.add_option(
"-s", "--secure",
action="store_true", default=False, dest="secure",
help="Enable secure mode"
)
parser.add_option(
"-u", "--udp",
action="store_true", default=False, dest="udp",
help="Use UDP transport",
)
parser.add_option(
"-v", "--verbose",
action="store_true", default=False, dest="verbose",
help="Enable verbose debugging",
)
opts, args = parser.parse_args()
if len(args) < 1:
parser.print_help()
raise SystemExit(1)
return opts, args
class Telnet(Component):
# Define a separate channel for this component so we don't clash with
# the ``read`` event of the ``stdin`` component.
channel = "telnet"
def __init__(self, *args, **opts):
super(Telnet, self).__init__()
self.args = args
self.opts = opts
if len(args) == 1:
if os.path.exists(args[0]):
UNIXClient(channel=self.channel).register(self)
host = dest = port = args[0]
dest = (dest,)
else:
raise OSError("Path %s not found" % args[0])
else:
if not opts["udp"]:
TCPClient(channel=self.channel).register(self)
else:
UDPClient(0, channel=self.channel).register(self)
host, port = args
port = int(port)
dest = host, port
self.host = host
self.port = port
print("Trying %s ..." % host)
if not opts["udp"]:
self.fire(connect(*dest, secure=opts["secure"]))
else:
self.fire(write((host, port), b"\x00"))
def ready(self, *args):
graph(self.root)
def connected(self, host, port=None):
"""connected Event Handler
This event is fired by the TCPClient Componentt to indicate a
successful connection.
"""
print("connected to {0}".format(host))
def error(self, *args, **kwargs):
"""error Event Handler
If any exception/error occurs in the system this event is triggered.
"""
if len(args) == 3:
print("ERROR: {0}".format(args[1]))
else:
print("ERROR: {0}".format(args[0]))
def read(self, *args):
"""read Event Handler
This event is fired by the underlying TCPClient Component when there
is data to be read from the connection.
"""
if len(args) == 1:
data = args[0]
else:
peer, data = args
data = data.strip().decode("utf-8")
print(data)
# Setup an Event Handler for "read" events on the "stdin" channel.
@handler("read", channel="stdin")
def _on_stdin_read(self, data):
"""read Event Handler for stdin
This event is triggered by the connected ``stdin`` component when
there is new data to be read in from standard input.
"""
if not self.opts["udp"]:
self.fire(write(data))
else:
self.fire(write((self.host, self.port), data))
def main():
opts, args = parse_options()
# Configure and "run" the System.
app = Telnet(*args, **opts.__dict__)
if opts.verbose:
from circuits import Debugger
Debugger().register(app)
stdin.register(app)
app.run()
if __name__ == "__main__":
main()