开发者

Python Equivalent of setInterval()?

Does Python have a function similar to JavaScript's setInterval()?

I would like to have:

def set_interval(func, interval):
    ...

That will call func every interval t开发者_JS百科ime units.


This might be the correct snippet you were looking for:

import threading

def set_interval(func, sec):
    def func_wrapper():
        set_interval(func, sec)
        func()
    t = threading.Timer(sec, func_wrapper)
    t.start()
    return t


This is a version where you could start and stop. It is not blocking. There is also no glitch as execution time error is not added (important for long time execution with very short interval as audio for example)

import time, threading

StartTime=time.time()

def action() :
    print('action ! -> time : {:.1f}s'.format(time.time()-StartTime))


class setInterval :
    def __init__(self,interval,action) :
        self.interval=interval
        self.action=action
        self.stopEvent=threading.Event()
        thread=threading.Thread(target=self.__setInterval)
        thread.start()

    def __setInterval(self) :
        nextTime=time.time()+self.interval
        while not self.stopEvent.wait(nextTime-time.time()) :
            nextTime+=self.interval
            self.action()

    def cancel(self) :
        self.stopEvent.set()

# start action every 0.6s
inter=setInterval(0.6,action)
print('just after setInterval -> time : {:.1f}s'.format(time.time()-StartTime))

# will stop interval in 5s
t=threading.Timer(5,inter.cancel)
t.start()

Output is :

just after setInterval -> time : 0.0s
action ! -> time : 0.6s
action ! -> time : 1.2s
action ! -> time : 1.8s
action ! -> time : 2.4s
action ! -> time : 3.0s
action ! -> time : 3.6s
action ! -> time : 4.2s
action ! -> time : 4.8s


Just keep it nice and simple.

import threading

def setInterval(func,time):
    e = threading.Event()
    while not e.wait(time):
        func()

def foo():
    print "hello"

# using
setInterval(foo,5)

# output:
hello
hello
.
.
.

EDIT : This code is non-blocking

import threading

class ThreadJob(threading.Thread):
    def __init__(self,callback,event,interval):
        '''runs the callback function after interval seconds

        :param callback:  callback function to invoke
        :param event: external event for controlling the update operation
        :param interval: time in seconds after which are required to fire the callback
        :type callback: function
        :type interval: int
        '''
        self.callback = callback
        self.event = event
        self.interval = interval
        super(ThreadJob,self).__init__()

    def run(self):
        while not self.event.wait(self.interval):
            self.callback()



event = threading.Event()

def foo():
    print "hello"

k = ThreadJob(foo,event,2)
k.start()

print "It is non-blocking"


Change Nailxx's answer a bit and you got the answer!

from threading import Timer

def hello():
    print "hello, world"
    Timer(30.0, hello).start()

Timer(30.0, hello).start() # after 30 seconds, "hello, world" will be printed


The sched module provides these abilities for general Python code. However, as its documentation suggests, if your code is multithreaded it might make more sense to use the threading.Timer class instead.


I think this is what you're after:

#timertest.py
import sched, time
def dostuff():
  print "stuff is being done!"
  s.enter(3, 1, dostuff, ())

s = sched.scheduler(time.time, time.sleep)
s.enter(3, 1, dostuff, ())
s.run()

If you add another entry to the scheduler at the end of the repeating method, it'll just keep going.


I use sched to create setInterval function gist

import functools
import sched, time

s = sched.scheduler(time.time, time.sleep)

def setInterval(sec):
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*argv, **kw):
            setInterval(sec)(func)
            func(*argv, **kw)
        s.enter(sec, 1, wrapper, ())
        return wrapper
    s.run()
    return decorator


@setInterval(sec=3)
def testInterval():
  print ("test Interval ")

testInterval()


Simple setInterval utils

from threading import Timer

def setInterval(timer, task):
    isStop = task()
    if not isStop:
        Timer(timer, setInterval, [timer, task]).start()

def hello():
    print "do something"
    return False # return True if you want to stop

if __name__ == "__main__":
    setInterval(2.0, hello) # every 2 seconds, "do something" will be printed


