跳至主要內容

python异步编程

pptg大约 7 分钟

1. 关键字 async、await

python中,asyncawait两个关键字用于定义异步操作。

  • async: 定义异步函数,表明该函数是协程,可以在其中使用await来等待其他异步操作完成。异步函数的执行不会阻塞事件循环,而是会立即返回一个协程对昂。
  • await: 用于在异步函数内等待其他协程执行完成/。遇到await时,事件循环会挂起当前的协程,并执行其它任务,直到协程完成后再恢复。

但是直接使用这两个关键字还是没有办法做到并行执行,比如下面的代码,实际上还是顺序执行的,因为每一个await都在等待call执行结束

import asyncio

async def call(name: str):
    print(f"Hello {name}")
    await asyncio.sleep(1)
    print(f"Goodbye {name}")
    
async def main():
    await call("one")
    await call("two")
    await call("three")

if __name__ == "__main__":
    asyncio.run(main())
    
# Hello one
# Goodbye one
# Hello two
# Goodbye two
# Hello three
# Goodbye three

这里可以使用asyncio.gather来并发运行多个任务,并等待他们全部完成

import asyncio

async def call(name: str):
    print(f"Hello {name}")
    await asyncio.sleep(1)
    print(f"Goodbye {name}")
    
async def main():
    await asyncio.gather(call("one"), call("two"), call("three"))

if __name__ == "__main__":
    asyncio.run(main())
    
# Hello one
# Hello two
# Hello three
# Goodbye one
# Goodbye two
# Goodbye three

2. asyncio

上文提到了gatherrun两个函数,在 asyncio 中还有很多其它函数,下面详细介绍这些 asyncio 函数的作用和使用场景:

run

asyncio.run作为程序的启动入口,会自动管理事件循环的生命周期,只应被调用一次。

作用:

  • 运行异步程序的主入口点
  • 创建事件循环、执行协程、最后关闭事件循环
  • 替代旧版的 loop.run_until_complete()

事件循环

python的事件循环,简单理解执行步骤就是:

  1. 注册所有协程任务(coroutines);
  2. 当某个协程执行到 await,就挂起,返回控制权;
  3. 事件循环检测哪个任务的“IO 已就绪”(比如网络读写完成);
  4. 唤醒相应协程继续执行;
  5. 重复此过程直到所有任务完成。

注意

BUT! python的事件循环依然是在一个线程里的

gather

作用:

  • 并发执行多个异步任务
  • 等待所有任务完成并收集结果
  • 保持任务执行顺序和结果顺序一致

create_task

作用:

  • 将协程包装成 Task 对象
  • 立即调度执行(但不等待完成)
  • 返回 Task 对象用于后续控制
import asyncio

async def background_task():
    while True:
        print("Background task running")
        await asyncio.sleep(2)

async def main():
    # 创建后台任务(不等待)
    task = asyncio.create_task(background_task())
    await asyncio.sleep(5)
    task.cancel()

if __name__ == "__main__":
    asyncio.run(main())

to_thread

作用:

  • 在线程池中运行阻塞式函数
  • 避免阻塞事件循环
  • 将同步函数"异步化"
import asyncio

import time

import asyncio
import time

async def async_network_call(name):
    print(f"开始网络请求: {name}")
    await asyncio.sleep(2)  # 模拟网络延迟
    return f"{name}的响应"

def blocking_file_operation(filename):
    print(f"开始文件操作: {filename}")
    time.sleep(1)  # 模拟阻塞文件操作
    return f"{filename}的内容"

async def main():
    start_time = time.time()
    
    # 并发执行异步网络请求
    network_tasks = [
        asyncio.create_task(async_network_call(f"API_{i}"))
        for i in range(3)
    ]
    
    # 在线程中执行阻塞文件操作
    file_task = asyncio.to_thread(blocking_file_operation, "data.txt")
    
    # 等待所有任务完成
    network_results = await asyncio.gather(*network_tasks)
    file_result = await file_task
    
    print("网络结果:", network_results)
    print("文件结果:", file_result)
    print(f"总耗时: {time.time() - start_time:.2f}秒")
    # 总耗时约2秒,而不是 2*3 + 1 = 7秒

asyncio.run(main())

if __name__ == "__main__":
    asyncio.run(main())
    
# 开始网络请求: API_0
# 开始网络请求: API_1
# 开始网络请求: API_2
# 开始文件操作: data.txt
# 网络结果: ['API_0的响应', 'API_1的响应', 'API_2的响应']
# 文件结果: data.txt的内容
# 总耗时: 3.01秒

queue

作用:

  • 线程安全的异步队列
  • 用于生产者-消费者模式
  • 协调多个协程之间的数据传递
import asyncio
import random

async def producer(queue, name):
    for i in range(3):
        item = f"产品_{name}_{i}"
        await asyncio.sleep(random.uniform(0.1, 0.5))
        await queue.put(item)
        print(f"生产者 {name} 生产了: {item}")
    await queue.put(None)  # 结束信号

async def consumer(queue, name):
    while True:
        item = await queue.get()
        if item is None:
            queue.put(None)  # 传递给其他消费者
            break
        print(f"消费者 {name} 消费了: {item}")
        await asyncio.sleep(random.uniform(0.1, 0.3))
        queue.task_done()  # 标记任务完成

