Skip to content

一个基于pyserial的全平台串口数据包收发库

License

Notifications You must be signed in to change notification settings

Dreams-Possible/py_uart

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

9 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

pyuartpkg - Python UART Packet Communication Library

中文

Overview

pyuartpkg is a cross-platform UART packet communication library built on top of pyserial. It provides a robust, high-level interface for reliable data packet transmission over serial ports with automatic packet framing, escaping, and buffering.

Features

  • Packet-based Communication: Transmits data as framed packets rather than raw byte streams
  • HDLC/SLIP-style Byte Escaping: Ensures packet integrity and unique restorability
  • Dynamic Buffer Management: Handles randomly fragmented packets for improved stability
  • Cross-platform: Works on Windows, Linux, and macOS
  • Thread-safe: Asynchronous sending with queue-based architecture
  • Flexible Data Formats: Supports any binary data format (struct-packed floats, integers, custom protocols)
  • Large Packet Support: Theoretical unlimited packet size (limited by system memory)

Installation

Prerequisites

  • Python 3.6 or higher
  • pyserial library

Install Dependencies

pip install pyserial

Using pyuartpkg

Since this is a local package, you can use it directly by placing the pyuart_v7.py file in your project directory, or install it locally:

# Option 1: Copy pyuart_v7.py to your project
cp pyuart_v7.py /your/project/directory/

# Option 2: Install as local package (if setup.py exists)
pip install -e .

Quick Start

Basic Usage Example

import pyuart_v7 as uart
import struct
import time

# List available UART ports
ports = uart.list_uart()
print(f"Available ports: {[p.device for p in ports.list]}")

# Initialize UART instance
uart_ins = uart.UartIns()
if not uart_ins.init(ports.list[0].device, 115200):
    raise RuntimeError("Failed to initialize UART")

# Initialize communication instance
comu_ins = uart.UartComuIns()
comu_ins.init(uart_ins)

time.sleep(0.1)  # Wait for UART stabilization

# Send data
data = struct.pack("<4f", 1.0, 2.0, 3.0, 4.0)  # 4 floats
comu_ins.send(data)

# Receive data
received = comu_ins.recv()
if received:
    for packet in received:
        print(f"Received packet: {packet}")

API Reference

Core Classes

UartIns - UART Instance

Manages the physical UART connection with asynchronous sending capabilities.

Methods:

  • init(port: str, baudrate: int = 115200) -> bool: Initialize UART connection
  • deinit(): Close UART connection and clean up resources
  • send(data: bytes): Asynchronously send data (queued)
  • send_block(data: bytes): Synchronously send data (blocking)
  • recv() -> bytes: Receive raw bytes from UART

UartComuIns - UART Communication Instance

Handles packet framing, escaping, and parsing for reliable communication.

Methods:

  • init(uart: UartIns): Bind to a UART instance
  • send(data: bytes): Send data as framed packet
  • recv(overtime: int = 0) -> List[bytes]: Receive and parse packets

Utility Functions

  • list_uart() -> UartListResult: List all available UART ports

Packet Format

The library uses a custom packet format with HDLC/SLIP-style escaping:

Packet Structure:
[HEADER][ESCAPED PAYLOAD][FOOTER]

Special Bytes:
- HEADER: 'A' (0x41)
- FOOTER: 'Z' (0x5A)
- ESCAPE: '_' (0x5F)

Escaping Rules:
- HEADER in payload → ESCAPE + HEADER
- FOOTER in payload → ESCAPE + FOOTER
- ESCAPE in payload → ESCAPE + ESCAPE

Examples

Example 1: Sender Device (test_dev1.py)

import struct
import time
import random
import pyuart_v7 as uart

# List and select first available port
ports = uart.list_uart()
if not ports.list:
    raise RuntimeError("No UART devices found")

# Initialize UART
uart_ins = uart.UartIns()
if not uart_ins.init(ports.list[0].device, 115200):
    raise RuntimeError("Failed to initialize UART")

# Initialize communication
comu_ins = uart.UartComuIns()
comu_ins.init(uart_ins)

time.sleep(0.1)

# Continuously send sensor data (4 floats)
while True:
    sensor_data = [random.uniform(0.0, 10.0) for _ in range(4)]
    data_bytes = struct.pack("<4f", *sensor_data)
    comu_ins.send(data_bytes)
    time.sleep(0.001)

Example 2: Receiver Device (test_dev2.py)

import struct
import time
import pyuart_v7 as uart

# List and select second available port
ports = uart.list_uart()
if len(ports.list) < 2:
    raise RuntimeError("Need at least 2 UART devices")

# Initialize UART
uart_ins = uart.UartIns()
if not uart_ins.init(ports.list[1].device, 115200):
    raise RuntimeError("Failed to initialize UART")

# Initialize communication
comu_ins = uart.UartComuIns()
comu_ins.init(uart_ins)

