异步编程中的同步阻塞陷阱:从一次RAG评估bug说起
Zhongjun Qiu 元婴开发者

核心问题

在异步代码中调用同步方法,就像在高速公路上突然停车——整个系统都会被你堵住。本文将带你从一次真实的RAG评估Bug出发,深入理解异步编程中的同步阻塞陷阱,以及如何正确使用asyncio.to_thread()来避免这类问题。

问题初现:页面卡死,请求堆积

2026年1月16日晚上7点左右,我正在进行前后端联调工作。当时我正在测试RAG(检索增强生成)功能的评估模块,这个模块使用RAGAS框架来评估回答的质量。

一切看似正常,我点击了”开始评估”按钮。然而,接下来发生的事情让我困惑不已:

  1. 前端页面卡住不动 - 任何页面切换、数据加载都无响应
  2. 后端日志停止更新 - 没有新的请求日志输出
  3. 网络恢复后突然爆发 - 等了大约十几秒后,后端日志突然输出一大堆之前积压的请求

更诡异的是,当我查看后端日志时,发现了大量这样的警告:

1
2
3
4
5
2026-01-16 19:49:12,830 - huggingface_hub.utils._http - WARNING - 
'(ReadTimeoutError("HTTPSConnectionPool(host='huggingface.co', port=443):
Read timed out. (read timeout=10)")' thrown while requesting HEAD
https://huggingface.co/BAAI/bge-small-zh-v1.5/resolve/main/./sentence_bert_config.json
Retrying in 1s [Retry 1/5].

从日志可以看出,HuggingFace模型下载时网络超时,系统正在进行重试。但问题是:为什么一次网络超时会导致整个后端服务”假死”?

第一次分析:初步猜测与错误方向

发现问题后,我的第一反应是:“这肯定是异步代码哪里写错了。”

我首先检查了conversation_service.py中调用评估的代码:

1
2
3
4
5
6
7
async def save_response(self, ...):
...
# 异步执行评估
if self.config.evaluation_enabled:
asyncio.create_task(
evaluation_service.schedule_evaluation(...)
)

这里看起来没问题,使用asyncio.create_task将评估任务放到后台执行,不会阻塞当前请求。

然后我检查了schedule_evaluation方法:

1
2
3
4
5
6
7
8
9
10
11
12
async def schedule_evaluation(self, ...):
...
# 创建评估记录
evaluation = await evaluation_repository.create(...)

# 异步运行评估
if self.config.async_mode:
asyncio.create_task(
self._run_evaluation(...)
)
else:
await self._run_evaluation(...)

这里也看起来没问题,如果async_mode为True,评估会在后台运行。

但是问题依然存在。 无论我怎么检查代码,看起来逻辑都是正确的。服务就是会卡住。

第二次分析:深入日志追踪

既然从代码表面看不出问题,我决定从日志入手,追踪完整的请求流程。

我仔细分析了评估服务的核心方法evaluate

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
async def evaluate(self, query: str, context: str, response: str) -> Dict[str, float]:
self._init_ragas()

try:
from datasets import Dataset
from ragas import evaluate

# 准备数据集
dataset = Dataset.from_dict({
"question": [query],
"answer": [response],
"contexts": [contexts],
})

# 运行评估
result = await asyncio.to_thread(
evaluate,
dataset,
metrics=self._metrics,
)
...
except Exception as e:
logger.error("Evaluation failed: %s", e, exc_info=True)
raise

等等!我发现了一个可疑的地方:await self._init_ragas()

这个_init_ragas()方法是在evaluate方法开头被调用的,而且是同步调用(没有用await)。虽然后面用了asyncio.to_thread来执行评估,但初始化阶段呢?

真相大白:罪魁祸首——同步的_init_ragas()

带着这个疑问,我深入查看了_init_ragas()方法的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def _init_ragas(self):
"""Lazily initialize RAGAS components."""
if self._ragas_initialized:
return

try:
from ragas.metrics import (
faithfulness,
answer_relevancy,
)
from ragas.llms import LangchainLLMWrapper
from ragas.embeddings import LangchainEmbeddingsWrapper

# 获取LLM配置
provider = LLMProvider(settings.llm)
base_llm = provider.create_llm(self.config.llm_type, temperature=0.0)
filtered_llm = FilteredChatOpenAI(base_llm, callbacks=self._callbacks)
self._llm = LangchainLLMWrapper(filtered_llm)

# 获取embeddings配置
from app.rag.embeddings.embedding_factory import get_embedding_model
base_embeddings = get_embedding_model(settings.rag)
self._embeddings = LangchainEmbeddingsWrapper(base_embeddings)

# 构建指标列表
self._metrics_map = {
"faithfulness": faithfulness,
"answer_relevancy": answer_relevancy,
}
self._metrics = [...]

