开发者

Non-blocking console input?

I am trying to make a simple IRC client in Python (as kind of a project while I learn the language).

I have a loop that I use to receive and parse what the IRC server sends me, but if I use raw_input to input stuff, it stops the loop dead in its tracks until I input something (obviously).

How can I input something without the loop stopping?

(I don't think I need to post the code, I just want to input something without the while 1: lo开发者_运维问答op stopping.)

I'm on Windows.


For Windows, console only, use the msvcrt module:

import msvcrt

num = 0
done = False
while not done:
    print(num)
    num += 1

    if msvcrt.kbhit():
        print "you pressed",msvcrt.getch(),"so now i will quit"
        done = True

For Linux, this article describes the following solution, it requires the termios module:

import sys
import select
import tty
import termios

def isData():
    return select.select([sys.stdin], [], [], 0) == ([sys.stdin], [], [])

old_settings = termios.tcgetattr(sys.stdin)
try:
    tty.setcbreak(sys.stdin.fileno())

    i = 0
    while 1:
        print(i)
        i += 1

        if isData():
            c = sys.stdin.read(1)
            if c == '\x1b':         # x1b is ESC
                break

finally:
    termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings)

For cross platform, or in case you want a GUI as well, you can use Pygame:

import pygame
from pygame.locals import *

def display(str):
    text = font.render(str, True, (255, 255, 255), (159, 182, 205))
    textRect = text.get_rect()
    textRect.centerx = screen.get_rect().centerx
    textRect.centery = screen.get_rect().centery

    screen.blit(text, textRect)
    pygame.display.update()

pygame.init()
screen = pygame.display.set_mode( (640,480) )
pygame.display.set_caption('Python numbers')
screen.fill((159, 182, 205))

font = pygame.font.Font(None, 17)

num = 0
done = False
while not done:
    display( str(num) )
    num += 1

    pygame.event.pump()
    keys = pygame.key.get_pressed()
    if keys[K_ESCAPE]:
        done = True


This is the most awesome solution1 I've ever seen. Pasted here in case link goes down:

#!/usr/bin/env python
'''
A Python class implementing KBHIT, the standard keyboard-interrupt poller.
Works transparently on Windows and Posix (Linux, Mac OS X).  Doesn't work
with IDLE.

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as 
published by the Free Software Foundation, either version 3 of the 
License, or (at your option) any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU General Public License for more details.

'''

import os

# Windows
if os.name == 'nt':
    import msvcrt

# Posix (Linux, OS X)
else:
    import sys
    import termios
    import atexit
    from select import select


class KBHit:

    def __init__(self):
        '''Creates a KBHit object that you can call to do various keyboard things.
        '''

        if os.name == 'nt':
            pass

        else:

            # Save the terminal settings
            self.fd = sys.stdin.fileno()
            self.new_term = termios.tcgetattr(self.fd)
            self.old_term = termios.tcgetattr(self.fd)

            # New terminal setting unbuffered
            self.new_term[3] = (self.new_term[3] & ~termios.ICANON & ~termios.ECHO)
            termios.tcsetattr(self.fd, termios.TCSAFLUSH, self.new_term)

            # Support normal-terminal reset at exit
            atexit.register(self.set_normal_term)


    def set_normal_term(self):
        ''' Resets to normal terminal.  On Windows this is a no-op.
        '''

        if os.name == 'nt':
            pass

        else:
            termios.tcsetattr(self.fd, termios.TCSAFLUSH, self.old_term)


    def getch(self):
        ''' Returns a keyboard character after kbhit() has been called.
            Should not be called in the same program as getarrow().
        '''

        s = ''

        if os.name == 'nt':
            return msvcrt.getch().decode('utf-8')

        else:
            return sys.stdin.read(1)


    def getarrow(self):
        ''' Returns an arrow-key code after kbhit() has been called. Codes are
        0 : up
        1 : right
        2 : down
        3 : left
        Should not be called in the same program as getch().
        '''

        if os.name == 'nt':
            msvcrt.getch() # skip 0xE0
            c = msvcrt.getch()
            vals = [72, 77, 80, 75]

        else:
            c = sys.stdin.read(3)[2]
            vals = [65, 67, 66, 68]

        return vals.index(ord(c.decode('utf-8')))


    def kbhit(self):
        ''' Returns True if keyboard character was hit, False otherwise.
        '''
        if os.name == 'nt':
            return msvcrt.kbhit()

        else:
            dr,dw,de = select([sys.stdin], [], [], 0)
            return dr != []