time.sleep(0.1)

# Continuously receive and decode data
while True:
    received_packets = comu_ins.recv()
    
    if received_packets:
        for packet in received_packets:
            try:
                # Decode as 4 floats (adjust based on your protocol)
                decoded = struct.unpack("<4f", packet)
                print(f"Decoded sensor data: {decoded}")
            except struct.error:
                print(f"Error decoding packet: {packet.hex()}")

Example 3: Custom Data Structure

import struct
import pyuart_v7 as uart

# Define custom data structure
class SensorData:
    def __init__(self, temperature, humidity, pressure):
        self.temperature = temperature
        self.humidity = humidity
        self.pressure = pressure
    
    def pack(self):
        # Pack as 3 floats: temperature, humidity, pressure
        return struct.pack("<3f", self.temperature, self.humidity, self.pressure)
    
    @staticmethod
    def unpack(data):
        temp, hum, pres = struct.unpack("<3f", data)
        return SensorData(temp, hum, pres)

# Usage
sensor = SensorData(25.5, 60.0, 1013.25)
packed_data = sensor.pack()

# Send packed data
# comu_ins.send(packed_data)

# Receive and unpack
# received = comu_ins.recv()
# if received:
#     for packet in received:
#         sensor_data = SensorData.unpack(packet)

Advanced Usage

Handling Large Data

For large data transfers (e.g., images), split data into chunks:

def send_large_data(data: bytes, chunk_size: int = 1024):
    total_chunks = (len(data) + chunk_size - 1) // chunk_size
    
    # Send metadata (total chunks, chunk size)
    metadata = struct.pack("<II", total_chunks, chunk_size)
    comu_ins.send(metadata)
    
    # Send data chunks
    for i in range(total_chunks):
        chunk = data[i*chunk_size:(i+1)*chunk_size]
        comu_ins.send(chunk)
        # Optional: wait for acknowledgment

Error Handling

import pyuart_v7 as uart

try:
    ports = uart.list_uart()
    if not ports.list:
        print("Warning: No UART devices found")
        # Handle gracefully
    
    uart_ins = uart.UartIns()
    if not uart_ins.init(ports.list[0].device, 115200):
        print("Error: UART initialization failed")
        # Retry or exit
    
    comu_ins = uart.UartComuIns()
    comu_ins.init(uart_ins)
    
except Exception as e:
    print(f"UART setup error: {e}")
    # Cleanup or fallback

Troubleshooting

Common Issues

  1. No UART devices found

    • Check physical connections
    • Verify device permissions (Linux: user in dialout group)
    • Check device manager (Windows) or ls /dev/tty* (Linux/macOS)
  2. Connection initialization fails

    • Verify baudrate matches device settings
    • Check if port is already in use
    • Try different baudrates (9600, 115200, etc.)
  3. Data corruption or incomplete packets

    • Ensure both devices use same baudrate
    • Check wiring and connections
    • Add delays between packets if sending too fast
  4. Memory issues with large packets

    • Split large data into smaller chunks
    • Increase system memory if needed
    • Monitor memory usage during transmission

Debugging Tips

Enable debug prints in the library or add your own:

# Monitor raw received data
data = uart_ins.recv()
if data:
    print(f"Raw received: {data.hex()}")

Performance Considerations

  • Baudrate: Higher baudrates (115200, 921600) for faster transmission
  • Packet Size: Optimal around 64-1024 bytes for most applications
  • Buffer Size: Dynamic buffer handles fragmentation but monitor memory
  • Thread Safety: Asynchronous sending prevents blocking but adds queue overhead

Comparison with Alternatives

Feature pyuartpkg pyserial rosserial
Packet-based
Byte escaping
Cross-platform
ROS dependency
Thread-safe sending
Dynamic buffering

License

This project is licensed under the MIT License - see the LICENSE file for details.

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

  1. Fork the repository
  2. Create your feature branch (git checkout -b feature/amazing-feature)
  3. Commit your changes (git commit -m 'Add some amazing feature')
  4. Push to the branch (git push origin feature/amazing-feature)
  5. Open a Pull Request

Acknowledgments

  • Built on top of the excellent pyserial library
  • Inspired by HDLC and SLIP protocols for reliable serial communication
  • Thanks to all contributors and users

pyuartpkg - Python UART数据包通信库

概述

pyuartpkg 是一个基于 pyserial 构建的跨平台UART数据包通信库。它提供了一个可靠的高级接口,用于通过串口进行数据包传输,具有自动数据包帧化、转义和缓冲功能。

特性

  • 数据包通信:以帧化数据包而非原始字节流传输数据
  • HDLC/SLIP风格字节转义:确保数据包完整性和唯一可还原性
  • 动态缓冲区管理:处理随机分片的数据包,提高稳定性
  • 跨平台:支持Windows、Linux和macOS
  • 线程安全:基于队列的异步发送架构
  • 灵活数据格式:支持任何二进制数据格式(struct打包的浮点数、整数、自定义协议)
  • 大数据包支持:理论无限制数据包大小(受系统内存限制)

