多客科技 发表于 2025-6-26 23:39

AI 搜索会是什么样?——百度 AI 搜索新范式(3)

作者:微信文章

“运筹帷幄之中,决胜千里之外。智周万物而道济,变应千机自圆融。”

上回书说到,AI 搜索之所以效果更好,是因为技术范式发生了巨大飞跃,架构从“检索”演变到了“推理”。AI 搜索已经是一个具备规划、分解、协作、反思能力的智能化系统,能够真正像一个助理一样,为用户完成复杂的信息处理和问题解决任务。

本讲开始,具体说说论文里提到的这几个 Agent,以及给我们的启发,即便我们不做 AI 搜索,只要是 Agent,都会有些价值。


AI 搜索范式概览Master Agent

图中最左边的 Master Agent 是整个 AI 搜索系统的"大脑中枢"和"总指挥",它的设计是整个系统能否高效运转的关键。我们详细看下 Master Agent 的设计原理、协同机制和实现难点等。为清晰表达逻辑,用了一些伪代码,注意原论文不包括以下代码,此处仅用于示例。
职责概览

论文第 2 节中讲了 Master Agent 的四大核心职责:
"""注:本文所有代码仅为示意,并非原论文的真实实现。
"""
class MasterAgent:    """    Master Agent 的四大核心职责:    1. 查询复杂度分析 (Query Complexity Analysis)    2. 动态组队决策 (Dynamic Team Configuration)   3. 执行监控 (Execution Monitoring)    4. 反思与重规划 (Reflection & Re-planning)    """
    def __init__(self):      self.complexity_analyzer = ComplexityAnalyzer()      self.team_configurator = TeamConfigurator()      self.execution_monitor = ExecutionMonitor()      self.reflection_engine = ReflectionEngine()职责1: 查询复杂度分析

Master 首先需要准确判断用户查询的内在复杂度,这是整个系统选择处理策略的基础。
class ComplexityAnalyzer:    """    复杂度分析器 - Master的核心组件之一    """
    def __init__(self):      self.classifier_llm = DeepSeekR1Model()# 强大的分类器 LLM      self.complexity_features = ComplexityFeatureExtractor()
    def analyze_complexity(self, query: str) -> ComplexityLevel:      """      多维度复杂度分析      """      # 1. 静态特征提取      static_features = self.extract_static_features(query)
      # 2. LLM 驱动的语义分析          semantic_analysis = self.semantic_complexity_analysis(query)
      # 3. 综合判断      complexity_score = self.compute_complexity_score(            static_features, semantic_analysis      )
      return self.score_to_level(complexity_score)
    def extract_static_features(self, query: str) -> Dict:      """      提取静态复杂度特征      """      return {            "query_length": len(query.split()),            "question_words": self.count_question_words(query),            "entity_count": self.count_named_entities(query),            "comparison_indicators": self.detect_comparison_patterns(query),            "calculation_requirements": self.detect_calculation_needs(query),            "multi_step_indicators": self.detect_multi_step_patterns(query)      }
    def semantic_complexity_analysis(self, query: str) -> Dict:      """      使用 LLM 进行语义复杂度分析      """      analysis_prompt = f"""      分析以下查询的复杂度,考虑以下维度:      1. 是否需要多步推理?      2. 是否需要外部信息?      3. 是否需要工具调用?      4. 是否需要信息对比或计算?
      查询: "{query}"
      请返回 JSON 格式的分析结果。      """
      response = self.classifier_llm.generate(analysis_prompt)      return self.parse_semantic_analysis(response)
    def compute_complexity_score(self, static_features: Dict, semantic_analysis: Dict) -> float:      """      综合计算复杂度分数 (0-1 之间)      """      # 基于静态特征的分数      static_score = (            static_features["query_length"] * 0.1 +            static_features["question_words"] * 0.2 +            static_features["entity_count"] * 0.15 +            static_features["comparison_indicators"] * 0.3 +            static_features["calculation_requirements"] * 0.4 +            static_features["multi_step_indicators"] * 0.5      ) / 6
      # 基于语义分析的分数      semantic_score = (            semantic_analysis.get("multi_step_reasoning", 0) * 0.4 +            semantic_analysis.get("external_info_needed", 0) * 0.3 +            semantic_analysis.get("tool_usage_required", 0) * 0.3 +            semantic_analysis.get("comparison_calculation", 0) * 0.5      ) / 4
      # 加权平均      final_score = 0.4 * static_score + 0.6 * semantic_score      return min(final_score, 1.0)
    def score_to_level(self, score: float) -> ComplexityLevel:      """      分数转换为复杂度等级      """      if score <= 0.3:            return ComplexityLevel.SIMPLE      elif score <= 0.7:            return ComplexityLevel.MODERATE          else:            return ComplexityLevel.COMPLEX职责2: 动态组队决策

