Spark 架构分析
常青
2 个月前
2 个月前

SQL 解析

解析阶段将 SQL 文本转换为未解析的逻辑计划。Spark SQL 使用 ANTLR4 解析 SQL 语句,生成抽象语法树(AST)。具体的流程为:

  1. SqlBaseLexer 分词
  2. SqlBaseParser SQL 解析
  3. AstBuilder Ast 语法树构建

SqlBaseLexer 分词

SqlBaseLexer:SqlBaseLexer

SqlBaseLexer 对 SQL 文本执行词法分析,将其分解为诸如关键字、标识符、文本和运算符等标记。它是从 SqlBaseLexer.g4 中的语法定义生成的。

主要特点:

  1. 检测关键字和保留字
  2. 文本(字符串、数字等)的标记识别
  3. 特殊字符和运算符的处理
  4. 支持注释和空格
  5. 词法分析器包含用于处理边缘情况(如十进制文本)和检测未闭合括号注释的特殊方法。

SqlBaseParser SQL 解析器

SqlBaseParser 通过根据语法规则将标记组织到分层结构中来执行句法分析。它是根据 SqlBaseParser.g4 中的语法定义生成的。

主要特点:

  1. 解析 DML 语句(SELECT、INSERT、UPDATE、DELETE)
  2. 解析 DDL 语句(CREATE、ALTER、DROP)
  3. 支持表达式、联接类型、窗口函数等。
  4. 子查询和 CTE(公共表表达式)的处理
  5. 解析器生成表示 SQL 语句结构的解析树(抽象语法树或 AST)。

AstBuilder Ast 语法树构建

AstBuilder 是 ANTLR 解析树和 Spark 的逻辑计划表示之间的桥梁。它使用访问者模式遍历解析树并构建相应的逻辑计划。

重要方法:

  1. visitSingleStatement():解析单个 SQL 语句的入口点
  2. visitQuery():处理 SELECT 查询
  3. visitInsertInto():处理 INSERT 语句
  4. visitExpression():将 SQL 表达式转换为 Catalyst 表达式
  5. AstBuilder 会创建未解析的逻辑计划,其中对表、列和函数的引用尚未连接到实际数据或实现。

SQL 分析

Spark的分析阶段(Analysis Phase)是Spark SQL查询处理流程中的核心环节,负责将用户提交的未解析逻辑计划(Unresolved LogicalPlan)转换为已解析的逻辑计划(Analyzed LogicalPlan)。这一过程通过多批次规则(Batches of Rules)逐步解决逻辑计划中的未绑定元素(如表名、字段名、类型等),最终生成一个语义明确、可执行的逻辑计划。

未解析逻辑计划的来源

用户提交的SQL或DataFrame操作会经过解析器(Parser)生成语法树,此时树中的表、字段等信息尚未与真实数据源关联。例如:

  • 表名可能未与Catalog中的元数据匹配;
  • 字段的类型或存在性未被验证;
  • 函数名未绑定到实际实现方法

分析阶段的核心流程

分析阶段通过**规则引擎(RuleExecutor)**逐批应用规则,逐步“绑定”未解析元素。关键步骤如下:

(1)规则批次(Batches)的划分

Analyzer中预定义了多个规则批次,每个批次包含一组相关规则,按顺序执行。常见批次包括:

  • Hints处理:处理如/*+ BROADCASTJOIN */等优化提示。
  • 简单检查(Simple Sanity Check) :验证函数是否存在、语法合法性等。
  • 解析(Resolution) :核心批次,绑定表名、字段名、分配唯一ID等。
  • 后处理(Post-Hoc Resolution) :用户自定义的扩展规则 36 28。

(2)规则应用方式

  • 自顶向下或自底向上遍历:例如,ResolveRelations规则通过遍历语法树,将未解析的表名(如UnresolvedRelation)替换为Catalog中注册的实际数据源逻辑计划 14。
  • 模式匹配(Pattern Matching) :规则通过Scala的模式匹配机制匹配特定子树结构。例如,匹配Filter节点并下推过滤条件 23。
  • 迭代执行:某些规则需多次应用直至达到“固定点”(FixedPoint),即逻辑计划不再变化 28。

