开发者

使用Python实现一个优雅的异步定时器

目录
  • 需求背景
  • 代码
  • 1. 单例事件循环的实现
  • 2. 事件循环的运行与关闭
  • 3. 定时器核心逻辑
  • 4. 启动与停止
  • 5. 使用示例
  • 设计亮点
  • 适用场景
  • 总结

需求背景

定时器的核心功能是能够周期性地触发回调函数,同时需要支持启动、停止以及状态检查等操作。在多线程或异步编程场景中,希望定时器能够:

  1. 支持异步操作,避免阻塞主线程;
  2. 单例化事件循环,节省资源;
  3. 优雅地管理定时器的生命周期;
  4. 提供简单的接口,易于使用。

为此,设计了一个 Timer 类,结合 asyncio 和 threading,实现了一个高效的定时器。

代码

完整代码

import asyncio
import threading
import time
import sys

class Timer:
    _loop = None
    _thread = None
    _lock = threading.Lock()
    _running_timers = 0

    @classmethod
    def _ensure_loop(cls):
        with cls._lock:
            if cls._loop is None or not cls._thread or not cls._thread.is_alive():
                cls._loop = asyncio.new_event_loop()
                cls._thread = threading.Thread(
                    target=cls._run_loop,
                    args=(cls._loop,),
                    daemon=True
                )
                cls._thread.start()

    @classmethod
    def _run_loop(cls, loop):
        asyncio.set_event_loop(loop)
        try:
            loop.run_forever()
        except Exception as e:
            print(f"事件循环异常: {e}")
        finally:
            loop.close()

    @classmethod
    def _shutdown(cls):
        with cls._lock:
            if cls._running_timers == 0 and cls._loop is not None and cls._loop.is_running():
                cls._loop.call_soon_threadsafe(cls._loop.stop)
                # 不使用 join,因为守护线程会在主线程退出时自动结束

    def __init__(self):
        self.is_running = False
        self._stop_event = asyncio.Event()
        self._task = None

    async def _timer_loop(self, interval, callback):
        try:
            while not self._stop_event.is_set():
                await asyncio.sleep(interval)
                if not self._stop_event.is_set():
                    await asyncio.get_event_loop().run_in_executor(None, callback)
        except asyncio.CancelledError:
            pass  # 正常取消时忽略
        except Exception as e:
            print(f"定时器循环异常: {e}")
        finally:
            self.is_running = False
            Timer._running_timers -= 1
            Timer._shutdown()

    def start(self, interval, callback):
        if not self.is_running:
            Timer._ensure_loop()
            self.is_running = True
            self._stop_event.clear()
            self._task = asyncio.run_coroutine_threadsafe(
                self._timer_loop(interval, callback),
                Timer._loop
            )
            Timer._running_timers += 1
            # print(f"定时器已启动,每{interval}秒执行一次")

    def stop(self):
        if self.is_running:
            self._stop_event.set()
            if self._task:
                Timer._loop.call_soon_threadsafe(self._task.cancel)
            self.is_running = False
            # print("定时器已停止")

    def is_active(self):
        return self.is_running

# 使用示例
def callback1():
    print(f"回调1触发: {time.strftime('%H:%M:%S')}")

def callback2():
    print(f"回调2触发: {time.strftime('%H:%M:%S')}")

if __name__ == "__main__":
    timer1 = Timer()
    timer2 = Timer()

    timer1.start(2, callback1)
    timer2.start(3, callback2)

    try:
        time.sleep(100)
        timer1.stop()
        time.sleep(2)
        timer2.stop()
    except KeyboardInterrupt:
        timer1.stop()
        timer2.stop()
    finally:
        # 确保在程序退出时清理
        Timer._shutdown()

1. 单例事件循环的实现

为了避免每个定时器都创建一个独立的事件循环,在 Timer 类中使用了类变量和类方法来管理全局唯一的事件循环:

class Timer:
    _loop = None
    _thread = None
    _lock = threading.Lock()
    _running_timers = 0

    @classmethod
    def _ensure_loop(cls):
        with cls._lock:
            if cls._loop is None or not cls._thread or not cls._thread.is_alive():
                cls._loop = asyncio.new_event_loop()
                cls._thread = threading.Thread(
                    target=cls._run_loop,
                    args=(cls._loop,),
                    daemon=True
                )
                cls._thread.start()
  • _loop:存储全局的 asyncio 事件循环。
  • _thread:将事件循环运行在一个独立的守护线程中,避免阻塞主线程。
  • _lock:线程锁,确保在多线程环境中创建事件循环时的线程安全。
  • _ensure_loop:在需要时创建或重用事件循环,确保只有一个全局循环。

