Spark 架构分析

5 天前

SQL 解析

解析阶段将 SQL 文本转换为未解析的逻辑计划。Spark SQL 使用 ANTLR4 解析 SQL 语句,生成抽象语法树(AST)。具体的流程为:

  1. SqlBaseLexer 分词
  2. SqlBaseParser SQL 解析
  3. AstBuilder Ast 语法树构建

SqlBaseLexer 分词

SqlBaseLexer:SqlBaseLexer

SqlBaseLexer 对 SQL 文本执行词法分析,将其分解为诸如关键字、标识符、文本和运算符等标记。它是从 SqlBaseLexer.g4 中的语法定义生成的。

主要特点:

  1. 检测关键字和保留字
  2. 文本(字符串、数字等)的标记识别
  3. 特殊字符和运算符的处理
  4. 支持注释和空格
  5. 词法分析器包含用于处理边缘情况(如十进制文本)和检测未闭合括号注释的特殊方法。

SqlBaseParser SQL 解析器

SqlBaseParser 通过根据语法规则将标记组织到分层结构中来执行句法分析。它是根据 SqlBaseParser.g4 中的语法定义生成的。

主要特点:

  1. 解析 DML 语句(SELECT、INSERT、UPDATE、DELETE)
  2. 解析 DDL 语句(CREATE、ALTER、DROP)
  3. 支持表达式、联接类型、窗口函数等。
  4. 子查询和 CTE(公共表表达式)的处理
  5. 解析器生成表示 SQL 语句结构的解析树(抽象语法树或 AST)。

AstBuilder Ast 语法树构建

AstBuilder 是 ANTLR 解析树和 Spark 的逻辑计划表示之间的桥梁。它使用访问者模式遍历解析树并构建相应的逻辑计划。

重要方法:

  1. visitSingleStatement():解析单个 SQL 语句的入口点
  2. visitQuery():处理 SELECT 查询
  3. visitInsertInto():处理 INSERT 语句
  4. visitExpression():将 SQL 表达式转换为 Catalyst 表达式
  5. AstBuilder 会创建未解析的逻辑计划,其中对表、列和函数的引用尚未连接到实际数据或实现。

SQL 分析

Spark的分析阶段(Analysis Phase)是Spark SQL查询处理流程中的核心环节,负责将用户提交的未解析逻辑计划(Unresolved LogicalPlan)转换为已解析的逻辑计划(Analyzed LogicalPlan)。这一过程通过多批次规则(Batches of Rules)逐步解决逻辑计划中的未绑定元素(如表名、字段名、类型等),最终生成一个语义明确、可执行的逻辑计划。

未解析逻辑计划的来源

用户提交的SQL或DataFrame操作会经过解析器(Parser)生成语法树,此时树中的表、字段等信息尚未与真实数据源关联。例如:

  • 表名可能未与Catalog中的元数据匹配;
  • 字段的类型或存在性未被验证;
  • 函数名未绑定到实际实现方法

分析阶段的核心流程

分析阶段通过**规则引擎(RuleExecutor)**逐批应用规则,逐步“绑定”未解析元素。关键步骤如下:

(1)规则批次(Batches)的划分

Analyzer中预定义了多个规则批次,每个批次包含一组相关规则,按顺序执行。常见批次包括:

  • Hints处理:处理如/*+ BROADCASTJOIN */等优化提示。
  • 简单检查(Simple Sanity Check) :验证函数是否存在、语法合法性等。
  • 解析(Resolution) :核心批次,绑定表名、字段名、分配唯一ID等。
  • 后处理(Post-Hoc Resolution) :用户自定义的扩展规则 36 28。

(2)规则应用方式

  • 自顶向下或自底向上遍历:例如,ResolveRelations规则通过遍历语法树,将未解析的表名(如UnresolvedRelation)替换为Catalog中注册的实际数据源逻辑计划 14。
  • 模式匹配(Pattern Matching) :规则通过Scala的模式匹配机制匹配特定子树结构。例如,匹配Filter节点并下推过滤条件 23。
  • 迭代执行:某些规则需多次应用直至达到“固定点”(FixedPoint),即逻辑计划不再变化 28。

