Spark 架构分析
SQL 解析
解析阶段将 SQL 文本转换为未解析的逻辑计划。Spark SQL 使用 ANTLR4 解析 SQL 语句,生成抽象语法树(AST)。具体的流程为:
- SqlBaseLexer 分词
- SqlBaseParser SQL 解析
- AstBuilder Ast 语法树构建
SqlBaseLexer 分词
SqlBaseLexer:SqlBaseLexer
SqlBaseLexer 对 SQL 文本执行词法分析,将其分解为诸如关键字、标识符、文本和运算符等标记。它是从 SqlBaseLexer.g4 中的语法定义生成的。
主要特点:
- 检测关键字和保留字
- 文本(字符串、数字等)的标记识别
- 特殊字符和运算符的处理
- 支持注释和空格
- 词法分析器包含用于处理边缘情况(如十进制文本)和检测未闭合括号注释的特殊方法。
SqlBaseParser SQL 解析器
SqlBaseParser 通过根据语法规则将标记组织到分层结构中来执行句法分析。它是根据 SqlBaseParser.g4 中的语法定义生成的。
主要特点:
- 解析 DML 语句(SELECT、INSERT、UPDATE、DELETE)
- 解析 DDL 语句(CREATE、ALTER、DROP)
- 支持表达式、联接类型、窗口函数等。
- 子查询和 CTE(公共表表达式)的处理
- 解析器生成表示 SQL 语句结构的解析树(抽象语法树或 AST)。
AstBuilder Ast 语法树构建
AstBuilder 是 ANTLR 解析树和 Spark 的逻辑计划表示之间的桥梁。它使用访问者模式遍历解析树并构建相应的逻辑计划。
重要方法:
- visitSingleStatement():解析单个 SQL 语句的入口点
- visitQuery():处理 SELECT 查询
- visitInsertInto():处理 INSERT 语句
- visitExpression():将 SQL 表达式转换为 Catalyst 表达式
- AstBuilder 会创建未解析的逻辑计划,其中对表、列和函数的引用尚未连接到实际数据或实现。
SQL 分析
Spark的分析阶段(Analysis Phase)是Spark SQL查询处理流程中的核心环节,负责将用户提交的未解析逻辑计划(Unresolved LogicalPlan)转换为已解析的逻辑计划(Analyzed LogicalPlan)。这一过程通过多批次规则(Batches of Rules)逐步解决逻辑计划中的未绑定元素(如表名、字段名、类型等),最终生成一个语义明确、可执行的逻辑计划。
未解析逻辑计划的来源
用户提交的SQL或DataFrame操作会经过解析器(Parser)生成语法树,此时树中的表、字段等信息尚未与真实数据源关联。例如:
- 表名可能未与Catalog中的元数据匹配;
- 字段的类型或存在性未被验证;
- 函数名未绑定到实际实现方法
分析阶段的核心流程
分析阶段通过**规则引擎(RuleExecutor)**逐批应用规则,逐步“绑定”未解析元素。关键步骤如下:
(1)规则批次(Batches)的划分
Analyzer中预定义了多个规则批次,每个批次包含一组相关规则,按顺序执行。常见批次包括:
- Hints处理:处理如
/*+ BROADCASTJOIN */
等优化提示。 - 简单检查(Simple Sanity Check) :验证函数是否存在、语法合法性等。
- 解析(Resolution) :核心批次,绑定表名、字段名、分配唯一ID等。
- 后处理(Post-Hoc Resolution) :用户自定义的扩展规则 36 28。
(2)规则应用方式
- 自顶向下或自底向上遍历:例如,
ResolveRelations
规则通过遍历语法树,将未解析的表名(如UnresolvedRelation
)替换为Catalog中注册的实际数据源逻辑计划 14。 - 模式匹配(Pattern Matching) :规则通过Scala的模式匹配机制匹配特定子树结构。例如,匹配
Filter
节点并下推过滤条件 23。 - 迭代执行:某些规则需多次应用直至达到“固定点”(FixedPoint),即逻辑计划不再变化 28。
(3)关键解析规则举例
- ResolveRelations:解析表名,检查Catalog是否存在对应表或视图,并替换为具体数据源的逻辑计划。
- ResolveReferences:绑定字段名到具体列,验证字段是否存在及类型是否兼容。
- ResolveFunctions:验证函数是否存在,并绑定到实际实现。
- ImplicitTypeCasts:自动插入类型转换(如将
String
隐式转为Date
)
案例
假设 person
表已注册到 Spark 的 Catalog 中(例如通过 spark.sql("CREATE TABLE person (id INT, name STRING)")
)然后以如下的 Sql 语句来说明
select id from person
通过分词以及解析后,得到了一个未解析的逻辑计划
== Unresolved Logical Plan ==
'Project ['id]
+- 'UnresolvedRelation [person] // 未解析的表名
- 表名未绑定:
UnresolvedRelation("person")
尚未关联到 Catalog 中的实际数据源。 - 字段未验证:
'id
是UnresolvedAttribute
,尚未绑定到具体表的列。
应用ResolveRelations 规则
ResolveRelations:解析表名,检查Catalog是否存在对应表或视图,并替换为具体数据源的逻辑计划。
规则目标:将 UnresolvedRelation
绑定到 Catalog 中注册的表或视图。
- Analyzer 遍历逻辑计划,发现
UnresolvedRelation("person")
。 - 查询 Catalog(如 Hive Metastore 或 Spark 内置 Catalog),发现
person
表存在,其 Schema 为(id INT, name STRING)
。 - 将
UnresolvedRelation
替换为具体的LogicalRelation
(表示数据源的逻辑结构)。
更新后的逻辑计划:
== Analyzed Logical Plan (Partial) ==
'Project ['id]
+- SubqueryAlias person // 绑定到具体表的逻辑计划
+- LogicalRelation (id INT, name STRING) // 数据源的逻辑表示
- 表名已绑定:
person
表已被替换为SubqueryAlias
和LogicalRelation
。 - 字段仍处于未解析状态:
'id
仍然是UnresolvedAttribute
。
应用ResolveReferences
规则
规则目标:将 UnresolvedAttribute
绑定到具体表的列。
- Analyzer 遍历逻辑计划,发现
'id
是未解析字段。 - 向上查找表的 Schema(从父节点
SubqueryAlias person
的LogicalRelation
中获取 Schema)。 - 检查
person
表的 Schema 是否包含id
列:
- 如果存在,将
'id
替换为AttributeReference
(包含唯一 ID 和数据类型)。 - 如果不存在,抛出
AnalysisException
(如Column 'id' not found
)。
更新后的逻辑计划:
== Analyzed Logical Plan ==
Project [id#10] // 绑定到具体列(id INT)
+- SubqueryAlias person
+- LogicalRelation (id#10 INT, name#11 STRING)
此时:
- 字段已绑定:
id
对应到LogicalRelation
中的id#10
(AttributeReference
)。 - 分配唯一 ID:
id#10
是全局唯一的表达式 ID(ExprId
),避免名称冲突。 - 数据类型明确:
id
的类型确定为INT
。
扩展
Spark 允许用户注入自定义规则。例如,定义一个规则 ResolveSpecialColumns
,将 SELECT special_id FROM person
中的 special_id
映射到 id
:
object ResolveSpecialColumns extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
case Project(projectList, child) =>
Project(projectList.map {
case UnresolvedAttribute("special_id") => child.output.find(_.name == "id").get
case other => other
}, child)
}
}
// 注册自定义规则到 Analyzer
spark.sessionState.analyzer.addResolutionRule(ResolveSpecialColumns)
封装为 jar 包,注册到 Spark 中。
逻辑优化
当 Spark 分析完成后,生成了对应的已经分析后的逻辑计划,然后通过Catalyst 优化器对解析的逻辑计划进行优化,从而将计划转换为更高效的形式,即解析后的逻辑计划->优化后的逻辑计划,很多的优化方式都是 SQL 中常用的优化方式。
Apache Spark的Catalyst优化器通过多种优化方法提升查询性能,结合规则优化(Rule-Based)和成本优化(Cost-Based)策略,其常用优化方法及具体实例如下:
谓词下推
将过滤条件尽可能下推到数据源附近,减少后续处理的数据量。例如,在读取Parquet或Hive表时,直接跳过不满足条件的行。
SELECT * FROM table WHERE age > 30 AND country = 'US'
优化后,Spark可能直接在数据源层面应用country = 'US'
过滤,减少数据加载。
列裁剪(Column Pruning)
仅读取查询中涉及的列,忽略无关列以减少I/O和内存开销。
示例:查询SELECT name FROM employees
时,即使表中包含age
或salary
等字段,Catalyst会跳过这些列的读取。
常量折叠(Constant Folding)
在编译阶段计算常量表达式,避免运行时重复计算。
示例:SELECT salary * 0.1 * 12
中的0.1 * 12
会被提前计算为1.2
,运行时直接计算salary * 1.2
。
操作符合并与简化
- CombineFilters:合并连续的过滤条件。例如,
Filter(age > 30, Filter(age < 50, child))
合并为Filter(age > 30 AND age < 50, child)
,减少多次扫描。 - CollapseRepartition:合并相邻的
repartition
操作,避免不必要的Shuffle 。 - LikeSimplification:将
LIKE 'abc%'
转换为更高效的StartsWith("abc")
。
案例
SELECT
o.order_id,
u.name,
o.amount
FROM
orders o
JOIN
users u
ON
o.user_id = u.user_id
WHERE
u.country = 'US'
AND o.amount > 100
AND o.order_date >= '2024-01-01'
Spark 解析 SQL 后生成的原始逻辑计划如下
== Parsed Logical Plan ==
Project [order_id#10, name#24, amount#12]
+- Filter (country#25 = US AND amount#12 > 100 AND order_date#13 >= 2024-01-01)
+- Join Inner (user_id#11 = user_id#23)
:- SubqueryAlias o
: +- Relation[order_id#10, user_id#11, amount#12, order_date#13] parquet
+- SubqueryAlias u
+- Relation[user_id#23, name#24, age#25, country#26] parquet
- 按照 SQL 的书写顺序生成计划:先执行
JOIN
,然后执行Filter
,最后执行Project
。 - 所有过滤条件(
country
,amount
,order_date
)均位于JOIN
之后,可能导致JOIN
处理大量冗余数据。
Catalyst 应用优化规则后,逻辑计划变为:
== Optimized Logical Plan ==
Project [order_id#10, name#24, amount#12]
+- Join Inner (user_id#11 = user_id#23)
:- Filter (amount#12 > 100 AND order_date#13 >= 2024-01-01)
: +- Relation[order_id#10, user_id#11, amount#12, order_date#13] parquet
+- Filter (country#26 = US)
+- Relation[user_id#23, name#24, age#25, country#26] parquet
以下通过一个具体的 SQL 查询案例,对比解析后的原始逻辑计划和优化后的逻辑计划,详细说明 Catalyst 优化器在逻辑计划层面的优化过程。
案例背景
假设我们有两个表:
orders
表(字段:order_id
,user_id
,amount
,order_date
)users
表(字段:user_id
,name
,age
,country
)
执行如下查询:
SELECT
o.order_id,
u.name,
o.amount
FROM
orders o
JOIN
users u
ON
o.user_id = u.user_id
WHERE
u.country = 'US'
AND o.amount > 100
AND o.order_date >= '2024-01-01'
1. 解析后的原始逻辑计划
Spark 解析 SQL 后生成的原始逻辑计划如下(简化表示):
== Parsed Logical Plan ==
Project [order_id#10, name#24, amount#12]
+- Filter (country#25 = US AND amount#12 > 100 AND order_date#13 >= 2024-01-01)
+- Join Inner (user_id#11 = user_id#23)
:- SubqueryAlias o
: +- Relation[order_id#10, user_id#11, amount#12, order_date#13] parquet
+- SubqueryAlias u
+- Relation[user_id#23, name#24, age#25, country#26] parquet
特点:
- 按照 SQL 的书写顺序生成计划:先执行
JOIN
,然后执行Filter
,最后执行Project
。 - 所有过滤条件(
country
,amount
,order_date
)均位于JOIN
之后,可能导致JOIN
处理大量冗余数据。
2. 优化后的逻辑计划
Catalyst 应用优化规则后,逻辑计划变为:
== Optimized Logical Plan ==
Project [order_id#10, name#24, amount#12]
+- Join Inner (user_id#11 = user_id#23)
:- Filter (amount#12 > 100 AND order_date#13 >= 2024-01-01)
: +- Relation[order_id#10, user_id#11, amount#12, order_date#13] parquet
+- Filter (country#26 = US)
+- Relation[user_id#23, name#24, age#25, country#26] parquet
关键优化步骤:
- 谓词下推(Predicate Pushdown)
- 将
Filter (country = 'US')
下推到users
表的数据源读取阶段,直接过滤掉非美国用户的数据。 - 将
Filter (amount > 100 AND order_date >= '2024-01-01')
下推到orders
表的数据源读取阶段,减少参与JOIN
的订单数据量。 - 优化后,
JOIN
操作仅处理过滤后的数据。
- 列裁剪(Column Pruning)
orders
表仅保留order_id
,user_id
,amount
,order_date
(查询中引用的字段)。若表中还有其他字段(如product_id
),会被自动裁剪。users
表仅保留user_id
,name
,country
,忽略age
字段。
- 常量折叠(Constant Folding) 表达式
order_date >= '2024-01-01'
中的字符串'2024-01-01'
会被转换为日期类型常量(如2024-01-01 00:00:00
),避免运行时重复解析。
物理算子
了解物理计划前,要先明白什么是物理算子。
- ((20250527205757-b6b6n9h '物理算子是什么?是某种框架的 API 吗?'))
- ((20250527205757-x2qz44i '为什么要使用物理算子,而不是直接使用优化后的逻辑算子'))】
- ((20250527205757-0zchep1 '物理算子有哪些?分别有什么样的作用呢?'))】
- 只有 SparkSQL 用到了物理算子吗?还是用 RDD 算子也使用到了物理算子呢?
物理算子是什么?是某种框架的 API 吗?
在 Spark 中,物理算子(Physical Operators) 是为最终执行设计的底层操作单元,负责将逻辑计划转换为具体的、可执行的步骤。它们并非某种语言的 API,而是 Spark 内部执行引擎的组成部分,直接对应到分布式计算任务的具体实现。
- 用户不直接调用物理算子:用户通过 DataFrame、SQL 或 RDD 等高级 API 编写逻辑(例如
df.filter()
或SELECT * FROM table
),由 Catalyst 优化器将这些逻辑转换为物理算子。 - 内部实现:物理算子是 Spark 执行引擎的内部组件,例如
FileSourceScanExec
类是 Spark 源码中的一个 Scala 类,负责生成文件扫描的 RDD。 - 查看物理计划:用户可以通过
df.explain(mode="formatted")
或 Spark UI 查看物理计划中的物理算子,但通常不需要直接操作它们。
为什么要使用物理算子,而不是直接使用优化后的逻辑算子
每个物理算子会生成对应的 RDD 操作链:
物理算子树 → 转换为 RDD DAG → 分解为 Stage 和 Task → 在 Executor 上执行
物理算子有哪些?分别有什么样的作用呢?
以下是一些常见的物理算子及其作用:
物理算子 | 作用 |
---|---|
FileSourceScanExec | 从文件(Parquet、CSV 等)中读取数据 |
FilterExec | 对数据行按条件过滤 |
ProjectExec | 选择或计算特定列(类似 SELECT 语句) |
SortMergeJoinExec | 基于排序的 Join 操作(大表 Join) |
BroadcastHashJoinExec | 广播小表实现 Hash Join(小表 Join) |
HashAggregateExec | 内存中的哈希聚合(如 GROUP BY) |
SortExec | 对数据排序 |
ShuffleExchangeExec | 数据重分区(Shuffle) |
代码生成
在 Spark 中,物理执行计划生成后,代码生成(Code Generation)是将物理算子树转换为可执行的字节码的关键步骤,这是通过 Tungsten 优化引擎 实现的。这一过程大幅减少了虚函数调用和中间数据结构的开销,显著提高了 CPU 效率。
1. 代码生成的核心机制
Spark 使用 Whole-Stage Code Generation(全阶段代码生成) 技术,将多个物理算子合并为一个 Java 方法,直接生成高度优化的代码。以下是具体流程:
步骤 1:物理算子的遍历
- 物理执行计划树由多个物理算子(如
FilterExec
、ProjectExec
、HashAggregateExec
)组成。 - Spark 遍历物理算子树,识别可以合并为单个代码生成阶段的连续算子(称为一个 "Stage" )。
步骤 2:生成代码片段
-
对每个 Stage 内的算子,Spark 动态生成 Java 代码:
- 例如,
Filter(condition)
和Project(expressions)
两个连续算子会被合并为一个循环:
- 例如,
for (Row row : inputRows) {
if (row.getInt(0) > 30) { // Filter 的 condition
outputRow.setInt(0, row.getInt(0) * 2); // Project 的表达式
emit(outputRow);
}
}
- 代码直接操作二进制数据(Tungsten 内存格式),跳过 Spark 内部的 Row 对象封装。
步骤 3:编译字节码
- 生成的 Java 代码通过 Janino 编译器 编译为 JVM 字节码。
- 编译后的类通过反射加载到 JVM 中,最终生成一个实现了
RDD[InternalRow]
的计算逻辑的类。
步骤 4:执行生成的代码
- 生成的代码直接操作
UnsafeRow
(基于堆外内存的二进制格式),避免序列化/反序列化开销。 - 最终将生成的代码逻辑绑定到 RDD 的
compute()
方法中,由 Executor 执行。所以只有 RDD 的动作算子才会真正的运行。
常用操作
显示所有阶段的执行计划
使用explain方法查看所有阶段,可以通过explain(extended=True)
一次性查看所有阶段的计划
df = spark.sql("SELECT * FROM range(10) WHERE id > 5")
display(df)
print("=== Parsed Logical Plan ===")
df.explain(extended=True)
=== Parsed Logical Plan ===
== Parsed Logical Plan ==
'Project [*]
+- 'Filter ('id > 5)
+- 'UnresolvedTableValuedFunction [range], [10], false
== Analyzed Logical Plan ==
id: bigint
Project [id#72430L]
+- Filter (id#72430L > cast(5 as bigint))
+- Range (0, 10, step=1, splits=None)
== Optimized Logical Plan ==
Filter (id#72430L > 5)
+- Range (0, 10, step=1, splits=None)
== Physical Plan ==
*(1) Filter (id#72430L > 5)
+- *(1) Range (0, 10, step=1, splits=8)
问题与回答
为什么 RDD 的动作算子才会执行,而逻辑算子并不会执行?
在最终将生成的代码逻辑绑定到 RDD 的 compute() 方法中,由 Executor 执行。所以只有 RDD 的动作算子才会真正的运行。
通过物理计划生成代码后,将代码绑定到RDD 的 compute()
方法中,再由Executor 执行。所以只有 RDD 的动作算子才会真正的运行。
DAG 通过 Shuffle 进行划分 Stage,本质是什么样子的呢?
我们都知道 DAG Schedular 通过 Shuffle 将 DAG 划分为 Stage,那么本质上是如何实现的呢?
首先,我们需要知道Stage 的划分是在 DAG Scheduler 处理物理执行计划时完成的,属于任务调度阶段。 物理执行计划中有ShuffleExchangeExec
这个物理算子,所以很显然,通过物理执行计划中的ShuffleExchangeExec
算子,将 DAG 划分为 Stage。
- Stage 分为两种类型:
│ └─ FileSourceScanExec └─ ShuffleExchangeExec (右表分区) └─ ProjectExec └─ FileSourceScanExec
- **Stage 划分**:
- 每个 `ShuffleExchangeExec` 会生成一个独立的 **ShuffleMapStage**。
- `SortMergeJoinExec` 和后续操作(如有)会生成 **ResultStage**。