The above method didn't quite do it for me as I needed to be able to cancel the interval. I turned the function into a class and came up with the following:

class setInterval():
    def __init__(self, func, sec):
        def func_wrapper():
            self.t = threading.Timer(sec, func_wrapper)
            self.t.start()
            func()
        self.t = threading.Timer(sec, func_wrapper)
        self.t.start()

    def cancel(self):
        self.t.cancel()


Most of the answers above do not shut down the Thread properly. While using Jupyter notebook I noticed that when an explicit interrupt was sent, the threads were still running and worse, they would keep multiplying starting at 1 thread running,2, 4 etc. My method below is based on the answer by @doom but cleanly handles interrupts by running an infinite loop in the Main thread to listen for SIGINT and SIGTERM events

  • No drift
  • Cancelable
  • Handles SIGINT and SIGTERM very well
  • Doesnt make a new thread for every run

Feel free to suggest improvements

import time
import threading
import signal

# Record the time for the purposes of demonstration 
start_time=time.time()

class ProgramKilled(Exception):
    """
    An instance of this custom exception class will be thrown everytime we get an SIGTERM or SIGINT
    """
    pass

# Raise the custom exception whenever SIGINT or SIGTERM is triggered
def signal_handler(signum, frame):
    raise ProgramKilled

# This function serves as the callback triggered on every run of our IntervalThread
def action() :
    print('action ! -> time : {:.1f}s'.format(time.time()-start_time))

# https://stackoverflow.com/questions/2697039/python-equivalent-of-setinterval
class IntervalThread(threading.Thread) :
    def __init__(self,interval,action, *args, **kwargs) :
        super(IntervalThread, self).__init__()
        self.interval=interval
        self.action=action
        self.stopEvent=threading.Event()
        self.start()

    def run(self) :
        nextTime=time.time()+self.interval
        while not self.stopEvent.wait(nextTime-time.time()) :
            nextTime+=self.interval
            self.action()

    def cancel(self) :
        self.stopEvent.set()

def main():

    # Handle SIGINT and SIFTERM with the help of the callback function
    signal.signal(signal.SIGTERM, signal_handler)
    signal.signal(signal.SIGINT, signal_handler)
    # start action every 1s
    inter=IntervalThread(1,action)
    print('just after setInterval -> time : {:.1f}s'.format(time.time()-start_time))

    # will stop interval in 500s
    t=threading.Timer(500,inter.cancel)
    t.start()

    # https://www.g-loaded.eu/2016/11/24/how-to-terminate-running-python-threads-using-signals/
    while True:
        try:
            time.sleep(1)
        except ProgramKilled:
            print("Program killed: running cleanup code")
            inter.cancel()
            break

if __name__ == "__main__":
    main()


In the above solutions if a situation arises where program is shutdown, there is no guarantee that it will shutdown gracefully,Its always recommended to shut a program via a soft kill, neither did most of them have a function to stop I found a nice article on medium written by Sankalp which solves both of these issues (run periodic tasks in python) refer the attached link to get a deeper insight. In the below sample a library named signal is used to track the kill is soft kill or a hard kill

import threading, time, signal

from datetime import timedelta

WAIT_TIME_SECONDS = 1

class ProgramKilled(Exception):
    pass

def foo():
    print time.ctime()

def signal_handler(signum, frame):
    raise ProgramKilled

class Job(threading.Thread):
    def __init__(self, interval, execute, *args, **kwargs):
        threading.Thread.__init__(self)
        self.daemon = False
        self.stopped = threading.Event()
        self.interval = interval
        self.execute = execute
        self.args = args
        self.kwargs = kwargs

    def stop(self):
                self.stopped.set()
                self.join()
    def run(self):
            while not self.stopped.wait(self.interval.total_seconds()):
                self.execute(*self.args, **self.kwargs)

