0%

跟着😺NanoBot学AI智能体设计和开发5:当我们问“使用子智能体做xxx”时nanobot做了啥

深入源码:Nanobot Agent 如何处理“使用子智能体”的交互指令

在多智能体(Multi-Agent)架构中,主智能体如何理解用户意图并动态派发任务给子智能体,是一个非常核心的设计。本文将以 Nanobot Agent 为例,详细拆解当用户在终端输入交互指令:“请使用子智能体调研一下hermes agent” 时,系统在底层是如何一步步流转的。

1. 接收输入:构造 InboundMessage

一切的起点始于用户在交互界面(例如 CLI 终端)输入文本。Nanobot 使用统一的事件总线(Message Bus)来解耦各个渠道和核心 Agent 逻辑。

当用户敲下回车后,CLI 渠道会捕获这段文本,并将其封装为一个标准化的 InboundMessage 对象。这个数据类定义在 events.py 中:

1
2
3
4
5
6
7
@dataclass
class InboundMessage:
channel: str # 例如 "cli"
sender_id: str # 用户标识,例如 "user"
chat_id: str # 会话标识,例如 "direct"
content: str # 消息文本内容
# ...

在 CLI 交互模式下,具体的构造逻辑位于 commands.py 中:

1
2
3
4
5
6
7
await bus.publish_inbound(InboundMessage(
channel=cli_channel,
sender_id="user",
chat_id=cli_chat_id,
content=user_input, # 即 "请使用子智能体调研一下hermes agent"
metadata={"_wants_stream": True},
))

随后,该消息被推入消息总线,等待主 Agent 循环消费。

2. 消息路由:主循环与会话加锁

主 Agent 在后台运行着一个监听循环,即 loop.py 中的 run() 方法。这个方法是整个机器人响应系统的心脏,它采用了异步和非阻塞的设计模式。我们一步步解析 run() 到底做了什么:

run() 方法的执行拆解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
async def run(self) -> None:
self._running = True
await self._connect_mcp() # 1. 启动前初始化 MCP (Model Context Protocol) 插件连接
logger.info("Agent loop started")

while self._running:
try:
# 2. 带超时的消息轮询
msg = await asyncio.wait_for(self.bus.consume_inbound(), timeout=1.0)
except asyncio.TimeoutError:
continue
# ... 省略异常处理 ...

raw = msg.content.strip()
# 3. 优先级命令拦截
if self.commands.is_priority(raw):
ctx = CommandContext(msg=msg, session=None, key=msg.session_key, raw=raw, loop=self)
result = await self.commands.dispatch_priority(ctx)
if result:
await self.bus.publish_outbound(result)
continue

# 4. 普通消息的异步派发
task = asyncio.create_task(self._dispatch(msg))
self._active_tasks.setdefault(msg.session_key, []).append(task)
task.add_done_callback(...)
  1. MCP 插件初始化:在进入死循环之前,先通过 _connect_mcp() 建立与配置的外部插件(如本地开发工具链、数据库查询接口等)的连接。
  2. 带超时的非阻塞拉取:通过 asyncio.wait_for(..., timeout=1.0) 从总线 consume_inbound() 获取消息。设置 1 秒超时是为了让循环能周期性“醒来”检查 self._running 的状态,从而支持优雅的平滑退出(Graceful Shutdown)。
  3. 优先指令拦截(Priority Commands):如果收到像 /stop 这样紧急干预指令,self.commands.is_priority(raw) 会返回 True。此时直接 await self.commands.dispatch_priority(ctx) 并在当前协程同步执行完毕,它能立即终止对应会话下的所有活跃任务和子智能体。这正是为什么即便机器人还在长篇大论地生成或执行耗时任务,你依然可以随时用 /stop 叫停它。
  4. 异步任务派发(Task Dispatch):对于绝大多数普通对话消息(包括我们的“请使用子智能体…”),run() 方法不会在原地等待它执行完。而是通过 asyncio.create_task(self._dispatch(msg)) 创建一个后台任务,并将其注册到该会话的活跃任务字典 _active_tasks 中,以便后续可能被 /stop 取消。

通过异步派发,普通消息进入到 _dispatch(msg) 中。在这里,系统会根据 session_key 获取一把异步锁:

1
2
3
4
lock = self._session_locks.setdefault(msg.session_key, asyncio.Lock())
async with lock, gate:
# ...
response = await self._process_message(msg, ...)

这保证了同一个用户的同一个会话是串行处理的,而不同用户的会话可以并发处理。随后消息进入 _process_message 方法。

3. 意图解析与工具选择:LLM Function Calling

_process_message 中,Nanobot 并没有硬编码写死 if "子智能体" in text 的判断逻辑。相反,它构建了上下文后,直接将消息丢给了大语言模型(LLM)。

3.1 主智能体的 Prompt 构造

在每次请求大模型前,Nanobot 都会通过 ContextBuilder.build_messages()(位于 context.py)来组装发给 LLM 的上下文,其中包括:

  1. System Prompt:由 build_system_prompt() 动态生成,包含机器人的人设(identity.md)、用户的自定义规则(AGENTS.md, SOUL.md 等)、以及当前已加载的 Skills 列表。
  2. Runtime Context:通过 _build_runtime_context() 注入当前系统时间、渠道来源等元数据。
  3. 对话历史:从数据库加载该 Session 的上下文记录。
  4. 当前输入:即用户输入的“请使用子智能体调研一下hermes agent”。

3.2 发现工具:SpawnTool Schema

大模型之所以“知道”它可以使用子智能体,是因为主智能体在请求时携带了所有可用工具的 JSON Schema 定义。

