pyuartpkg is a cross-platform UART packet communication library built on top of pyserial. It provides a robust, high-level interface for reliable data packet transmission over serial ports with automatic packet framing, escaping, and buffering.
- Packet-based Communication: Transmits data as framed packets rather than raw byte streams
- HDLC/SLIP-style Byte Escaping: Ensures packet integrity and unique restorability
- Dynamic Buffer Management: Handles randomly fragmented packets for improved stability
- Cross-platform: Works on Windows, Linux, and macOS
- Thread-safe: Asynchronous sending with queue-based architecture
- Flexible Data Formats: Supports any binary data format (struct-packed floats, integers, custom protocols)
- Large Packet Support: Theoretical unlimited packet size (limited by system memory)
- Python 3.6 or higher
pyseriallibrary
pip install pyserialSince this is a local package, you can use it directly by placing the pyuart_v7.py file in your project directory, or install it locally:
# Option 1: Copy pyuart_v7.py to your project
cp pyuart_v7.py /your/project/directory/
# Option 2: Install as local package (if setup.py exists)
pip install -e .import pyuart_v7 as uart
import struct
import time
# List available UART ports
ports = uart.list_uart()
print(f"Available ports: {[p.device for p in ports.list]}")
# Initialize UART instance
uart_ins = uart.UartIns()
if not uart_ins.init(ports.list[0].device, 115200):
raise RuntimeError("Failed to initialize UART")
# Initialize communication instance
comu_ins = uart.UartComuIns()
comu_ins.init(uart_ins)
time.sleep(0.1) # Wait for UART stabilization
# Send data
data = struct.pack("<4f", 1.0, 2.0, 3.0, 4.0) # 4 floats
comu_ins.send(data)
# Receive data
received = comu_ins.recv()
if received:
for packet in received:
print(f"Received packet: {packet}")Manages the physical UART connection with asynchronous sending capabilities.
Methods:
init(port: str, baudrate: int = 115200) -> bool: Initialize UART connectiondeinit(): Close UART connection and clean up resourcessend(data: bytes): Asynchronously send data (queued)send_block(data: bytes): Synchronously send data (blocking)recv() -> bytes: Receive raw bytes from UART
Handles packet framing, escaping, and parsing for reliable communication.
Methods:
init(uart: UartIns): Bind to a UART instancesend(data: bytes): Send data as framed packetrecv(overtime: int = 0) -> List[bytes]: Receive and parse packets
list_uart() -> UartListResult: List all available UART ports
The library uses a custom packet format with HDLC/SLIP-style escaping:
Packet Structure:
[HEADER][ESCAPED PAYLOAD][FOOTER]
Special Bytes:
- HEADER: 'A' (0x41)
- FOOTER: 'Z' (0x5A)
- ESCAPE: '_' (0x5F)
Escaping Rules:
- HEADER in payload → ESCAPE + HEADER
- FOOTER in payload → ESCAPE + FOOTER
- ESCAPE in payload → ESCAPE + ESCAPE
import struct
import time
import random
import pyuart_v7 as uart
# List and select first available port
ports = uart.list_uart()
if not ports.list:
raise RuntimeError("No UART devices found")
# Initialize UART
uart_ins = uart.UartIns()
if not uart_ins.init(ports.list[0].device, 115200):
raise RuntimeError("Failed to initialize UART")
# Initialize communication
comu_ins = uart.UartComuIns()
comu_ins.init(uart_ins)
time.sleep(0.1)
# Continuously send sensor data (4 floats)
while True:
sensor_data = [random.uniform(0.0, 10.0) for _ in range(4)]
data_bytes = struct.pack("<4f", *sensor_data)
comu_ins.send(data_bytes)
time.sleep(0.001)import struct
import time
import pyuart_v7 as uart
# List and select second available port
ports = uart.list_uart()
if len(ports.list) < 2:
raise RuntimeError("Need at least 2 UART devices")
# Initialize UART
uart_ins = uart.UartIns()
if not uart_ins.init(ports.list[1].device, 115200):
raise RuntimeError("Failed to initialize UART")
# Initialize communication
comu_ins = uart.UartComuIns()
comu_ins.init(uart_ins)
time.sleep(0.1)
# Continuously receive and decode data
while True:
received_packets = comu_ins.recv()
if received_packets:
for packet in received_packets:
try:
# Decode as 4 floats (adjust based on your protocol)
decoded = struct.unpack("<4f", packet)
print(f"Decoded sensor data: {decoded}")
except struct.error:
print(f"Error decoding packet: {packet.hex()}")import struct
import pyuart_v7 as uart
# Define custom data structure
class SensorData:
def __init__(self, temperature, humidity, pressure):
self.temperature = temperature
self.humidity = humidity
self.pressure = pressure
def pack(self):
# Pack as 3 floats: temperature, humidity, pressure
return struct.pack("<3f", self.temperature, self.humidity, self.pressure)
@staticmethod
def unpack(data):
temp, hum, pres = struct.unpack("<3f", data)
return SensorData(temp, hum, pres)
# Usage
sensor = SensorData(25.5, 60.0, 1013.25)
packed_data = sensor.pack()
# Send packed data
# comu_ins.send(packed_data)
# Receive and unpack
# received = comu_ins.recv()
# if received:
# for packet in received:
# sensor_data = SensorData.unpack(packet)For large data transfers (e.g., images), split data into chunks:
def send_large_data(data: bytes, chunk_size: int = 1024):
total_chunks = (len(data) + chunk_size - 1) // chunk_size
# Send metadata (total chunks, chunk size)
metadata = struct.pack("<II", total_chunks, chunk_size)
comu_ins.send(metadata)
# Send data chunks
for i in range(total_chunks):
chunk = data[i*chunk_size:(i+1)*chunk_size]
comu_ins.send(chunk)
# Optional: wait for acknowledgmentimport pyuart_v7 as uart
try:
ports = uart.list_uart()
if not ports.list:
print("Warning: No UART devices found")
# Handle gracefully
uart_ins = uart.UartIns()
if not uart_ins.init(ports.list[0].device, 115200):
print("Error: UART initialization failed")
# Retry or exit
comu_ins = uart.UartComuIns()
comu_ins.init(uart_ins)
except Exception as e:
print(f"UART setup error: {e}")
# Cleanup or fallback-
No UART devices found
- Check physical connections
- Verify device permissions (Linux: user in
dialoutgroup) - Check device manager (Windows) or
ls /dev/tty*(Linux/macOS)
-
Connection initialization fails
- Verify baudrate matches device settings
- Check if port is already in use
- Try different baudrates (9600, 115200, etc.)
-
Data corruption or incomplete packets
- Ensure both devices use same baudrate
- Check wiring and connections
- Add delays between packets if sending too fast
-
Memory issues with large packets
- Split large data into smaller chunks
- Increase system memory if needed
- Monitor memory usage during transmission
Enable debug prints in the library or add your own:
# Monitor raw received data
data = uart_ins.recv()
if data:
print(f"Raw received: {data.hex()}")- Baudrate: Higher baudrates (115200, 921600) for faster transmission
- Packet Size: Optimal around 64-1024 bytes for most applications
- Buffer Size: Dynamic buffer handles fragmentation but monitor memory
- Thread Safety: Asynchronous sending prevents blocking but adds queue overhead
| Feature | pyuartpkg | pyserial | rosserial |
|---|---|---|---|
| Packet-based | ✅ | ❌ | ✅ |
| Byte escaping | ✅ | ❌ | ✅ |
| Cross-platform | ✅ | ✅ | ✅ |
| ROS dependency | ❌ | ❌ | ✅ |
| Thread-safe sending | ✅ | ❌ | ✅ |
| Dynamic buffering | ✅ | ❌ | ❌ |
This project is licensed under the MIT License - see the LICENSE file for details.
Contributions are welcome! Please feel free to submit a Pull Request.
- Fork the repository
- Create your feature branch (
git checkout -b feature/amazing-feature) - Commit your changes (
git commit -m 'Add some amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
- Built on top of the excellent
pyseriallibrary - Inspired by HDLC and SLIP protocols for reliable serial communication
- Thanks to all contributors and users
pyuartpkg 是一个基于 pyserial 构建的跨平台UART数据包通信库。它提供了一个可靠的高级接口,用于通过串口进行数据包传输,具有自动数据包帧化、转义和缓冲功能。
- 数据包通信:以帧化数据包而非原始字节流传输数据
- HDLC/SLIP风格字节转义:确保数据包完整性和唯一可还原性
- 动态缓冲区管理:处理随机分片的数据包,提高稳定性
- 跨平台:支持Windows、Linux和macOS
- 线程安全:基于队列的异步发送架构
- 灵活数据格式:支持任何二进制数据格式(struct打包的浮点数、整数、自定义协议)
- 大数据包支持:理论无限制数据包大小(受系统内存限制)
- Python 3.6 或更高版本
pyserial库
pip install pyserial由于这是本地包,您可以直接将 pyuart_v7.py 文件放在项目目录中使用,或本地安装:
# 选项1:复制 pyuart_v7.py 到项目目录
cp pyuart_v7.py /your/project/directory/
# 选项2:本地安装(如果存在 setup.py)
pip install -e .import pyuart_v7 as uart
import struct
import time
# 列出可用UART端口
ports = uart.list_uart()
print(f"可用端口: {[p.device for p in ports.list]}")
# 初始化UART实例
uart_ins = uart.UartIns()
if not uart_ins.init(ports.list[0].device, 115200):
raise RuntimeError("UART初始化失败")
# 初始化通信实例
comu_ins = uart.UartComuIns()
comu_ins.init(uart_ins)
time.sleep(0.1) # 等待UART稳定
# 发送数据
data = struct.pack("<4f", 1.0, 2.0, 3.0, 4.0) # 4个浮点数
comu_ins.send(data)
# 接收数据
received = comu_ins.recv()
if received:
for packet in received:
print(f"接收到的数据包: {packet}")管理物理UART连接,具有异步发送能力。
方法:
init(port: str, baudrate: int = 115200) -> bool:初始化UART连接deinit():关闭UART连接并清理资源send(data: bytes):异步发送数据(队列)send_block(data: bytes):同步发送数据(阻塞)recv() -> bytes:从UART接收原始字节
处理数据包帧化、转义和解析,实现可靠通信。
方法:
init(uart: UartIns):绑定到UART实例send(data: bytes):发送帧化数据包recv(overtime: int = 0) -> List[bytes]:接收并解析数据包
list_uart() -> UartListResult:列出所有可用UART端口
库使用自定义数据包格式,具有HDLC/SLIP风格转义:
数据包结构:
[包头][转义的有效载荷][包尾]
特殊字节:
- 包头: 'A' (0x41)
- 包尾: 'Z' (0x5A)
- 转义符: '_' (0x5F)
转义规则:
- 有效载荷中的包头 → 转义符 + 包头
- 有效载荷中的包尾 → 转义符 + 包尾
- 有效载荷中的转义符 → 转义符 + 转义符
import struct
import time
import random
import pyuart_v7 as uart
# 列出并选择第一个可用端口
ports = uart.list_uart()
if not ports.list:
raise RuntimeError("未找到UART设备")
# 初始化UART
uart_ins = uart.UartIns()
if not uart_ins.init(ports.list[0].device, 115200):
raise RuntimeError("UART初始化失败")
# 初始化通信
comu_ins = uart.UartComuIns()
comu_ins.init(uart_ins)
time.sleep(0.1)
# 持续发送传感器数据(4个浮点数)
while True:
sensor_data = [random.uniform(0.0, 10.0) for _ in range(4)]
data_bytes = struct.pack("<4f", *sensor_data)
comu_ins.send(data_bytes)
time.sleep(0.001)import struct
import time
import pyuart_v7 as uart
# 列出并选择第二个可用端口
ports = uart.list_uart()
if len(ports.list) < 2:
raise RuntimeError("需要至少2个UART设备")
# 初始化UART
uart_ins = uart.UartIns()
if not uart_ins.init(ports.list[1].device, 115200):
raise RuntimeError("UART初始化失败")
# 初始化通信
comu_ins = uart.UartComuIns()
comu_ins.init(uart_ins)
time.sleep(0.1)
# 持续接收和解码数据
while True:
received_packets = comu_ins.recv()
if received_packets:
for packet in received_packets:
try:
# 解码为4个浮点数(根据您的协议调整)
decoded = struct.unpack("<4f", packet)
print(f"解码的传感器数据: {decoded}")
except struct.error:
print(f"数据包解码错误: {packet.hex()}")-
未找到UART设备
- 检查物理连接
- 验证设备权限(Linux:用户需要在
dialout组中) - 检查设备管理器(Windows)或
ls /dev/tty*(Linux/macOS)
-
连接初始化失败
- 验证波特率与设备设置匹配
- 检查端口是否已被占用
- 尝试不同的波特率(9600、115200等)