if __name__ == "__main__":
    signal.signal(signal.SIGTERM, signal_handler)
    signal.signal(signal.SIGINT, signal_handler)
    job = Job(interval=timedelta(seconds=WAIT_TIME_SECONDS), execute=foo)
    job.start()

    while True:
          try:
              time.sleep(1)
          except ProgramKilled:
              print "Program killed: running cleanup code"
              job.stop()
              break
#output
#Tue Oct 16 17:47:51 2018
#Tue Oct 16 17:47:52 2018
#Tue Oct 16 17:47:53 2018
#^CProgram killed: running cleanup code


setInterval should be run on multiple thread, and not freeze the task when it running loop.

Here is my RUNTIME package that support multithread feature:

  • setTimeout(F,ms) : timming to fire function in independence thread.
  • delayF(F,ms) : similar setTimeout(F,ms).
  • setInterval(F,ms) : asynchronous loop .pause, .resume : pause and resume the interval
  • clearInterval(interval) : clear the interval

It's short and simple. Note that python need lambda if you input direct the function, but lambda is not support command block, so you should define the function content before put it in the setInterval.

### DEMO PYTHON MULTITHREAD ASYNCHRONOUS LOOP ###

import time;
import threading;
import random;

def delay(ms):time.sleep(ms/1000); # Controil while speed
def setTimeout(R,delayMS):
    t=threading.Timer(delayMS/1000,R)
    t.start();
    return t;
def delayF(R,delayMS):
    t=threading.Timer(delayMS/1000,R)
    t.start();
    return t;

class THREAD:
    def __init__(this):
        this.R_onRun=None;
        this.thread=None;
    def run(this):
        this.thread=threading.Thread(target=this.R_onRun);
        this.thread.start();
    def isRun(this): return this.thread.isAlive();
    
class setInterval :
    def __init__(this,R_onRun,msInterval) :
        this.ms=msInterval;
        this.R_onRun=R_onRun;
        this.kStop=False;
        this.thread=THREAD();
        this.thread.R_onRun=this.Clock;
        this.thread.run();
    def Clock(this) :
        while not this.kStop :
            this.R_onRun();
            delay(this.ms);
    def pause(this) :
        this.kStop=True;
    def stop(this) :
        this.kStop=True;
    def resume(this) :
        if (this.kStop) :
            this.kStop=False;
            this.thread.run();
    
def clearInterval(Timer): Timer.stop();

# EXAMPLE
def p():print(random.random());
tm=setInterval(p,20);
tm2=setInterval(lambda:print("AAAAA"),20);
delayF(tm.pause,1000);
delayF(tm.resume,2000);
delayF(lambda:clearInterval(tm),3000);

Save to file .py and run it. You will see it print both random number and string "AAAAA". The print number thread will pause printing after 1 second and resume print again for 1 second then stop, while the print string keep printing text not corrupt.

In case you use OpenCV for graphic animation with those setInterval for boost animate speed, you must have 1 main thread to apply waitKey, otherwise the window will freeze no matter how slow delay or you applied waitKey in sub thread:

def p:... # Your drawing task
setInterval(p,1); # Subthread1 running draw
setInterval(p,1); # Subthread2 running draw
setInterval(p,1); # Subthread3 running draw
while True: cv2.waitKey(10); # Main thread which waitKey have effect


You can also try out this method:

import time

while True:
    time.sleep(5)
    print("5 seconds has passed")

So it will print "5 seconds has passed" every 5 seconds.

The function sleep() suspends execution for the given number of seconds. The argument may be a floating point number to indicate a more precise sleep time.


Recently, I have the same issue as you. And I find these soluation:

1. you can use the library: threading.Time(this have introduction above)

2. you can use the library: sched(this have introduction above too)

3. you can use the library: Advanced Python Scheduler(Recommend)


Some answers above that uses func_wrapper and threading.Timer indeed work, except that it spawns a new thread every time an interval is called, which is causing memory problems.