self._ragas_initialized = True
logger.info("RAGAS initialized with metrics: %s", ...)

except Exception as e:
logger.error("Failed to initialize RAGAS: %s", e)
raise

这就是问题所在!

_init_ragas()是一个同步方法,它执行了以下阻塞操作:

  1. 导入RAGAS模块 - 需要加载依赖库
  2. 创建LLM实例 - 可能涉及API调用
  3. 创建Embeddings模型 - 这里调用了get_embedding_model(settings.rag)
  4. 从HuggingFace下载模型 - 当本地没有缓存时,会从HuggingFace下载模型文件

关键问题就在第4步!当get_embedding_model需要下载HuggingFace模型时,如果网络不稳定,就会触发多次重试:

1
2
3
4
Retrying in 1s [Retry 1/5]
Retrying in 1s [Retry 2/5]
Retrying in 1s [Retry 3/5]
...

_init_ragas()是同步执行的,它会直接阻塞整个事件循环! 这意味着在重试期间,Python的主线程完全被这个同步方法占用,无法处理其他任何协程或请求。

异步编程基础知识回顾

在理解为什么这会导致问题之前,让我们回顾一下Python asyncio的核心概念。这些知识点是解决这个问题的关键。

事件循环(Event Loop)

事件循环是异步编程的核心。它负责:

  1. 执行协程 - 调度和运行使用async def定义的异步函数
  2. 处理I/O事件 - 当I/O操作完成时唤醒等待的协程
  3. 定时任务 - 执行asyncio.sleep()等定时操作
  4. 回调处理 - 执行注册在Future上的回调函数

关键点:事件循环是单线程的。在任何时刻,只有一个协程在执行。当一个协程执行await时,事件循环会切换到其他等待的协程。但如果一个协程执行了同步阻塞操作,事件循环就会被卡住,无法切换。

协程(Coroutines)

协程是使用async def定义的函数,可以在执行过程中暂停和恢复:

1
2
3
4
async def my_coroutine():
print("开始")
await asyncio.sleep(1) # 暂停执行,切换到其他任务
print("结束")

阻塞操作与非阻塞操作

阻塞操作会暂停当前线程的执行,直到操作完成:

  • time.sleep(5) - 睡眠5秒
  • requests.get(url) - 等待HTTP响应
  • open(filename).read() - 等待文件读取
  • HuggingFace模型下载 - 等待网络下载

非阻塞操作允许在等待期间执行其他任务: - await asyncio.sleep(5) - 非阻塞睡眠 - await aiohttp.get(url) - 异步HTTP请求 - await asyncio.to_thread(blocking_io) - 在线程池中执行阻塞I/O

asyncio.to_thread()的正确用法

asyncio.to_thread()是Python 3.9引入的函数,用于在异步代码中执行阻塞操作,避免阻塞事件循环:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import asyncio
import time

def blocking_io():
print("开始读取文件...")
time.sleep(5) # 阻塞5秒
print("文件读取完成")
return "文件内容"

async def main():
print("开始异步任务")
# 使用to_thread()在单独的线程中执行阻塞的I/O操作
result = await asyncio.to_thread(blocking_io)
print(f"读取到的文件内容: {result}")
print("异步任务完成")

asyncio.run(main())

重要提醒asyncio.to_thread()只是将阻塞操作放到线程池执行。但如果调用to_thread()之前有同步阻塞代码,问题依然存在。

问题根源分析

现在我们可以清楚地解释为什么服务会卡死了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
主线程事件循环

├─> 请求A进入 → evaluate() 被调用
│ │
│ └─> await self._init_ragas() ← 同步方法!
│ │
│ └─> get_embedding_model() ← 尝试下载模型
│ │
│ └─> HuggingFace网络超时
│ │
│ └─> 阻塞等待 + 重试 (事件循环被卡住!)

├─> 请求B进入 → 等待事件循环处理... (卡住)
├─> 请求C进入 → 等待事件循环处理... (卡住)
└─> 请求D进入 → 等待事件循环处理... (卡住)

问题本质:虽然在evaluate方法中使用了await asyncio.to_thread(evaluate, ...)来执行评估,但_init_ragas()同步调用,它直接运行在主线程上,阻塞了事件循环。

解决方案:异步初始化

明确了问题后,解决方案就很清晰了:_init_ragas()改为异步方法,并在其中使用asyncio.to_thread()执行真正耗时的同步初始化逻辑

第一步:拆分为同步和异步两个方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
def _init_ragas_sync(self):
"""
同步RAGAS初始化(在线程池中运行)。
由于HuggingFace模型下载,这个方法可能会阻塞。
"""
try:
from ragas.metrics import (
faithfulness,
answer_relevancy,
)
from ragas.llms import LangchainLLMWrapper
from ragas.embeddings import LangchainEmbeddingsWrapper

