基于图结构的 Agent 编排框架,用状态机+条件路由构建可控、可观测、可调试的 AI 智能体
LangChain 提供了早期的 Agent 实现(如 AgentExecutor),但它有一些根本性的局限:
LangGraph 是一个基于图(Graph)的 Agent 编排框架,它将 Agent 的执行流程建模为有向图:
这种模型天然支持循环、分支、并行等复杂控制流。
TypedDict 定义共享状态,所有节点读写同一份状态,支持 checkpointing 持久化
原生支持循环结构,Agent 可以多次调用工具直到任务完成
根据运行时状态动态决定下一步走向,实现复杂的决策逻辑
Human-in-the-loop 机制,可在任意节点暂停等待人工审批
每一步执行都可追踪,支持 LangSmith 集成进行调试和分析
支持子图(Subgraph),将复杂工作流拆分为可复用的模块
State 是 LangGraph 中所有节点共享的数据结构,使用 Python 的 TypedDict 或 Pydantic 的 BaseModel 定义。
Pythonfrom typing import TypedDict, Annotated, List
from langgraph.graph.message import add_messages
class AgentState(TypedDict):
"""Agent 的共享状态"""
messages: Annotated[List, add_messages] # 消息列表,自动累积
query: str # 用户查询
documents: List[str] # 检索到的文档
tool_calls: int # 工具调用次数
Annotated[list, add_messages] 是 LangGraph 的特殊语法——add_messages 告诉框架如何合并状态更新(追加而非覆盖)。Node 是一个普通 Python 函数,接收当前 State,返回状态更新(Partial State):
Pythondef llm_node(state: AgentState) -> dict:
"""LLM 节点:调用大模型生成回复"""
response = llm.invoke(state["messages"])
return {"messages": [response]}
def tool_node(state: AgentState) -> dict:
"""工具节点:执行工具调用"""
last_message = state["messages"][-1]
results = [execute_tool(tc) for tc in last_message.tool_calls]
return {"messages": results}
Edge 定义节点之间的固定连接关系,表示无条件的流转:
Python# 固定边:从 start 直接进入 llm_node
graph.add_edge("start", "llm_node")
# 固定边:从 tool_node 回到 llm_node
graph.add_edge("tool_node", "llm_node")
条件边是 LangGraph 最强大的特性——根据运行时状态动态决定下一步走向哪个节点:
Pythondef should_continue(state: AgentState) -> str:
"""判断 Agent 是否需要继续调用工具"""
last_message = state["messages"][-1]
if last_message.tool_calls:
return "tool_node" # 有工具调用 → 执行工具
return "end" # 没有工具调用 → 结束
# 添加条件边
graph.add_conditional_edges(
"llm_node", # 从哪个节点出发
should_continue, # 路由函数
{
"tool_node": "tool_node", # 返回值 → 目标节点
"end": "end",
}
)
StateGraph 是图的顶层构建器,负责组合所有节点和边:
Pythonfrom langgraph.graph import StateGraph, START, END
# 创建图
graph = StateGraph(AgentState)
# 添加节点
graph.add_node("llm_node", llm_node)
graph.add_node("tool_node", tool_node)
# 添加边
graph.add_edge(START, "llm_node")
graph.add_conditional_edges("llm_node", should_continue, ...)
graph.add_edge("tool_node", "llm_node")
# 编译为可执行的图
app = graph.compile()
构建一个 LangGraph 应用分为 4 个步骤:定义→添加→编译→运行。
编译后的图支持三种运行模式:
| 方法 | 说明 | 返回值 |
|---|---|---|
app.invoke(state) |
同步执行,等待全部完成后返回最终状态 | 完整的 State |
app.stream(state) |
同步流式执行,每完成一个节点就 yield 一次 | Generator[chunk] |
app.astream(state) |
异步流式执行,适合 FastAPI 等 async 环境 | AsyncGenerator[chunk] |
Python — 运行示例# 方式 1: 同步调用(等待完整结果)
result = app.invoke({"messages": ["北京今天天气如何?"]})
print(result["messages"][-1].content)
# 方式 2: 流式输出(逐步获取每个节点的输出)
for chunk in app.stream({"messages": ["北京今天天气如何?"]}):
print(chunk) # 每个节点的输出
# 方式 3: 异步流式(适合 Web 服务)
async for chunk in app.astream({"messages": ["北京今天天气如何?"]}):
print(chunk)
ReAct(Reasoning + Acting)是最经典的 Agent 模式。以下是一个完整的、可直接运行的 LangGraph ReAct Agent 实现。
Python — react_agent.pyfrom typing import TypedDict, Annotated, Literal
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage, ToolMessage
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode
# ═══ 1. 定义 State ═══
class AgentState(TypedDict):
messages: Annotated[list[BaseMessage], add_messages]
# ═══ 2. 定义工具 ═══
@tool
def search(query: str) -> str:
"""搜索互联网获取最新信息"""
# 实际项目中接入搜索引擎 API
return f"搜索结果:关于 '{query}' 的最新信息..."
@tool
def calculator(expression: str) -> str:
"""计算数学表达式"""
try:
result = eval(expression)
return str(result)
except Exception as e:
return f"计算错误: {e}"
tools = [search, calculator]
llm = ChatOpenAI(model="gpt-4o").bind_tools(tools)
# ═══ 3. 创建 LLM 节点 ═══
def llm_node(state: AgentState) -> dict:
"""调用 LLM 生成回复或决定使用工具"""
response = llm.invoke(state["messages"])
return {"messages": [response]}
# ═══ 4. 创建工具节点 ═══
tool_node = ToolNode(tools)
# ═══ 5. 创建条件边 ═══
def should_continue(
state: AgentState,
) -> Literal["tools", "end"]:
"""判断是否继续调用工具"""
last_message = state["messages"][-1]
if hasattr(last_message, "tool_calls") and last_message.tool_calls:
return "tools"
return "end"
# ═══ 6. 构建图 ═══
graph = StateGraph(AgentState)
graph.add_node("agent", llm_node)
graph.add_node("tools", tool_node)
graph.add_edge(START, "agent")
graph.add_conditional_edges("agent", should_continue, {
"tools": "tools",
"end": END,
})
graph.add_edge("tools", "agent") # 工具执行完回到 LLM
# ═══ 7. 编译并运行 ═══
app = graph.compile()
# 运行 Agent
result = app.invoke({
"messages": [HumanMessage(content="搜索 Python 最新版本,然后计算 123 * 456")]
})
print(result["messages"][-1].content)
ToolNode 和 tools_condition,在 langgraph.prebuilt 模块中,可以简化工具节点和条件边的创建。使用 Annotated[list, add_messages] 让消息列表自动累积,每个节点只返回新增的消息。
llm_node 接收完整消息历史,调用 LLM 获取回复(可能包含 tool_calls)。
ToolNode 自动执行 LLM 返回的 tool_calls,并将结果封装为 ToolMessage。
should_continue 检查最后一条消息是否有 tool_calls,有则进入工具节点,无则结束。
工具节点通过 add_edge("tools", "agent") 回到 LLM 节点,形成"思考→行动→观察"的循环。
与简单的 ReAct Agent 不同,RAG Agent 需要先判断问题是否需要检索知识库,再决定执行路径。
Python — rag_agent.pyfrom typing import TypedDict, Annotated, Literal, List
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import FAISS
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
# ═══ 1. 定义 State ═══
class RAGState(TypedDict):
messages: Annotated[list[BaseMessage], add_messages]
query: str
rewritten_query: str
documents: List[str]
# ═══ 2. 初始化组件 ═══
llm = ChatOpenAI(model="gpt-4o")
embeddings = OpenAIEmbeddings()
vectorstore = FAISS.load_local("./faiss_index", embeddings)
retriever = vectorstore.as_retriever(search_kwargs={"k": 5})
# ═══ 3. 路由节点:判断是否需要检索 ═══
def route_query(state: RAGState) -> Literal["rewrite_query", "generate"]:
"""判断问题是需要检索知识库,还是可以直接回答"""
query = state["messages"][-1].content
response = llm.invoke([
"系统": "判断以下问题是否需要检索知识库。只需回答 YES 或 NO。",
"用户": query,
])
if "YES" in response.content:
return "rewrite_query"
return "generate"
# ═══ 4. 重写查询节点 ═══
def rewrite_query(state: RAGState) -> dict:
"""优化查询以提高检索质量"""
query = state["messages"][-1].content
response = llm.invoke([
"系统": "将以下问题改写为更适合向量检索的关键词查询。只输出改写后的查询。",
"用户": query,
])
return {"rewritten_query": response.content}
# ═══ 5. 检索节点 ═══
def retrieve(state: RAGState) -> dict:
"""从向量数据库检索相关文档"""
query = state.get("rewritten_query", state["messages"][-1].content)
docs = retriever.invoke(query)
return {"documents": [doc.page_content for doc in docs]}
# ═══ 6. 生成节点 ═══
def generate(state: RAGState) -> dict:
"""基于检索结果(如有)生成最终回答"""
query = state["messages"][-1].content
docs = state.get("documents", [])
context = "\n\n".join(docs) if docs else "无相关文档。"
response = llm.invoke([
"系统": f"基于以下上下文回答用户问题。\n\n上下文:\n{context}",
"用户": query,
])
return {"messages": [response]}
# ═══ 7. 构建图 ═══
graph = StateGraph(RAGState)
graph.add_node("rewrite_query", rewrite_query)
graph.add_node("retrieve", retrieve)
graph.add_node("generate", generate)
graph.add_conditional_edges(START, route_query, {
"rewrite_query": "rewrite_query",
"generate": "generate",
})
graph.add_edge("rewrite_query", "retrieve")
graph.add_edge("retrieve", "generate")
graph.add_edge("generate", END)
# ═══ 8. 编译并运行 ═══
app = graph.compile()
result = app.invoke({
"messages": [HumanMessage(content="什么是 RAG?")],
"query": "",
"rewritten_query": "",
"documents": [],
})
print(result["messages"][-1].content)
route_query 节点用 LLM 判断问题类型,实际项目中也可以用分类器或关键词匹配来降低延迟和成本。LangGraph 支持在任意节点暂停执行,等待人工审批后再继续。这对于关键决策场景至关重要。
Pythonfrom langgraph.checkpoint.memory import MemorySaver
from langgraph.types import interrupt, Command
# 在工具执行前加入人工审批
def human_review_node(state: AgentState) -> dict:
last_message = state["messages"][-1]
if last_message.tool_calls:
# interrupt() 暂停执行,等待人工输入
human_response = interrupt({
"question": "请审批以下工具调用:",
"tool_calls": last_message.tool_calls,
})
if human_response["approved"]:
return {"messages": [last_message]} # 继续执行
else:
return {"messages": [AIMessage(content="工具调用被拒绝。")]}
return {}
# 编译时启用 checkpointer
checkpointer = MemorySaver()
app = graph.compile(checkpointer=checkpointer, interrupt_before=["tools"])
# 运行 — 会在 tools 节点前暂停
config = {"configurable": {"thread_id": "user-123"}}
for event in app.stream(input_state, config):
print(event)
# 人工审批后继续执行
app.invoke(Command(resume={"approved": True}), config)
interrupt_before=["tools"] 表示在进入 tools 节点前暂停;也可以用 interrupt_after 在节点执行后暂停。LangGraph 的 Checkpointing 机制会在每个节点执行后保存完整状态,支持恢复中断的执行和实现对话记忆。
Python — Checkpointer 类型# 1. 内存存储(开发调试用)
from langgraph.checkpoint.memory import MemorySaver
checkpointer = MemorySaver()
# 2. SQLite 存储(轻量级持久化)
from langgraph.checkpoint.sqlite import SqliteSaver
checkpointer = SqliteSaver.from_conn_string("./checkpoints.db")
# 3. Postgres 存储(生产环境推荐)
from langgraph.checkpoint.postgres import PostgresSaver
checkpointer = PostgresSaver.from_conn_string(
"postgresql://user:pass@localhost/db"
)
# 编译时传入 checkpointer
app = graph.compile(checkpointer=checkpointer)
# 每次调用传入 thread_id,同一 thread 自动恢复状态
config = {"configurable": {"thread_id": "conversation-1"}}
app.invoke({"messages": ["你好"]}, config)
app.invoke({"messages": ["继续上次的对话"]}, config) # 自动加载历史
MemorySaver 数据存储在内存中,重启后丢失。生产环境请使用 PostgresSaver 或 SqliteSaver。子图允许将复杂的图拆分为可复用的模块,类似于编程中的"函数封装"。
Python — 子图示例# 定义子图 State
class ResearchState(TypedDict):
query: str
results: List[str]
# 构建子图
research_graph = StateGraph(ResearchState)
research_graph.add_node("search", search_node)
research_graph.add_node("summarize", summarize_node)
research_graph.add_edge(START, "search")
research_graph.add_edge("search", "summarize")
research_graph.add_edge("summarize", END)
research_app = research_graph.compile()
# 在主图中将子图作为节点
def research_node(state: MainState) -> dict:
result = research_app.invoke({"query": state["query"]})
return {"research_results": result["results"]}
main_graph = StateGraph(MainState)
main_graph.add_node("research", research_node) # 子图作为节点
main_graph.add_node("generate", generate_node)
main_graph.add_edge(START, "research")
main_graph.add_edge("research", "generate")
main_graph.add_edge("generate", END)
add_node("name", subgraph_app),无需包装函数。LangGraph 提供多种流式模式,实时获取每个节点的输出和 Token 级别的流式响应。
Python — 流式输出# 模式 1: 流式获取每个节点的输出
async for event in app.astream_events(
{"messages": ["解释量子计算"]},
config,
version="v2",
):
kind = event["event"]
if kind == "on_chat_model_stream":
# Token 级别的流式输出
print(event["data"]["chunk"].content, end="")
elif kind == "on_tool_start":
print(f"\n[工具] {event['name']}")
elif kind == "on_chain_end":
print(f"\n[节点完成] {event['name']}")
# 模式 2: 按节点流式获取
async for node_output in app.astream(input_state, config):
# node_output 是 {"node_name": state_update}
for node_name, update in node_output.items():
print(f"节点 {node_name} 完成")
| 流式模式 | 粒度 | 适用场景 |
|---|---|---|
.stream() |
节点级别 | 监控每个节点的输出 |
.astream_events() |
Token / 事件级别 | 实现打字机效果、实时 UI 更新 |
.astream_nodes() |
节点级别(带元数据) | 需要节点执行时间和元信息 |
| 维度 | LangChain AgentExecutor | LangGraph |
|---|---|---|
| 架构模型 | 线性循环(Think→Act→Observe) | 有向图(任意拓扑结构) |
| 控制流 | 固定的 ReAct 循环 | 支持循环、分支、并行、回退等 |
| 状态管理 | 仅 ConversationBuffer/Memory | TypedDict 自定义,支持 Checkpointing |
| 人机交互 | 不支持中途暂停 | 原生 Human-in-the-loop |
| 可观测性 | 依赖回调系统 | 内置 streaming + LangSmith 深度集成 |
| 可组合性 | Agent 嵌套困难 | Subgraph 子图复用 |
| 学习曲线 | 较低,开箱即用 | 中等,需理解图概念 |
| 灵活性 | 低,扩展需要 hack | 高,完全可控 |
| 生产就绪 | 原型验证 | 生产推荐 ✅ |
| 适用场景 | 简单的单步/少步任务 | 复杂多步、多分支、多人协作任务 |
综合运用 LangGraph 的各项能力,构建一个具备多轮对话、知识库检索、工单创建能力的智能客服系统。
Python — customer_service_agent.pyfrom typing import TypedDict, Annotated, Literal, List
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
# ═══ State ═══
class CustomerServiceState(TypedDict):
messages: Annotated[list[BaseMessage], add_messages]
intent: str # 用户意图
satisfaction: bool # 用户满意度
ticket_id: str # 工单号
conversation_history: List[dict] # 多轮对话记录
llm = ChatOpenAI(model="gpt-4o")
# ═══ 1. 意图识别 ═══
def classify_intent(state: CustomerServiceState) -> dict:
"""识别用户意图:FAQ / 知识检索 / 工单创建 / 转人工"""
last_msg = state["messages"][-1].content
response = llm.invoke([
{"role": "system", "content": """
将用户消息分类为以下意图之一,只输出意图标签:
- faq: 常见问题(退换货、物流、价格等常见咨询)
- knowledge: 需要检索知识库的专业问题
- ticket: 需要创建工单的问题(投诉、技术故障等)
- human: 明确要求转人工或无法自动处理
"""},
{"role": "user", "content": last_msg},
])
intent = response.content.strip().lower()
return {"intent": intent}
# ═══ 2. FAQ 回答 ═══
def faq_answer(state: CustomerServiceState) -> dict:
"""基于 FAQ 知识库回答常见问题"""
faqs = {
"退货": "您可以在订单完成后 7 天内申请退货,请前往「我的订单」→「申请退货」。",
"物流": "您可以在「我的订单」中查看物流状态,一般 3-5 个工作日送达。",
"价格": "我们的价格会根据活动进行调整,请关注首页的优惠信息。",
}
query = state["messages"][-1].content
# 简单匹配,实际项目中用向量检索
for keyword, answer in faqs.items():
if keyword in query:
return {"messages": [AIMessage(content=answer)]}
return {"messages": [AIMessage(content="抱歉,我暂时无法回答这个问题,为您转接人工客服。")]}
# ═══ 3. 知识库检索 ═══
def knowledge_search(state: CustomerServiceState) -> dict:
"""从知识库检索并生成回答"""
query = state["messages"][-1].content
# 实际项目:使用 retriever 检索
docs = retriever.invoke(query)
context = "\n".join([doc.page_content for doc in docs])
response = llm.invoke([
{"role": "system", "content": f"基于以下知识库内容回答用户问题:\n{context}"},
{"role": "user", "content": query},
])
return {"messages": [response]}
# ═══ 4. 创建工单 ═══
def create_ticket(state: CustomerServiceState) -> dict:
"""创建客服工单"""
query = state["messages"][-1].content
# 实际项目:调用工单系统 API
ticket_id = f"TK-{datetime.now().strftime('%Y%m%d%H%M%S')}"
response = llm.invoke([
{"role": "system", "content": "你是一个客服助手。告知用户工单已创建,并提供工单号。"},
{"role": "user", "content": query},
])
return {
"messages": [response],
"ticket_id": ticket_id,
}
# ═══ 5. 满意度检查 ═══
def check_satisfaction(state: CustomerServiceState) -> Literal["end", "escalate"]:
"""检查用户是否满意回答"""
last_msg = state["messages"][-1].content
response = llm.invoke([
{"role": "system", "content": "判断用户对上一个回答是否满意。只回答 satisfied 或 unsatisfied。"},
{"role": "user", "content": last_msg},
])
if "satisfied" in response.content.lower():
return "end"
return "escalate"
# ═══ 6. 路由函数 ═══
def route_intent(state: CustomerServiceState) -> str:
intent = state["intent"]
routing = {
"faq": "faq_answer",
"knowledge": "knowledge_search",
"ticket": "create_ticket",
"human": "escalate",
}
return routing.get(intent, "faq_answer")
# ═══ 7. 构建图 ═══
graph = StateGraph(CustomerServiceState)
graph.add_node("classify_intent", classify_intent)
graph.add_node("faq_answer", faq_answer)
graph.add_node("knowledge_search", knowledge_search)
graph.add_node("create_ticket", create_ticket)
graph.add_node("check_satisfaction", check_satisfaction)
graph.add_edge(START, "classify_intent")
graph.add_conditional_edges("classify_intent", route_intent, {
"faq_answer": "faq_answer",
"knowledge_search": "knowledge_search",
"create_ticket": "create_ticket",
"escalate": "check_satisfaction",
})
graph.add_edge("faq_answer", "check_satisfaction")
graph.add_edge("knowledge_search", "check_satisfaction")
graph.add_edge("create_ticket", "check_satisfaction")
graph.add_conditional_edges("check_satisfaction", check_satisfaction, {
"end": END,
"escalate": "escalate", # 可接入人工客服节点
})
# ═══ 8. 编译运行 ═══
app = graph.compile(checkpointer=MemorySaver())
# 多轮对话
config = {"configurable": {"thread_id": "customer-001"}}
print(app.invoke({
"messages": [HumanMessage(content="我想退货")],
"intent": "", "satisfaction": False, "ticket_id": "",
"conversation_history": [],
}, config))
结合 Twilio/Vonage API,实现语音客服
统计意图分布、满意度、平均响应时间
自动识别语言,切换对应知识库
FAQ 答案缓存、语义缓存减少 LLM 调用