Master 根据复杂度分析结果,动态决定启用哪些智能体和资源。简单查询可能只需要 Writer 或者 Executor 和 Writer 就可以。复杂的查询,还需要包括 Planner。论文中称为 Writer-Only Configuration、Executor-Inclusive Configuration 和 Planner-Enhanced Configuration。
class TeamConfigurator:    """    团队配置器 - 实现论文中的三种动态配置    """
    def configure_team(self, query: str, complexity: ComplexityLevel) -> TeamConfig:      """      根据复杂度选择合适的团队配置      """      if complexity == ComplexityLevel.SIMPLE:            return self.writer_only_config(query)      elif complexity == ComplexityLevel.MODERATE:            return self.executor_inclusive_config(query)          else:            return self.planner_enhanced_config(query)
    def writer_only_config(self, query: str) -> TeamConfig:      """      配置1: Writer-Only      适用于: 简单事实查询,LLM内部知识足够      """      return TeamConfig(            agents=["writer"],            strategy="direct_answer",            max_iterations=1,            tools=[],            estimated_cost=0.01,# 最低成本            estimated_latency=0.5# 最快响应      )
    def executor_inclusive_config(self, query: str) -> TeamConfig:      """      配置2: Executor-Inclusive          适用于: 需要外部信息但不需要复杂推理      """      relevant_tools = self.select_relevant_tools(query)
      return TeamConfig(            agents=["executor", "writer"],            strategy="single_step_retrieval",            max_iterations=2,            tools=relevant_tools,            estimated_cost=0.05,            estimated_latency=2.0      )
    def planner_enhanced_config(self, query: str) -> TeamConfig:      """      配置3: Planner-Enhanced      适用于: 复杂查询,需要多步推理和规划      """      tool_ecosystem = self.analyze_required_tool_ecosystem(query)
      return TeamConfig(            agents=["planner", "executor", "writer"],            strategy="multi_step_planning",            max_iterations=5,            tools=tool_ecosystem,            estimated_cost=0.20,            estimated_latency=10.0,            parallel_execution=True# 启用并行执行      )
    def select_relevant_tools(self, query: str) -> List:      """      为中等复杂度查询选择相关工具      """      # 使用COLT方法选择工具      semantic_match = self.semantic_tool_matching(query)      return semantic_match[:3]# 限制工具数量,保持轻量
    def analyze_required_tool_ecosystem(self, query: str) -> List:      """      为复杂查询分析所需的完整工具生态      """      # 使用COLT的协作维度分析      tool_ecosystem = self.collaborative_tool_analysis(query)      return tool_ecosystem职责3: 执行监控

Master 需要实时监控整个执行过程,及时发现问题。
class ExecutionMonitor:    """    执行监控器 - Master 的实时监控系统    """
    def __init__(self):      self.execution_state = ExecutionState()      self.failure_detector = FailureDetector()      self.performance_tracker = PerformanceTracker()
    def monitor_execution(self, team_config: TeamConfig,                        execution_context: ExecutionContext) -> MonitorResult:      """      实时监控执行过程      """      monitor_result = MonitorResult()
      # 1. 状态跟踪      current_state = self.track_execution_state(execution_context)      monitor_result.current_state = current_state
      # 2. 失败检测      failures = self.detect_failures(execution_context)      monitor_result.failures = failures
      # 3. 性能监控      performance = self.track_performance(execution_context)      monitor_result.performance = performance
      # 4. 质量评估      quality_issues = self.assess_intermediate_quality(execution_context)      monitor_result.quality_issues = quality_issues
      return monitor_result
    def detect_failures(self, context: ExecutionContext) -> List:      """      多维度失败检测      """      failures = []
      # 工具调用失败      if context.tool_failures:            failures.extend(self.analyze_tool_failures(context.tool_failures))
      # 超时检测      if context.execution_time > context.max_allowed_time:            failures.append(Failure(                type="timeout",                component="executor",               description="Execution exceeded time limit"            ))
      # 质量异常检测      if context.intermediate_results:            quality_failures = self.detect_quality_failures(                context.intermediate_results            )            failures.extend(quality_failures)
      return failures
    def assess_intermediate_quality(self, context: ExecutionContext) -> List:      """      中间结果质量评估      """      issues = []
      for result in context.intermediate_results:            # 检查结果完整性            if not self.is_result_complete(result):                issues.append(QualityIssue(                  type="incomplete_result",                  severity="medium",                  affected_task=result.task_id                ))
            # 检查结果一致性            if not self.is_result_consistent(result, context.previous_results):                issues.append(QualityIssue(                  type="inconsistent_result",                     severity="high",                  affected_task=result.task_id                ))
      return issues职责4: 反思与重规划

这是 Master 最复杂也最关键的能力。
class ReflectionEngine:    """    反思引擎 - Master 的高级认知能力    """
    def __init__(self):      self.reflection_llm = DeepSeekR1Model()      self.replanning_strategies = ReplanningStrategies()
    def reflect_and_replan(self, monitor_result: MonitorResult, original_query: str, current_plan: Optional) -> ReflectionResult:      """      基于监控结果进行反思并重新规划      """      # 1. 失败分析      failure_analysis = self.analyze_failures(monitor_result.failures)
      # 2. 反思决策      reflection_decision = self.make_reflection_decision(            failure_analysis, monitor_result      )
      # 3. 执行相应策略      if reflection_decision.action == "abort":            return self.create_abort_result(reflection_decision.reason)      elif reflection_decision.action == "retry":            return self.create_retry_result(reflection_decision.retry_strategy)      elif reflection_decision.action == "replan":            return self.execute_replanning(                original_query, current_plan, failure_analysis            )      else:            return self.create_continue_result()
    def analyze_failures(self, failures: List) -> FailureAnalysis:      """      深度失败分析      """      analysis_prompt = f"""      分析以下执行失败情况,判断根本原因和可能的解决方案:
      失败列表:      {json.dumps(, indent=2)}
      请分析:      1. 根本原因是什么?      2. 是否可以通过重试解决?      3. 是否需要调整计划?      4. 是否需要更换工具?
      返回JSON格式的分析结果。      """
      response = self.reflection_llm.generate(analysis_prompt)      return FailureAnalysis.from_json(response)
    def execute_replanning(self, original_query: str, current_plan: DAG,failure_analysis: FailureAnalysis) -> ReflectionResult:      """      执行重规划      """      replanning_prompt = f"""      原始查询: {original_query}
      当前计划:      {current_plan.to_json()}
      失败分析:      {failure_analysis.to_json()}
      请基于失败经验重新制定计划:      1. 保留成功的部分      2. 修复失败的部分          3. 考虑备选工具和策略      4. 确保新计划更鲁棒
      返回新的DAG计划。      """
      new_plan_response = self.reflection_llm.generate(replanning_prompt)      new_plan = DAG.from_json(new_plan_response)
      return ReflectionResult(            action="replan",            new_plan=new_plan,            reasoning=failure_analysis.recommended_changes      )Master 与其他 Agent 的协同机制

完整的协同流程实现

class MasterAgent:    """    Master Agent 的完整实现    """
    def __init__(self):      self.complexity_analyzer = ComplexityAnalyzer()      self.team_configurator = TeamConfigurator()      self.execution_monitor = ExecutionMonitor()      self.reflection_engine = ReflectionEngine()
      # 下游智能体实例      self.planner = PlannerAgent()      self.executor = ExecutorAgent()          self.writer = WriterAgent()
    async def process_query(self, query: str) -> SearchResult:      """      Master 的主要处理流程      """      # 阶段1: 复杂度分析和团队配置      complexity = self.complexity_analyzer.analyze_complexity(query)      team_config = self.team_configurator.configure_team(query, complexity)
      # 阶段2: 执行监控循环      max_attempts = 3      attempt = 0
      while attempt < max_attempts:            try:                # 根据配置执行相应策略                if team_config.strategy == "direct_answer":                  result = await self.execute_writer_only(query, team_config)                elif team_config.strategy == "single_step_retrieval":                  result = await self.execute_executor_inclusive(query, team_config)                  else:                  result = await self.execute_planner_enhanced(query, team_config)
                # 成功则返回结果                return result
            except ExecutionException as e:                # 阶段3: 失败处理和反思                monitor_result = MonitorResult(failures=)                reflection_result = self.reflection_engine.reflect_and_replan(                  monitor_result, query, getattr(e, 'current_plan', None)                )
                if reflection_result.action == "abort":                  return SearchResult.create_error(reflection_result.reasoning)                elif reflection_result.action == "replan":                  # 更新团队配置并重试                  team_config = self.update_team_config(                        team_config, reflection_result                  )                  attempt += 1                else:                  attempt += 1
      # 超过最大尝试次数      return SearchResult.create_error("Maximum retry attempts exceeded")
    async def execute_planner_enhanced(self, query: str, team_config: TeamConfig) -> SearchResult:      """      执行 Planner-Enhanced 配置的完整流程      """      # 1. Planner 生成计划      plan = await self.planner.create_plan(query, team_config.tools)
      # 2. 创建执行上下文      execution_context = ExecutionContext(            query=query,            plan=plan,            team_config=team_config,            start_time=time.time()      )
      # 3. 监控执行循环      while not execution_context.is_complete():            # Executor 执行下一批任务            batch_results = await self.executor.execute_batch(                execution_context.get_ready_tasks()            )
            # 更新执行上下文            execution_context.update_results(batch_results)
            # 实时监控            monitor_result = self.execution_monitor.monitor_execution(                team_config, execution_context            )
            # 检查是否需要干预            if monitor_result.needs_intervention():                reflection_result = self.reflection_engine.reflect_and_replan(                  monitor_result, query, plan                )
                if reflection_result.action == "replan":                  # 局部重规划                  plan = self.update_plan_with_reflection(                        plan, reflection_result                  )                  execution_context.update_plan(plan)
      # 4. Writer 生成最终答案      final_result = await self.writer.synthesize_answer(            query, execution_context.get_all_results()      )
      return final_result总结

通过以上设计,Master Agent 实现了:
智能决策: 准确的复杂度分析和动态资源配置实时监控: 高效的执行过程监控和问题检测自适应调整: 快速的反思决策和重规划能力协同指挥: 与 Planner、Executor、Writer 的无缝协作

预知 Planner Agent 如何工作,我们下回分解。

大家好,我是自在哪吒的创始人、首席服务官 Kafka。让我们一起进化吧。

页: [1]
查看完整版本: AI 搜索会是什么样?——百度 AI 搜索新范式(3)