# 获取LLM配置
provider = LLMProvider(settings.llm)
base_llm = provider.create_llm(self.config.llm_type, temperature=0.0)
filtered_llm = FilteredChatOpenAI(base_llm, callbacks=self._callbacks)
self._llm = LangchainLLMWrapper(filtered_llm)

# 获取embeddings配置
from app.rag.embeddings.embedding_factory import get_embedding_model
base_embeddings = get_embedding_model(settings.rag)
self._embeddings = LangchainEmbeddingsWrapper(base_embeddings)

# 构建指标列表
self._metrics_map = {
"faithfulness": faithfulness,
"answer_relevancy": answer_relevancy,
}
self._metrics = [...]

self._ragas_initialized = True
logger.info("RAGAS initialized with metrics: %s", ...)

except Exception as e:
logger.error("Failed to initialize RAGAS: %s", e)
raise

async def _init_ragas(self):
"""
异步初始化RAGAS组件。
在线程池中运行阻塞初始化操作,避免阻塞事件循环。
"""
if self._ragas_initialized:
return

# 使用asyncio.to_thread将阻塞操作交给线程池执行
await asyncio.to_thread(self._init_ragas_sync)

第二步:修改evaluate方法的调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
async def evaluate(self, query: str, context: str, response: str) -> Dict[str, float]:
# 改为异步调用
await self._init_ragas()

try:
from datasets import Dataset
from ragas import evaluate

# 准备数据集
dataset = Dataset.from_dict({
"question": [query],
"answer": [response],
"contexts": [context],
})

# 评估也在线程池中执行
result = await asyncio.to_thread(
evaluate,
dataset,
metrics=self._metrics,
)

# 处理结果...
return scores

except Exception as e:
logger.error("Evaluation failed: %s", e, exc_info=True)
raise

修复后的执行流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
主线程事件循环

├─> 请求A进入 → evaluate() 被调用
│ │
│ └─> await self._init_ragas()
│ │
│ └─> await asyncio.to_thread(_init_ragas_sync)
│ │
│ └─> 在线程池中执行初始化
│ │
│ └─> HuggingFace网络超时 → 重试 (不影响主线程!)

├─> 请求B进入 → 事件循环处理 ✓
├─> 请求C进入 → 事件循环处理 ✓
└─> 请求D进入 → 事件循环处理 ✓

现在,即使HuggingFace网络超时重试,主线程的事件循环仍然可以正常处理其他请求。

经验总结与最佳实践

这次bug调试让我深刻认识到异步编程中的几个关键点:

1. 警惕同步阻塞

任何同步的I/O操作都可能成为异步程序的性能瓶颈:

  • 网络请求 - requests.get(), HuggingFace模型下载
  • 文件读写 - open(), pandas.read_csv()
  • CPU密集型操作 - 大量计算、数据处理
  • 睡眠操作 - time.sleep()(应使用asyncio.sleep()

最佳实践:在异步代码中,如果需要执行上述操作,必须使用: - asyncio.to_thread() - 对于阻塞的I/O操作 - asyncio.get_running_loop().run_in_executor() - 同样效果 - 异步版本的库(如aiohttp代替requests

2. 理解asyncio.to_thread()的工作原理

asyncio.to_thread()内部使用ThreadPoolExecutor来执行阻塞函数。它只是将阻塞操作”移出”了主线程,但并不能消除阻塞本身。

重要:如果调用to_thread()之前有同步代码,这些代码仍然会阻塞事件循环。

3. 初始化操作要特别注意

懒加载(Lazy Loading)是常用的性能优化模式,但在异步程序中需要注意:

  • 确保所有初始化操作都是非阻塞的
  • 或者将阻塞的初始化操作放到asyncio.to_thread()中执行
  • 使用异步锁(如asyncio.Lock())保护初始化过程,避免竞态条件

4. 善用调试工具

Python提供了调试异步代码的工具:

1
2
# 启用asyncio调试模式
asyncio.run(main(), debug=True)

当事件循环被阻塞时,会输出警告信息:

1
2
Executing <Task finished name='Task-1' coro=<slow_coroutine done ...> 
took 0.506 seconds

此外,还有一些专门的工具可以检测阻塞操作: - BlockBuster - 检测事件循环阻塞的工具 - pytest-asyncio - 测试异步代码时检测泄漏的任务和阻塞

5. 理解阻塞的传播路径

在排查异步问题时,需要沿着调用链追踪:

1
2
3
4
evaluate() 
└─> _init_ragas() ← 同步调用!
└─> get_embedding_model()
└─> HuggingFace下载 ← 阻塞点

每一个环节都可能成为性能瓶颈。不要只看表面使用了async/await,要深入检查每个方法的实际执行逻辑。

 REWARD AUTHOR
 Comments
Comment plugin failed to load
Loading comment plugin