主智能体在初始化时,将 SpawnTool 注册进了工具集(见 loop.py)。在与 LLM 交互时(runner.py_request_model 方法),Nanobot 会通过 self.tools.get_definitions() 获取工具定义。

SpawnTool 的描述位于 spawn.py。最终转换为 OpenAI 标准的 Function Schema 发送给大模型,其结构大致如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
{
"type": "function",
"function": {
"name": "spawn",
"description": "Spawn a subagent to handle a task in the background. Use this for complex or time-consuming tasks that can run independently...",
"parameters": {
"type": "object",
"properties": {
"task": {"type": "string", "description": "The task for the subagent to complete"},
"label": {"type": "string", "description": "Optional short label for the task (for display)"}
},
"required": ["task"]
}
}
}

由于用户的输入明确要求“请使用子智能体…”,LLM 读取到 spawn 工具的 description 匹配该意图,于是决定触发 Function Calling,输出类似 {"name": "spawn", "arguments": {"task": "调研一下hermes agent"}} 的指令。

4. 派发任务:SpawnToolSubagentManager

AgentRunner 接收到 LLM 返回的 Tool Call 请求后,会执行对应的工具代码。对于 spawn 工具,其执行体 spawn.py 仅仅是一个简单的转发层:

1
2
3
4
5
6
7
8
async def execute(self, task: str, label: str | None = None, **kwargs: Any) -> str:
return await self._manager.spawn(
task=task,
label=label,
origin_channel=self._origin_channel,
origin_chat_id=self._origin_chat_id,
session_key=self._session_key,
)

SubagentManager.spawn 方法(subagent.py)接管后,会做两件事:

  1. 启动后台任务:通过 asyncio.create_task(self._run_subagent(...)) 将耗时的调研任务丢到后台执行。
  2. 立即回复主模型:返回类似 "Subagent [...] started (id: ...). I'll notify you when it completes." 的字符串。这使得主智能体能立刻给用户一个响应,告知任务已在后台开启,而不必让用户在此处干等。

5. 子智能体执行:专用的运行沙盒

后台的 _run_subagent 任务(subagent.py)代表了子智能体的实际运行过程。它的设计有几个关键特点:

  • 隔离的工具集:它创建了一个全新的 ToolRegistry,包含了文件读写、Grep/Glob 检索,甚至 Web 搜索工具。但刻意去掉了发消息给用户的工具和再次生成子智能体的 spawn 工具,以防无限递归。
  • 独立的 System Prompt:在调用 LLM 之前,系统通过 self._build_subagent_prompt() 方法为子智能体构造了专属的人设。在 subagent_system.md 模板中,系统提示词明确规定:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    # Subagent

    {{ time_ctx }}

    You are a subagent spawned by the main agent to complete a specific task.
    Stay focused on the assigned task. Your final response will be reported back to the main agent.

    ## Workspace
    {{ workspace }}

    这段提示词极大地压缩了主智能体复杂的闲聊属性,使得子智能体仅仅作为一个纯粹的“打工人”,去执行传入的 task 参数内容(即 {"role": "user", "content": "调研一下hermes agent"})。

  • 有限的迭代:它使用独立的 AgentRunner 跑一个上限为 15 轮的小迭代,防止子智能体在遇到死胡同时无限制地消耗 Tokens。

在这个环节,子智能体带着上述提示词,会在代码库里检索 hermes agent,收集到相关结论后生成内部的总结报告。

6. 结果回注:巧妙的 system 消息通道

子智能体执行完毕后,并不能直接调用 API 发消息给用户。为了将结果带回给主智能体,它调用了 _announce_result 方法(subagent.py)。

这里使用了非常巧妙的事件驱动设计:将结果伪装成一条来自 system 渠道的 InboundMessage 重新发布到消息总线上

1
2
3
4
5
6
7
msg = InboundMessage(
channel="system",
sender_id="subagent",
chat_id=f"{origin['channel']}:{origin['chat_id']}",
content=announce_content,
)
await self.bus.publish_inbound(msg)

announce_content 的渲染模板 subagent_announce.md 中还包含了一条隐藏指令:“Summarize this naturally for the user. Keep it brief… Do not mention technical details like ‘subagent’ or task IDs.”

7. 二次转述:主智能体的最终回应

这条带有系统通知的 InboundMessage 再次进入了主循环的 _process_message

loop.pysystem 消息专用分支中,系统会从 chat_id 中解析出原始的 CLI 会话。由于消息的发送者是 subagent,系统会将其标记为 current_role = "assistant",代表这是助手自身完成的后台任务结果:

1
2
3
current_role = "assistant" if msg.sender_id == "subagent" else "user"
messages = self.context.build_messages(..., current_role=current_role)
final_content, _, all_msgs = await self._run_agent_loop(messages, ...)

带着之前历史上下文和这份子智能体的详细报告,主智能体(LLM)再次被唤醒。根据模板中的隐藏指令,LLM 会将繁杂的技术报告转化为对用户友好的自然语言摘要,最终封装为 OutboundMessage 返回给 CLI 渠道展示在屏幕上。

总结

Nanobot 对“使用子智能体”这一请求的处理,优雅地展示了现代 Agent 框架的解耦美学:

  1. 渠道层:负责标准化 InboundMessage
  2. 意图层:不靠正则匹配,全权交由 LLM Function Calling 决策。
  3. 执行层:后台异步调度,主从沙盒隔离。
  4. 闭环层:利用事件总线回注系统消息,触发主智能体二次总结。

这种基于事件总线和工具抽象的机制,赋予了系统极高的扩展性。