async def main():
    queue = asyncio.Queue(maxsize=2)  # 限制队列大小
    
    # 创建生产者和消费者
    producers = [
        asyncio.create_task(producer(queue, "A")),
        asyncio.create_task(producer(queue, "B"))
    ]
    
    consumers = [
        asyncio.create_task(consumer(queue, "X")),
        asyncio.create_task(consumer(queue, "Y"))
    ]
    
    # 等待所有生产者完成
    await asyncio.gather(*producers)
    
    # 等待队列清空
    await queue.join()
    
    # 发送结束信号
    await queue.put(None)

asyncio.run(main())

# 生产者 A 生产了: 产品_A_0
# 消费者 X 消费了: 产品_A_0
# 生产者 B 生产了: 产品_B_0
# 消费者 Y 消费了: 产品_B_0
# 生产者 A 生产了: 产品_A_1
# 消费者 X 消费了: 产品_A_1
# 生产者 B 生产了: 产品_B_1
# 消费者 Y 消费了: 产品_B_1
# 生产者 A 生产了: 产品_A_2
# 消费者 X 消费了: 产品_A_2

wait、wait for

  • wait
    • 等待多个任务完成,提供更细粒度的控制
    • 可以指定完成条件(第一个完成、第一个异常等)
    • 返回已完成和未完成的任务集合
  • wait for
    • 为单个异步操作设置超时时间
    • 超时后抛出 asyncio.TimeoutError
    • 自动取消超时的任务

as_completed

作用:

  • 按照任务完成顺序处理结果
  • 返回一个异步迭代器
  • 适合需要尽快处理完成的任务的场景
import asyncio

import time

import asyncio
import random

async def fetch_data(url, delay):
    print(f"开始获取: {url}")
    await asyncio.sleep(delay)
    return f"{url} 的数据 (耗时 {delay} 秒)"

async def main():
    urls = [
        ("http://example.com/1", 2.0),
        ("http://example.com/2", 0.5),
        ("http://example.com/3", 1.5),
        ("http://example.com/4", 1.0),
        ("http://example.com/5", 3.0)
    ]
    
    # 创建所有任务
    tasks = [fetch_data(url, delay) for url, delay in urls]
    
    print("=== 按完成顺序处理 ===")
    # 按完成顺序处理结果
    for completed_coro in asyncio.as_completed(tasks):
        result = await completed_coro
        print(f"收到: {result}")
    
    print("\n=== 对比: 使用 gather (按提交顺序) ===")
    # 对比:gather 按提交顺序返回结果
    results = await asyncio.gather(*[fetch_data(url, delay) for url, delay in urls])
    for result in results:
        print(f"收到: {result}")

asyncio.run(main())

if __name__ == "__main__":
    asyncio.run(main())
    
# === 按完成顺序处理 ===
# 开始获取: http://example.com/1
# 开始获取: http://example.com/4
# 开始获取: http://example.com/2
# 开始获取: http://example.com/5
# 开始获取: http://example.com/3
# 收到: http://example.com/2 的数据 (耗时 0.5 秒)
# 收到: http://example.com/4 的数据 (耗时 1.0 秒)
# 收到: http://example.com/3 的数据 (耗时 1.5 秒)
# 收到: http://example.com/1 的数据 (耗时 2.0 秒)
# 收到: http://example.com/5 的数据 (耗时 3.0 秒)
# 
# === 对比: 使用 gather (按提交顺序) ===
# 开始获取: http://example.com/1
# 开始获取: http://example.com/2
# 开始获取: http://example.com/3
# 开始获取: http://example.com/4
# 开始获取: http://example.com/5
# 收到: http://example.com/1 的数据 (耗时 2.0 秒)
# 收到: http://example.com/2 的数据 (耗时 0.5 秒)
# 收到: http://example.com/3 的数据 (耗时 1.5 秒)
# 收到: http://example.com/4 的数据 (耗时 1.0 秒)
# 收到: http://example.com/5 的数据 (耗时 3.0 秒)
# === 按完成顺序处理 ===
# 开始获取: http://example.com/3
# 开始获取: http://example.com/4
# 开始获取: http://example.com/2
# 开始获取: http://example.com/1
# 开始获取: http://example.com/5
# 收到: http://example.com/2 的数据 (耗时 0.5 秒)
# 收到: http://example.com/4 的数据 (耗时 1.0 秒)
# 收到: http://example.com/3 的数据 (耗时 1.5 秒)
# 收到: http://example.com/1 的数据 (耗时 2.0 秒)
# 收到: http://example.com/5 的数据 (耗时 3.0 秒)
# 
# === 对比: 使用 gather (按提交顺序) ===
# 开始获取: http://example.com/1
# 开始获取: http://example.com/2
# 开始获取: http://example.com/3
# 开始获取: http://example.com/4
# 开始获取: http://example.com/5
# 收到: http://example.com/1 的数据 (耗时 2.0 秒)
# 收到: http://example.com/2 的数据 (耗时 0.5 秒)
# 收到: http://example.com/3 的数据 (耗时 1.5 秒)
# 收到: http://example.com/4 的数据 (耗时 1.0 秒)
# 收到: http://example.com/5 的数据 (耗时 3.0 秒)

3. Lock、RLock

在 Python 中,Lock 和 RLock 都是用于线程同步的锁机制,但它们在重入行为上有重要区别

Lock 互斥锁

  • 不可重入锁(Non-reentrant lock)
  • 同一个线程不能多次获取同一把锁
  • 简单、轻量级的同步原语

RLock 可重入锁

  • 可重入锁(Reentrant lock)
  • 同一个线程可以多次获取同一把锁
  • 内部维护计数器跟踪获取次数
特性LockRLock
重入性不可重入可重入
需不改变稍快较慢(维护计数器)
死锁风险高(递归调用)