(3)关键解析规则举例

  • ResolveRelations:解析表名,检查Catalog是否存在对应表或视图,并替换为具体数据源的逻辑计划。
  • ResolveReferences:绑定字段名到具体列,验证字段是否存在及类型是否兼容。
  • ResolveFunctions:验证函数是否存在,并绑定到实际实现。
  • ImplicitTypeCasts:自动插入类型转换(如将String隐式转为Date

案例

假设 person 表已注册到 Spark 的 Catalog 中(例如通过 spark.sql("CREATE TABLE person (id INT, name STRING)"))然后以如下的 Sql 语句来说明

select id from person

通过分词以及解析后,得到了一个未解析的逻辑计划

== Unresolved Logical Plan ==
'Project ['id]
+- 'UnresolvedRelation [person]  // 未解析的表名
  • 表名未绑定UnresolvedRelation("person") 尚未关联到 Catalog 中的实际数据源。
  • 字段未验证'idUnresolvedAttribute,尚未绑定到具体表的列。

应用ResolveRelations 规则

ResolveRelations:解析表名,检查Catalog是否存在对应表或视图,并替换为具体数据源的逻辑计划。

规则目标:将 UnresolvedRelation 绑定到 Catalog 中注册的表或视图。

  1. Analyzer 遍历逻辑计划,发现 UnresolvedRelation("person")
  2. 查询 Catalog(如 Hive Metastore 或 Spark 内置 Catalog),发现 person 表存在,其 Schema 为 (id INT, name STRING)
  3. UnresolvedRelation 替换为具体的 LogicalRelation(表示数据源的逻辑结构)。

更新后的逻辑计划

== Analyzed Logical Plan (Partial) ==
'Project ['id]
+- SubqueryAlias person  // 绑定到具体表的逻辑计划
   +- LogicalRelation (id INT, name STRING)  // 数据源的逻辑表示
  • 表名已绑定person 表已被替换为 SubqueryAliasLogicalRelation
  • 字段仍处于未解析状态'id 仍然是 UnresolvedAttribute

应用ResolveReferences 规则

规则目标:将 UnresolvedAttribute 绑定到具体表的列。

  1. Analyzer 遍历逻辑计划,发现 'id 是未解析字段。
  2. 向上查找表的 Schema(从父节点 SubqueryAlias personLogicalRelation 中获取 Schema)。
  3. 检查 person 表的 Schema 是否包含 id 列:
  • 如果存在,将 'id 替换为 AttributeReference(包含唯一 ID 和数据类型)。
  • 如果不存在,抛出 AnalysisException(如 Column 'id' not found)。

更新后的逻辑计划