# Test    
if __name__ == "__main__":

    kb = KBHit()

    print('Hit any key, or ESC to exit')

    while True:

        if kb.kbhit():
            c = kb.getch()
            if ord(c) == 27: # ESC
                break
            print(c)

    kb.set_normal_term()

1 Made by Simon D. Levy, part of a compilation of software he has written and released under the Gnu Lesser General Public License.


Here a solution that runs under linux and windows using a seperate thread:

import sys
import threading
import time
import Queue

def add_input(input_queue):
    while True:
        input_queue.put(sys.stdin.read(1))

def foobar():
    input_queue = Queue.Queue()

    input_thread = threading.Thread(target=add_input, args=(input_queue,))
    input_thread.daemon = True
    input_thread.start()

    last_update = time.time()
    while True:

        if time.time()-last_update>0.5:
            sys.stdout.write(".")
            last_update = time.time()

        if not input_queue.empty():
            print "\ninput:", input_queue.get()

foobar()


My favorite to get non-blocking input is using the python input() in a thread:

import threading

class KeyboardThread(threading.Thread):

    def __init__(self, input_cbk = None, name='keyboard-input-thread'):
        self.input_cbk = input_cbk
        super(KeyboardThread, self).__init__(name=name)
        self.start()

    def run(self):
        while True:
            self.input_cbk(input()) #waits to get input + Return

showcounter = 0 #something to demonstrate the change

def my_callback(inp):
    #evaluate the keyboard input
    print('You Entered:', inp, ' Counter is at:', showcounter)

#start the Keyboard thread
kthread = KeyboardThread(my_callback)

while True:
    #the normal program executes without blocking. here just counting up
    showcounter += 1

OS independent, only internal libraries, supports multi-character input


On Linux, here's a refactoring of mizipzor's code that makes this a little easier, in case you have to use this code in multiple places.

import sys
import select
import tty
import termios

class NonBlockingConsole(object):

    def __enter__(self):
        self.old_settings = termios.tcgetattr(sys.stdin)
        tty.setcbreak(sys.stdin.fileno())
        return self

    def __exit__(self, type, value, traceback):
        termios.tcsetattr(sys.stdin, termios.TCSADRAIN, self.old_settings)


    def get_data(self):
        if select.select([sys.stdin], [], [], 0) == ([sys.stdin], [], []):
            return sys.stdin.read(1)
        return False

Here's how to use this: This code will print a counter that keeps growing until you press ESC.

with NonBlockingConsole() as nbc:
    i = 0
    while 1:
        print i
        i += 1
        if nbc.get_data() == '\x1b':  # x1b is ESC
            break


I think curses library can help.

import curses
import datetime

stdscr = curses.initscr()
curses.noecho()
stdscr.nodelay(1) # set getch() non-blocking

stdscr.addstr(0,0,"Press \"p\" to show count, \"q\" to exit...")
line = 1
try:
    while 1:
        c = stdscr.getch()
        if c == ord('p'):
            stdscr.addstr(line,0,"Some text here")
            line += 1
        elif c == ord('q'): break

        """
        Do more things
        """

finally:
    curses.endwin()


.... backing to the initial question ...

i am learning python too, it cost me many documentation and examples readings and head crackings... but i think i reached an easy, simple, short and compatible solution... using just input, lists and threads

'''
what i thought:
- input() in another thread
- that were filling a global strings list
- strings are being popped in the main thread
'''

import threading

consoleBuffer = []

def consoleInput(myBuffer):
  while True:
    myBuffer.append(input())
 
threading.Thread(target=consoleInput, args=(consoleBuffer,), daemon=True).start() # start the thread

import time # just to demonstrate non blocking parallel processing

while True:
  time.sleep(2) # avoid 100% cpu
  print(time.time()) # just to demonstrate non blocking parallel processing
  while consoleBuffer:
    print(repr(consoleBuffer.pop(0)))

