Skip to content

IArchi/simple-socket-server

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

3 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Simple TCP socket server with select

License Downloads Latest Version Supported Python versions

Installation

Install it with pip:

$ pip install simple-socket-server

Or you can add it as dependency in requirements.txt file of your python application:

simple-socket-server~=1.0

Usage

Easy way to understand how it works is testing socket server via telnet terminal:

from socket import socket
from simple_socket_server import SimpleSocketServer

socket_server = SimpleSocketServer()


@socket_server.on_connect
def on_connect(sock: socket):
    print('New connection from %s:%s' % sock.getpeername())
    socket_server.send(sock, bytes('What is your name?\r\n', 'utf-8'))


@socket_server.on_disconnect
def on_disconnect(sock: socket):
    print('Connection from %s:%s is closed' % sock.getpeername())


@socket_server.on_message
def on_message(sock: socket, message: bytes):
    print('Incoming data from %s:%s' % sock.getpeername(), message)
    socket_server.send(sock, bytes('Hi, ', 'utf-8') + message)


socket_server.run(ip='0.0.0.0', port=5000)

Then you can connect to server:

telnet 127.0.0.1 5000

This socket server can be used with the Flask server:

import logging
from flask import Flask
from socket import socket
from simple_socket_server import FlaskSimpleSocketServer

app = Flask(__name__)
app.logger.setLevel(logging.DEBUG)
socket_server = FlaskSimpleSocketServer(app)


@app.route('/', methods=['GET'])
def get_index():
    """ Yor index page code """


@socket_server.on_connect
def on_connect(sock: socket):
    app.logger.info('New connection from %s:%s' % sock.getpeername())
    socket_server.send(sock, bytes('What is your name?\r\n', 'utf-8'))


@socket_server.on_disconnect
def on_disconnect(sock: socket):
    app.logger.info('Connection from %s:%s is closed' % sock.getpeername())


@socket_server.on_message
def on_message(sock: socket, message: bytes):
    app.logger.debug('Incoming data from %s:%s' % sock.getpeername())
    answer = your_message_handler(message)
    socket_server.send(sock, answer)


socket_server.run(ip='0.0.0.0', port=5000)

About

Simple Telnet Server

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages

  • Python 100.0%