在 AI
应用开发中,调用大语言模型看似简单,但当项目规模扩大后,你会发现各种问题接踵而至:配置散落各地、追踪信息需要层层传递、Token
消耗无法统一记录……今天我想分享一下我是如何在 CookHero
项目中设计一个统一的 LLM 调用框架的。
一个看似简单的问题
第一次调用 LLM 时,你可能会这样写:
1 2 3 4 5 6
| from openai import OpenAI client = OpenAI(api_key="xxx") response = client.chat.completions.create( model="gpt-4", messages=[{"role": "user", "content": "hello"}] )
|
不到 10
行代码,搞定。但当你的项目有几十个模块、需要记录使用量、要在调用链中传递用户信息、还要支持多个模型切换时……代码就会变成这样:
1 2 3 4 5 6 7 8 9 10 11 12
| client = OpenAI(api_key="xxx", base_url="...", timeout=...)
def recipe_generator(user_id, conversation_id, ...): def call_llm(module_name, user_id, conversation_id, ...): response = client.chat.completions.create(...) log_token(user_id, module_name, response) return response
|
这就是我遇到的问题,也是 CookHero LLM 模块要解决的核心痛点。
核心设计:三个关键抽象
经过反复思考,我确定了三个核心概念来组织整个框架:
| 概念 |
职责 |
一句话总结 |
| LLMProvider |
配置管理者 |
知道有哪些模型、API 地址、配置参数 |
| LLMInvoker |
调用执行者 |
封装调用逻辑,自动附加追踪 |
| LLMCallContext |
上下文携带者 |
在调用链中传递追踪信息 |
这三者的关系可以这样理解:
1 2 3 4 5
| LLMProvider (工厂) ──创建──▶ LLMInvoker (执行器) │ ├── 依赖 ──▶ LLMCallContext (contextvars) │ └── 依赖 ──▶ LLMUsageCallbackHandler (callbacks)
|
分层架构
整个设计采用清晰的分层架构:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| ┌─────────────────────────────────────────────────────────────┐ │ 业务调用层 │ │ (intent_detector, recipe_generator, chat_agent 等模块) │ └─────────────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────┐ │ 封装层 (Provider) │ │ LLMProvider + LLMInvoker 统一入口 │ └─────────────────────────────────────────────────────────────┘ │ ┌───────────────────┼───────────────────┐ ▼ ▼ ▼ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │ context.py │ │ callbacks.py │ │ LangChain │ │ 上下文管理 │ │ 使用量追踪 │ │ ChatOpenAI │ └─────────────────┘ └─────────────────┘ └─────────────────┘
|
难点一:追踪信息如何优雅传递?
问题
在 LLM
调用时,我们需要知道是哪个用户、哪个模块调用的。如果每个函数都加参数,代码会变得难以维护:
1 2 3 4
| async def recipe_generator(module_name, user_id, conversation_id, request_id, ...): response = await call_llm(module_name, user_id, conversation_id, ...)
|
解决方案:contextvars
Python 3.7 引入了
contextvars,专门解决这类问题。它提供了线程/协程安全的上下文管理:
1 2 3 4 5 6
| from contextvars import ContextVar
_llm_context: ContextVar[Optional[LLMCallContext]] = ContextVar( "llm_context", default=None )
|
通过一个简单的 contextmanager,我们实现了优雅的上下文传递:
1 2 3 4 5 6 7 8 9 10 11 12 13
| @contextmanager def llm_context(module_name: str, user_id: str = None, conversation_id: str = None): ctx = LLMCallContext( request_id=str(uuid.uuid4()), module_name=module_name, user_id=user_id, conversation_id=conversation_id, ) _llm_context.set(ctx) try: yield ctx finally: clear_llm_context()
|
使用起来非常简单:
1 2 3 4
| with llm_context("recipe_generator", user_id="user123", conversation_id="conv_456"): response = await invoker.ainvoke(messages)
|
工作原理:
1 2 3 4 5 6 7 8 9 10 11 12 13
| 调用链: with llm_context("recipe_module", user_id="user123"): # 这里设置上下文到 contextvars # ---------------------------------- # 调用 deep_call() # ↓ # call_llm() # ↓ # callback 读取 contextvars # ↓ # 写入数据库(包含 user_id)
# 离开 with 块,自动清除上下文
|
优势
- 无需修改函数签名:业务代码保持干净
- 自动管理生命周期:进入 with
块自动设置,离开自动清除
- 线程/协程安全:每个请求有独立的上下文
难点二:使用量追踪如何自动执行?
问题
每次调用 LLM 后,都要手动提取 token
使用量并写入数据库,代码重复且容易遗漏:
1 2 3 4
| response = client.chat.completions.create(...) usage = response.usage.total_tokens log_to_db(user_id, module_name, usage)
|
解决方案:LangChain Callbacks
LangChain 提供了强大的回调机制,我们可以在 LLM
调用的生命周期中自动执行操作:
1 2 3 4 5 6
| class LLMUsageCallbackHandler(BaseCallbackHandler): def on_llm_end(self, response: LLMResult, **kwargs): ctx = get_llm_context() usage = self._extract_usage(response)
|
工作流程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| 业务代码 │ ▼ invoker.ainvoke(messages, callbacks=[handler]) │ ▼ LangChain 执行 LLM 调用 │ ├── 开始 ──▶ on_llm_start() │ └── 结束 ──▶ on_llm_end() │ ├── 提取 usage ├── 获取上下文 └── 异步写库
|
异步写入设计
写数据库是 I/O
操作,不应该阻塞主流程。我使用独立线程的事件循环来处理:
1 2 3
| def _schedule_write(self, log_data): loop = _get_background_loop() future = asyncio.run_coroutine_threadsafe(self._write_to_db(log_data), loop)
|
为什么用独立线程?
- 写数据库是 I/O 操作,不应阻塞 LLM 调用
- 使用单独的线程 + 事件循环,避免主线程负担
- 不影响用户体验和其他请求
难点三:负载均衡
问题
同一个模型可能有多个实例(如多个 OpenAI 兼容的 API
地址),需要均匀分散请求,避免某个实例过载。
解决方案
每次调用时随机选择模型实例:
1 2 3
| def pick_model(self, llm_type) -> str: profile = self.get_profile(llm_type) return random.choice(profile.model_names)
|
1 2 3 4
| def _get_llm_with_model(self): model = self._provider.pick_model(self._llm_type) llm = self._base_llm.bind(model=model) return llm
|
优点: - 请求均匀分布到多个模型实例 -
提高系统整体吞吐量 - 某个实例故障时自动切换
难点四:多配置管理
问题
不同场景需要不同配置:快速响应用小模型,复杂任务用大模型。如果每个地方都写死配置,切换成本很高。
解决方案
通过配置文件统一管理:
1 2 3 4 5 6 7 8 9 10 11
| LLMConfig: profiles: fast: model_names: ["gpt-4o-mini", "claude-3-haiku"] temperature: 0.3 max_tokens: 1000 normal: model_names: ["gpt-4o", "claude-3-sonnet"] temperature: 0.7 max_tokens: 4000
|
调用时只需指定类型:
1 2 3 4 5
| fast_invoker = provider.create_invoker("fast")
normal_invoker = provider.create_invoker("normal")
|
优点: - 配置集中管理,一处修改处处生效 -
根据场景灵活选择模型 - 便于 A/B 测试和模型迁移
完整调用链路
创建阶段
1 2 3 4 5 6 7 8
| from app.llm import LLMProvider, llm_context from app.config import settings
provider = LLMProvider(settings.llm)
invoker = provider.create_invoker("fast")
|
调用阶段
1 2
| with llm_context("recipe_generator", user_id="user123", conversation_id="conv_456"): response = await invoker.ainvoke(messages)
|
调用链路详解:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| 业务代码 │ ▼ with llm_context(...) ───设置上下文──▶ contextvars │ ▼ invoker.ainvoke(messages) │ ├── 1. _prepare_config(kwargs) │ │ │ └── 合并所有 callbacks │ ├── 用户传入的 callbacks │ ├── config 中的 callbacks │ └── 初始化时传入的 usage callback │ └── 2. _get_llm_with_model() │ ├── pick_model() ──随机选模型──▶ "gpt-4o-mini" │ └── bind(model="gpt-4o-mini") ──▶ 新的 ChatOpenAI 实例 │ ▼ base_llm.ainvoke(messages, config={callbacks: [..., usage_handler]}) │ ▼ LangChain 执行 HTTP 请求到 LLM API │ ▼ ╔════════════════════════════════════╗ ║ LangChain 内部回调触发 ║ ╠════════════════════════════════════╣ ║ on_llm_start() ║ ║ ... ║ ║ on_llm_end() ← 响应返回 ║ ║ │ ║ ║ ├── 提取 token 使用量 ║ ║ │ (response.llm_output) ║ ║ │ ║ ║ ├── get_llm_context() ║ ║ │ (从 contextvars 读取) ║ ║ │ ║ ║ └── _schedule_write(log_data) ║ ║ │ ║ ║ └── 异步写入数据库 ║ ╚════════════════════════════════════╝
|
更多使用场景
流式输出
1 2 3 4 5 6
| invoker = provider.create_invoker("fast", streaming=True)
with llm_context("chat", user_id="user123"): messages = [{"role": "user", "content": "讲个笑话"}] async for chunk in invoker.astream(messages): print(chunk.content, end="", flush=True)
|
工具调用
1 2 3 4 5 6 7
| tools = [get_recipe, search_ingredients]
invoker = provider.create_invoker("normal")
with llm_context("recipe_agent", user_id="user123"): messages = [{"role": "user", "content": "给我红烧肉的做法"}] response = await invoker.ainvoke_with_tools(messages, tools)
|
多模块追踪
不同模块可以独立使用,自动追踪各自的调用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| intent_invoker = provider.create_invoker("fast")
async def detect_intent(user_id: str, query: str): with llm_context("intent_detector", user_id=user_id): response = await intent_invoker.ainvoke([ {"role": "system", "content": "You are an intent detector."}, {"role": "user", "content": query} ]) return response.content
recipe_invoker = provider.create_invoker("normal")
async def generate_recipe(user_id: str, intent: str): with llm_context("recipe_generator", user_id=user_id): response = await recipe_invoker.ainvoke([ {"role": "system", "content": "You are a recipe generator."}, {"role": "user", "content": intent} ]) return response.content
|
如果你也在做类似的事情,希望这个设计能给你一些启发。