The basic example below roughly implemented a similar mechanism by putting interval on a separate thread. It sleeps at the given interval. Before jumping into code, here are some of the limitations that you need to be aware of:

  1. JavaScript is single threaded, so when the function inside setInterval is fired, nothing else will be working at the same time (excluding worker thread, but let's talk general use case of setInterval. Therefore, threading is safe. But here in this implementation, you may encounter race conditions unless using a threading.rLock.

  2. The implementation below uses time.sleep to simulate intervals, but adding the execution time of func, the total time for this interval may be greater than what you expect. So depending on use cases, you may want to "sleep less" (minus time taken for calling func)

  3. I only roughly tested this, and you should definitely not use global variables the way I did, feel free to tweak it so that it fits in your system.


Enough talking, here is the code:

# Python 2.7
import threading
import time


class Interval(object):
    def __init__(self):
        self.daemon_alive = True
        self.thread = None # keep a reference to the thread so that we can "join"

    def ticktock(self, interval, func):
        while self.daemon_alive:
            time.sleep(interval)
            func()

num = 0
def print_num():
    global num
    num += 1
    print 'num + 1 = ', num

def print_negative_num():
    global num
    print '-num = ', num * -1

intervals = {} # keep track of intervals
g_id_counter = 0 # roughly generate ids for intervals

def set_interval(interval, func):
    global g_id_counter

    interval_obj = Interval()
    # Put this interval on a new thread
    t = threading.Thread(target=interval_obj.ticktock, args=(interval, func))
    t.setDaemon(True)
    interval_obj.thread = t
    t.start()

    # Register this interval so that we can clear it later
    # using roughly generated id
    interval_id = g_id_counter
    g_id_counter += 1
    intervals[interval_id] = interval_obj

    # return interval id like it does in JavaScript
    return interval_id

def clear_interval(interval_id):
    # terminate this interval's while loop
    intervals[interval_id].daemon_alive = False
    # kill the thread
    intervals[interval_id].thread.join()
    # pop out the interval from registry for reusing
    intervals.pop(interval_id)

if __name__ == '__main__':
    num_interval = set_interval(1, print_num)
    neg_interval = set_interval(3, print_negative_num)

    time.sleep(10) # Sleep 10 seconds on main thread to let interval run
    clear_interval(num_interval)
    clear_interval(neg_interval)
    print "- Are intervals all cleared?"
    time.sleep(3) # check if both intervals are stopped (not printing)
    print "- Yup, time to get beers"

Expected output:

num + 1 =  1
num + 1 =  2
-num =  -2
 num + 1 =  3
num + 1 =  4
num + 1 =  5
-num =  -5
num + 1 =  6
num + 1 =  7
num + 1 =  8
-num =  -8
num + 1 =  9
num + 1 =  10
-num =  -10
Are intervals all cleared?
Yup, time to get beers


My Python 3 module jsinterval.py will be helpful! Here it is:

"""
Threaded intervals and timeouts from JavaScript
"""

import threading, sys

__all__ =  ['TIMEOUTS', 'INTERVALS', 'setInterval', 'clearInterval', 'setTimeout', 'clearTimeout']

TIMEOUTS  = {}
INTERVALS = {}

last_timeout_id  = 0
last_interval_id = 0

class Timeout:
    """Class for all timeouts."""
    def __init__(self, func, timeout):
        global last_timeout_id
        last_timeout_id += 1
        self.timeout_id = last_timeout_id
        TIMEOUTS[str(self.timeout_id)] = self
        self.func = func
        self.timeout = timeout
        self.threadname = 'Timeout #%s' %self.timeout_id

    def run(self):
        func = self.func
        delx = self.__del__
        def func_wrapper():
            func()
            delx()
        self.t = threading.Timer(self.timeout/1000, func_wrapper)
        self.t.name = self.threadname
        self.t.start()

    def __repr__(self):
        return '<JS Timeout set for %s seconds, launching function %s on timeout reached>' %(self.timeout, repr(self.func))

    def __del__(self):
        self.t.cancel()

class Interval:
    """Class for all intervals."""
    def __init__(self, func, interval):
        global last_interval_id
        self.interval_id = last_interval_id
        INTERVALS[str(self.interval_id)] = self
        last_interval_id += 1
        self.func = func
        self.interval = interval
        self.threadname = 'Interval #%s' %self.interval_id

    def run(self):
        func = self.func
        interval = self.interval
        def func_wrapper():
            timeout = Timeout(func_wrapper, interval)
            self.timeout = timeout
            timeout.run()
            func()
        self.t = threading.Timer(self.interval/1000, func_wrapper)
        self.t.name = self.threadname
        self.t.run()

    def __repr__(self):
        return '<JS Interval, repeating function %s with interval %s>' %(repr(self.func), self.interval)

    def __del__(self):
        self.timeout.__del__()

def setInterval(func, interval):
    """
    Create a JS Interval: func is the function to repeat, interval is the interval (in ms)
    of executing the function.
    """
    temp = Interval(func, interval)
    temp.run()
    idx = int(temp.interval_id)
    del temp
    return idx


def clearInterval(interval_id):
    try:
        INTERVALS[str(interval_id)].__del__()
        del INTERVALS[str(interval_id)]
    except KeyError:
        sys.stderr.write('No such interval "Interval #%s"\n' %interval_id)

def setTimeout(func, timeout):
    """
    Create a JS Timeout: func is the function to timeout, timeout is the timeout (in ms)
    of executing the function.
    """
    temp = Timeout(func, timeout)
    temp.run()
    idx = int(temp.timeout_id)
    del temp
    return idx


def clearTimeout(timeout_id):
    try:
        TIMEOUTS[str(timeout_id)].__del__()
        del TIMEOUTS[str(timeout_id)]
    except KeyError:
        sys.stderr.write('No such timeout "Timeout #%s"\n' %timeout_id)

CODE EDIT: Fixed the memory leak (spotted by @benjaminz). Now ALL threads are cleaned up upon end. Why does this leak happen? It happens because of the implicit (or even explicit) references. In my case, TIMEOUTS and INTERVALS. Timeouts self-clean automatically (after this patch) because they use function wrapper which calls the function and then self-kills. But how does this happen? Objects can't be deleted from memory unless all references are deleted too or gc module is used. Explaining: there's no way to create (in my code) unwanted references to timeouts/intervals. They have only ONE referrer: the TIMEOUTS/INTERVALS dicts. And, when interrupted or finished (only timeouts can finish uninterrupted) they delete the only existing reference to themselves: their corresponding dict element. Classes are perfectly encapsulated using __all__, so no space for memory leaks.


Here is a low time drift solution that uses a thread to periodically signal an Event object. The thread's run() does almost nothing while waiting for a timeout; hence the low time drift.

# Example of low drift (time) periodic execution of a function.
import threading
import time

# Thread that sets 'flag' after 'timeout'
class timerThread (threading.Thread):

    def __init__(self , timeout , flag):
        threading.Thread.__init__(self)
        self.timeout = timeout
        self.stopFlag = False
        self.event = threading.Event()
        self.flag = flag

    # Low drift run(); there is only the 'if'
    # and 'set' methods between waits.
    def run(self):
        while not self.event.wait(self.timeout):
            if self.stopFlag:
                break
            self.flag.set()

    def stop(self):
        stopFlag = True
        self.event.set()

# Data.
printCnt = 0

# Flag to print.
printFlag = threading.Event()

# Create and start the timer thread.
printThread = timerThread(3 , printFlag)
printThread.start()

# Loop to wait for flag and print time.
while True:

    global printCnt

    # Wait for flag.
    printFlag.wait()
    # Flag must be manually cleared.
    printFlag.clear()
    print(time.time())
    printCnt += 1
    if printCnt == 3:
        break;

# Stop the thread and exit.
printThread.stop()
printThread.join()
print('Done')


fall asleep until the next interval of seconds length starts: (not concurrent)

def sleep_until_next_interval(self, seconds):
    now = time.time()
    fall_asleep = seconds - now % seconds
    time.sleep(fall_asleep)

while True:
    sleep_until_next_interval(10) # 10 seconds - worktime
    # work here

simple and no drift.


I have written my code to make a very very flexible setInterval in python. Here you are:

import threading


class AlreadyRunning(Exception):
    pass


class IntervalNotValid(Exception):
    pass


class setInterval():
    def __init__(this, func=None, sec=None, args=[]):
        this.running = False
        this.func = func  # the function to be run
        this.sec = sec            # interval in second
        this.Return = None  # The returned data
        this.args = args
        this.runOnce = None  # asociated with run_once() method
        this.runOnceArgs = None   # asociated with run_once() method

        if (func is not None and sec is not None):
            this.running = True

            if (not callable(func)):
                raise TypeError("non-callable object is given")

            if (not isinstance(sec, int) and not isinstance(sec, float)):
                raise TypeError("A non-numeric object is given")

            this.TIMER = threading.Timer(this.sec, this.loop)
            this.TIMER.start()

    def start(this):
        if (not this.running):
            if (not this.isValid()):
                raise IntervalNotValid("The function and/or the " +
                                       "interval hasn't provided or invalid.")
            this.running = True
            this.TIMER = threading.Timer(this.sec, this.loop)
            this.TIMER.start()
        else:
            raise AlreadyRunning("Tried to run an already run interval")

    def stop(this):
        this.running = False

    def isValid(this):
        if (not callable(this.func)):
            return False

        cond1 = not isinstance(this.sec, int)
        cond2 = not isinstance(this.sec, float)
        if (cond1 and cond2):
            return False
        return True

    def loop(this):

        if (this.running):
            this.TIMER = threading.Timer(this.sec, this.loop)
            this.TIMER.start()
            function_, Args_ = this.func, this.args

            if (this.runOnce is not None):  # someone has provide the run_once
                runOnce, this.runOnce = this.runOnce, None
                result = runOnce(*(this.runOnceArgs))
                this.runOnceArgs = None

                # if and only if the result is False. not accept "None"
                # nor zero.
                if (result is False):
                    return  # cancel the interval right now

            this.Return = function_(*Args_)

    def change_interval(this, sec):

        cond1 = not isinstance(sec, int)
        cond2 = not isinstance(sec, float)
        if (cond1 and cond2):
            raise TypeError("A non-numeric object is given")

        # prevent error when providing interval to a blueprint
        if (this.running):
            this.TIMER.cancel()

        this.sec = sec

        # prevent error when providing interval to a blueprint
        # if the function hasn't provided yet
        if (this.running):
            this.TIMER = threading.Timer(this.sec, this.loop)
            this.TIMER.start()

    def change_next_interval(this, sec):

        if (not isinstance(sec, int) and not isinstance(sec, float)):
            raise TypeError("A non-numeric object is given")

        this.sec = sec

    def change_func(this, func, args=[]):

        if (not callable(func)):
            raise TypeError("non-callable object is given")

        this.func = func
        this.args = args

    def run_once(this, func, args=[]):
        this.runOnce = func
        this.runOnceArgs = args

    def get_return(this):
        return this.Return

You can get many features and flexibility. Running this code won't freeze your code, you can change the interval at run time, you can change the function at run time, you can pass arguments, you can get the returned object from your function, and many more. You can make your tricks too!

here's a very simple and basic example to use it:

import time

def interval(name="world"):
  print(f"Hello {name}!")

# function named interval will be called every two seconds
# output: "Hello world!"
interval1 = setInterval(interval, 2) 

# function named interval will be called every 1.5 seconds
# output: "Hello Jane!"
interval2 = setInterval(interval, 1.5, ["Jane"]) 

time.sleep(5) #stop all intervals after 5 seconds
interval1.stop()
interval2.stop()

Check out my Github project to see more examples and follow next updates :D https://github.com/Hzzkygcs/setInterval-python


Things work differently in Python: you need to either sleep() (if you want to block the current thread) or start a new thread. See http://docs.python.org/library/threading.html


Here's something easy peazy:

import time

delay = 10 # Seconds

def setInterval():
    print('I print in intervals!')
    time.sleep(delay)
    setInterval()


From Python Documentation:

from threading import Timer

def hello():
    print "hello, world"

t = Timer(30.0, hello)
t.start() # after 30 seconds, "hello, world" will be printed
0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