基于Python+PyQt5实现串口数据采集和显示
目录
- 一、环境搭建
- 1.1 python 3.X安装
- 1.2 安装Pycharm
- 1.3 PyQt5安装
- 1.4 环境配置
- 1.4.1 配置Qt Designer
- 1.4.2 配置PyUIC
- 1.4.3 配置Pyqcc
- 1.5 测试
- 1.5.1 ui_serial_port.ui
- 1.5.2 ui_serial_port.py
- 1.5.3 serial_port.py
- 1.5.4 main.py
- 二、程序设计
- 2.1 需求
- 2.1.1 导航解算报文
- 2.1.2 原始信息报文
- 2.2 界面设计
- 三、程序实现
- 3.1 初始化工作
- 3.1.1 初始化串口设置区域
- 3.1.2 初始化串口接收区域
- 3.1.3 初始化串口数据接收区域
- 3.1.4 初始化串口数据发送区域
- 3.1.5 初始化报文解析区域
- 3.2 打开串口
- 3.2.1 __validate_setting__
- 3.2.2 SerialThread
- 3.2.3 handle_data_received
- 3.2.4 handler_serial_error
- 3.3 接收数据
- 3.4 报文解析
- 3.4.1 get_data_format
- 3.4.2 get_nav_sol
- 3.4.3 get_raw
- 3.5 发送数据
- 3.6 串口线程
- 3.6.1 初始化
- 3.6.2 线程运行
- 3.6.3 接受数据
- 3.6.4 发送数据
本节我们将会通过PyQt5
实现串口数据采集和实时通信,涉及到的技术栈包括:Python
、PyQt5
。
一、环境搭建
1.1 Python 3.X安装
直接从官网下载安装包:Index of /ftp/python/;
这里我下载的包为https://www.python.org/ftp/python/3.9.6/python-3.9.6-amd64.exe
,安装版本:python 3.9.6
。
双击开始安装的时候,一定要把下面的 Add Path
勾上 (表示添加到环境变量,这样cmd
也能使用了),其他一路Next
安装完成。默认会安装一键式工具pip
。
pip
工具镜像源配置。配置方法如下:
- 在
cmd
窗口下执行echo %HOMEPATH%
获取用户HOME
目录,并在该目录下创建pip
目录; - 在
pip
目录下创建pip.ini
文件。记住,后缀必须是.ini
格式。并在该文件中写入如下内容;
内容如下:
[global] index-url = https://pypi.tuna.tsinghua.edu.cn/simple [install] trusted-host = pypi.tuna.tsinghua.edu.cn
1.2 安装Pycharm
官方网站:http://www.jetbrains.com/pycharm/,提供以下安装版本:
Professional
:专业版(收费,网上一大堆破解方法)Community
:社区版(免费,我用的这个),下载版本为pycharm-community-2023.3.4.exe
;
1.3 PyQt5安装
使用pip
工具安装PyQt5
工具,执行:
pip install pyqt5
如果慢,用国内源:
pip install pyqt5 -i https://pypi.tuna.tsinghua.edu.cn/simple
使用pip
工具安装PyQt5-tools
工具,执行:
pip install pyqt5-tools
如果慢,用国内源:
pip install pyqt5-tools -i https://pypi.tuna.tsinghua.edu.cn/simple
工具安装完成后的路径在E:\Program Files\Python\Lib\site-packages
。
PyQt5
主要有三个部分:
QtCore
: 包含了核心的非GUI
的功能。主要和时间、文件与文件夹、各种数据、模型、流、URLs
、mime
类文件、进程与线程一起使用;QtGui
: 包含了窗口系统、事件处理、2D图像、基本绘画、字体和文字类;QtWidgets
: 包含了一些创建桌面的UI
元素和控件;
1.4 环境配置
PyCharm
是开发Python
程序主流常用的IDE。为方便调用Qt Designer
实现界面开发和编译相应完成,可以在PyCharm
配置Qt Designer
和PyUIC
、Pyrcc
。
其中Qt Designer
是Qt
设计师,PyUics
是把UI
界面转换成py
文件,Pyrcc
是资源系统转换。
打开PyCharm
, 新建一个项目,项目名称为serial_port
。
这里Python path
选择我们之前安装的python 3.9.6
的路径:E:\Program Files\Python\python.exe
。
1.4.1 配置Qt Designer
菜单File
->Settings
-> Tools
-> External Tools
-> +
号,进行添加。 参数配置说明:
- Name:填入Qt Designer,实际可以任意取值;
- Program:designer.exe程序绝对路径。根据实际安装路径填写,这里我配置的是E:\Program Files\Python\Lib\site-packages\qt5_applications\Qt\bin\designer.exe;
- Working directory: 填入$FileDir$,固定取值;
具体如下:
1.4.2 配置PyUIC
该工具是用于将Qt Designer
工具开发完成的.ui
文件转化为.py
文件。配置打开路径同Qt Designer
,参数配置说明:
Name
:填入PyUIC
,实际可以任意取值。Program
:python.exe
程序绝对路径,根据实际安装路径填写,这里我配置的是E:\Program Files\Python\python.exe
;Arguments
:-m PyQt5.uic.pyuic $FileName$ -o $FileNameWithoutExtension$.py
;Working directory
: 填入$FileDir$
,固定取值;
具体如下:
1.4.3 配置Pyqcc
配置打开路径同Qt Designer
。参数配置说明:
Name
:填入Pyqcc
,实际可以任意取值。Program
:这里我配置的是E:\Program Files\Python\Scripts\pyrcc5.exe
;Arguments
:$FileName$ -o $FileNameWithoutExtension$_rc.py
;Working directory
: 填入$FileDir$
,固定取值;
具体http://www.devze.com如下:
1.5 测试
测试Qt Designer
和PyUIC
、Pyqcc
配置是否成功。打开路径:菜单栏Tools
->External Tools
->Qt Designer
/PyUIC
/Pyqcc
;
1.5.1 ui_serial_port.ui
点击Qt Designer
,打开Designer
程序主主界面,会弹出一个窗口,这里一般是选择Main Window
或者Widget
,其中Main Window
继承自Widget
,添加了一些内容,本质二者差不多。这里选择的是Main Window
;
将左侧Widget Box
中Push button
空间拖到主界面,Ctrl + S
保存名称ui_serial_port.ui
,默认后缀就是.ui
。
1.5.2 ui_serial_port.py
选中ui_serial_port.ui
文件,同理点击PyUIC
,自动完成ui_serial_port.ui
文件的转换,生成文件名为ui_serial_port.py
。
1.5.3 serial_port.py
除ui
界面代码,还需要有一个逻辑代码,而逻辑代码个人感觉使用类的形式来组织更加方便,也更优雅。
还记得创建ui
时选择的类吗?是Widget
还是Main Window
,逻辑代码类最好是继承这个这个类,即QWidget
或QMainWindow
。一般的代码结构如下所示:
from PyQt5.QtWidgets import QMainWindow # 导入设计的ui界面转换成的py文件 from ui_serial_port import Ui_MainWindow class SerialPort(QMainWindow): """ 串口行为 """ def __init__(self): # QMainWindow构造函数初始化 super().__init__() self.ui = Ui_MainWindow() # 这个函数本身需要传递一个MainWindow类,而该类本身就继承了这个,所以可以直接传入self self.ui.setupUi(self)
1.5.4 main.py
在当前项目下,新建main.py
文件;
import sys from serial_port import SerialPort from PyQt5.QtWidgets import QApplication, QMainWindow if __name__ == '__main__': # 先建立一个app app = QApplication(sys.argv) # 初始化一个对象,调用init函数,已加载设计的ui文件 ui = SerialPort() # 显示这个ui ui.show() # 运行界面,响应按钮等操作 sys.exit(app.exec_())
运行程序:
二、程序设计
2.1 需求
客户这里有一款惯性设备,在惯性装置的AXS31
接口,里面有两路数据,一路称为导航解算,一路称为原始信息。我们通过串口读取该设备的数据并在界面显示处理,同时还需要将读取到的数据保存到文本中。
实际在测试时候发现只能接收到导航解算报文,因此猜测原始信息已经被传感器内部转换为导航解算报文了。
2.1.1 导航解算报文
波特率:230400,数据位8,停止位1,无校验;
字节 | 意义 | 类型 | 所占字节 | 备注 |
---|---|---|---|---|
1-2 | 报文头 | 2字节 | 5a 5a | |
3 | 工作状态 | 1字节 | 0xFF等待对准0x00码头对准0x01海上对准0x02牵引对准0x03 是无阻尼0x04是惯导阻尼0x05 点校0x06 综合校正0x07 位置组合 | |
4 | 参数状态 | 1字节 | B8 =0手动 B8=1自动 | |
5-8 | 运行时间 | 4字节 | 单位0.05s | |
9-11 | 纬度 | 3字节 | 量纲93206.75556 | |
12-14 | 经度 | 3字节 | 量纲46603.37778 | |
15-16 | 升沉 | 2字节 | 最小量纲100m | |
17-18 | 东速 | 2字节 | 最小量纲100kn | |
19-20 | 北速 | 2字节 | 最小量纲100kn | |
21-22 | 垂速 | 2字节 | 最小量纲100m/s | |
23-25 | 姿态角1 | 3字节 | 最小量纲0.25*93206.75556 | |
26-28 | 姿态角2 | 3字节 | 最小量纲93206.75556 | |
29-31 | 姿态角3 | 3字节 | 最小量纲93206.75556 | |
32-34 | 姿态角速率1(纵摇角速率) | 3字节 | 93206.75556 度每秒 | |
35-37 | 姿态角速率2 (横摇角速率) | 3字节 | 93206.75556 度每秒 | |
38-40 | 姿态角速率3(航向角速率) | 3字节 | 93206.75556度每秒 | |
41 | 故障码 | 1字节 | B0=1 IMU接收错B1=1 测角采样错B2=1 接收缓存错B3=1 测角控制板错B4=1 驱动错B5=1 测角错B6=1 激磁错B7=1 转台保护错 | |
42-43 | IMUtime | 2字节 | ||
44-47 | 备用 | 4字节 | ||
48 | 应答标志 | 1字节 | 可忽略 | |
49 | 校验和 | 1字节 | 3-48字节累加和 |
2.1.2 原始信息报文
波特率:230400,数据位8,停止位1,无校验;
字节 | 意义 | 类型 | 所占字节 | 备注 |
---|---|---|---|---|
1-2 | 报文头 | Int | 2字节 | 5a 5a |
3-6 | IMUtime | Int | 4字节 | 整型时戳 |
7-10 | GYROX | float | 4字节 | 直接浮点数 |
11-14 | GYROY | float | 4字节 | 直接浮点数 |
15-18 | GYROZ | float | 4字节 | 直接浮点数 |
19-22 | ACCEX | float | 4字节 | 直接浮点数 |
23-26 | ACCEY | float | 4字节 | 直接浮点数 |
27-30 | ACCEZ | float | 4字节 | 直接浮点数 |
31-34 | 备用 | Int | 4字节 | |
35-37 | 转台角1 | Int | 3字节 | 量纲2.330168888888889*e4° |
38-40 | 转台角2 | Int | 3字节 | 量纲2.330168888888889*e4° |
41-43 | GPS经度 | Int | 3字节 | 量纲93206.75556 |
44-46 | GPS纬度 | Int | 3字节 | 量纲46603.37778 |
47-48 | Para4 | Int | 2字节 | |
49-50 | Para[2]/para[5] | Int | 2字节 | |
51-52 | Para[3]/para[6] | Int | 2字节 | |
53-54 | Para[7] | Int | 2字节 | |
55-56 | Para[8] | Int | 2字节 | |
57 | comdatavalid | Int | 1字节 | |
58 | 校验和 | Int | 1字节 | 3-57和校验 |
2.2 界面设计
首先,我们设计一个简单的用户界面,包括:
- 串口配置区域:【串口】、【波特率】、【数据位】、【停止位】的设置,以及一个按钮用于开始打开和关闭串口;
- 接收设置区域:用于设置接收和发送的数据格式,支持
16
进制以及ASCII
两种格式;这里为了简单起见,程序中发送/接收采用一样的数据格式;16
进制:例如:5a 5a 02 03 5a
;ASCII
格式:例如:DDR V1.12 52218f4949 cym 23/07/0
;
- 数据发送区域:由一个文本框和一个发送按钮组成;
- 数据接收区域:由一个文本域组成;
- 如果接收到的数据时导航解算或者原始信息报文,则将解析后的数据显示在导航结算和原始信息区域;
界面效果如下(参考网上串口助手工具);
三、程序实现
我们将界面原型划分成了五个区域;
- 串口设置区域;
- 接收设置区域;
- 数据发送区域;
- 数据接收区域;
- 导航解算和原始信息区域;
我们针对这五个区域编写相关实现代码,其具体流程如下;
- 使用
Qt Designer
工具按照界面原型设置窗口,主要使用到一些基础控件,比如按钮、文本框、文本域、单选框、复选框等;‘ - 针对窗体各个区域中的控件进行初始化工作;
我们将界面相关的代码均放置在serial_port.pywww.devze.com
文件中,该文件主要包含了如下功能;
- 界面初始化工作;
- 打开串口;
- 接收数据;
- 发送数据。
3.1 初始化工作
3.1.1 初始化串口设置区域
串口设置区域主要由【串口】、【波特率】、【校验位】、【数据位】、【停止位】下拉列表以及【打开串口】按钮组成。
首先需要初始化【串口】、【波特率】、【校验位】、【数据位】、【停止位】下拉列表;
- 串口:下拉列表加载系统当前可用的串口;
- 波特率:下拉列表设置常见的波特率,比如
1200
、2400
、9600
、4800
,9600
,19200
,384000
,57600
,115200
,460800
,921600
,230400
,1500000
等; - 校验位:下拉列表设置校验位为
None
、Odd
、Even
、Mark
、Space
; - 数据位:下拉列表设置为
5
、6
、7
、8
; - 停止位:下拉列表设置为
1
、2
;
设置【打开串口】按钮点击事件对应的槽函数为self.open_serial_connection
,当点击【打开串口】按钮时将会执行该函数;
代码位于serial_port.py
中__init_serial_setting__
,具体如下;
def __init_serial_setting__(self): """ 初始化串口设置相关控件默认参数 设置下拉列表自动补全 https://blog.csdn.net/xuleisdjn/article/details/51434118 :return: """ # 加载可用串口 self.ui.cbx_com.setEditable(False) self.ui.cbx_com.setMaxVisibleItems(10) # 设置最大显示下列项 超过要使用滚动条拖拉 self.ui.cbx_com.setInsertPolicy(QComboBox.InsertAfterCurrent) # 设置插入方式\ port_list = list(serial.tools.list_ports.comports()) # 获取当前的所有串口,得到一个列表 for port in port_list: self.ui.cbx_com.addItem(port.device) # 初始化波特率列表 self.ui.cbx_baud_rate.setEditable(False) self.ui.cbx_baud_rate.setMaxVisibleItems(10) # 设置最大显示下列项 超过要使用滚动条拖拉 self.ui.cbx_baud_rate.setInsertPolicy(QComboBox.InsertAfterCurrent) # 设置插入方式 for baud_rate in [1200, 2400, 4800, 9600, 19200, 384000, 57600, 115200, 460800, 921600, 230400, 1500000]: self.ui.cbx_baud_rate.addItem(str(baud_rate), baud_rate) # 设置默认值 self.ui.cbx_baud_rate.setCurrentIndex(7) # 初始化校验位列表 self.ui.cbx_parity_bit.setEditable(False) self.ui.cbx_parity_bit.setMaxVisibleItems(10) # 设置最大显示下列项 超过要使用滚动条拖拉 self.ui.cbx_parity_bit.setInsertPolicy(QComboBox.InsertAfterCurrent) # 设置插入方式 for (key, value) in {'None': serial.PARITY_NONE, 'Odd': serial.PARITY_ODD, 'Even': serial.PARITY_EVEN, 'Mark': serial.PARITY_MARK, 'Space': serial.PARITY_SPACE}.items(): self.ui.cbx_parity_bit.addItem(key, value) # 设置默认值 self.ui.cbx_parity_bit.setCurrentIndex(0) # 初始化数据位列表 self.ui.cbx_data_bit.setEditable(False) self.ui.cbx_data_bit.setMaxVisibleItems(10) # 设置最大显示下列项 超过要使用滚动条拖拉 self.ui.cbx_data_bit.setInsertPolicy(QComboBox.InsertAfterCurrent) # 设置插入方式 for data_bit in [serial.FIVEBITS, serial.SIXBITS, serial.SEVENBITS, serial.EIGHTBITS]: self.ui.cbx_data_bit.addItem(str(data_bit), data_bit) # 设置默认值 self.ui.cbx_data_bit.setCurrentIndex(3) # 初始化停止位列表 self.ui.cbx_stop_bit.setEditable(False) self.ui.cbx_stop_bit.setMaxVisibleItems(10) # 设置最大显示下列项 超过要使用滚动条拖拉 self.ui.cbx_stop_bit.setInsertPolicy(QComboBox.InsertAfterCurrent) # 设置插入方式 for data_bit in [serial.STOPBITS_ONE, serial.STOPBITS_TWO]: self.ui.cbx_stop_bit.addItem(str(data_bit), data_bit) # 设置默认值 self.ui.cbx_stop_bit.setCurrentIndex(0) # 设置点击打开串口按钮对应的槽函数 self.ui.btx_start.clicked.connect(self.open_serial_connection)
3.1.2 初始化串口接收区域
串口接收区域主要由【Hex
】、【ASCII
】单选框以及【显示时间】复选框组成;
设置发送/接收的数据格式,支持Hex
和ASCII
,Hex
和ASCII
是互斥的,默认选中Hex
,这里我们设置:
- 【
Hex
】单选框点击事件对应的槽函数为self.rbn_data_format_hex_clicked
,当点击【Hex
】单选框时将会执行该函数,在该函数内会记录当前选中的是hex
; - 【
ASCII
】单选框点击事件对应的槽函数为self.rbn_data_format_ascii_clicked
,当点击【ASCII
】单选框时将会执行该函数,在该函数内出记录当前选中的是ascii
;
【显示时间】复选框用于设置串口接收到数据时,是否在接收区域输出当前时间;
代码位于serial_port.py
中__init_recv_setting__
,具体如下;
def __init_recv_setting__(self): """ 接收设置初始化 :return: """ self.ui.rbn_data_format_hex.clicked.connect(self.rbn_data_format_hex_clicked) self.ui.rbn_data_format_ascii.clicked.connect(self.rbn_data_format_ascii_clicked)
其中rbn_data_format_hex_clicked
函数;
def rbn_data_format_hex_clicked(self): """ 接收数据格式发生变化 :return: """ if self.ui.rbn_data_format_hex.isChecked(): self.ui.rbn_data_format_ascii.setChecked(False) if self.serial_thread: self.serial_thread.date_format = 'hex'
其中rbn_data_format_ascii_clicked
函数;
def rbn_data_format_ascii_clicked(self): """ 接收数据格式发生变化 :return: """ if self.ui.rbn_data_format_ascii.isChecked(): self.ui.rbn_data_format_hex.setChecked(False) if self.serial_thread: self.serial_thread.date_format = 'ascii'
3.1.3 初始化串口数据接收区域
串口数据接收区域由一个【文本域控件】组成,用于存放串口接收到的数据,数据长度默认最长为5000字符,超过5000字符自动清空;
这里初始化【文本域控件】为只读,并且串口接收到数据时,自动将滚动条移动至【文本域控件】的最低端;
代码位于serial_port.py
中__init_recv_setting__
,具体如下;
def __init_recv_data_viewer__(self): """ 初始化串口数据接收区域 :return: """ self.ui.txt_recv_data_viewer.setReadOnly(True) self.ui.txt_recv_data_viewer.textChanged.connect( lambda: self.ui.txt_recv_data_viewer.moveCursor(QTextCursor.End))
3.1.4 初始化串口数据发送区域
串口数据接收区域由一个【文本输入框】和一个【发送】按钮组成,文本输入框用于输入要发送的内容,点击发送按钮,会将文本输入框的内容通过当前打开的串口发送出去;
代码位于serial_port.py
中__init_send_data_viewer__
,具体如下;
def __init_send_data_viewer__(self): """ 初始化串口数据发送区域 :return: """ self.ui.btn_send.clicked.connect(self.send_serial_data)
设置【发送】按钮点击事件对应的槽函数为self.send_serial_data
,当点击【发送】按钮时将会执行该函数进行串口数据的发送。
3.1.5 初始化报文解析区域
报文解析区域主要由两部分组成;
- 导航结算区域:该部分主要由【报文头】、【工作状态】、【参数状态】、【运行时间】等文本输入框组成,其与导航解算报文字段一一对应;
- 原始信息区域:该部分主要由【报文头】、【
IMUTime
】、【GYROX
】、【GYROZ
】等文本输入框组成,其与原始信息报文字段一一对应;
代码位于serial_port.py
中__init_package_setting__
,具体如下;
def __init_package_setting__(self): self.ui.let_nav_header.setReadOnly(True) self.ui.let_nav_work_state.setReadOnly(True) self.ui.let_nav_param_state.setReadOnly(True) self.ui.let_nav_run_time.setReadOnly(True) self.ui.let_nav_latitude.setReadOnly(True) self.ui.let_nav_longitude.setReadOnly(True) self.ui.let_nav_heave.setReadOnly(True) self.ui.let_nav_east_speed.setReadOnly(True) self.ui.let_nav_north_speed.setReadOnly(True) self.ui.let_nav_vertical_speed.setReadOnly(True) self.ui.let_nav_attitude_angle1.setReadOnly(True) self.ui.let_nav_attitude_angle2.setReadOnly(True) self.ui.let_nav_attitude_angle3.setReadOnly(True) self.ui.let_nav_attitude_velocity1.setReadOnly(True) self.ui.let_nav_attitude_velocity2.setReadOnly(True) self.ui.let_nav_attitude_velocity3.setReadOnly(True) self.ui.let_nav_fault_code.setReadOnly(True) self.ui.let_nav_imu_time.setReadOnly(True) self.ui.let_nav_reserved.setReadOnly(True) self.ui.let_nav_response_flag.setReadOnly(True) self.ui.let_nav_check_sum.setReadOnly(True) self.ui.let_raw_header.setReadOnly(True) self.ui.let_raw__imu_time.setReadOnly(True) self.ui.let_raw_gyrox.setReadOnly(True) self.ui.let_raw_gyroy.setReadOnly(True) self.ui.let_raw_gyroz.setReadOnly(True) self.ui.let_raw_accex.setReadOnly(True) self.ui.let_raw_accey.setReadOnly(True) self.ui.let_raw_accez.setReadOnly(True) self.ui.let_raw_reserved.setReadOnly(True) self.ui.let_raw_turntable_angle1.setReadOnly(True) self.ui.let_raw_turntable_angle2.setReadOnly(True) self.ui.let_raw_longitude.setReadOnly(True) self.ui.let_raw_latitude.setReadOnly(True) self.ui.let_raw_para4.setReadOnly(True) self.ui.let_raw_para2_5.setReadOnly(True) self.ui.let_raw_para3_6.setReadOnly(True) self.ui.let_raw_para7.setReadOnly(True) self.ui.let_raw_para8.setReadOnly(True) self.ui.let_raw_comdata_valid.setReadOnly(True) self.ui.let_raw_check_sum.setReadOnly(True)
这里我们仅仅是将上述这些控件设置为只读状态。
3.2 打开串口
根据硬件设备参数在串口设置区域进行配置【串口】、【波特率】、【校验位】、【数据位】、【停止位】,配置完成后,当点击【打开串口】按钮,将会执行open_serial_connection
函数;
在open_serial_connection
函数内主要会进行如下工作;
- 获取串口设置区域配置的参数,并对参数进行校验,具体是由
__validate_setting__
函数完成; - 调用
SerialThread
创建串口线程,并将串口设置区域配置参数传递给线程; - 设置串口线程接收到数据时对应的槽函数为
handle_data_received
,即串口接收到数据时会执行handle_data_received
函数; - 设置串口线程发生异常时对应的槽函数为
handler_serial_error
,即串口线程发生异常时,会将错误信息发送给主线程,主线程接收到错误信息,将会在界面显示;
核心代码如下:
def open_serial_connection(self): """ 打开串口 :return: """ if not self.serial_thread or not self.serial_thread.isRunning(): # 参数校验 if not self.__validate_setting__(): return # 建立一个串口 self.serial_thread = SerialThread(self.ui.cbx_com.currentText(), self.ui.cbx_baud_rate.currentData(), self.ui.cbx_data_bit.currentData(), self.ui.cbx_parity_bit.currentData(), self.ui.cbx_stop_bit.currentData(), 'hex' if self.ui.rbn_data_format_hex.isChecked() else 'ascii') self.serial_thread.data_received.connect(self.handle_data_received) self.serial_thread.serial_error.connect(self.handler_serial_error) self.serial_thread.start() self.ui.btx_start.setText("关闭串口") else: self.serial_thread.stop() # 打开串口操作 self.ui.btx_start.setText("打开串口")
3.2.1 __validate_setting__
__validate_setting__
函数主要就是校验【串口】、【波特率】、【校验位】、【数据位】、【停止位】下拉列表是否已经配置,如果未配置将会弹出警告提示信息;
def __validate_setting__(self): """ 校验串口设置参数 :return: """ # 参数校验 if self.ui.cbx_com.currentIndex() == -1: QMessageBox.warning(self, "Warning", "请选择串口!") return False if self.ui.cbx_baud_rate.currentIndex() == -1: QMessageBox.warning(self, "Warning", "请选择波特率!") return False if self.ui.cbx_parity_bit.currentIndex() == -1: QMessageBox.warning(self, "Warning", "请选择校验位!") return False if self.ui.cbx_data_bit.currentIndex() == -1: QMessageBox.warning(self, "Warning", "请选择数据位!") return False if self.ui.cbx_data_bit.currentIndex() == -1: QMessageBox.warning(self, "Warning", "请选择停止位!") return False return True
3.2.2 SerialThread
SerialThread
类的实现位于serial_thread.py
文件中,这块我们后面单独接收。
3.2.3 handle_data_received
当串口接收到数据时会执行handle_data_received
函数,这块我们后面单独接收。
3.2.4 handler_serial_error
串口线程发生异常时,会将错误信息发送给主线程,主线程接收到错误信息,将会在界面显示;具体实现函数为handler_serial_error
;
def handler_serial_error(self, error): """ 串口接收线程异常 :param error: :return: """ QMessageBox.critical(self, '错误', error)
3.3 接收数据
上面我们提到,当串口线程接收到数据时,会将数据通过信号与槽机制传递给主线程,将会由主线程handle_data_received
函数进行处理;
在handle_data_received
函数内主要会进行如下工作;
- 获取当前时间;
- 串口数据接收区域【文本域控件】内容超过
5000
个字符将会清空; - 如果配置了显示当前时间,将会在接收到的数据前面 插入当前时间,并将数据追加到串口数据接收区域【文本域控件】中;
- 如果串口接收区域配置的数据格式为
hex
,将会对接收到的报文信息进行解析;- 解析由
PackageParse
类完成,解析类会根据报文数据长度、报文头、以及校验和等参数判定当前报文格式为导航结算还是原始信息; - 如果为导航结算报文,保存报文到
nav_data.csv
,同时将报文解析到的字段一一填入界面导航结算区域对应的控件中; - 如果为原始信息报文,保存报文到
raw_data.csv
,同时将报文解析到的字段一一填入界面原始信息区域对应的控件中;
- 解析由
- 移除已经解析的报文,截取未解析的报文,重复报文解析的流程,直至报文内容不满足导航结算、原始信息报文格式;
handle_data_received
代码如下:
def handle_data_received(self, data): """ 接收数据 :param data:接收到的数据 :return: """ # 获取时间 current_time = QDateTime.currentDateTime().toString("yyyy-MM-dd hh:mm:ss") # 超过5000字符清空 if len(self.ui.txt_recv_data_viewer.toPlainText()) > 5000: self.ui.txt_recv_data_viewer.clear() # 更新显示区域中的数据 if self.ui.cbx_show_time.isChecked(): self.ui.txt_recv_data_viewer.insertPlainText(f"[{current_time}] {data}") else: self.ui.txt_recv_data_viewer.insertPlainText(f"[{data}") # 必须是hex格式 if self.ui.rbn_data_format_ascii.isChecked(): return # 如果报文最后两位是换行符,则移除 data = data.replace('0d 0a', '') data = bytes.fromhex(data.replace(' ', '')) # 解析data while data is not None: package_parse = PackageParse(data) data_format = package_parse.get_data_format() if data_format == PackageParse.Type.NAl_SOL: # 解析数据 dict_msg = package_parse.get_nav_sol() # 写入当前时间,并保存到文件 dict_msg["current_time"] = current_time save_csv('nav_data.csv', dict_msg) # 界面显示 self.ui.let_nav_header.setText(dict_msg["header"]) self.ui.let_nav_work_state.setText(dict_msg["work_state"]) self.ui.let_nav_param_state.setText(dict_msg["param_state"]) self.ui.let_nav_run_time.setText(str(dict_msg["run_time"])) self.ui.let_nav_latitude.setText(str(dict_msg["latitude"])) self.ui.let_nav_longitude.setText(str(dict_msg["longitude"])) self.ui.let_nav_heave.setText(str(dict_msg["heave"])) self.ui.let_nav_east_speed.setText(str(dict_msg["east_speed"])) self.ui.let_nav_north_speed.setText(str(dict_msg["north_speed"])) self.ui.let_nav_vertical_speed.setText(str(dict_msg["vertical_speed"])) self.ui.let_nav_attitude_angle1.setText(str(dict_msg["attitude_angle1"])) self.ui.let_nav_attitude_angle2.setText(str(dict_msg["attitude_angle2"])) self.ui.let_nav_attitude_angle3.setText(str(dict_msg["attitude_angle3"])) self.ui.let_nav_attitude_velocity1.setText(www.devze.comstr(dict_msg["attitude_velocity1"])) self.ui.let_nav_attitude_velocity2.setText(str(dict_msg["attitude_velocity2"])) self.ui.let_nav_attitude_velocity3.setText(str(dict_msg["attitude_velocity3"])) self.ui.let_nav_fault_code.setText(dict_msg["fault_code"]) self.ui.let_nav_imu_time.setText(dict_msg["imu_time"]) self.ui.let_nav_reserved.setText(dict_msg["reserved"]) self.ui.let_nav_response_flag.setText(dict_msg["response_flag"]) self.ui.let_nav_check_sum.setText(dict_msg["check_sum"]) if data_format == PackageParse.Type.RAW: # 解析数据 data = package_parse.get_raw() # 写入当前时间,并保存到文件 data["current_time"] = current_time save_csv('raw_data.csv', data) # 界面显示 self.ui.let_raw_header.setText(dict_msg["header"]) self.ui.let_raw__imu_time.setText(dict_msg["imu_time"]) self.ui.let_raw_gyrox.setText(dict_msg["gyrox"]) self.ui.let_raw_gyroy.setText(dict_msg["gyroy"]) self.ui.let_raw_gyroz.setText(dict_msg["gyroz"]) self.ui.let_raw_accex.setText(dict_msg["accex"]) self.ui.let_raw_accey.setText(dict_msg["accey"]) self.ui.let_raw_accez.setText(dict_msg["accez"]) self.ui.let_raw_reserved.setText(dict_msg["reserved"]) self.ui.let_raw_turntable_angle1.setText(dict_msg["turntable_angle1"]) self.ui.let_raw_turntable_angle2.setText(dict_msg["turntable_angle2"]) self.ui.let_raw_longitude.setText(dict_msg["longitude"]) self.ui.let_raw_latitude.setText(dict_msg["latitude"]) self.ui.let_raw_para4.setText(dict_msg["para4"]) self.ui.let_raw_para2_5.setText(dict_msg["para2_5"]) self.ui.let_raw_para3_6.setText(dict_msg["para3_6"]) self.ui.let_raw_para7.setText(dict_msg["para7"]) self.ui.let_raw_para8.setText(dict_msg["para8"]) self.ui.let_raw_comdata_valid.setText(dict_msg["comdata_valid"]) self.ui.let_raw_check_sum.setText(dict_msg["check_sum"]) # 一次收到若干个数据包 if data_format != PackageParse.Type.UNkNOWN and len(data) > data_format.value: data = data[data_format.value:] else: data = None
文件保存函数save_csv
:
def save_csv(file_name, dict: {}): """ 保存到csv文件 :param file_name:文件名 :param dict: 数据,字典格式 :return: """ file_exists = os.path.exists(file_name) # Save data to CSV file with open(file_name, 'a', newline='') as csvfile: writer = csv.writer(csvfile) # 如果文件不存在,则写入列名 if not file_exists: writer.writerow(dict.keys()) # 写入数据 writer.writerow(dict.values())
3.4 报文解析
报文解析是由PackageParse
类实现的,代码位于package_parse.py
,在该类内部主要实现了一下功能;
判断当前报文格式是导航结算还是原始信息,由get_data_format
函数实现;实现导航结算报文的解析,由get_nav_sol
函数实现;实现原始信息报文的解析,由get_raw
函数实现;
3.4.1 get_data_format
这里判断报文格式的方法很简单;
- 首先就是长度判定,如果是导航结算报文,长度至少为49个字符;如果是原始信息报文,长度至少是58个字符;
- 接着判断报文前两个字节是否为
0x5a 0X5a
; - 然后判定第58个字节是否为
3~57
个字节的累加和,如果满足条件就是原始信息报文; - 最后判定第49个字节是否为
3~48
个字节的累加和,如果满足条件就是导航结算报文;
具体代码如下:
def validate_nal_sol(self): """ 校验和 导航结算:第49个字节为3-48字节累加和 """ # 取出第3到第48字节 selected_bytes = self.__byte_array[2:48] # 计算累加和并只保留低两位数字 checksum = sum(selected_bytes) & 0xFF return checksum == self.__byte_array[48] def validate_raw(self): """ 校验和 原始信息:第58个字节为3-57字节累加和 """ # 取出第3到第48字节 selected_bytes = self.__byte_array[2:57] # 计算累加和并只保留低两位数字 checksum = sum(selected_bytes) & 0xFF return checksum == self.__byte_array[57] def get_data_format(self): """ 返回数据类型:在惯性装置的AXS31接口,里面有两路数据,一路称为导航解算,一路称为原始数 :return: 0: 导航解算 1:原始信息 2:非法数据 """ # 原始信息报文 if (len(self.__byte_array) >= 58 and self.__byte_array[0] == 0x5a and self.__byte_array[1] == 0x5a and self.validate_raw()): return PackageParse.Type.RAW # 导航解算的报文内容 if (len(self.__byte_array) >= 49 and self.__byte_array[0] == 0x5a and self.__byte_array[1] == 0x5a and self.validate_nal_sol()): return PackageParse.Type.NAl_SOL return PackageParse.Type.UNkNOWN
3.4.2 get_nav_sol
如果报文格式是导航结算,那么调用该方法实现导航结算报文的解析,在函数内部根据字段单位、量纲对原始字节进行了转换;
def translate(hex_array: [], unit=None): """ 将字节数组转换为16进制字符串 [0x0a,0x04]-> 0x0a 0x04 :param hex_array: 字节数组 :param unit 量纲 :return: """ if unit is not None: # 将字节数组按照小端格式转换为数字 num = int.from_bytes(hex_array, byteorder='little') return num / unit return ' '.join(f'0x{val:02X}' for val in hex_array) def get_nav_sol(self): """ 获取导航解算数据 :return: """ if self.get_data_format() != PackageParse.Type.NAl_SOL: print("数据格式错误,非导航解算报文!") return None message_dict = {} # 解析报文头 message_dict['header'] = translate(self.__byte_array[0:2]) # 解析工作状态 message_dict['work_state'] = translate(self.__byte_array[2:3]) # 解析参数状态 message_dict['param_state'] = translate(self.__byte_array[3:4]) # 解析运行时间 message_dict['run_time'] = translate(self.__byte_array[4:8], 20) # 解析纬度 message_dict['latitude'] = translate(self.__byte_array[8:11], 93206.75556) # 解析经度 message_dict['longitude'] = translate(self.__byte_array[11:14], 46603.37778) # 解析升沉 message_dict['heave'] = translate(self.__byte_array[14:16], 100) # 解析东速 message_dict['east_speed'] = translate(self.__byte_array[16:18], 100) # 解析北速 message_dict['north_speed'] = translate(self.__byte_array[18:20], 100) # 解析垂速 message_dict['vertical_speed'] = translate(self.__byte_array[20:22], 100) # 解析姿态角1 message_dict['attitude_angle1'] = translate(self.__byte_array[22:25], 0.25 * 93206.75556) # 解析姿态角2 message_dict['attitude_angle2'] = translate(self.__byte_array[25:28], 93206.75556) # 解析姿态角3 message_dict['attitude_angle3'] = translate(self.__byte_array[28:31], 93206.75556) # 姿态角速率1 message_dict['attitude_velocity1'] = translate(self.__byte_array[31:34], 93206.75556) # 姿态角速率2 message_dict['attitude_velocity2'] = translate(self.__byte_array[34:37], 93206.75556) # 姿态角速率3 message_dict['attitude_velocity3'] = translate(self.__byte_array[37:40], 93206.75556) # 故障码 message_dict['fault_code'] = translate(self.__byte_array[40:41]) # IMU时间 message_dict['imu_time'] = translate(self.__byte_array[41:43]) # 备用 message_dict['reserved'] = translate(self.__byte_array[43:47]) # 应答标志 message_dict['response_fpythonlag'] = translate(self.__byte_array[47:48]) # 校验和 message_dict['check_sum'] = translate(self.__byte_array[48:49]) return message_dict
3.4.3 get_raw
如果报文格式是原始信息,那么调用该方法实现原始信息报文的解析;
def get_raw(self): """ 获取原始信息报文 :return: """ if self.get_data_format() != PackageParse.Type.RAW: print("数据格式错误,非原始信息报文!") return None message_dict = {} # 解析报文头 message_dict['header'] = translate(self.__byte_array[0:2]) # IMUtime message_dict['imu_time'] = translate(self.__byte_array[2:6]) # GYROX message_dict['gyrox'] = translate(self.__byte_array[6:10]) # GYROY message_dict['gyroy'] = translate(self.__byte_array[10:14]) # GYROZ message_dict['gyroz'] = translate(self.__byte_array[14:18]) # ACCEX message_dict['accex'] = translate(self.__byte_array[18:22]) # ACCEY message_dict['accey'] = translate(self.__byte_array[22:26]) # ACCEZ message_dict['accez'] = translate(self.__byte_array[26:30]) # 备用 message_dict['reserved'] = translate(self.__byte_array[30:34]) # 转台角1 message_dict['turntable_angle1'] = translate(self.__byte_array[34:37]) # 转台角2 message_dict['turntable_angle2'] = translate(self.__byte_array[37:40]) # GPS经度 message_dict['longitude'] = translate(self.__byte_array[40:43]) # GPS纬度 message_dict['latitude'] = translate(self.__byte_array[43:46]) # Para[4](电磁/牵引时为航向) message_dict['para4'] = translate(self.__byte_array[46:48]) # Para[2]/para[5] message_dict['para2_5'] = translate(self.__byte_array[48:50]) # Para[3]/para[6] message_dict['para3_6'] = translate(self.__byte_array[50:52]) # Para[7] message_dict['para7'] = translate(self.__byte_array[52:54]) # Para[8] message_dict['para8'] = translate(self.__byte_array[54:56]) # comdatavalid message_dict['comdata_valid'] = translate(self.__byte_array[56:57]) # 校验和 message_dict['check_sum'] = translate(self.__byte_array[57:58]) return message_dict
3.5 发送数据
当用户在串口数据接收区域的【文本输入框】录入内容,点击【发送】按钮时,会调用send_serial_data
方法:
def send_serial_data(self, data: str): """ 发送数据 :return: """ if not self.serial_thread or not self.serial_thread.isRunning(): QMessageBox.warning(self, "Warning", "请先打开串口!") return data = self.ui.let_send_data_viewer.text() if data != "": self.serial_thread.send_data(data) self.ui.let_send_data_viewer.clear()
函数内部首先校验串口是否已经打开,如果串口已经打开并且【文本输入框】输入了内容,将调用串口线程的send_data
方法来进行数据发送。
3.6 串口线程
PyQt
已经为我们提供了串口控件,控件名称为QtSerialPort
,使用方法比较简单,主要是两个模块:QSerialPort
, QSerialPortInfo
,但是这个控件提供的能力有限。
这里我们使用另一个串口包pyserial
来实现:
pip install pyserial -i https://pypi.tuna.tsinghua.edu.cn/simple
为了实现串口数据的读取和发送,我们创建了一个继承自QThread
的SerialThread
类;
from PyQt5.QtCore import QThread,pyqtSignal import serial class SerialThread(QThread): """ 创建一个继承自QThread的SerialThread类,实现串口数据的读取/发送 """ # 用于发送串口数据接收信号 data_received = pyqtSignal(str) # 串口打开/接收异常 serial_error = pyqtSignal(str) .......
3.6.1 初始化
在首次点击【打开串口】按钮时会创建SerialThread
实例,即调用串口线程初始化方法;
def __init__(self, port, baud_rate, data_bits, parity_bits, stop_bits, data_format): """ 初始化 :param port: 串口号 :param baud_rate: 波特率 :param data_bits: 数据位 :param stop_bits: 停止位 :param parity_bits: 奇偶校验位 """ super().__init__() self.port = port self.baud_rate = baud_rate self.data_bits = data_bits self.stop_bits = stop_bits self.parity_bits = parity_bits # 串口已经运行标志位 self.running = False # 串口 self.serial = None # 数据格式 self.__date_format = data_format # 发送开启追加换行 self.__auto_line = True @property def date_format(self): """ #把一个getter方法变成属性 :return: """ return self.__date_format @date_format.setter def date_format(self, value): """ # 负责把一个setter方法变成属性赋值 :param value: :return: """ if not isinstance(value, str): raise ValueError('date_format must be an str') if value not in ['hex', 'ascii']: raise ValueError('date_format must in [hex,ascii]') self.__date_format = value
在这段代码中,主要就是保存构造函数传递过来的参数,比如串口号、波特率等。
3.6.2 线程运行
串口线程初始化完成后,调用start
方法,将会执行run
函数;
def run(self): """ 打开串口 :return: """ # 串口已经运行 if self.running: return try: # 建立一个串口 with serial.Serial(port=self.port, baudrate=self.baud_rate, parity=self.parity_bits, bytesize=self.data_bits, stopbits=self.stop_bits, timeout=2) as self.serial: self.running = True while self.running: data = self.__read_data__() if data: self.data_received.emit(data) except Exception as e: print("打开串口时失败:", e) self.serial_error.emit("打开串口时失败!")
在函数内部我们实际上就是调用了pyserial
提供的Serial
类去实现串口的打开,并在while
循环中,调用__read_data__
获取串口发送过来的数据,如果接受到数据将串口数据通过信号与槽机制推送给主线程。
3.6.3 接受数据
串口数据的读取是由__read_data__
函数完成的,在函数内部我们调用self.serial.readline()
依次读取一行的数据,这里读取到的是字节数组,我们根据我们串口接收区域设置的数据格式来进行解析;
def __read_data__(self): """ 按行接收数据 :return: """ if self.serial is None or not self.serial.isOpen(): return try: # 读取串口数据 例如:b'DDR V1.12 52218f4949 cym 23/07/0' byte_array = self.serial.readline() if len(byte_array) == 0: return None # ascii显示 if self.__date_format == 'ascii': # 串口接收到的字符串为b'ABC',要转化成unicode字符串才能输出到窗口中去 data_str = byte_array.decode('utf-8') else: # 串口接收到的字符串为b'ZZ\x02\x03Z',要转换成16进制字符串显示 data_str = ' '.join(format(x, '02x') for x in byte_array) if self.__auto_line: data_str += '\r\n' return data_str except Exception as e: print("接收数据异常:", e) self.serial_error.emit("接收数据异常!")
3.6.4 发送数据
当用户在串口数据接收区域的【文本输入框】录入内容,点击【发送】按钮时,最终调用的就是串口线程的send_data
方法;我们根据我们串口接收区域设置的数据格式处理将要发送的数据,然后调用self.serial.write
实现串口数据的发送;
def send_data(self, data: str): """ 发送数据 :return: """ if not self.running: self.serial_error.emit("请先打开串口!") return # hex发送 比如:5a 5a 02 03 5a -> b'ZZ\x02\x03Z' if self.__date_format == 'hex': data_str = data.strip() send_list = [] while data_str != '': try: num = int(data_str[0:2], 16) except ValueError: self.serial_error.emit('请输入十六进制数据,以空格分开!') return data_str = data_str[2:].strip() send_list.append(num) if self.__auto_line: send_list.append(0x0d) send_list.append(0x0a) byte_array = bytes(send_list) else: if self.__auto_line: data += '\r\n' # ascii发送 比如:'ABC' -> b'ABC' byte_array = data.encode('utf-8') try: self.serial.write(byte_array) except Exception as e: print("发送失败", e) self.seria编程客栈l_error.emit('发送失败!')
以上就是基于Python+PyQt5实现串口数据采集和显示的详细内容,更多关于Python PyQt5数据采集和显示的资料请关注编程客栈(www.devze.com)其它相关文章!
精彩评论