AI 搜索会是什么样?——百度 AI 搜索新范式(3)
作者:微信文章“运筹帷幄之中,决胜千里之外。智周万物而道济,变应千机自圆融。”
上回书说到,AI 搜索之所以效果更好,是因为技术范式发生了巨大飞跃,架构从“检索”演变到了“推理”。AI 搜索已经是一个具备规划、分解、协作、反思能力的智能化系统,能够真正像一个助理一样,为用户完成复杂的信息处理和问题解决任务。
本讲开始,具体说说论文里提到的这几个 Agent,以及给我们的启发,即便我们不做 AI 搜索,只要是 Agent,都会有些价值。
AI 搜索范式概览Master Agent
图中最左边的 Master Agent 是整个 AI 搜索系统的"大脑中枢"和"总指挥",它的设计是整个系统能否高效运转的关键。我们详细看下 Master Agent 的设计原理、协同机制和实现难点等。为清晰表达逻辑,用了一些伪代码,注意原论文不包括以下代码,此处仅用于示例。
职责概览
论文第 2 节中讲了 Master Agent 的四大核心职责:
"""注:本文所有代码仅为示意,并非原论文的真实实现。
"""
class MasterAgent: """ Master Agent 的四大核心职责: 1. 查询复杂度分析 (Query Complexity Analysis) 2. 动态组队决策 (Dynamic Team Configuration) 3. 执行监控 (Execution Monitoring) 4. 反思与重规划 (Reflection & Re-planning) """
def __init__(self): self.complexity_analyzer = ComplexityAnalyzer() self.team_configurator = TeamConfigurator() self.execution_monitor = ExecutionMonitor() self.reflection_engine = ReflectionEngine()职责1: 查询复杂度分析
Master 首先需要准确判断用户查询的内在复杂度,这是整个系统选择处理策略的基础。
class ComplexityAnalyzer: """ 复杂度分析器 - Master的核心组件之一 """
def __init__(self): self.classifier_llm = DeepSeekR1Model()# 强大的分类器 LLM self.complexity_features = ComplexityFeatureExtractor()
def analyze_complexity(self, query: str) -> ComplexityLevel: """ 多维度复杂度分析 """ # 1. 静态特征提取 static_features = self.extract_static_features(query)
# 2. LLM 驱动的语义分析 semantic_analysis = self.semantic_complexity_analysis(query)
# 3. 综合判断 complexity_score = self.compute_complexity_score( static_features, semantic_analysis )
return self.score_to_level(complexity_score)
def extract_static_features(self, query: str) -> Dict: """ 提取静态复杂度特征 """ return { "query_length": len(query.split()), "question_words": self.count_question_words(query), "entity_count": self.count_named_entities(query), "comparison_indicators": self.detect_comparison_patterns(query), "calculation_requirements": self.detect_calculation_needs(query), "multi_step_indicators": self.detect_multi_step_patterns(query) }
def semantic_complexity_analysis(self, query: str) -> Dict: """ 使用 LLM 进行语义复杂度分析 """ analysis_prompt = f""" 分析以下查询的复杂度,考虑以下维度: 1. 是否需要多步推理? 2. 是否需要外部信息? 3. 是否需要工具调用? 4. 是否需要信息对比或计算?
查询: "{query}"
请返回 JSON 格式的分析结果。 """
response = self.classifier_llm.generate(analysis_prompt) return self.parse_semantic_analysis(response)
def compute_complexity_score(self, static_features: Dict, semantic_analysis: Dict) -> float: """ 综合计算复杂度分数 (0-1 之间) """ # 基于静态特征的分数 static_score = ( static_features["query_length"] * 0.1 + static_features["question_words"] * 0.2 + static_features["entity_count"] * 0.15 + static_features["comparison_indicators"] * 0.3 + static_features["calculation_requirements"] * 0.4 + static_features["multi_step_indicators"] * 0.5 ) / 6
# 基于语义分析的分数 semantic_score = ( semantic_analysis.get("multi_step_reasoning", 0) * 0.4 + semantic_analysis.get("external_info_needed", 0) * 0.3 + semantic_analysis.get("tool_usage_required", 0) * 0.3 + semantic_analysis.get("comparison_calculation", 0) * 0.5 ) / 4
# 加权平均 final_score = 0.4 * static_score + 0.6 * semantic_score return min(final_score, 1.0)
def score_to_level(self, score: float) -> ComplexityLevel: """ 分数转换为复杂度等级 """ if score <= 0.3: return ComplexityLevel.SIMPLE elif score <= 0.7: return ComplexityLevel.MODERATE else: return ComplexityLevel.COMPLEX职责2: 动态组队决策
Master 根据复杂度分析结果,动态决定启用哪些智能体和资源。简单查询可能只需要 Writer 或者 Executor 和 Writer 就可以。复杂的查询,还需要包括 Planner。论文中称为 Writer-Only Configuration、Executor-Inclusive Configuration 和 Planner-Enhanced Configuration。
class TeamConfigurator: """ 团队配置器 - 实现论文中的三种动态配置 """
def configure_team(self, query: str, complexity: ComplexityLevel) -> TeamConfig: """ 根据复杂度选择合适的团队配置 """ if complexity == ComplexityLevel.SIMPLE: return self.writer_only_config(query) elif complexity == ComplexityLevel.MODERATE: return self.executor_inclusive_config(query) else: return self.planner_enhanced_config(query)
def writer_only_config(self, query: str) -> TeamConfig: """ 配置1: Writer-Only 适用于: 简单事实查询,LLM内部知识足够 """ return TeamConfig( agents=["writer"], strategy="direct_answer", max_iterations=1, tools=[], estimated_cost=0.01,# 最低成本 estimated_latency=0.5# 最快响应 )
def executor_inclusive_config(self, query: str) -> TeamConfig: """ 配置2: Executor-Inclusive 适用于: 需要外部信息但不需要复杂推理 """ relevant_tools = self.select_relevant_tools(query)
return TeamConfig( agents=["executor", "writer"], strategy="single_step_retrieval", max_iterations=2, tools=relevant_tools, estimated_cost=0.05, estimated_latency=2.0 )
def planner_enhanced_config(self, query: str) -> TeamConfig: """ 配置3: Planner-Enhanced 适用于: 复杂查询,需要多步推理和规划 """ tool_ecosystem = self.analyze_required_tool_ecosystem(query)
return TeamConfig( agents=["planner", "executor", "writer"], strategy="multi_step_planning", max_iterations=5, tools=tool_ecosystem, estimated_cost=0.20, estimated_latency=10.0, parallel_execution=True# 启用并行执行 )
def select_relevant_tools(self, query: str) -> List: """ 为中等复杂度查询选择相关工具 """ # 使用COLT方法选择工具 semantic_match = self.semantic_tool_matching(query) return semantic_match[:3]# 限制工具数量,保持轻量
def analyze_required_tool_ecosystem(self, query: str) -> List: """ 为复杂查询分析所需的完整工具生态 """ # 使用COLT的协作维度分析 tool_ecosystem = self.collaborative_tool_analysis(query) return tool_ecosystem职责3: 执行监控
Master 需要实时监控整个执行过程,及时发现问题。
class ExecutionMonitor: """ 执行监控器 - Master 的实时监控系统 """
def __init__(self): self.execution_state = ExecutionState() self.failure_detector = FailureDetector() self.performance_tracker = PerformanceTracker()
def monitor_execution(self, team_config: TeamConfig, execution_context: ExecutionContext) -> MonitorResult: """ 实时监控执行过程 """ monitor_result = MonitorResult()
# 1. 状态跟踪 current_state = self.track_execution_state(execution_context) monitor_result.current_state = current_state
# 2. 失败检测 failures = self.detect_failures(execution_context) monitor_result.failures = failures
# 3. 性能监控 performance = self.track_performance(execution_context) monitor_result.performance = performance
# 4. 质量评估 quality_issues = self.assess_intermediate_quality(execution_context) monitor_result.quality_issues = quality_issues
return monitor_result
def detect_failures(self, context: ExecutionContext) -> List: """ 多维度失败检测 """ failures = []
# 工具调用失败 if context.tool_failures: failures.extend(self.analyze_tool_failures(context.tool_failures))
# 超时检测 if context.execution_time > context.max_allowed_time: failures.append(Failure( type="timeout", component="executor", description="Execution exceeded time limit" ))
# 质量异常检测 if context.intermediate_results: quality_failures = self.detect_quality_failures( context.intermediate_results ) failures.extend(quality_failures)
return failures
def assess_intermediate_quality(self, context: ExecutionContext) -> List: """ 中间结果质量评估 """ issues = []
for result in context.intermediate_results: # 检查结果完整性 if not self.is_result_complete(result): issues.append(QualityIssue( type="incomplete_result", severity="medium", affected_task=result.task_id ))
# 检查结果一致性 if not self.is_result_consistent(result, context.previous_results): issues.append(QualityIssue( type="inconsistent_result", severity="high", affected_task=result.task_id ))
return issues职责4: 反思与重规划
这是 Master 最复杂也最关键的能力。
class ReflectionEngine: """ 反思引擎 - Master 的高级认知能力 """
def __init__(self): self.reflection_llm = DeepSeekR1Model() self.replanning_strategies = ReplanningStrategies()
def reflect_and_replan(self, monitor_result: MonitorResult, original_query: str, current_plan: Optional) -> ReflectionResult: """ 基于监控结果进行反思并重新规划 """ # 1. 失败分析 failure_analysis = self.analyze_failures(monitor_result.failures)
# 2. 反思决策 reflection_decision = self.make_reflection_decision( failure_analysis, monitor_result )
# 3. 执行相应策略 if reflection_decision.action == "abort": return self.create_abort_result(reflection_decision.reason) elif reflection_decision.action == "retry": return self.create_retry_result(reflection_decision.retry_strategy) elif reflection_decision.action == "replan": return self.execute_replanning( original_query, current_plan, failure_analysis ) else: return self.create_continue_result()
def analyze_failures(self, failures: List) -> FailureAnalysis: """ 深度失败分析 """ analysis_prompt = f""" 分析以下执行失败情况,判断根本原因和可能的解决方案:
失败列表: {json.dumps(, indent=2)}
请分析: 1. 根本原因是什么? 2. 是否可以通过重试解决? 3. 是否需要调整计划? 4. 是否需要更换工具?
返回JSON格式的分析结果。 """
response = self.reflection_llm.generate(analysis_prompt) return FailureAnalysis.from_json(response)
def execute_replanning(self, original_query: str, current_plan: DAG,failure_analysis: FailureAnalysis) -> ReflectionResult: """ 执行重规划 """ replanning_prompt = f""" 原始查询: {original_query}
当前计划: {current_plan.to_json()}
失败分析: {failure_analysis.to_json()}
请基于失败经验重新制定计划: 1. 保留成功的部分 2. 修复失败的部分 3. 考虑备选工具和策略 4. 确保新计划更鲁棒
返回新的DAG计划。 """
new_plan_response = self.reflection_llm.generate(replanning_prompt) new_plan = DAG.from_json(new_plan_response)
return ReflectionResult( action="replan", new_plan=new_plan, reasoning=failure_analysis.recommended_changes )Master 与其他 Agent 的协同机制
完整的协同流程实现
class MasterAgent: """ Master Agent 的完整实现 """
def __init__(self): self.complexity_analyzer = ComplexityAnalyzer() self.team_configurator = TeamConfigurator() self.execution_monitor = ExecutionMonitor() self.reflection_engine = ReflectionEngine()
# 下游智能体实例 self.planner = PlannerAgent() self.executor = ExecutorAgent() self.writer = WriterAgent()
async def process_query(self, query: str) -> SearchResult: """ Master 的主要处理流程 """ # 阶段1: 复杂度分析和团队配置 complexity = self.complexity_analyzer.analyze_complexity(query) team_config = self.team_configurator.configure_team(query, complexity)
# 阶段2: 执行监控循环 max_attempts = 3 attempt = 0
while attempt < max_attempts: try: # 根据配置执行相应策略 if team_config.strategy == "direct_answer": result = await self.execute_writer_only(query, team_config) elif team_config.strategy == "single_step_retrieval": result = await self.execute_executor_inclusive(query, team_config) else: result = await self.execute_planner_enhanced(query, team_config)
# 成功则返回结果 return result
except ExecutionException as e: # 阶段3: 失败处理和反思 monitor_result = MonitorResult(failures=) reflection_result = self.reflection_engine.reflect_and_replan( monitor_result, query, getattr(e, 'current_plan', None) )
if reflection_result.action == "abort": return SearchResult.create_error(reflection_result.reasoning) elif reflection_result.action == "replan": # 更新团队配置并重试 team_config = self.update_team_config( team_config, reflection_result ) attempt += 1 else: attempt += 1
# 超过最大尝试次数 return SearchResult.create_error("Maximum retry attempts exceeded")
async def execute_planner_enhanced(self, query: str, team_config: TeamConfig) -> SearchResult: """ 执行 Planner-Enhanced 配置的完整流程 """ # 1. Planner 生成计划 plan = await self.planner.create_plan(query, team_config.tools)
# 2. 创建执行上下文 execution_context = ExecutionContext( query=query, plan=plan, team_config=team_config, start_time=time.time() )
# 3. 监控执行循环 while not execution_context.is_complete(): # Executor 执行下一批任务 batch_results = await self.executor.execute_batch( execution_context.get_ready_tasks() )
# 更新执行上下文 execution_context.update_results(batch_results)
# 实时监控 monitor_result = self.execution_monitor.monitor_execution( team_config, execution_context )
# 检查是否需要干预 if monitor_result.needs_intervention(): reflection_result = self.reflection_engine.reflect_and_replan( monitor_result, query, plan )
if reflection_result.action == "replan": # 局部重规划 plan = self.update_plan_with_reflection( plan, reflection_result ) execution_context.update_plan(plan)
# 4. Writer 生成最终答案 final_result = await self.writer.synthesize_answer( query, execution_context.get_all_results() )
return final_result总结
通过以上设计,Master Agent 实现了:
智能决策: 准确的复杂度分析和动态资源配置实时监控: 高效的执行过程监控和问题检测自适应调整: 快速的反思决策和重规划能力协同指挥: 与 Planner、Executor、Writer 的无缝协作
预知 Planner Agent 如何工作,我们下回分解。
大家好,我是自在哪吒的创始人、首席服务官 Kafka。让我们一起进化吧。
页:
[1]