安装

前提条件

  • Python 3.6 或更高版本
  • pyserial

安装依赖

pip install pyserial

使用 pyuartpkg

由于这是本地包,您可以直接将 pyuart_v7.py 文件放在项目目录中使用,或本地安装:

# 选项1:复制 pyuart_v7.py 到项目目录
cp pyuart_v7.py /your/project/directory/

# 选项2:本地安装(如果存在 setup.py)
pip install -e .

快速开始

基本使用示例

import pyuart_v7 as uart
import struct
import time

# 列出可用UART端口
ports = uart.list_uart()
print(f"可用端口: {[p.device for p in ports.list]}")

# 初始化UART实例
uart_ins = uart.UartIns()
if not uart_ins.init(ports.list[0].device, 115200):
    raise RuntimeError("UART初始化失败")

# 初始化通信实例
comu_ins = uart.UartComuIns()
comu_ins.init(uart_ins)

time.sleep(0.1)  # 等待UART稳定

# 发送数据
data = struct.pack("<4f", 1.0, 2.0, 3.0, 4.0)  # 4个浮点数
comu_ins.send(data)

# 接收数据
received = comu_ins.recv()
if received:
    for packet in received:
        print(f"接收到的数据包: {packet}")

API参考

核心类

UartIns - UART实例

管理物理UART连接,具有异步发送能力。

方法:

  • init(port: str, baudrate: int = 115200) -> bool:初始化UART连接
  • deinit():关闭UART连接并清理资源
  • send(data: bytes):异步发送数据(队列)
  • send_block(data: bytes):同步发送数据(阻塞)
  • recv() -> bytes:从UART接收原始字节

UartComuIns - UART通信实例

处理数据包帧化、转义和解析,实现可靠通信。

方法:

  • init(uart: UartIns):绑定到UART实例
  • send(data: bytes):发送帧化数据包
  • recv(overtime: int = 0) -> List[bytes]:接收并解析数据包

工具函数

  • list_uart() -> UartListResult:列出所有可用UART端口

数据包格式

库使用自定义数据包格式,具有HDLC/SLIP风格转义:

数据包结构:
[包头][转义的有效载荷][包尾]

特殊字节:
- 包头: 'A' (0x41)
- 包尾: 'Z' (0x5A)
- 转义符: '_' (0x5F)

转义规则:
- 有效载荷中的包头 → 转义符 + 包头
- 有效载荷中的包尾 → 转义符 + 包尾
- 有效载荷中的转义符 → 转义符 + 转义符

示例

示例1:发送设备 (test_dev1.py)

import struct
import time
import random
import pyuart_v7 as uart

# 列出并选择第一个可用端口
ports = uart.list_uart()
if not ports.list:
    raise RuntimeError("未找到UART设备")

# 初始化UART
uart_ins = uart.UartIns()
if not uart_ins.init(ports.list[0].device, 115200):
    raise RuntimeError("UART初始化失败")

# 初始化通信
comu_ins = uart.UartComuIns()
comu_ins.init(uart_ins)

time.sleep(0.1)

# 持续发送传感器数据(4个浮点数)
while True:
    sensor_data = [random.uniform(0.0, 10.0) for _ in range(4)]
    data_bytes = struct.pack("<4f", *sensor_data)
    comu_ins.send(data_bytes)
    time.sleep(0.001)

示例2:接收设备 (test_dev2.py)

import struct
import time
import pyuart_v7 as uart

# 列出并选择第二个可用端口
ports = uart.list_uart()
if len(ports.list) < 2:
    raise RuntimeError("需要至少2个UART设备")

# 初始化UART
uart_ins = uart.UartIns()
if not uart_ins.init(ports.list[1].device, 115200):
    raise RuntimeError("UART初始化失败")

# 初始化通信
comu_ins = uart.UartComuIns()
comu_ins.init(uart_ins)

time.sleep(0.1)

# 持续接收和解码数据
while True:
    received_packets = comu_ins.recv()
    
    if received_packets:
        for packet in received_packets:
            try:
                # 解码为4个浮点数(根据您的协议调整)
                decoded = struct.unpack("<4f", packet)
                print(f"解码的传感器数据: {decoded}")
            except struct.error:
                print(f"数据包解码错误: {packet.hex()}")

故障排除

常见问题

  1. 未找到UART设备

    • 检查物理连接
    • 验证设备权限(Linux:用户需要在 dialout 组中)
    • 检查设备管理器(Windows)或 ls /dev/tty*(Linux/macOS)
  2. 连接初始化失败

    • 验证波特率与设备设置匹配
    • 检查端口是否已被占用
    • 尝试不同的波特率(9600、115200等)

About

一个基于pyserial的全平台串口数据包收发库

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published