开发者

How to exit a multithreaded program?

I was just messing around with threading in python, wrote this basic IM thingy [code 开发者_JAVA技巧at bottom]

I noticed that when I kill the program with C-c it doesn't exit, it just hangs forever.

I'm just guessing it's waiting for each thread to finish what they are doing, but since it's an endless loop that will never happen.

So I guess I need to kill each thread manually, or end the loop when the killsignal comes in.

How would I do that?

#!/usr/bin/env python
import threading
import socket

class Listen(threading.Thread):

    def run(self):
        conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        conn.bind(('', 2727))
        conn.listen(1)
        while True:
            channel, details = conn.accept()
            print str(details)+": "+channel.recv(250)
            channel.send("got it")
            channel.close()

class Shout(threading.Thread):

    def run(self):
        while True:
            try:    
                address = raw_input("who u talking to? ")
                conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                conn.connect((address, 2727))
                break
            except:
                print "can't connect to "+ str(address)
        while True:
            conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            conn.connect((address, 2727))
            conn.send(raw_input())
            conn.close()

listen = Listen().start()
shout = Shout().start()


I see several causes of the misbehavior in your code.

  1. Ctrl+C causes a "KeyboardInterrupt" exception in the main thread. So you should handle it there.
  2. Your socket is in blocking mode. This causes several socket functions to block the calling thread until the function returns. During this state the thread cannot react to any termination event.
  3. As you already said: your endless loop in the thread's run() function is ... really endless. So the thread execution is never ending (at least not without an unexpected exception). You should use some kind of synchronization object, like an threading.Event object, to be able to tell a thread externally that it should terminate itself.
  4. I would discourage the use of raw_input() out of the main thread. Imagine what happens when you have more than one Shout thread.
  5. Why are you always closing and reconnecting the socket when a message has been transmitted in your Shout class? Network connections should be re-established only in special cases because of the setup costs.
  6. Without a frame protocol for the communication you can never expect to have received all data that was sent by the other host when the recv() function returns.
  7. The start() function of the thread object does not return a value or object. So saving the returned value (=None) doesn't make much sense.
  8. You can never expect the send() function to transmit all passed data. Therefore you must check the result of the function and appropriately handle the situation when not all bytes were really transmitted.
  9. To learn threading there are surely better problems to solve than network communication, since that topic is in itself really complex.

Beside all these things, here is my try for a solution. Still there is much that can be improved. You should consider the answer from Mark Tolonen too, since the SocketServer class is surely provided to ease several things in handling this kind of stuff. But you should keep on studying the basics too.

#!/usr/bin/env python
import threading
import socket
import time
import errno

class StoppableThread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.stop_event = threading.Event()        

    def stop(self):
        if self.isAlive() == True:
            # set event to signal thread to terminate
            self.stop_event.set()
            # block calling thread until thread really has terminated
            self.join()

class Accept(StoppableThread):
    def __init__(self, port):
        StoppableThread.__init__(self)
        self.port = port
        self.threads = []

    def run(self):     
        # handle connection acception
        conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        conn.bind(('', self.port ))
        conn.listen(5)
        # set socket timeout to ~10ms
        conn.settimeout(0.01)
        while self.stop_event.is_set() == False:
            try:
                csock, caddr = conn.accept()
                # spawn a new thread to handle the client connection
                listen_thread = Listen(csock, caddr)
                self.threads.append(listen_thread)
                listen_thread.start()
            except socket.timeout:
                # socket operation timeout
                # clear all terminated threads from thread list                
                for thread in self.threads:
                    if thread.isAlive() == False:
                        self.threads.remove(thread)

        self.stop_threads()

    def stop_threads(self):
        # stop all running threads
        for listen_thread in self.threads:
            if listen_thread.isAlive() == True:
                listen_thread.stop()
        self.threads = [] 

class Listen(StoppableThread):
    def __init__(self, csock, caddr):
        StoppableThread.__init__(self)
        self.csock = csock
        self.caddr = caddr
        self.csock.setblocking(False)

    def run(self):                
        while self.stop_event.is_set() == False:            
            try:                
                recv_data = self.csock.recv(250)
                if len(recv_data) > 0:       
                    print str(self.caddr)+": " + recv_data
                    self.csock.send("got it")                    
                else:
                    # connection was closed by foreign host
                    self.stop_event.set()
            except socket.error as (sock_errno, sock_errstr):
                if (sock_errno == errno.EWOULDBLOCK):
                    # socket would block - sleep sometime
                    time.sleep(0.1)                    
                else:
                    # unexpected / unhandled error - terminate thread
                    self.stop_event.set()
        channel.close()

class Shout(StoppableThread):
    def __init__(self, sport):
        StoppableThread.__init__(self)
        self.sport = sport

    def run(self):
        while self.stop_event.is_set() == False:
            try:    
                address = raw_input("who u talking to? ")
                conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                conn.connect((address, self.sport))
                break
            except socket.error:
                # handle connection problems
                print "can't connect to "+ str(address)
            except: 
                # exit thread in case of an unexpected error
                self.stop_event.set()

        while self.stop_event.is_set() == False:
            try: 
                # chat loop: send messages to remote host            
                print "what to send? :",
                msg = raw_input()
                # beware: send() function may block indefinitly here and it might not send all bytes as expected !!
                conn.send(msg)
            except:
                # exit thread in case of an unexpected error
                self.stop_event.set()
        # close socket before thread terminates
        conn.close()

def main():
    do_exit = False
    server_port = 2727

    # start server socket thread
    accept = Accept(server_port)
    accept.start()

    # start transmitting client socket thread
    shout = Shout(server_port)
    shout.start()

    while do_exit == False:
        try:
            # sleep some time
            time.sleep(0.1)
        except KeyboardInterrupt:
            # Ctrl+C was hit - exit program
            do_exit = True

    # stop all running threads
    shout.stop()
    accept.stop()

    # exit main program after all threads were terminated gracefully    

if __name__ == "__main__":
    main()


Look at the Python library source for SocketServer.py, in particular the implementation of server_forever() to see how a server implements a quit. It uses select() to poll the server socket for new connections and tests a quit flag. Here's a hack on your source to use SocketServer, and I added a quit flag to Shout(). It will run the Shout and Listen threads for 5 seconds and then stop them.

import socket
import SocketServer
import threading
import time

class Handler(SocketServer.StreamRequestHandler):
    def handle(self):
        print str(self.client_address) + ": " + self.request.recv(250)
        self.request.send("got it\n")

class Listen(threading.Thread):
    def run(self):
        self.server = SocketServer.TCPServer(('',2727),Handler)
        self.server.serve_forever()
    def stop(self):
        self.server.shutdown()

class Shout(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.quit = False
    def run(self):
        while not self.quit:
            conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            conn.connect(('localhost', 2727))
            conn.send('sending\n')
            print conn.recv(100)
            conn.close()
    def stop(self):
        self.quit = True

listen = Listen()
listen.start()
shout = Shout()
shout.start()

time.sleep(5)

shout.stop()
listen.stop()
0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