(3)关键解析规则举例

  • ResolveRelations:解析表名,检查Catalog是否存在对应表或视图,并替换为具体数据源的逻辑计划。
  • ResolveReferences:绑定字段名到具体列,验证字段是否存在及类型是否兼容。
  • ResolveFunctions:验证函数是否存在,并绑定到实际实现。
  • ImplicitTypeCasts:自动插入类型转换(如将String隐式转为Date

案例

假设 person 表已注册到 Spark 的 Catalog 中(例如通过 spark.sql("CREATE TABLE person (id INT, name STRING)"))然后以如下的 Sql 语句来说明

select id from person

通过分词以及解析后,得到了一个未解析的逻辑计划

== Unresolved Logical Plan ==
'Project ['id]
+- 'UnresolvedRelation [person]  // 未解析的表名
  • 表名未绑定UnresolvedRelation("person") 尚未关联到 Catalog 中的实际数据源。
  • 字段未验证'idUnresolvedAttribute,尚未绑定到具体表的列。

应用ResolveRelations 规则

ResolveRelations:解析表名,检查Catalog是否存在对应表或视图,并替换为具体数据源的逻辑计划。

规则目标:将 UnresolvedRelation 绑定到 Catalog 中注册的表或视图。

  1. Analyzer 遍历逻辑计划,发现 UnresolvedRelation("person")
  2. 查询 Catalog(如 Hive Metastore 或 Spark 内置 Catalog),发现 person 表存在,其 Schema 为 (id INT, name STRING)
  3. UnresolvedRelation 替换为具体的 LogicalRelation(表示数据源的逻辑结构)。

更新后的逻辑计划

== Analyzed Logical Plan (Partial) ==
'Project ['id]
+- SubqueryAlias person  // 绑定到具体表的逻辑计划
   +- LogicalRelation (id INT, name STRING)  // 数据源的逻辑表示
  • 表名已绑定person 表已被替换为 SubqueryAliasLogicalRelation
  • 字段仍处于未解析状态'id 仍然是 UnresolvedAttribute

应用ResolveReferences 规则

规则目标:将 UnresolvedAttribute 绑定到具体表的列。

  1. Analyzer 遍历逻辑计划,发现 'id 是未解析字段。
  2. 向上查找表的 Schema(从父节点 SubqueryAlias personLogicalRelation 中获取 Schema)。
  3. 检查 person 表的 Schema 是否包含 id 列:
  • 如果存在,将 'id 替换为 AttributeReference(包含唯一 ID 和数据类型)。
  • 如果不存在,抛出 AnalysisException(如 Column 'id' not found)。

更新后的逻辑计划