== Analyzed Logical Plan ==
Project [id#10]  // 绑定到具体列(id INT)
+- SubqueryAlias person
+- LogicalRelation (id#10 INT, name#11 STRING)

此时:

  • 字段已绑定id 对应到 LogicalRelation 中的 id#10AttributeReference)。
  • 分配唯一 IDid#10 是全局唯一的表达式 ID(ExprId),避免名称冲突。
  • 数据类型明确id 的类型确定为 INT

扩展

Spark 允许用户注入自定义规则。例如,定义一个规则 ResolveSpecialColumns,将 SELECT special_id FROM person 中的 special_id 映射到 id

object ResolveSpecialColumns extends Rule[LogicalPlan] {
  def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
    case Project(projectList, child) =>
      Project(projectList.map {
        case UnresolvedAttribute("special_id") => child.output.find(_.name == "id").get
        case other => other
      }, child)
  }
}
 
// 注册自定义规则到 Analyzer
spark.sessionState.analyzer.addResolutionRule(ResolveSpecialColumns)

封装为 jar 包,注册到 Spark 中。

逻辑优化

Spark 分析完成后,生成了对应的已经分析后的逻辑计划,然后通过Catalyst 优化器对解析的逻辑计划进行优化,从而将计划转换为更高效的形式,即解析后的逻辑计划->优化后的逻辑计划,很多的优化方式都是 SQL 中常用的优化方式

Apache Spark的Catalyst优化器通过多种优化方法提升查询性能,结合规则优化(Rule-Based)和成本优化(Cost-Based)策略,其常用优化方法及具体实例如下:

谓词下推

将过滤条件尽可能下推到数据源附近,减少后续处理的数据量。例如,在读取Parquet或Hive表时,直接跳过不满足条件的行。

SELECT * FROM table WHERE age > 30 AND country = 'US'

优化后,Spark可能直接在数据源层面应用country = 'US'过滤,减少数据加载。

列裁剪(Column Pruning)

仅读取查询中涉及的列,忽略无关列以减少I/O和内存开销。

示例:查询SELECT name FROM employees时,即使表中包含agesalary等字段,Catalyst会跳过这些列的读取。

常量折叠(Constant Folding)

在编译阶段计算常量表达式,避免运行时重复计算。

示例SELECT salary * 0.1 * 12中的0.1 * 12会被提前计算为1.2,运行时直接计算salary * 1.2

操作符合并与简化

  • CombineFilters:合并连续的过滤条件。例如,Filter(age > 30, Filter(age < 50, child))合并为Filter(age > 30 AND age < 50, child),减少多次扫描。
  • CollapseRepartition:合并相邻的repartition操作,避免不必要的Shuffle 。
  • LikeSimplification:将LIKE 'abc%'转换为更高效的StartsWith("abc")

案例

SELECT 
  o.order_id, 
  u.name, 
  o.amount 
FROM 
  orders o 
JOIN 
  users u 
ON 
  o.user_id = u.user_id 
WHERE 
  u.country = 'US' 
  AND o.amount > 100 
  AND o.order_date >= '2024-01-01'

Spark 解析 SQL 后生成的原始逻辑计划如下

== Parsed Logical Plan ==
Project [order_id#10, name#24, amount#12]
+- Filter (country#25 = US AND amount#12 > 100 AND order_date#13 >= 2024-01-01)
   +- Join Inner (user_id#11 = user_id#23)
      :- SubqueryAlias o
      :  +- Relation[order_id#10, user_id#11, amount#12, order_date#13] parquet
      +- SubqueryAlias u
         +- Relation[user_id#23, name#24, age#25, country#26] parquet
  • 按照 SQL 的书写顺序生成计划:先执行 JOIN,然后执行 Filter,最后执行 Project
  • 所有过滤条件(country, amount, order_date)均位于 JOIN 之后,可能导致 JOIN 处理大量冗余数据。

Catalyst 应用优化规则后,逻辑计划变为:

== Optimized Logical Plan ==
Project [order_id#10, name#24, amount#12]
+- Join Inner (user_id#11 = user_id#23)
   :- Filter (amount#12 > 100 AND order_date#13 >= 2024-01-01)
   :  +- Relation[order_id#10, user_id#11, amount#12, order_date#13] parquet
   +- Filter (country#26 = US)
      +- Relation[user_id#23, name#24, age#25, country#26] parquet

以下通过一个具体的 SQL 查询案例,对比解析后的原始逻辑计划优化后的逻辑计划,详细说明 Catalyst 优化器在逻辑计划层面的优化过程。

案例背景

假设我们有两个表:

  • orders 表(字段:order_id, user_id, amount, order_date
  • users 表(字段:user_id, name, age, country

执行如下查询:

SELECT 
  o.order_id, 
  u.name, 
  o.amount 
FROM 
  orders o 
JOIN 
  users u 
ON 
  o.user_id = u.user_id 
WHERE 
  u.country = 'US' 
  AND o.amount > 100 
  AND o.order_date >= '2024-01-01'

1. 解析后的原始逻辑计划

Spark 解析 SQL 后生成的原始逻辑计划如下(简化表示):

== Parsed Logical Plan ==
Project [order_id#10, name#24, amount#12]
+- Filter (country#25 = US AND amount#12 > 100 AND order_date#13 >= 2024-01-01)
   +- Join Inner (user_id#11 = user_id#23)
      :- SubqueryAlias o
      :  +- Relation[order_id#10, user_id#11, amount#12, order_date#13] parquet
      +- SubqueryAlias u
         +- Relation[user_id#23, name#24, age#25, country#26] parquet

特点

  • 按照 SQL 的书写顺序生成计划:先执行 JOIN,然后执行 Filter,最后执行 Project
  • 所有过滤条件(country, amount, order_date)均位于 JOIN 之后,可能导致 JOIN 处理大量冗余数据。

2. 优化后的逻辑计划

Catalyst 应用优化规则后,逻辑计划变为:

== Optimized Logical Plan ==
Project [order_id#10, name#24, amount#12]
+- Join Inner (user_id#11 = user_id#23)
   :- Filter (amount#12 > 100 AND order_date#13 >= 2024-01-01)
   :  +- Relation[order_id#10, user_id#11, amount#12, order_date#13] parquet
   +- Filter (country#26 = US)
      +- Relation[user_id#23, name#24, age#25, country#26] parquet

关键优化步骤

  1. 谓词下推(Predicate Pushdown)
  • Filter (country = 'US') 下推到 users 表的数据源读取阶段,直接过滤掉非美国用户的数据。
  • Filter (amount > 100 AND order_date >= '2024-01-01') 下推到 orders 表的数据源读取阶段,减少参与 JOIN 的订单数据量。
  • 优化后,JOIN 操作仅处理过滤后的数据。
  1. 列裁剪(Column Pruning)
  • orders 表仅保留 order_id, user_id, amount, order_date(查询中引用的字段)。若表中还有其他字段(如 product_id),会被自动裁剪。
  • users 表仅保留 user_id, name, country,忽略 age 字段。
  1. 常量折叠(Constant Folding) 表达式 order_date >= '2024-01-01' 中的字符串 '2024-01-01' 会被转换为日期类型常量(如 2024-01-01 00:00:00),避免运行时重复解析。

物理算子

了解物理计划前,要先明白什么是物理算子。

  1. ((20250527205757-b6b6n9h '物理算子是什么?是某种框架的 API 吗?'))
  2. ((20250527205757-x2qz44i '为什么要使用物理算子,而不是直接使用优化后的逻辑算子'))】
  3. ((20250527205757-0zchep1 '物理算子有哪些?分别有什么样的作用呢?'))】
  4. 只有 SparkSQL 用到了物理算子吗?还是用 RDD 算子也使用到了物理算子呢?

物理算子是什么?是某种框架的 API 吗?

Spark 中,物理算子(Physical Operators) 是为最终执行设计的底层操作单元,负责将逻辑计划转换为具体的、可执行的步骤。它们并非某种语言的 API,而是 Spark 内部执行引擎的组成部分,直接对应到分布式计算任务的具体实现。

  • 用户不直接调用物理算子:用户通过 DataFrame、SQL 或 RDD 等高级 API 编写逻辑(例如 df.filter()SELECT * FROM table),由 Catalyst 优化器将这些逻辑转换为物理算子。
  • 内部实现:物理算子是 Spark 执行引擎的内部组件,例如 FileSourceScanExec 类是 Spark 源码中的一个 Scala 类,负责生成文件扫描的 RDD。
  • 查看物理计划:用户可以通过 df.explain(mode="formatted")Spark UI 查看物理计划中的物理算子,但通常不需要直接操作它们。

为什么要使用物理算子,而不是直接使用优化后的逻辑算子

每个物理算子会生成对应的 RDD 操作链

物理算子树 → 转换为 RDD DAG → 分解为 Stage 和 Task → 在 Executor 上执行

物理算子有哪些?分别有什么样的作用呢?

以下是一些常见的物理算子及其作用:

物理算子作用
FileSourceScanExec从文件(Parquet、CSV 等)中读取数据
FilterExec对数据行按条件过滤
ProjectExec选择或计算特定列(类似 SELECT 语句)
SortMergeJoinExec基于排序的 Join 操作(大表 Join)
BroadcastHashJoinExec广播小表实现 Hash Join(小表 Join)
HashAggregateExec内存中的哈希聚合(如 GROUP BY)
SortExec对数据排序
ShuffleExchangeExec数据重分区(Shuffle)

代码生成

Spark 中,物理执行计划生成后,代码生成(Code Generation)是将物理算子树转换为可执行的字节码的关键步骤,这是通过 Tungsten 优化引擎 实现的。这一过程大幅减少了虚函数调用和中间数据结构的开销,显著提高了 CPU 效率。

1. 代码生成的核心机制

Spark 使用 Whole-Stage Code Generation(全阶段代码生成) 技术,将多个物理算子合并为一个 Java 方法,直接生成高度优化的代码。以下是具体流程:

步骤 1:物理算子的遍历

  • 物理执行计划树由多个物理算子(如 FilterExecProjectExecHashAggregateExec)组成。
  • Spark 遍历物理算子树,识别可以合并为单个代码生成阶段的连续算子(称为一个 "Stage" )。

步骤 2:生成代码片段

  • 对每个 Stage 内的算子,Spark 动态生成 Java 代码

    • 例如,Filter(condition)Project(expressions) 两个连续算子会被合并为一个循环:
for (Row row : inputRows) {
    if (row.getInt(0) > 30) {          // Filter 的 condition
        outputRow.setInt(0, row.getInt(0) * 2); // Project 的表达式
        emit(outputRow);
    }
}
  • 代码直接操作二进制数据(Tungsten 内存格式),跳过 Spark 内部的 Row 对象封装。

步骤 3:编译字节码

  • 生成的 Java 代码通过 Janino 编译器 编译为 JVM 字节码
  • 编译后的类通过反射加载到 JVM 中,最终生成一个实现了 RDD[InternalRow] 的计算逻辑的类。

步骤 4:执行生成的代码

  • 生成的代码直接操作 UnsafeRow(基于堆外内存的二进制格式),避免序列化/反序列化开销。
  • 最终将生成的代码逻辑绑定到 RDD 的 compute() 方法中,由 Executor 执行。所以只有 RDD 的动作算子才会真正的运行。

常用操作

显示所有阶段的执行计划

使用explain方法查看所有阶段,可以通过explain(extended=True)一次性查看所有阶段的计划

df = spark.sql("SELECT * FROM range(10) WHERE id > 5")
display(df)
print("=== Parsed Logical Plan ===")
df.explain(extended=True)
=== Parsed Logical Plan ===
== Parsed Logical Plan ==
'Project [*]
+- 'Filter ('id > 5)
  +- 'UnresolvedTableValuedFunction [range], [10], false
 
== Analyzed Logical Plan ==
id: bigint
Project [id#72430L]
+- Filter (id#72430L > cast(5 as bigint))
  +- Range (0, 10, step=1, splits=None)
 
== Optimized Logical Plan ==
Filter (id#72430L > 5)
  +- Range (0, 10, step=1, splits=None)
 
== Physical Plan ==
*(1) Filter (id#72430L > 5)
  +- *(1) Range (0, 10, step=1, splits=8)

问题与回答

为什么 RDD 的动作算子才会执行,而逻辑算子并不会执行?

在最终将生成的代码逻辑绑定到 RDD 的 compute() 方法中,由 Executor 执行。所以只有 RDD 的动作算子才会真正的运行。

通过物理计划生成代码后,将代码绑定到RDD 的 compute() 方法中,再由Executor 执行。所以只有 RDD 的动作算子才会真正的运行。

DAG 通过 Shuffle 进行划分 Stage,本质是什么样子的呢?

我们都知道 DAG Schedular 通过 Shuffle 将 DAG 划分为 Stage,那么本质上是如何实现的呢?

首先,我们需要知道Stage 的划分是在 DAG Scheduler 处理物理执行计划时完成的,属于任务调度阶段。 物理执行计划中有ShuffleExchangeExec 这个物理算子,所以很显然,通过物理执行计划中的ShuffleExchangeExec 算子,将 DAG 划分为 Stage。

  • Stage 分为两种类型:

│ └─ FileSourceScanExec └─ ShuffleExchangeExec (右表分区) └─ ProjectExec └─ FileSourceScanExec


- **Stage 划分**:

  - 每个 `ShuffleExchangeExec` 会生成一个独立的 **ShuffleMapStage**。
  - `SortMergeJoinExec` 和后续操作(如有)会生成 **ResultStage**。
评论