How do I stop Tornado web server? [duplicate]
I've been playing around a bit with the Tornado web server and have come to a point where I want to stop the web server (for example during unit testing). The following simple example exists on the Tornado web page:
import tornado.ioloop
import tornado.web
class MainHandler(tornado.web.RequestHandler):
def get(self):
self.write("Hello, world")
application = tornado.web.Application([
(r"/", MainHandler),
])
if __name__ == "__main__":
application.listen(8888)
tornado.ioloop.IOLoop.instance().start()
Once tornado.ioloop.IOLoop.instance().start()
is called, it blocks the program (or current thread). Reading the source code for the IOLoop
object gives this example in the documentation for the stop
function:
To use asynchronous methods from otherwise-synchronous code (such as
unit tests), you can start and stop the event loop like this:
ioloop = IOLoop()
async_method(ioloop=ioloop, callback=ioloop.stop)
ioloop.start()
ioloop.start() will return after async_method has run its callback,
whether that callback was invoked before or after ioloop.start.
However, I have no idea how to integrate this into my program. I actually have a class that encapsulates the web server (having it's own start
and stop
functions), but as soon as I call start, the program (or tests) will of course block anyway.
I've tried to start the web server in another process (using the multiprocessing
package). This is the class that is wrapping the web server:
class Server:
def __init__(self, port=8888):
self.application = tornado.web.Application([ (r"/", Handler) ])
def server_thread(application, port):
http_server = tornado.httpserver.HTTPServer(application)
http_server.listen(port)
tornado.ioloop.IOLoop.instance().start()
self.process = Process(target=server_thread,
args=(self.application, port,))
def start(self):
self.process.start()
def stop(self):
ioloop = tornado.ioloop.IOLoop.instance()
ioloop.add_callback(ioloop.stop)
However, stop does not seem to entirely stop the web server since it is still running in the next test, even with this test setup:
def setup_method(self, _function):
self.server = Server()
self.server.start()
time.sleep(0.5) # Wait for web server to start
def teardown_method(self, _function):
self.kstore.stop()
time.sleep(0.5)
How can I start and stop a Tornado web server from within a Python program?
I just ran into this and found this issue myself, and using info from this thread came up with the following. I simply took my working stand alone Tornado code (copied from all the examples) and moved the actual starting code into a function. I then called the function as a threading thread. My case different as the threading call was done from my existing code where I just imported the startTornado and stopTornado routines.
The suggestion above seemed to work great, so I figured I would supply the missing example code. I tested this code under Linux on a FC16 system (and fixed my initial type-o).
import tornado.ioloop, tornado.web
class Handler(tornado.web.RequestHandler):
def get(self):
self.write("Hello, world")
application = tornado.web.Application([ (r"/", Handler) ])
def startTornado():
application.listen(8888)
tornado.ioloop.IOLoop.instance().start()
def stopTornado():
tornado.ioloop.IOLoop.instance().stop()
if __name__ == "__main__":
import time, threading
threading.Thread(target=startTornado).start()
print "Your web server will self destruct in 2 minutes"
time.sleep(120)
stopTornado()
Hope this helps the next person.
Here is the solution how to stop Torando from another thread. Schildmeijer provided a good hint, but it took me a while to actually figure the final example that works.
Please see below:
import threading
import tornado.ioloop
import tornado.web
import time
class MainHandler(tornado.web.RequestHandler):
def get(self):
self.write("Hello, world!\n")
def start_tornado(*args, **kwargs):
application = tornado.web.Application([
(r"/", MainHandler),
])
application.listen(8888)
print "Starting Torando"
tornado.ioloop.IOLoop.instance().start()
print "Tornado finished"
def stop_tornado():
ioloop = tornado.ioloop.IOLoop.instance()
ioloop.add_callback(ioloop.stop)
print "Asked Tornado to exit"
def main():
t = threading.Thread(target=start_tornado)
t.start()
time.sleep(5)
stop_tornado()
t.join()
if __name__ == "__main__":
main()
In case you do no want to bother with threads, you could catch a keyboard interrupt signal :
try:
tornado.ioloop.IOLoop.instance().start()
# signal : CTRL + BREAK on windows or CTRL + C on linux
except KeyboardInterrupt:
tornado.ioloop.IOLoop.instance().stop()
There is a problem with Zaar Hai's solution, namely that it leaves the socket open. The reason I was looking for a solution to stop Tornado is I'm running unit tests against my app server and I needed a way to start/stop the server between tests to have a clear state (empty session, etc.). By leaving the socket open, the second test always ran into an Address already in use
error. So I came up with the following:
import logging as log
from time import sleep
from threading import Thread
import tornado
from tornado.httpserver import HTTPServer
server = None
thread = None
def start_app():
def start():
global server
server = HTTPServer(create_app())
server.listen(TEST_PORT, TEST_HOST)
tornado.ioloop.IOLoop.instance().start()
global thread
thread = Thread(target=start)
thread.start()
# wait for the server to fully initialize
sleep(0.5)
def stop_app():
server.stop()
# silence StreamClosedError Tornado is throwing after it is stopped
log.getLogger().setLevel(log.FATAL)
ioloop = tornado.ioloop.IOLoop.instance()
ioloop.add_callback(ioloop.stop)
thread.join()
So the main idea here is to keep a reference to the HTTPServer
instance and call its stop()
method. And create_app()
just returns an Application
instance configured with handlers. Now you can use these methods in your unit tests like this:
class FoobarTest(unittest.TestCase):
def setUp(self):
start_app()
def tearDown(self):
stop_app()
def test_foobar(self):
# here the server is up and running, so you can make requests to it
pass
To stop the entire ioloop you simply invoke the ioloop.stop method when you have finished the unit test. (Remember that the only (documented) thread safe method is ioloop.add_callback, ie. if the unit tests is executed by another thread, you could wrap the stop invocation in a callback)
If its enough to stop the http web server you invoke the httpserver.stop() method
If you need this behavior for unit testing, take a look at tornado.testing.AsyncTestCase.
By default, a new IOLoop is constructed for each test and is available as self.io_loop. This IOLoop should be used in the construction of HTTP clients/servers, etc. If the code being tested requires a global IOLoop, subclasses should override get_new_ioloop to return it.
If you need to start and stop an IOLoop for some other purpose and can't call ioloop.stop() from a callback for some reason, a multi-threaded implementation is possible. To avoid race conditions, however, you need to synchronize access to the ioloop, which is actually impossible. Something like the following will result in deadlock:
Thread 1:
with lock:
ioloop.start()
Thread 2:
with lock:
ioloop.stop()
because thread 1 will never release the lock (start() is blocking) and thread 2 will wait till the lock is released to stop the ioloop.
The only way to do it is for thread 2 to call ioloop.add_callback(ioloop.stop), which will call stop() on thread 1 in the event loop's next iteration. Rest assured, ioloop.add_callback() is thread-safe.
Tornado's IOloop.instance() has trouble stopping from an external signal when run under multiprocessing.Process.
The only solution I came up with that works consistently, is by using Process.terminate():
import tornado.ioloop, tornado.web
import time
import multiprocessing
class Handler(tornado.web.RequestHandler):
def get(self):
self.write("Hello, world")
application = tornado.web.Application([ (r"/", Handler) ])
class TornadoStop(Exception):
pass
def stop():
raise TornadoStop
class worker(multiprocessing.Process):
def __init__(self):
multiprocessing.Process.__init__(self)
application.listen(8888)
self.ioloop = tornado.ioloop.IOLoop.instance()
def run(self):
self.ioloop.start()
def stop(self, timeout = 0):
self.ioloop.stop()
time.sleep(timeout)
self.terminate()
if __name__ == "__main__":
w = worker()
print 'starting server'
w.start()
t = 2
print 'waiting {} seconds before stopping'.format(t)
for i in range(t):
time.sleep(1)
print i
print 'stopping'
w.stop(1)
print 'stopped'
We want to use a multiprocessing.Process
with a tornado.ioloop.IOLoop
to work around the cPython GIL for performance and independency. To get access to the IOLoop we need to use Queue
to pass the shutdown signal through.
Here is a minimalistic example:
class Server(BokehServer)
def start(self, signal=None):
logger.info('Starting server on http://localhost:%d'
% (self.port))
if signal is not None:
def shutdown():
if not signal.empty():
self.stop()
tornado.ioloop.PeriodicCallback(shutdown, 1000).start()
BokehServer.start(self)
self.ioloop.start()
def stop(self, *args, **kwargs): # args important for signals
logger.info('Stopping server...')
BokehServer.stop(self)
self.ioloop.stop()
The Process
import multiprocessing as mp
import signal
from server import Server # noqa
class ServerProcess(mp.Process):
def __init__(self, *args, **kwargs):
self.server = Server(*args, **kwargs)
self.shutdown_signal = _mp.Queue(1)
mp.Process.__init__(self)
signal.signal(signal.SIGTERM, self.server.stop)
signal.signal(signal.SIGINT, self.server.stop)
def run(self):
self.server.start(signal=self.shutdown_signal)
def stop(self):
self.shutdown_signal.put(True)
if __name__ == '__main__':
p = ServerProcess()
p.start()
Cheers!
Just add this before the start():
IOLoop.instance().add_timeout(10,IOLoop.instance().stop)
It will register the stop function as a callback in the loop and lauch it 10 second after the start
精彩评论