在AI技术飞速迭代的当下,单一大型语言模型(LLM)在面对复杂业务场景时逐渐显得力不从心。无论是需要多步骤协作的任务处理,还是跨系统的数据交互,单一模型的“单打独斗”模式都难以兼顾效率与准确性。而多智能体(Multi-Agent)系统通过将复杂任务拆解给不同专业子智能体,再由“主管”统一协调,恰好解决了这一痛点,成为工业级AI应用的新方向。
我将以智能工厂设备运维助手为实际场景,手把手教你用LangGraph构建功能完备的多智能体系统,同时结合LangFuse实现系统的可观测与量化评估。从架构设计到代码落地,再到性能优化,形成一套完整的工业级开发闭环,让你不仅能掌握技术细节,更能理解多智能体系统的设计思维。

在现代化智能工厂中,设备运维是保障生产连续性的核心环节。一线工程师常常需要同时处理“查询设备温度”“分析故障缘由”“创建维保工单”等多类任务,而传统的人工响应模式效率低、易出错。我们需要一个AI运维助手,能理解自然语言请求,自动调用不同系统完成任务,这正是多智能体系统的用武之地。
为了实现任务的高效拆分与协作,我们设计“主管(Supervisor)+子Agent”的分层架构,各角色分工明确:
当用户发送请求(如“P-205加压泵告警了,帮我安排检修”)时,系统流程如下:
在动手编码前,我们需要先搭建开发环境,并理解LangGraph的核心组件,这是构建多智能体系统的“地基”。
第一安装所需依赖库,这些工具将覆盖模型调用、图计算、可观测性等核心需求:
pip install langchain langchain-openai langgraph langfuse typing_extensions python-dotenv
接着在项目根目录创建.env文件,存储API密钥(敏感信息不直接写在代码中,是工业级开发的基本规范):
# OpenAI API密钥,用于调用大语言模型
OPENAI_API_KEY="your-openai-api-key"
# LangFuse相关配置,用于系统观测与评估
LANGFUSE_SECRET_KEY="sk-lf-xxxxxxxxxxxxxxxxxx"
LANGFUSE_PUBLIC_KEY="pk-lf-xxxxxxxxxxxxxxxxxx"
LANGFUSE_HOST="https://cloud.langfuse.com" # 自托管部署可替换为私有地址
LangGraph基于图计算思想设计,核心是“状态流转”,理解这三个概念就能掌握其工作原理:
工业级系统需要“短期记忆”和“长期记忆”协同工作:
初始化记忆系统的代码如下:
import os
from dotenv import load_dotenv
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.store.memory import InMemoryStore
# 加载.env文件中的环境变量
load_dotenv()
# 短期记忆:SQLite检查点,存储单次对话状态
short_term_memory = SqliteSaver.from_conn_string(":memory:")
# 长期记忆:内存存储,生产环境提议替换为Redis
long_term_memory = InMemoryStore()
接下来我们逐步构建系统的每个组件,先从两个子Agent入手,再实现主管Agent的协调逻辑。
这个Agent需要完成“理解用户查询→调用工具查数据→返回结果”的流程,属于典型的ReAct(推理与行动)风格Agent。
第一设计该Agent的状态字典,明确需要传递的数据:
from typing import TypedDict, Annotated, List
from langgraph.graph.message import AnyMessage, add_messages
class AgentState(TypedDict):
# 对话历史,add_messages会自动聚合新旧消息
messages: Annotated[list[AnyMessage], add_messages]
# 设备历史故障记录(从长期记忆加载)
device_history: str
# 当前操作的设备ID
device_id: str
# 防止无限循环的安全计数器
remaining_steps: int
Agent通过工具与外部系统交互,这里模拟工厂IoT数据库的查询工具:
from langchain_core.tools import tool
import random
# 模拟设备数据库,生产环境需替换为真实IoT数据库连接
DEVICE_DB = {
"C-101": {"name": "1号数控车床", "status": "正常", "temp": 65.5, "pressure": 1.2},
"P-205": {"name": "5号加压泵", "status": "告警", "temp": 92.1, "pressure": 2.5},
"M-308": {"name": "8号铣床", "status": "正常", "temp": 72.3, "pressure": 1.0}
}
@tool
def get_device_status(device_id: str) -> dict:
"""根据设备ID查询实时运行状态,包括名称、状态、温度、压力"""
print(f"[工具调用] get_device_status: 设备ID={device_id}")
return DEVICE_DB.get(device_id, {"error": "未找到该设备"})
@tool
def find_high_temp_devices() -> list[str]:
"""查询所有温度超过80度的设备ID列表"""
print(f"[工具调用] find_high_temp_devices")
high_temp_list = [dev_id for dev_id, data in DEVICE_DB.items() if data["temp"] > 80.0]
return high_temp_list
# 工具集合,后续绑定给LLM
status_tools = [get_device_status, find_high_temp_devices]
一个ReAct Agent需要两个核心节点:“思考节点”(决定是否调用工具)和“工具执行节点”(执行工具调用):
from langchain_openai import ChatOpenAI
from langchain_core.messages import SystemMessage, HumanMessage
from langgraph.prebuilt import ToolNode
# 初始化LLM,并绑定工具(让模型知道有哪些工具可用)
llm = ChatOpenAI(model="gpt-4-turbo", temperature=0)
llm_with_tools = llm.bind_tools(status_tools)
# 1. 思考节点:根据对话历史决定下一步行动
def status_agent_think(state: AgentState):
"""设备状态查询Agent的思考逻辑,判断是否需要调用工具或直接回答"""
system_prompt = """
你是专业的设备状态分析员,负责用工具查询工厂设备实时数据。
若用户未提供设备ID但有具体问题(如“哪些设备温度过高”),需主动调用find_high_temp_devices;
若已获取工具返回结果,直接整理成简洁明了的中文回答,无需额外解释。
"""
# 调用LLM生成思考结果
response = llm_with_tools.invoke(
[SystemMessage(content=system_prompt)] + state["messages"]
)
return {"messages": [response], "remaining_steps": state["remaining_steps"] - 1}
# 2. 工具执行节点:LangGraph预置的ToolNode,自动执行工具调用
status_tool_executor = ToolNode(status_tools)
将节点和边组合成可运行的图,定义流程逻辑:
from langgraph.graph import StateGraph, START, END
def should_continue(status: AgentState):
"""条件边判断函数:若最后一条消息有工具调用,则继续;否则结束"""
last_msg = status["messages"][-1]
if last_msg.tool_calls and status["remaining_steps"] > 0:
return "continue"
return "end"
# 创建图构建器
graph_builder = StateGraph(AgentState)
# 添加节点
graph_builder.add_node("think", status_agent_think) # 思考节点
graph_builder.add_node("tool", status_tool_executor) # 工具执行节点
# 定义流程:开始→思考节点,思考节点根据条件判断走向
graph_builder.add_edge(START, "think")
graph_builder.add_conditional_edges(
"think",
should_continue,
{"continue": "tool", "end": END}
)
# 工具执行后返回思考节点,继续判断是否需要进一步操作
graph_builder.add_edge("tool", "think")
# 编译图,启用检查点(短期记忆)
status_agent_graph = graph_builder.compile(checkpointer=short_term_memory)
为了展示开发灵活性,这个Agent我们用LangGraph的create_react_agent预构建函数快速实现,专注于工单创建功能。
@tool
def create_maintenance_ticket(device_id: str, issue_description: str) -> dict:
"""为故障设备创建维保工单,需传入设备ID和问题描述"""
print(f"[工具调用] create_maintenance_ticket: 设备ID={device_id}, 问题={issue_description}")
# 模拟生成工单ID,生产环境需对接真实工单系统API
ticket_id = random.randint(1000, 9999)
return {
"status": "success",
"ticket_id": ticket_id,
"message": f"已为设备{device_id}创建维保工单,工单ID:{ticket_id}"
}
maintenance_tools = [create_maintenance_ticket]
用预构建函数简化开发,只需传入LLM、工具和提示词:
from langgraph.prebuilt import create_react_agent
maintenance_agent_prompt = """
你是工厂维保任务调度员,负责根据设备故障信息创建维保工单。
创建工单时必须包含设备ID和清晰的问题描述,若信息不全需追问用户;
收到工具返回的工单结果后,直接整理成中文反馈给用户。
"""
# 用预构建函数创建Agent,自动集成思考-工具执行流程
maintenance_agent = create_react_agent(
llm=llm,
tools=maintenance_tools,
messages_modifier=maintenance_agent_prompt,
checkpointer=short_term_memory
)
主管Agent不直接操作工具,而是作为“调度员”,解析用户请求并分配给合适的子Agent。其核心是“路由逻辑”——判断该调用哪个子Agent。
主管通过“路由工具”标识需要调用的子Agent(工具函数体为空,仅作为路由标记):
@tool
def route_to_status_agent(query: str):
"""路由工具:处理设备状态、温度、压力等实时数据查询请求"""
pass
@tool
def route_to_maintenance_agent(device_id: str, issue: str):
"""路由工具:处理维保工单创建、检修安排等请求"""
pass
# 子Agent注册表,主管通过此表找到对应Agent
agent_registry = {
"status_agent": {"instance": status_agent_graph, "router": route_to_status_agent},
"maintenance_agent": {"instance": maintenance_agent, "router": maintenance_agent_router}
}
主管的核心是调用LLM分析用户请求,输出路由工具调用,再根据工具调用结果转发任务:
from langchain_core.tools import render_text_description
def supervisor_decision(state: AgentState):
"""主管Agent的决策逻辑:解析请求并路由给对应子Agent"""
# 生成工具描述,让LLM知道如何路由
tool_descriptions = render_text_description([
route_to_status_agent, route_to_maintenance_agent
])
system_prompt = f"""
你是智能工厂运维主管,负责协调团队完成用户请求。你的团队成员及分工:
- status_agent:查询设备实时状态,需用route_to_status_agent路由;
- maintenance_agent:创建维保工单,需用route_to_maintenance_agent路由,且必须传入device_id和issue。
工具列表:
{tool_descriptions}
处理流程:
1. 分析用户请求,判断需要调用哪个子Agent;
2. 若子Agent返回结果后仍需后续操作(如查完状态需创建工单),继续路由;
3. 所有任务完成后,整理结果反馈用户。
"""
# 调用LLM生成路由决策
response = llm.bind_tools([route_to_status_agent, route_to_maintenance_agent]).invoke(
[SystemMessage(content=system_prompt)] + state["messages"]
)
return {"messages": [response], "remaining_steps": state["remaining_steps"] - 1}
# 实现路由执行逻辑:根据主管决策调用对应子Agent
def execute_route(state: AgentState):
"""根据主管的工具调用结果,执行子Agent路由"""
last_msg = state["messages"][-1]
if not last_msg.tool_calls:
return state
# 解析工具调用信息
tool_call = last_msg.tool_calls[0]
agent_name = None
agent_input = tool_call["args"]
# 匹配对应的子Agent
if tool_call["name"] == "route_to_status_agent":
agent_name = "status_agent"
elif tool_call["name"] == "route_to_maintenance_agent":
agent_name = "maintenance_agent"
if not agent_name:
return {"messages": state["messages"] + ["未找到对应处理Agent"]}
# 调用子Agent
agent_instance = agent_registry[agent_name]["instance"]
agent_result = agent_instance.invoke({
"messages": [HumanMessage(content=str(agent_input))],
"device_history": state["device_history"],
"device_id": agent_input.get("device_id", ""),
"remaining_steps": 3
})
# 将子Agent结果加入状态
return {
"messages": state["messages"] + agent_result["messages"],
"device_history": state["device_history"],
"device_id": agent_input.get("device_id", ""),
"remaining_steps": state["remaining_steps"] - 1
}
将主管决策节点和路由执行节点组合,形成完整的系统入口:
# 创建主管系统图
supervisor_graph_builder = StateGraph(AgentState)
# 添加节点
supervisor_graph_builder.add_node("supervisor_decision", supervisor_decision)
supervisor_graph_builder.add_node("execute_route", execute_route)
# 定义流程:开始→主管决策→路由执行→回到决策节点判断是否继续
supervisor_graph_builder.add_edge(START, "supervisor_decision")
supervisor_graph_builder.add_conditional_edges(
"supervisor_decision",
lambda s: "execute_route" if s["messages"][-1].tool_calls else "end",
{"execute_route": "execute_route", "end": END}
)
supervisor_graph_builder.add_edge("execute_route", "supervisor_decision")
# 编译主管系统,启用检查点
final_system_graph = supervisor_graph_builder.compile(checkpointer=short_term_memory)
工业场景中,“绝对自动化”往往存在风险,列如关键设备停机检修需要人工确认;同时,设备的历史故障记录对故障分析至关重大。这就需要加入“人机回环”和“长期记忆”功能。
通过LangGraph的GraphInterrupt实现流程中断,让人工介入关键操作:
from langgraph.errors import GraphInterrupt
def critical_operation_check(state: AgentState):
"""检查是否为关键操作,若是则中断流程等待人工确认"""
# 假设“创建涉及停机的维保工单”为关键操作
last_msg = state["messages"][-1].content
if "停机" in last_msg or "加压泵" in last_msg: # 加压泵为关键设备
raise GraphInterrupt("关键维保任务需停机,输入'yes'确认执行:")
return state
# 将检查节点加入主管系统流程(在路由执行前)
supervisor_graph_builder.add_node("critical_check", critical_operation_check)
supervisor_graph_builder.add_edge("supervisor_decision", "critical_check")
supervisor_graph_builder.add_edge("critical_check", "execute_route")
# 编译更新后的系统
final_system_graph = supervisor_graph_builder.compile(checkpointer=short_term_memory)
当系统执行到critical_operation_check节点时,若检测到关键操作,会抛出中断并提示用户确认。用户输入“yes”后,系统继续执行后续流程,避免误操作。
实现设备历史故障记录的加载与更新,让Agent“记住”过往信息:
def load_device_history(state: AgentState):
"""从长期记忆加载设备历史故障记录"""
device_id = state.get("device_id")
if not device_id:
return {"device_history": "未指定设备ID,无历史记录"}
# 从长期记忆存储中获取数据
history = long_term_memory.get(f"device:{device_id}:fault_history")
return {"device_history": history or "该设备无历史故障记录"}
def update_device_history(state: AgentState):
"""若检测到新故障,更新长期记忆"""
last_msg = state["messages"][-1].content
device_id = state.get("device_id")
if not device_id:
return state
# 简单判断是否有新故障(实际场景可通过LLM分析)
if "告警" in last_msg or "温度过高" in last_msg:
new_fault = f"[{os.getenv('DATE')}] {last_msg}"
# 读取现有历史并更新
existing_history = long_term_memory.get(f"device:{device_id}:fault_history") or ""
updated_history = existing_history + ";" + new_fault if existing_history else new_fault
long_term_memory.put(f"device:{device_id}:fault_history", updated_history)
return state
# 将记忆节点加入流程:开始→加载记忆→主管决策
supervisor_graph_builder.add_node("load_memory", load_device_history)
supervisor_graph_builder.add_node("update_memory", update_device_history)
supervisor_graph_builder.add_edge(START, "load_memory")
supervisor_graph_builder.add_edge("load_memory", "supervisor_decision")
supervisor_graph_builder.add_edge("execute_route", "update_memory")
supervisor_graph_builder.add_edge("update_memory", "supervisor_decision")
# 最终编译系统
final_system_graph = supervisor_graph_builder.compile(checkpointer=short_term_memory)
工业级系统不仅要“能用”,还要“好用”且“可控”。LangFuse作为开源可观测性平台,能协助我们追踪系统运行、评估性能、优化迭代。
在系统中集成LangFuse,追踪Agent的每一步执行:
from langfuse import Langfuse
from langfuse.decorators import traceable
# 初始化LangFuse客户端(自动读取.env中的密钥)
langfuse = Langfuse()
@traceable(name="smart_factory_system_run")
def run_system(user_query: str, device_id: str = ""):
"""系统执行入口,用@traceable装饰器开启LangFuse追踪"""
config = {"configurable": {"thread_id": f"user_{random.randint(1000,9999)}"}} # 唯一对话ID
result = final_system_graph.invoke(
{
"messages": [HumanMessage(content=user_query)],
"device_id": device_id,
"device_history": "",
"remaining_steps": 5
},
config=config
)
return result["messages"][-1].content
当调用run_system("P-205加压泵告警了")时,LangFuse会自动记录:
在LangFuse中创建测试用例,用于系统评估:
# 创建数据集(也可在LangFuse UI中手动创建)
langfuse.create_dataset(name="factory_system_test_set")
# 添加测试用例
test_cases = [
{
"input": "查询1号数控车床(C-101)的温度",
"expected_output": "1号数控车床当前温度为65.5度,状态正常",
"device_id": "C-101"
},
{
"input": "P-205加压泵告警,帮我创建维保工单",
"expected_output": "已为设备P-205创建维保工单",
"device_id": "P-205"
},
{
"input": "哪些设备温度超过80度",
"expected_output": "温度超过80度的设备有P-205",
"device_id": ""
}
]
# 将测试用例添加到数据集
for idx, case in enumerate(test_cases):
langfuse.create_example(
dataset_name="factory_system_test_set",
name=f"test_case_{idx}",
input={"query": case["input"], "device_id": case["device_id"]},
expected_output=case["expected_output"]
)
用LLM作为“评估员”,自动判断系统输出是否符合预期:
from langfuse.model import CreateScore
from langfuse.evaluation import Evaluation
@traceable(name="system_correctness_evaluator")
def evaluate_correctness(actual_output: str, expected_output: str):
"""评估系统输出的正确性:1分正确,0分错误"""
prompt = f"""
你是严格的评估员,判断实际输出是否包含预期输出的核心信息。
输出格式:先给分数(1或0),再用一句话说明理由。
预期输出:{expected_output}
实际输出:{actual_output}
"""
eval_result = llm.invoke([HumanMessage(content=prompt)]).content
# 解析分数(简单逻辑,实际可优化)
score = 1 if "1" in eval_result.split("
")[0] else 0
reason = "
".join(eval_result.split("
")[1:]).strip()
return CreateScore(name="correctness", value=score, comment=reason)
# 运行评估任务
evaluation = Evaluation(
name="factory_system_v1_evaluation",
dataset_name="factory_system_test_set",
model=lambda example: run_system(example["input"]["query"], example["input"]["device_id"]),
scorers=[evaluate_correctness]
)
# 执行评估
evaluation.run()
print("评估完成!前往LangFuse UI查看结果:https://cloud.langfuse.com")
评估完成后,在LangFuse仪表板能看到:
通过智能工厂运维助手的案例,完整演示了用LangGraph和LangFuse构建工业级多智能体系统的流程。从架构设计到代码实现,再到可观测性建设,我们形成了一套“开发-测试-优化”的闭环。
在实际生产落地时,还需要注意以下几点:
多智能体系统正在成为复杂AI应用的主流架构,而LangGraph和LangFuse的组合为我们提供了从快速原型到工业级部署的完整工具链。希望本文的实战经验能协助你在多智能体领域少走弯路,打造出更高效、更可靠的AI系统。