until this is the simplest and compatible way i found, be aware by default stdin stdout and stderr share the same terminal so "local echo" of your input may look inconsistent if something is printed on console while you are typing, however after pressing enter the typed string is received well... if you don't want/like this behavior find a way to separate input/output areas like redirections, or try another solution like curses, tkinter, pygame, etc.

BONUS: the ctrl-c keystroke can be easily handled with

try:
  # do whatever
except KeyboardInterrupt:
  print('cancelled by user') or exit() # overload


I'd do what Mickey Chan said, but I'd use unicurses instead of normal curses. Unicurses is universal (works on all or at least almost all operating systems)


With python3.3 and above you can use the asyncio module as mentioned in this answer. You will have to re factor your code though to work with asyncio. Prompt for user input using python asyncio.create_server instance


Since I found one of the answers above helpful, here's an example of a similar approach. This code creates a metronome effect while taking input.

The difference is this code uses a closure instead of a class, which feels a little more straight-forward to me. This example also incorporates a flag to kill the thread via my_thread.stop = True, but without using a global variable. I do this by (ab)using the fact that python functions are objects and thus can be monkey-patched, even from inside themselves.

Note: Stopping threads should be done with caution. If your thread has data that needs some kind of clean up process or if the thread spawned its own threads, this approach will unceremoniously kill those processes.

# Begin metronome sound while accepting input.
# After pressing enter, turn off the metronome sound.
# Press enter again to restart the process.

import threading
import time
import winsound  # Only on Windows

beat_length = 1  # Metronome speed


def beat_thread():
    beat_thread.stop = False  # Monkey-patched flag
    frequency, duration = 2500, 10
    def run():  # Closure
        while not beat_thread.stop:  # Run until flag is True
            winsound.Beep(frequency, duration)
            time.sleep(beat_length - duration/1000)
    threading.Thread(target=run).start()


while True:
    beat_thread()
    input("Input with metronome. Enter to finish.\n")
    beat_thread.stop = True  # Flip monkey-patched flag
    input("Metronome paused. Enter to continue.\n\n")


If you just want a single "escape" from a loop, you can intercept the Ctrl-C signal.

This is cross-platform and very simple!

import signal
import sys

def signal_handler(sig, frame):
    print('You pressed Ctrl+C!')
    sys.exit(0)

signal.signal(signal.SIGINT, signal_handler)
while True:
    # do your work here


The following is an class wrapper around one of the above solutions:

#!/usr/bin/env python3

import threading

import queue

class NonBlockingInput:

    def __init__(self, exit_condition):
        self.exit_condition = exit_condition
        self.input_queue = queue.Queue()
        self.input_thread = threading.Thread(target=self.read_kbd_input, args=(), daemon=True)
        self.input_thread.start()

    def read_kbd_input(self):
        done_queueing_input = False
        while not done_queueing_input:
            console_input = input()
            self.input_queue.put(console_input)
            if console_input.strip() == self.exit_condition:
                done_queueing_input = True

    def input_queued(self):
        return_value = False
        if self.input_queue.qsize() > 0:
            return_value = True
        return return_value

    def input_get(self):
        return_value = ""
        if self.input_queue.qsize() > 0:
            return_value = self.input_queue.get()
        return return_value

if __name__ == '__main__':

    NON_BLOCK_INPUT = NonBlockingInput(exit_condition='quit')

    DONE_PROCESSING = False
    INPUT_STR = ""
    while not DONE_PROCESSING:
        if NON_BLOCK_INPUT.input_queued():
            INPUT_STR = NON_BLOCK_INPUT.input_get()
            if INPUT_STR.strip() == "quit":
                DONE_PROCESSING = True
            else:
                print("{}".format(INPUT_STR))


I was writing a program using Linux that has a bigger mainloop that requires regular updates but also needs to read characters in a non-blocking way. But resetting the display, also loses the input buffer. This is the solution that I came up with. Every time after the screen is updated it sets the terminal to non-blocking, waits for the mainloop to pass and then interprets stdin. After that the terminal gets reset to the original settings.

#!/usr/bin/python3
import sys, select, os, tty, termios, time

