python异步编程
大约 7 分钟
1. 关键字 async、await
python中,async和await两个关键字用于定义异步操作。
async: 定义异步函数,表明该函数是协程,可以在其中使用await来等待其他异步操作完成。异步函数的执行不会阻塞事件循环,而是会立即返回一个协程对昂。await: 用于在异步函数内等待其他协程执行完成/。遇到await时,事件循环会挂起当前的协程,并执行其它任务,直到协程完成后再恢复。
但是直接使用这两个关键字还是没有办法做到并行执行,比如下面的代码,实际上还是顺序执行的,因为每一个await都在等待call执行结束
import asyncio
async def call(name: str):
print(f"Hello {name}")
await asyncio.sleep(1)
print(f"Goodbye {name}")
async def main():
await call("one")
await call("two")
await call("three")
if __name__ == "__main__":
asyncio.run(main())
# Hello one
# Goodbye one
# Hello two
# Goodbye two
# Hello three
# Goodbye three
这里可以使用asyncio.gather来并发运行多个任务,并等待他们全部完成
import asyncio
async def call(name: str):
print(f"Hello {name}")
await asyncio.sleep(1)
print(f"Goodbye {name}")
async def main():
await asyncio.gather(call("one"), call("two"), call("three"))
if __name__ == "__main__":
asyncio.run(main())
# Hello one
# Hello two
# Hello three
# Goodbye one
# Goodbye two
# Goodbye three
2. asyncio
上文提到了gather和run两个函数,在 asyncio 中还有很多其它函数,下面详细介绍这些 asyncio 函数的作用和使用场景:
run
asyncio.run作为程序的启动入口,会自动管理事件循环的生命周期,只应被调用一次。
作用:
- 运行异步程序的主入口点
- 创建事件循环、执行协程、最后关闭事件循环
- 替代旧版的
loop.run_until_complete()
事件循环
python的事件循环,简单理解执行步骤就是:
- 注册所有协程任务(coroutines);
- 当某个协程执行到 await,就挂起,返回控制权;
- 事件循环检测哪个任务的“IO 已就绪”(比如网络读写完成);
- 唤醒相应协程继续执行;
- 重复此过程直到所有任务完成。
注意
BUT! python的事件循环依然是在一个线程里的
gather
作用:
- 并发执行多个异步任务
- 等待所有任务完成并收集结果
- 保持任务执行顺序和结果顺序一致
create_task
作用:
- 将协程包装成 Task 对象
- 立即调度执行(但不等待完成)
- 返回 Task 对象用于后续控制
import asyncio
async def background_task():
while True:
print("Background task running")
await asyncio.sleep(2)
async def main():
# 创建后台任务(不等待)
task = asyncio.create_task(background_task())
await asyncio.sleep(5)
task.cancel()
if __name__ == "__main__":
asyncio.run(main())
to_thread
作用:
- 在线程池中运行阻塞式函数
- 避免阻塞事件循环
- 将同步函数"异步化"
import asyncio
import time
import asyncio
import time
async def async_network_call(name):
print(f"开始网络请求: {name}")
await asyncio.sleep(2) # 模拟网络延迟
return f"{name}的响应"
def blocking_file_operation(filename):
print(f"开始文件操作: {filename}")
time.sleep(1) # 模拟阻塞文件操作
return f"{filename}的内容"
async def main():
start_time = time.time()
# 并发执行异步网络请求
network_tasks = [
asyncio.create_task(async_network_call(f"API_{i}"))
for i in range(3)
]
# 在线程中执行阻塞文件操作
file_task = asyncio.to_thread(blocking_file_operation, "data.txt")
# 等待所有任务完成
network_results = await asyncio.gather(*network_tasks)
file_result = await file_task
print("网络结果:", network_results)
print("文件结果:", file_result)
print(f"总耗时: {time.time() - start_time:.2f}秒")
# 总耗时约2秒,而不是 2*3 + 1 = 7秒
asyncio.run(main())
if __name__ == "__main__":
asyncio.run(main())
# 开始网络请求: API_0
# 开始网络请求: API_1
# 开始网络请求: API_2
# 开始文件操作: data.txt
# 网络结果: ['API_0的响应', 'API_1的响应', 'API_2的响应']
# 文件结果: data.txt的内容
# 总耗时: 3.01秒
queue
作用:
- 线程安全的异步队列
- 用于生产者-消费者模式
- 协调多个协程之间的数据传递
import asyncio
import random
async def producer(queue, name):
for i in range(3):
item = f"产品_{name}_{i}"
await asyncio.sleep(random.uniform(0.1, 0.5))
await queue.put(item)
print(f"生产者 {name} 生产了: {item}")
await queue.put(None) # 结束信号
async def consumer(queue, name):
while True:
item = await queue.get()
if item is None:
queue.put(None) # 传递给其他消费者
break
print(f"消费者 {name} 消费了: {item}")
await asyncio.sleep(random.uniform(0.1, 0.3))
queue.task_done() # 标记任务完成
async def main():
queue = asyncio.Queue(maxsize=2) # 限制队列大小
# 创建生产者和消费者
producers = [
asyncio.create_task(producer(queue, "A")),
asyncio.create_task(producer(queue, "B"))
]
consumers = [
asyncio.create_task(consumer(queue, "X")),
asyncio.create_task(consumer(queue, "Y"))
]
# 等待所有生产者完成
await asyncio.gather(*producers)
# 等待队列清空
await queue.join()
# 发送结束信号
await queue.put(None)
asyncio.run(main())
# 生产者 A 生产了: 产品_A_0
# 消费者 X 消费了: 产品_A_0
# 生产者 B 生产了: 产品_B_0
# 消费者 Y 消费了: 产品_B_0
# 生产者 A 生产了: 产品_A_1
# 消费者 X 消费了: 产品_A_1
# 生产者 B 生产了: 产品_B_1
# 消费者 Y 消费了: 产品_B_1
# 生产者 A 生产了: 产品_A_2
# 消费者 X 消费了: 产品_A_2
wait、wait for
- wait
- 等待多个任务完成,提供更细粒度的控制
- 可以指定完成条件(第一个完成、第一个异常等)
- 返回已完成和未完成的任务集合
- wait for
- 为单个异步操作设置超时时间
- 超时后抛出 asyncio.TimeoutError
- 自动取消超时的任务
as_completed
作用:
- 按照任务完成顺序处理结果
- 返回一个异步迭代器
- 适合需要尽快处理完成的任务的场景
import asyncio
import time
import asyncio
import random
async def fetch_data(url, delay):
print(f"开始获取: {url}")
await asyncio.sleep(delay)
return f"{url} 的数据 (耗时 {delay} 秒)"
async def main():
urls = [
("http://example.com/1", 2.0),
("http://example.com/2", 0.5),
("http://example.com/3", 1.5),
("http://example.com/4", 1.0),
("http://example.com/5", 3.0)
]
# 创建所有任务
tasks = [fetch_data(url, delay) for url, delay in urls]
print("=== 按完成顺序处理 ===")
# 按完成顺序处理结果
for completed_coro in asyncio.as_completed(tasks):
result = await completed_coro
print(f"收到: {result}")
print("\n=== 对比: 使用 gather (按提交顺序) ===")
# 对比:gather 按提交顺序返回结果
results = await asyncio.gather(*[fetch_data(url, delay) for url, delay in urls])
for result in results:
print(f"收到: {result}")
asyncio.run(main())
if __name__ == "__main__":
asyncio.run(main())
# === 按完成顺序处理 ===
# 开始获取: http://example.com/1
# 开始获取: http://example.com/4
# 开始获取: http://example.com/2
# 开始获取: http://example.com/5
# 开始获取: http://example.com/3
# 收到: http://example.com/2 的数据 (耗时 0.5 秒)
# 收到: http://example.com/4 的数据 (耗时 1.0 秒)
# 收到: http://example.com/3 的数据 (耗时 1.5 秒)
# 收到: http://example.com/1 的数据 (耗时 2.0 秒)
# 收到: http://example.com/5 的数据 (耗时 3.0 秒)
#
# === 对比: 使用 gather (按提交顺序) ===
# 开始获取: http://example.com/1
# 开始获取: http://example.com/2
# 开始获取: http://example.com/3
# 开始获取: http://example.com/4
# 开始获取: http://example.com/5
# 收到: http://example.com/1 的数据 (耗时 2.0 秒)
# 收到: http://example.com/2 的数据 (耗时 0.5 秒)
# 收到: http://example.com/3 的数据 (耗时 1.5 秒)
# 收到: http://example.com/4 的数据 (耗时 1.0 秒)
# 收到: http://example.com/5 的数据 (耗时 3.0 秒)
# === 按完成顺序处理 ===
# 开始获取: http://example.com/3
# 开始获取: http://example.com/4
# 开始获取: http://example.com/2
# 开始获取: http://example.com/1
# 开始获取: http://example.com/5
# 收到: http://example.com/2 的数据 (耗时 0.5 秒)
# 收到: http://example.com/4 的数据 (耗时 1.0 秒)
# 收到: http://example.com/3 的数据 (耗时 1.5 秒)
# 收到: http://example.com/1 的数据 (耗时 2.0 秒)
# 收到: http://example.com/5 的数据 (耗时 3.0 秒)
#
# === 对比: 使用 gather (按提交顺序) ===
# 开始获取: http://example.com/1
# 开始获取: http://example.com/2
# 开始获取: http://example.com/3
# 开始获取: http://example.com/4
# 开始获取: http://example.com/5
# 收到: http://example.com/1 的数据 (耗时 2.0 秒)
# 收到: http://example.com/2 的数据 (耗时 0.5 秒)
# 收到: http://example.com/3 的数据 (耗时 1.5 秒)
# 收到: http://example.com/4 的数据 (耗时 1.0 秒)
# 收到: http://example.com/5 的数据 (耗时 3.0 秒)
3. Lock、RLock
在 Python 中,Lock 和 RLock 都是用于线程同步的锁机制,但它们在重入行为上有重要区别
Lock 互斥锁
- 不可重入锁(Non-reentrant lock)
- 同一个线程不能多次获取同一把锁
- 简单、轻量级的同步原语
RLock 可重入锁
- 可重入锁(Reentrant lock)
- 同一个线程可以多次获取同一把锁
- 内部维护计数器跟踪获取次数
| 特性 | Lock | RLock |
|---|---|---|
| 重入性 | 不可重入 | 可重入 |
| 需不改变 | 稍快 | 较慢(维护计数器) |
| 死锁风险 | 高(递归调用) | 低 |