一、前言
LangChain 的 | 管道在"固定链路"场景下很优雅——一个 Prompt 接一个 LLM,输出接一个 Parser,一条直线走到底。
但现实中的 Agent 不是直线:
根据工具返回结果,可能需要重新思考 而不是继续执行下一步
多轮对话需要记忆状态 在节点之间流转
分支条件(if 工具报错就走回退策略)是常态不是例外
LangGraph 就是干这个的——把 Agent 执行流程画成一张有向图 :节点是计算单元(LLM 调用、工具执行、条件判断),边是数据流向。LangChain 的 Chain 其实就是这张图的特例——只有一个节点的情况下。
本文不讲抽象概念,从一段"必须用图才能优雅解决"的代码开始,带你画出一个完整的 Agent 状态机。
二、最小图
2.1 安装
LangGraph 是独立包,不依赖 LangChain 全家桶,但跟 LangChain 的 Runnable 天然互通。
2.2 两节点一循环
还是先跑通最小链路——一个会"思考-行动-再思考"的简单 Agent:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 from typing import TypedDict, Literal from langgraph.graph import StateGraph, ENDfrom langgraph.checkpoint.memory import MemorySaverfrom langchain_openai import ChatOpenAIfrom langchain_core.tools import toolclass AgentState (TypedDict ): messages: list next_action: str @tool def search (query: str ) -> str : """模拟搜索引擎""" return f"关于 '{query} ' 的搜索结果:找到 3 条相关记录。" @tool def calculator (expr: str ) -> str : """模拟计算器""" return str (eval (expr)) tools = [search, calculator] llm = ChatOpenAI(model="gpt-4o" , temperature=0 ).bind_tools(tools) def decide (state: AgentState ) -> AgentState: response = llm.invoke(state["messages" ]) return {"messages" : [response]} def execute (state: AgentState ) -> AgentState: last_msg = state["messages" ][-1 ] if not last_msg.tool_calls: return {"next_action" : "end" } results = [] for tc in last_msg.tool_calls: tool_map = {"search" : search, "calculator" : calculator} result = tool_map[tc["name" ]].invoke(tc["args" ]) results.append(result) return {"messages" : [{"role" : "tool" , "content" : r} for r in results]} def should_continue (state: AgentState ) -> Literal ["execute" , "decide" , "__end__" ]: last_msg = state["messages" ][-1 ] if not last_msg.tool_calls: return "__end__" return "execute" builder = StateGraph(AgentState) builder.add_node("decide" , decide) builder.add_node("execute" , execute) builder.set_entry_point("decide" ) builder.add_conditional_edges("decide" , should_continue) builder.add_edge("execute" , "decide" ) graph = builder.compile (checkpointer=MemorySaver()) config = {"configurable" : {"thread_id" : "1" }} result = graph.invoke( {"messages" : [{"role" : "human" , "content" : "计算 2024 年全年天数乘以圆周率" }]}, config, ) print (result["messages" ][-1 ].content)
执行时,LangGraph 内部发生的事情:
1 2 3 4 5 decide → LLM 决定需要搜索"2024 年天数" → tool_calls=[search] execute → 调用 search,返回结果 decide → LLM 得到搜索结果:366 天 → 决定调用 calculator("366 * 3.14159") execute → 调用 calculator,返回 1149.82 decide → LLM 组装最终答案 → 没有 tool_calls → END
每个节点只做一件事,状态通过 AgentState 在节点间传递,不需要手动维护上下文。
三、核心概念
3.1 State — 数据在节点间怎么传
图里的每个节点都需要读写同一份数据。LangGraph 用一个 TypedDict(或者 Pydantic 模型)来管这件事,叫 State :
1 2 3 4 5 6 7 from typing import Annotated, Sequence from langgraph.graph.message import add_messagesclass State (TypedDict ): messages: Annotated[list , add_messages] user_info: dict retry_count: int
关键在于那个 Annotated —— 它带了一个 reducer ,决定字段冲突时怎么合并。不写 reducer 就直接覆盖,这是默认行为。写了 add_messages,那每次更新就是在列表后面追加,不会丢掉前面的消息。
1 2 3 4 5 6 def cap_retry (current: int , update: int ) -> int : return min (current + update, 3 ) class State (TypedDict ): retry_count: Annotated[int , cap_retry]
总结一下几个内置 reducer 的行为:
reducer
行为
add_messages
追加消息列表,不会覆盖已有消息
operator.add
列表拼接
默认
直接覆盖旧值
3.2 Node — 节点里写什么
节点就是个普通 Python 函数,收 State 返 dict,没别的花活:
1 2 3 4 5 6 7 8 9 10 11 def validate_input (state: State ) -> dict : if not state["messages" ]: return {"error" : "输入不能为空" } return {"validated" : True } def call_llm (state: State ) -> dict : response = llm.invoke(state["messages" ]) return {"messages" : [response]} def post_process (state: State ) -> dict : return {"result" : format_output(state["messages" ][-1 ])}
返回的 dict 不直接替换 State——它会跟当前 State 合并,每个字段走自己的 reducer。所以你不用担心一个节点不小心把别的字段搞丢了。
3.3 Edge — 节点之间怎么走
边就三种,很好记:
普通边 ——没得选,A 完事一定走 B:
1 builder.add_edge("step1" , "step2" )
条件边 ——根据当前 State 决定走哪条路:
1 2 3 4 5 6 7 8 9 def route (state: State ) -> Literal ["tool_call" , "respond" , "__end__" ]: last = state["messages" ][-1 ] if hasattr (last, "tool_calls" ) and last.tool_calls: return "tool_call" if state.get("need_human_confirm" ): return "respond" return "__end__" builder.add_conditional_edges("decide" , route)
记住一点:条件边返回的字符串必须对应一个已经注册的节点名 ,或者 END。写错了它不会默默忽略——直接抛异常。
入口边 ——告诉图从哪里开始:
1 builder.set_entry_point("decide" )
3.4 Checkpointer — 对话记忆和断点续传
每次对话的状态快照可以存起来。开发时用 MemorySaver 就够了,放内存里:
1 2 3 4 5 6 7 from langgraph.checkpoint.sqlite import SqliteSavercheckpointer = SqliteSaver.from_conn_string("checkpoints.db" ) graph = builder.compile (checkpointer=checkpointer) config = {"configurable" : {"thread_id" : "user_001" }}
有了 checkpointer 之后能干的事:
1 2 3 4 5 6 7 8 9 graph.invoke({"messages" : [{"role" : "human" , "content" : "继续" }]}, config) state_history = list (graph.get_state_history(config)) checkpoint = state_history[-2 ] graph.update_state(config, checkpoint.values)
四、实战:客服 Agent 带人工兜底
4.1 需求
一个客服 Agent,流程如下:
用户提问 → LLM 判断能否自动回复
能 → 直接回复用户
不能 → 转人工
转人工需要触发通知,并等待人工处理结果
4.2 实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 from typing import Literal from pydantic import BaseModel, Fieldfrom langgraph.graph import StateGraph, ENDfrom langgraph.checkpoint.memory import MemorySaverfrom langchain_openai import ChatOpenAIclass SupportState (BaseModel ): messages: list = [] auto_reply: str = "" need_human: bool = False human_resolved: bool = False def classify (state: SupportState ) -> dict : classifier = ChatOpenAI(model="gpt-4o-mini" , temperature=0 ) schema = { "type" : "object" , "properties" : { "can_auto" : {"type" : "boolean" }, "reply" : {"type" : "string" }, }, "required" : ["can_auto" ], } result = classifier.with_structured_output(schema).invoke( f"判断以下用户问题能否自动回复:\n{state.messages[-1 ]['content' ]} " ) if result["can_auto" ]: return {"auto_reply" : result["reply" ]} return {"need_human" : True } def auto_reply (state: SupportState ) -> dict : return {"messages" : [{"role" : "assistant" , "content" : state.auto_reply}]} def escalate (state: SupportState ) -> dict : print (f"[ESCALATE] 工单已创建,等待人工处理" ) return {"messages" : [{"role" : "assistant" , "content" : "已转接人工客服,请稍候。" }]} def route (state: SupportState ) -> Literal ["auto_reply" , "escalate" , "__end__" ]: if state.auto_reply: return "auto_reply" if state.need_human: return "escalate" return "__end__" builder = StateGraph(SupportState) builder.add_node("classify" , classify) builder.add_node("auto_reply" , auto_reply) builder.add_node("escalate" , escalate) builder.set_entry_point("classify" ) builder.add_conditional_edges("classify" , route) builder.add_edge("auto_reply" , END) graph = builder.compile (checkpointer=MemorySaver()) result = graph.invoke({ "messages" : [{"role" : "human" , "content" : "我的订单 12345 还没发货,帮我查一下" }] }) print (result["messages" ])
4.3 人工接管后的恢复
LangGraph 支持中断执行 ,人工处理后恢复图继续执行:
1 2 3 4 5 6 7 8 9 10 11 graph = builder.compile ( checkpointer=MemorySaver(), interrupt_before=["escalate" ], ) graph.update_state( config, {"human_resolved" : True , "messages" : [{"role" : "human" , "content" : "已处理完成" }]}, )
这解决了 Agent 落地的核心痛点:AI 兜不住的,转人工;人工处理完,AI 继续 。
五、对比 LangChain
维度
LangChain Chain
LangGraph
流程形态
线性管道 \|
有向图(支持循环、分支)
状态管理
隐式,靠 Runnable 传参
显式 State + Reducer
多轮对话
需要手动拼接 history
内置 add_messages reducer
分支条件
用 RunnableBranch,有限
条件边,任意路由
持久化
无内置
Checkpointer 插拔
中断/恢复
不支持
interrupt_before / update_state
简单规则:如果你的 Agent 只有一条直线链路,用 Chain。一旦出现"根据结果决定下一步",换 LangGraph。
六、总结
概念
作用
关键 API
State
图共享状态,TypedDict + Reducer
add_messages / 自定义 reducer
Node
计算单元,输入 state 输出 dict
add_node(name, fn)
Edge
连接 Node,决定流向
add_edge / add_conditional_edges
Checkpointer
状态持久化,支持中断恢复
MemorySaver / SqliteSaver
Compile
把图定义编译成可执行对象
builder.compile()
LangGraph 最核心的认知转变是:不要写 Agent,要画 Agent 。
每个节点只负责一件事,状态图就是你的业务流程图,条件边就是你的 if/else。代码结构和业务逻辑结构一一对应——读代码就是读流程,改流程就是改代码。
画一张图,编译它,你的 Agent 就开始跑了。