== Analyzed Logical Plan ==
Project [id#10]  // 绑定到具体列(id INT)
+- SubqueryAlias person
+- LogicalRelation (id#10 INT, name#11 STRING)

此时:

  • 字段已绑定id 对应到 LogicalRelation 中的 id#10AttributeReference)。
  • 分配唯一 IDid#10 是全局唯一的表达式 ID(ExprId),避免名称冲突。
  • 数据类型明确id 的类型确定为 INT

扩展

Spark 允许用户注入自定义规则。例如,定义一个规则 ResolveSpecialColumns,将 SELECT special_id FROM person 中的 special_id 映射到 id

object ResolveSpecialColumns extends Rule[LogicalPlan] {
  def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
    case Project(projectList, child) =>
      Project(projectList.map {
        case UnresolvedAttribute("special_id") => child.output.find(_.name == "id").get
        case other => other
      }, child)
  }
}
 
// 注册自定义规则到 Analyzer
spark.sessionState.analyzer.addResolutionRule(ResolveSpecialColumns)

封装为 jar 包,注册到 Spark 中。

逻辑优化

当 Spark 分析完成后,生成了对应的已经分析后的逻辑计划,然后通过Catalyst 优化器对解析的逻辑计划进行优化,从而将计划转换为更高效的形式,即解析后的逻辑计划->优化后的逻辑计划,很多的优化方式都是 SQL 中常用的优化方式

Apache Spark的Catalyst优化器通过多种优化方法提升查询性能,结合规则优化(Rule-Based)和成本优化(Cost-Based)策略,其常用优化方法及具体实例如下:

谓词下推

将过滤条件尽可能下推到数据源附近,减少后续处理的数据量。例如,在读取Parquet或Hive表时,直接跳过不满足条件的行。

SELECT * FROM table WHERE age > 30 AND country = 'US'

优化后,Spark可能直接在数据源层面应用country = 'US'过滤,减少数据加载。

列裁剪(Column Pruning)

仅读取查询中涉及的列,忽略无关列以减少I/O和内存开销。

示例:查询SELECT name FROM employees时,即使表中包含agesalary等字段,Catalyst会跳过这些列的读取。

常量折叠(Constant Folding)

在编译阶段计算常量表达式,避免运行时重复计算。

示例SELECT salary * 0.1 * 12中的0.1 * 12会被提前计算为1.2,运行时直接计算salary * 1.2

操作符合并与简化

  • CombineFilters:合并连续的过滤条件。例如,Filter(age > 30, Filter(age < 50, child))合并为Filter(age > 30 AND age < 50, child),减少多次扫描。
  • CollapseRepartition:合并相邻的repartition操作,避免不必要的Shuffle 。
  • LikeSimplification:将LIKE 'abc%'转换为更高效的StartsWith("abc")

案例

SELECT 
  o.order_id, 
  u.name, 
  o.amount 
FROM 
  orders o 
JOIN 
  users u 
ON 
  o.user_id = u.user_id 
WHERE 
  u.country = 'US' 
  AND o.amount > 100 
  AND o.order_date >= '2024-01-01'

Spark 解析 SQL 后生成的原始逻辑计划如下

== Parsed Logical Plan ==
Project [order_id#10, name#24, amount#12]
+- Filter (country#25 = US AND amount#12 > 100 AND order_date#13 >= 2024-01-01)
   +- Join Inner (user_id#11 = user_id#23)
      :- SubqueryAlias o
      :  +- Relation[order_id#10, user_id#11, amount#12, order_date#13] parquet
      +- SubqueryAlias u
         +- Relation[user_id#23, name#24, age#25, country#26] parquet
  • 按照 SQL 的书写顺序生成计划:先执行 JOIN,然后执行 Filter,最后执行 Project
  • 所有过滤条件(country, amount, order_date)均位于 JOIN 之后,可能导致 JOIN 处理大量冗余数据。

Catalyst 应用优化规则后,逻辑计划变为:

== Optimized Logical Plan ==
Project [order_id#10, name#24, amount#12]
+- Join Inner (user_id#11 = user_id#23)
   :- Filter (amount#12 > 100 AND order_date#13 >= 2024-01-01)
   :  +- Relation[order_id#10, user_id#11, amount#12, order_date#13] parquet
   +- Filter (country#26 = US)
      +- Relation[user_id#23, name#24, age#25, country#26] parquet

以下通过一个具体的 SQL 查询案例,对比解析后的原始逻辑计划优化后的逻辑计划,详细说明 Catalyst 优化器在逻辑计划层面的优化过程。

案例背景

假设我们有两个表:

  • orders 表(字段:order_id, user_id, amount, order_date
  • users 表(字段:user_id, name, age, country

执行如下查询:

SELECT 
  o.order_id, 
  u.name, 
  o.amount 
FROM 
  orders o 
JOIN 
  users u 
ON 
  o.user_id = u.user_id 
WHERE 
  u.country = 'US' 
  AND o.amount > 100 
  AND o.order_date >= '2024-01-01'

1. 解析后的原始逻辑计划

Spark 解析 SQL 后生成的原始逻辑计划如下(简化表示):

== Parsed Logical Plan ==
Project [order_id#10, name#24, amount#12]
+- Filter (country#25 = US AND amount#12 > 100 AND order_date#13 >= 2024-01-01)
   +- Join Inner (user_id#11 = user_id#23)
      :- SubqueryAlias o
      :  +- Relation[order_id#10, user_id#11, amount#12, order_date#13] parquet
      +- SubqueryAlias u
         +- Relation[user_id#23, name#24, age#25, country#26] parquet

特点

  • 按照 SQL 的书写顺序生成计划:先执行 JOIN,然后执行 Filter,最后执行 Project
  • 所有过滤条件(country, amount, order_date)均位于 JOIN 之后,可能导致 JOIN 处理大量冗余数据。

2. 优化后的逻辑计划

Catalyst 应用优化规则后,逻辑计划变为:

== Optimized Logical Plan ==
Project [order_id#10, name#24, amount#12]
+- Join Inner (user_id#11 = user_id#23)
   :- Filter (amount#12 > 100 AND order_date#13 >= 2024-01-01)
   :  +- Relation[order_id#10, user_id#11, amount#12, order_date#13] parquet
   +- Filter (country#26 = US)
      +- Relation[user_id#23, name#24, age#25, country#26] parquet

关键优化步骤

  1. 谓词下推(Predicate Pushdown)
  • Filter (country = 'US') 下推到 users 表的数据源读取阶段,直接过滤掉非美国用户的数据。
  • Filter (amount > 100 AND order_date >= '2024-01-01') 下推到 orders 表的数据源读取阶段,减少参与 JOIN 的订单数据量。
  • 优化后,JOIN 操作仅处理过滤后的数据。
  1. 列裁剪(Column Pruning)
  • orders 表仅保留 order_id, user_id, amount, order_date(查询中引用的字段)。若表中还有其他字段(如 product_id),会被自动裁剪。
  • users 表仅保留 user_id, name, country,忽略 age 字段。
  1. 常量折叠(Constant Folding) 表达式 order_date >= '2024-01-01' 中的字符串 '2024-01-01' 会被转换为日期类型常量(如 2024-01-01 00:00:00),避免运行时重复解析。

物理算子

了解物理计划前,要先明白什么是物理算子。

  1. ((20250527205757-b6b6n9h '物理算子是什么?是某种框架的 API 吗?'))
  2. ((20250527205757-x2qz44i '为什么要使用物理算子,而不是直接使用优化后的逻辑算子'))】
  3. ((20250527205757-0zchep1 '物理算子有哪些?分别有什么样的作用呢?'))】
  4. 只有 SparkSQL 用到了物理算子吗?还是用 RDD 算子也使用到了物理算子呢?

物理算子是什么?是某种框架的 API 吗?

在 Spark 中,物理算子(Physical Operators) 是为最终执行设计的底层操作单元,负责将逻辑计划转换为具体的、可执行的步骤。它们并非某种语言的 API,而是 Spark 内部执行引擎的组成部分,直接对应到分布式计算任务的具体实现。

  • 用户不直接调用物理算子:用户通过 DataFrame、SQL 或 RDD 等高级 API 编写逻辑(例如 df.filter()SELECT * FROM table),由 Catalyst 优化器将这些逻辑转换为物理算子。
  • 内部实现:物理算子是 Spark 执行引擎的内部组件,例如 FileSourceScanExec 类是 Spark 源码中的一个 Scala 类,负责生成文件扫描的 RDD。
  • 查看物理计划:用户可以通过 df.explain(mode="formatted") 或 Spark UI 查看物理计划中的物理算子,但通常不需要直接操作它们。

为什么要使用物理算子,而不是直接使用优化后的逻辑算子

每个物理算子会生成对应的 RDD 操作链

物理算子树 → 转换为 RDD DAG → 分解为 Stage 和 Task → 在 Executor 上执行

物理算子有哪些?分别有什么样的作用呢?

以下是一些常见的物理算子及其作用:

物理算子作用
FileSourceScanExec从文件(Parquet、CSV 等)中读取数据
FilterExec对数据行按条件过滤
ProjectExec选择或计算特定列(类似 SELECT 语句)
SortMergeJoinExec基于排序的 Join 操作(大表 Join)
BroadcastHashJoinExec广播小表实现 Hash Join(小表 Join)
HashAggregateExec内存中的哈希聚合(如 GROUP BY)
SortExec对数据排序
ShuffleExchangeExec数据重分区(Shuffle)

代码生成

在 Spark 中,物理执行计划生成后,代码生成(Code Generation)是将物理算子树转换为可执行的字节码的关键步骤,这是通过 Tungsten 优化引擎 实现的。这一过程大幅减少了虚函数调用和中间数据结构的开销,显著提高了 CPU 效率。

1. 代码生成的核心机制

Spark 使用 Whole-Stage Code Generation(全阶段代码生成) 技术,将多个物理算子合并为一个 Java 方法,直接生成高度优化的代码。以下是具体流程:

步骤 1:物理算子的遍历

  • 物理执行计划树由多个物理算子(如 FilterExecProjectExecHashAggregateExec)组成。
  • Spark 遍历物理算子树,识别可以合并为单个代码生成阶段的连续算子(称为一个 "Stage" )。

步骤 2:生成代码片段

  • 对每个 Stage 内的算子,Spark 动态生成 Java 代码

    • 例如,Filter(condition)Project(expressions) 两个连续算子会被合并为一个循环:
for (Row row : inputRows) {
    if (row.getInt(0) > 30) {          // Filter 的 condition
        outputRow.setInt(0, row.getInt(0) * 2); // Project 的表达式
        emit(outputRow);
    }
}
  • 代码直接操作二进制数据(Tungsten 内存格式),跳过 Spark 内部的 Row 对象封装。

步骤 3:编译字节码

  • 生成的 Java 代码通过 Janino 编译器 编译为 JVM 字节码
  • 编译后的类通过反射加载到 JVM 中,最终生成一个实现了 RDD[InternalRow] 的计算逻辑的类。

步骤 4:执行生成的代码

  • 生成的代码直接操作 UnsafeRow(基于堆外内存的二进制格式),避免序列化/反序列化开销。
  • 最终将生成的代码逻辑绑定到 RDD 的 compute() 方法中,由 Executor 执行。所以只有 RDD 的动作算子才会真正的运行。

常用操作

显示所有阶段的执行计划

使用explain方法查看所有阶段,可以通过explain(extended=True)一次性查看所有阶段的计划

df = spark.sql("SELECT * FROM range(10) WHERE id > 5")
display(df)
print("=== Parsed Logical Plan ===")
df.explain(extended=True)
=== Parsed Logical Plan ===
== Parsed Logical Plan ==
'Project [*]
+- 'Filter ('id > 5)
  +- 'UnresolvedTableValuedFunction [range], [10], false
 
== Analyzed Logical Plan ==
id: bigint
Project [id#72430L]
+- Filter (id#72430L > cast(5 as bigint))
  +- Range (0, 10, step=1, splits=None)
 
== Optimized Logical Plan ==
Filter (id#72430L > 5)
  +- Range (0, 10, step=1, splits=None)
 
== Physical Plan ==
*(1) Filter (id#72430L > 5)
  +- *(1) Range (0, 10, step=1, splits=8)

问题与回答

为什么 RDD 的动作算子才会执行,而逻辑算子并不会执行?

在最终将生成的代码逻辑绑定到 RDD 的 compute() 方法中,由 Executor 执行。所以只有 RDD 的动作算子才会真正的运行。

通过物理计划生成代码后,将代码绑定到RDD 的 compute() 方法中,再由Executor 执行。所以只有 RDD 的动作算子才会真正的运行。

DAG 通过 Shuffle 进行划分 Stage,本质是什么样子的呢?

我们都知道 DAG Schedular 通过 Shuffle 将 DAG 划分为 Stage,那么本质上是如何实现的呢?

首先,我们需要知道Stage 的划分是在 DAG Scheduler 处理物理执行计划时完成的,属于任务调度阶段。 物理执行计划中有ShuffleExchangeExec 这个物理算子,所以很显然,通过物理执行计划中的ShuffleExchangeExec 算子,将 DAG 划分为 Stage。

  • Stage 分为两种类型:

│ └─ FileSourceScanExec └─ ShuffleExchangeExec (右表分区) └─ ProjectExec └─ FileSourceScanExec


- **Stage 划分**:

  - 每个 `ShuffleExchangeExec` 会生成一个独立的 **ShuffleMapStage**。
  - `SortMergeJoinExec` 和后续操作(如有)会生成 **ResultStage**。