守护线程(daemon=True)的设计使得程序退出时无需显式关闭线程,简化了资源清理。

2. 事件循环的运行与关闭

javascript

事件循环的运行逻辑封装在 _run_loop 中:

@classmethod
def _run_loop(cls, loop):
    asyncio.set_event_loop(loop)
    try:
        loop.run_forever()
    except Exception as e:
        print(f"事件循环异常: {e}")
    finally:
        loop.close()
  • run_forever:让事件循环持续运行,直到被外部停止。
  • 异常处理:捕获可能的错误并打印,便于调试。
  • finally:确保循环关闭时资源被正确释放。

关闭逻辑则由 _shutdown 方法控制:

@classmethod
def _shutdown(cls):
    with cls._lock:
        if cls._running_timers == 0 and cls._loop is not None and cls._loop.is_running():
            cls._loop.call_soon_threadsafe(cls._loop.stop)

当所有定时器都停止时(_running_timers == 0),事件循环会被安全停止。

3. 定时器核心逻辑

每个 Timer 实例负责管理一个独立的定时任务:

def __init__(self):
    self.is_running = False
    self._stop_event = ajavascriptsyncio.Event()
    self._task = None

async def _timer_loop(self, interval, callback):
    try:
        while not self._stop_event.is_set():
            await asyncio.sleep(interval)
            if not self._stop_event.is_set():
                await asyncio.get_event_loop().run_in_executor(None, callback)
    except asyncio.CancelledError:
        pass  # 正常取消时忽略
    finally:
        self.is_running = False
        Timer._running_timers -= 1
        Timer._shutdown()
  • _stop_event:一个 asyncio.Event 对象,用于控制定时器的停止。
  • _timer_loop:异步协程,每隔 interval 秒执行一次回调函数 callback
  • run_in_executor:将回调函数运行在默认的线程池中,避免阻塞事件循环。

4. 启动与停止

启动和停止方法是用户的主要接口:

def start(self, interval, callback):
    if not self.is_running:
        Timer._ensure_loop()
        self.is_running = True
        self._stop_event.clear()
        self._task = asyncio.run_coroutine_threadsafe(
            self._timer_loop(interval, callback),
            Timer._loop
        )
        Timer._running_timers += 1

def stop(self):
    if self.is_running:
        self._stop_event.set()
        if self._task:
            Timer._loop.callphp_soon_threadsafe(self._task.cancel)
        self.is_running = False
  • start:启动定时器,确保事件循环可用,并记录运行中的定时器数量。
  • stop:通过设置 _stop_event 并取消任务来停止定时器。

5. 使用示例

以下是一个简单的使用示例:

def callback1():
    print(f"回调1触发: {time.strftime('%H:%M:%S')}")

def callback2():
    print(f"回调2触发: {time.strftime('%H:%M:%S')}")

timer1 = Timer()
timer2 = Timer()

timer1.start(2, callback1)  # 每2秒触发一次
timer2.start(3, callback2)  # 每3秒触发一次

time.sleep(10)  # 运行10秒
timer1.stop()
timer2.stop()

输出可能如下:

回调1触发: 14:30:02
回调2触发: 14:30:03
回调1触发: 14:30:04
回调1触发: 14:30:06
回调2触发: 14:30:06
...

设计亮点

  1. 异步与多线程结合:通过 asyncio 和 threading,实现了非阻塞的定时器,适合高并发场景。
  2. 资源高android效利用:全局唯一的事件循环避免了重复创建的开销。
  3. 优雅的生命周期管理:守护线程和自动关闭机制简化了资源清理。
  4. 线程安全:使用锁机制确保多python线程环境下的稳定性。

适用场景

  • 周期性任务调度,如数据刷新、状态检查。
  • 后台服务中的定时监控。
  • 游戏或实时应用中的计时器需求。

总结

这个异步定时器实现结合了 python 的异步编程和多线程特性,提供了一个轻量、灵活的解决方案。无论是简单的脚本还是复杂的后台服务,它都能胜任。如果你需要一个可靠的定时器,不妨试试这个实现,或者根据需求进一步优化它!

以上就是使用Python实现一个优雅的异步定时器的详细内容,更多关于Python异步定时器的资料请关注编程客栈(www.devze.com)其它相关文章!

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新开发

开发排行榜