I/O intensive serial port application: Porting from Threading, Queue based design to Asynchronous (ala Twisted)
So, I've been working on an application for a client that communicates with wireless devices via a Serial (RS-232) "Master". I've currently written the core of the app using threading (below). I've been noticing on #python that the consensus seems to be to NOT use threads and to use Twisted's asynchronous communication abilities.
I haven't been able to find any good examples of using twisted for serial port async I/O communication. However, I have found Dave Peticolas' 'Twisted Introduction' (thanks nosklo) that I'm currently working through, but, it uses sockets instead of serial communication (but the async concept is definitely very well explained).
How would I go about porting this app over to Twisted from using Threading, Queues? Are there any advantages/disadvantages (I have noticed that, on occasion, if a thread hangs it will BSOD the system)?
The Code (msg_poller.py)
from livedatafeed import LiveDataFeed
from msg_build import build_message_to_send
from utils import get_item_from_queue
from protocol_wrapper import ProtocolWrapper, ProtocolStatus
from crc16 import *
import time
import Queue
import threading
import serial
import gc
gc.enable()
PROTOCOL_HEADER = '\x01'
PROTOCOL_FOOTER = '\x0D\x0A'
PROTOCOL_DLE = '\x90'
INITIAL_MODBUS = 0xFFFF
class Poller:
"""
Connects to the serial port and polls nodes for data.
Reads response from node(s) and loads that data into queue.
Parses qdata and writes that data to database.
"""
def __init__(self,
port,
baudrate,
parity,
rtscts,
xonxoff,
echo=False):
try:
self.serial = serial.serial_for_url(port,
baudrate,
parity=parity,
rtscts=rtscts,
xonxoff=xonxoff,
timeout=.01)
except AttributeError:
self.serial = serial.Serial(port,
baudrate,
parity=parity,
rtscts=rtscts,
xonxoff=xonxoff,
timeout=.01)
self.com_data_q = None
self.com_error开发者_StackOverflow_q = None
self.livefeed = LiveDataFeed()
self.timer = time.time()
self.dtr_state = True
self.rts_state = True
self.break_state = False
def start(self):
self.data_q = Queue.Queue()
self.error_q = Queue.Queue()
com_error = get_item_from_queue(self.error_q)
if com_error is not None:
print 'Error %s' % (com_error)
self.timer = time.time()
self.alive = True
# start monitor thread
#
self.mon_thread = threading.Thread(target=self.reader)
self.mon_thread.setDaemon(1)
self.mon_thread.start()
# start sending thread
#
self.trans_thread = threading.Thread(target=self.writer)
self.trans_thread.setDaemon(1)
self.trans_thread.start()
def stop(self):
try:
self.alive = False
self.serial.close()
except (KeyboardInterrupt, SystemExit):
self.alive = False
def reader(self):
"""
Reads data from the serial port using self.mon_thread.
Displays that data on the screen.
"""
from rmsg_format import message_crc, message_format
while self.alive:
try:
while self.serial.inWaiting() != 0:
# Read node data from the serial port. Data should be 96B.
data = self.serial.read(96)
data += self.serial.read(self.serial.inWaiting())
if len(data) > 0:
# Put data in to the data_q object
self.data_q.put(data)
if len(data) == 96:
msg = self.data_q.get()
pw = ProtocolWrapper(
header=PROTOCOL_HEADER,
footer=PROTOCOL_FOOTER,
dle=PROTOCOL_DLE)
status = map(pw.input, msg)
if status[-1] == ProtocolStatus.IN_MSG:
# Feed all the bytes of 'msg' sequentially into pw.input
# Parse the received CRC into a 16-bit integer
rec_crc = message_crc.parse(msg[-4:]).crc
# Compute the CRC on the message
calc_crc = calcString(msg[:-4], INITIAL_MODBUS)
from datetime import datetime
ts = datetime.now().strftime('%Y/%m/%d %H:%M:%S')
if rec_crc != calc_crc:
print ts
print 'ERROR: CRC Mismatch'
print msg.encode('hex')
else:
#msg = message_format.parse(msg[1:])
#print msg.encode('hex') + "\r\n"
msg = message_format.parse(msg[1:])
print msg
#return msg
gc.collect()
time.sleep(.2)
except (KeyboardInterrupt, SystemExit, Exception, TypeError):
self.alive = False
self.serial.close()
raise
def writer(self):
"""
Builds the packet to poll each node for data.
Writes that data to the serial port using self.trans_thread
"""
import time
try:
while self.alive:
try:
dest_module_code = ['DRILLRIG',
'POWERPLANT',
'GENSET',
'MUDPUMP']
dest_ser_no = lambda x: x + 1
for code in dest_module_code:
if code != 'POWERPLANT':
msg = build_message_to_send(
data_len=0x10,
dest_module_code='%s' % (code),
dest_ser_no=dest_ser_no(0),
dest_customer_code='*****',
ret_ser_no=0x01,
ret_module_code='DOGHOUSE',
ret_customer_code='*****',
command='POLL_NODE',
data=[])
self.serial.write(msg)
time.sleep(.2)
gc.collect()
elif code == 'POWERPLANT':
msg = build_message_to_send(
data_len=0x10,
dest_module_code='POWERPLANT',
dest_ser_no=dest_ser_no(0),
dest_customer_code='*****',
ret_ser_no=0x01,
ret_module_code='DOGHOUSE',
ret_customer_code='*****',
command='POLL_NODE',
data=[])
self.serial.write(msg)
time.sleep(.2)
gc.collect()
msg = build_message_to_send(
data_len=0x10,
dest_module_code='POWERPLANT',
dest_ser_no=dest_ser_no(1),
dest_customer_code='*****',
ret_ser_no=0x01,
ret_module_code='DOGHOUSE',
ret_customer_code='*****',
command='POLL_NODE',
data=[])
self.serial.write(msg)
time.sleep(.2)
gc.collect()
except (KeyboardInterrupt, SystemExit):
self.alive = False
self.serial.close()
raise
except (KeyboardInterrupt, SystemExit):
self.alive = False
self.serial.close()
raise
def main():
poller = Poller(
port='COM4',
baudrate=115200,
parity=serial.PARITY_NONE,
rtscts=0,
xonxoff=0,
)
poller.start()
poller.reader()
poller.writer()
poller.stop()
if __name__ == '__main__':
main()
It is very difficult (if not impossible) to write a direct one-to-one mapping program between threading/queue approach and one that uses twisted.
I would suggest that, get a hang of twisted and its reactor way it's use of Protocol and the protocol specific methods. Think about it as as all the asynchronous things that you had been explicitly coding using threads and queues are given to you for free when you are using deferred using twisted.
twisted does seem to support SerialPort over it's reactor using SerialPort transport class and the basic structure seems to be somewhat like this.
from twisted.internet import reactor
from twisted.internet.serialport import SerialPort
SerialPort(YourProtocolClass(), Port, reactor, baudrate=baudrate))
reactor.run()
In YourProtocolClass() would you handle the various events that are specific to your Serial Port Communication requirements. The doc/core/examples directory contains examples such as gpsfix.py and mouse.py.
精彩评论