核心问题
在异步代码中调用同步方法,就像在高速公路上突然停车——整个系统都会被你堵住。本文将带你从一次真实的RAG评估Bug出发,深入理解异步编程中的同步阻塞陷阱,以及如何正确使用asyncio.to_thread()来避免这类问题。
问题初现:页面卡死,请求堆积
2026年1月16日晚上7点左右,我正在进行前后端联调工作。当时我正在测试RAG(检索增强生成)功能的评估模块,这个模块使用RAGAS框架来评估回答的质量。
一切看似正常,我点击了”开始评估”按钮。然而,接下来发生的事情让我困惑不已:
- 前端页面卡住不动 -
任何页面切换、数据加载都无响应
- 后端日志停止更新 - 没有新的请求日志输出
- 网络恢复后突然爆发 -
等了大约十几秒后,后端日志突然输出一大堆之前积压的请求
更诡异的是,当我查看后端日志时,发现了大量这样的警告:
1 2 3 4 5
| 2026-01-16 19:49:12,830 - huggingface_hub.utils._http - WARNING - '(ReadTimeoutError("HTTPSConnectionPool(host='huggingface.co', port=443): Read timed out. (read timeout=10)")' thrown while requesting HEAD https://huggingface.co/BAAI/bge-small-zh-v1.5/resolve/main/./sentence_bert_config.json Retrying in 1s [Retry 1/5].
|
从日志可以看出,HuggingFace模型下载时网络超时,系统正在进行重试。但问题是:为什么一次网络超时会导致整个后端服务”假死”?
第一次分析:初步猜测与错误方向
发现问题后,我的第一反应是:“这肯定是异步代码哪里写错了。”
我首先检查了conversation_service.py中调用评估的代码:
1 2 3 4 5 6 7
| async def save_response(self, ...): ... if self.config.evaluation_enabled: asyncio.create_task( evaluation_service.schedule_evaluation(...) )
|
这里看起来没问题,使用asyncio.create_task将评估任务放到后台执行,不会阻塞当前请求。
然后我检查了schedule_evaluation方法:
1 2 3 4 5 6 7 8 9 10 11 12
| async def schedule_evaluation(self, ...): ... evaluation = await evaluation_repository.create(...) if self.config.async_mode: asyncio.create_task( self._run_evaluation(...) ) else: await self._run_evaluation(...)
|
这里也看起来没问题,如果async_mode为True,评估会在后台运行。
但是问题依然存在。
无论我怎么检查代码,看起来逻辑都是正确的。服务就是会卡住。
第二次分析:深入日志追踪
既然从代码表面看不出问题,我决定从日志入手,追踪完整的请求流程。
我仔细分析了评估服务的核心方法evaluate:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| async def evaluate(self, query: str, context: str, response: str) -> Dict[str, float]: self._init_ragas() try: from datasets import Dataset from ragas import evaluate dataset = Dataset.from_dict({ "question": [query], "answer": [response], "contexts": [contexts], }) result = await asyncio.to_thread( evaluate, dataset, metrics=self._metrics, ) ... except Exception as e: logger.error("Evaluation failed: %s", e, exc_info=True) raise
|
等等!我发现了一个可疑的地方:await self._init_ragas()。
这个_init_ragas()方法是在evaluate方法开头被调用的,而且是同步调用(没有用await)。虽然后面用了asyncio.to_thread来执行评估,但初始化阶段呢?
真相大白:罪魁祸首——同步的_init_ragas()
带着这个疑问,我深入查看了_init_ragas()方法的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| def _init_ragas(self): """Lazily initialize RAGAS components.""" if self._ragas_initialized: return try: from ragas.metrics import ( faithfulness, answer_relevancy, ) from ragas.llms import LangchainLLMWrapper from ragas.embeddings import LangchainEmbeddingsWrapper provider = LLMProvider(settings.llm) base_llm = provider.create_llm(self.config.llm_type, temperature=0.0) filtered_llm = FilteredChatOpenAI(base_llm, callbacks=self._callbacks) self._llm = LangchainLLMWrapper(filtered_llm) from app.rag.embeddings.embedding_factory import get_embedding_model base_embeddings = get_embedding_model(settings.rag) self._embeddings = LangchainEmbeddingsWrapper(base_embeddings) self._metrics_map = { "faithfulness": faithfulness, "answer_relevancy": answer_relevancy, } self._metrics = [...] self._ragas_initialized = True logger.info("RAGAS initialized with metrics: %s", ...) except Exception as e: logger.error("Failed to initialize RAGAS: %s", e) raise
|
这就是问题所在!
_init_ragas()是一个同步方法,它执行了以下阻塞操作:
- 导入RAGAS模块 - 需要加载依赖库
- 创建LLM实例 - 可能涉及API调用
- 创建Embeddings模型 -
这里调用了
get_embedding_model(settings.rag)
- 从HuggingFace下载模型 -
当本地没有缓存时,会从HuggingFace下载模型文件
关键问题就在第4步!当get_embedding_model需要下载HuggingFace模型时,如果网络不稳定,就会触发多次重试:
1 2 3 4
| Retrying in 1s [Retry 1/5] Retrying in 1s [Retry 2/5] Retrying in 1s [Retry 3/5] ...
|
而_init_ragas()是同步执行的,它会直接阻塞整个事件循环!
这意味着在重试期间,Python的主线程完全被这个同步方法占用,无法处理其他任何协程或请求。
异步编程基础知识回顾
在理解为什么这会导致问题之前,让我们回顾一下Python
asyncio的核心概念。这些知识点是解决这个问题的关键。
事件循环(Event Loop)
事件循环是异步编程的核心。它负责:
- 执行协程 -
调度和运行使用
async def定义的异步函数
- 处理I/O事件 - 当I/O操作完成时唤醒等待的协程
- 定时任务 -
执行
asyncio.sleep()等定时操作
- 回调处理 - 执行注册在Future上的回调函数
关键点:事件循环是单线程的。在任何时刻,只有一个协程在执行。当一个协程执行await时,事件循环会切换到其他等待的协程。但如果一个协程执行了同步阻塞操作,事件循环就会被卡住,无法切换。
协程(Coroutines)
协程是使用async def定义的函数,可以在执行过程中暂停和恢复:
1 2 3 4
| async def my_coroutine(): print("开始") await asyncio.sleep(1) print("结束")
|
阻塞操作与非阻塞操作
阻塞操作会暂停当前线程的执行,直到操作完成:
time.sleep(5) - 睡眠5秒
requests.get(url) - 等待HTTP响应
open(filename).read() - 等待文件读取
- HuggingFace模型下载 - 等待网络下载
非阻塞操作允许在等待期间执行其他任务: -
await asyncio.sleep(5) - 非阻塞睡眠 -
await aiohttp.get(url) - 异步HTTP请求 -
await asyncio.to_thread(blocking_io) -
在线程池中执行阻塞I/O
asyncio.to_thread()的正确用法
asyncio.to_thread()是Python
3.9引入的函数,用于在异步代码中执行阻塞操作,避免阻塞事件循环:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| import asyncio import time
def blocking_io(): print("开始读取文件...") time.sleep(5) print("文件读取完成") return "文件内容"
async def main(): print("开始异步任务") result = await asyncio.to_thread(blocking_io) print(f"读取到的文件内容: {result}") print("异步任务完成")
asyncio.run(main())
|
重要提醒:asyncio.to_thread()只是将阻塞操作放到线程池执行。但如果调用to_thread()之前有同步阻塞代码,问题依然存在。
问题根源分析
现在我们可以清楚地解释为什么服务会卡死了:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| 主线程事件循环 │ ├─> 请求A进入 → evaluate() 被调用 │ │ │ └─> await self._init_ragas() ← 同步方法! │ │ │ └─> get_embedding_model() ← 尝试下载模型 │ │ │ └─> HuggingFace网络超时 │ │ │ └─> 阻塞等待 + 重试 (事件循环被卡住!) │ ├─> 请求B进入 → 等待事件循环处理... (卡住) ├─> 请求C进入 → 等待事件循环处理... (卡住) └─> 请求D进入 → 等待事件循环处理... (卡住)
|
问题本质:虽然在evaluate方法中使用了await asyncio.to_thread(evaluate, ...)来执行评估,但_init_ragas()是同步调用,它直接运行在主线程上,阻塞了事件循环。
解决方案:异步初始化
明确了问题后,解决方案就很清晰了:将_init_ragas()改为异步方法,并在其中使用asyncio.to_thread()执行真正耗时的同步初始化逻辑。
第一步:拆分为同步和异步两个方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| def _init_ragas_sync(self): """ 同步RAGAS初始化(在线程池中运行)。 由于HuggingFace模型下载,这个方法可能会阻塞。 """ try: from ragas.metrics import ( faithfulness, answer_relevancy, ) from ragas.llms import LangchainLLMWrapper from ragas.embeddings import LangchainEmbeddingsWrapper provider = LLMProvider(settings.llm) base_llm = provider.create_llm(self.config.llm_type, temperature=0.0) filtered_llm = FilteredChatOpenAI(base_llm, callbacks=self._callbacks) self._llm = LangchainLLMWrapper(filtered_llm) from app.rag.embeddings.embedding_factory import get_embedding_model base_embeddings = get_embedding_model(settings.rag) self._embeddings = LangchainEmbeddingsWrapper(base_embeddings) self._metrics_map = { "faithfulness": faithfulness, "answer_relevancy": answer_relevancy, } self._metrics = [...] self._ragas_initialized = True logger.info("RAGAS initialized with metrics: %s", ...) except Exception as e: logger.error("Failed to initialize RAGAS: %s", e) raise
async def _init_ragas(self): """ 异步初始化RAGAS组件。 在线程池中运行阻塞初始化操作,避免阻塞事件循环。 """ if self._ragas_initialized: return await asyncio.to_thread(self._init_ragas_sync)
|
第二步:修改evaluate方法的调用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| async def evaluate(self, query: str, context: str, response: str) -> Dict[str, float]: await self._init_ragas() try: from datasets import Dataset from ragas import evaluate dataset = Dataset.from_dict({ "question": [query], "answer": [response], "contexts": [context], }) result = await asyncio.to_thread( evaluate, dataset, metrics=self._metrics, ) return scores except Exception as e: logger.error("Evaluation failed: %s", e, exc_info=True) raise
|
修复后的执行流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| 主线程事件循环 │ ├─> 请求A进入 → evaluate() 被调用 │ │ │ └─> await self._init_ragas() │ │ │ └─> await asyncio.to_thread(_init_ragas_sync) │ │ │ └─> 在线程池中执行初始化 │ │ │ └─> HuggingFace网络超时 → 重试 (不影响主线程!) │ ├─> 请求B进入 → 事件循环处理 ✓ ├─> 请求C进入 → 事件循环处理 ✓ └─> 请求D进入 → 事件循环处理 ✓
|
现在,即使HuggingFace网络超时重试,主线程的事件循环仍然可以正常处理其他请求。
经验总结与最佳实践
这次bug调试让我深刻认识到异步编程中的几个关键点:
1. 警惕同步阻塞
任何同步的I/O操作都可能成为异步程序的性能瓶颈:
- 网络请求 -
requests.get(),
HuggingFace模型下载
- 文件读写 -
open(),
pandas.read_csv()
- CPU密集型操作 - 大量计算、数据处理
- 睡眠操作 -
time.sleep()(应使用asyncio.sleep())
最佳实践:在异步代码中,如果需要执行上述操作,必须使用:
- asyncio.to_thread() - 对于阻塞的I/O操作 -
asyncio.get_running_loop().run_in_executor() - 同样效果 -
异步版本的库(如aiohttp代替requests)
2.
理解asyncio.to_thread()的工作原理
asyncio.to_thread()内部使用ThreadPoolExecutor来执行阻塞函数。它只是将阻塞操作”移出”了主线程,但并不能消除阻塞本身。
重要:如果调用to_thread()之前有同步代码,这些代码仍然会阻塞事件循环。
3. 初始化操作要特别注意
懒加载(Lazy
Loading)是常用的性能优化模式,但在异步程序中需要注意:
- 确保所有初始化操作都是非阻塞的
- 或者将阻塞的初始化操作放到
asyncio.to_thread()中执行
- 使用异步锁(如
asyncio.Lock())保护初始化过程,避免竞态条件
4. 善用调试工具
Python提供了调试异步代码的工具:
1 2
| asyncio.run(main(), debug=True)
|
当事件循环被阻塞时,会输出警告信息:
1 2
| Executing <Task finished name='Task-1' coro=<slow_coroutine done ...> took 0.506 seconds
|
此外,还有一些专门的工具可以检测阻塞操作: - BlockBuster
- 检测事件循环阻塞的工具 - pytest-asyncio -
测试异步代码时检测泄漏的任务和阻塞
5. 理解阻塞的传播路径
在排查异步问题时,需要沿着调用链追踪:
1 2 3 4
| evaluate() └─> _init_ragas() ← 同步调用! └─> get_embedding_model() └─> HuggingFace下载 ← 阻塞点
|
每一个环节都可能成为性能瓶颈。不要只看表面使用了async/await,要深入检查每个方法的实际执行逻辑。