i = 0
l = True
oldtty = termios.tcgetattr(sys.stdin)
stdin_no = sys.stdin.fileno()

while l:
    os.system('clear')
    print("I'm doing stuff. Press a 'q' to stop me!")
    print(i)
    tty.setcbreak(stdin_no)
    time.sleep(0.5)
    if sys.stdin in select.select([sys.stdin], [], [], 0.0)[0]:
        line = sys.stdin.read(1)
        print (line, len(line))
        
        if "q" in line:
            l = False
        else: 
            pass
    termios.tcsetattr(stdin_no, termios.TCSADRAIN, oldtty)
    i += 1



The Solution by marco is the right idea, but I decided to simplify it to the minimal possible code without any classes. Also it actually shows you how to get the user input with the queue library instead of just printing it:

import time, threading, queue


def collect(que):
    msg = input()
    que.put(msg)

que = queue.Queue()
thread = threading.Thread(target=collect, args=[que])
thread.start()

while thread.is_alive():
    time.sleep(1)
    print("The main thread continues while we wait for you...")

msg = que.get()
print('You typed:', msg)

In this example, the main thread continues indefinitely (processing data or whatever), while periodically checking to see if the user has input any data in the spawned thread. When that happens it returns the user input.

I've successfully used this idea in my own script to create a debugger, where I can type "print variable name" at any point during the main loop and it gives me the values in real time without stopping.


My example below does allow for non-blocking reads from stdin under both Windows (only tested under Windows 10) and Linux without requiring external dependencies or using threading. It works for copypasted text, it disables ECHO, so it could be used for e.g. some sort of custom UI and uses a loop, so it would be easy to process anything that was input into it.

With the above in mind, the example is meant for an interactive TTY, not piped input.

#!/usr/bin/env python3
import sys

if(sys.platform == "win32"):
    import msvcrt
    import ctypes
    from ctypes import wintypes
    kernel32 = ctypes.windll.kernel32
    oldStdinMode = ctypes.wintypes.DWORD()
    # Windows standard handle -10 refers to stdin
    kernel32.GetConsoleMode(kernel32.GetStdHandle(-10), ctypes.byref(oldStdinMode))
    # Disable ECHO and line-mode
    # https://learn.microsoft.com/en-us/windows/console/setconsolemode
    kernel32.SetConsoleMode(kernel32.GetStdHandle(-10), 0)
else:
    # POSIX uses termios
    import select, termios, tty
    oldStdinMode = termios.tcgetattr(sys.stdin)
    _ = termios.tcgetattr(sys.stdin)
    # Disable ECHO and line-mode
    _[3] = _[3] & ~(termios.ECHO | termios.ICANON)
    # Don't block on stdin.read()
    _[6][termios.VMIN] = 0
    _[6][termios.VTIME] = 0
    termios.tcsetattr(sys.stdin, termios.TCSAFLUSH, _)

def readStdin():
    if(sys.platform == "win32"):
        return msvcrt.getwch() if(msvcrt.kbhit()) else ""
    else:
        return sys.stdin.read(1)

def flushStdin():
    if(sys.platform == "win32"):
        kernel32.FlushConsoleInputBuffer(kernel32.GetStdHandle(-10))
    else:
        termios.tcflush(sys.stdin, termios.TCIFLUSH)

try:
    userInput = ""
    print("Type something: ", end = "", flush = True)
    flushStdin()
    while 1:
        peek = readStdin()
        if(len(peek) > 0):
            # Stop input on NUL, Ctrl+C, ESC, carriage return, newline, backspace, EOF, EOT
            if(peek not in ["\0", "\3", "\x1b", "\r", "\n", "\b", "\x1a", "\4"]):
                userInput += peek
                # This is just to show the user what they typed.
                # Can be skipped, if one doesn't need this.
                sys.stdout.write(peek)
                sys.stdout.flush()
            else:
                break
    flushStdin()
    print(f"\nuserInput length: {len(userInput)}, contents: \"{userInput}\"")
finally:
    if(sys.platform == "win32"):
        kernel32.SetConsoleMode(kernel32.GetStdHandle(-10), oldStdinMode)
    else:
        termios.tcsetattr(sys.stdin, termios.TCSAFLUSH, oldStdinMode)
0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